|
77 | 77 | from danswer.utils.variable_functionality import set_is_ee_based_on_env_variable
|
78 | 78 | from shared_configs.configs import MODEL_SERVER_HOST
|
79 | 79 | from shared_configs.configs import MODEL_SERVER_PORT
|
| 80 | +from shared_configs.configs import POSTGRES_DEFAULT_SCHEMA |
80 | 81 | from shared_configs.configs import SLACK_CHANNEL_ID
|
81 | 82 |
|
82 | 83 | logger = setup_logger()
|
@@ -189,59 +190,67 @@ def acquire_tenants(self) -> None:
|
189 | 190 | continue
|
190 | 191 |
|
191 | 192 | logger.debug(f"Acquired lock for tenant {tenant_id}")
|
192 |
| - with get_session_with_tenant(tenant_id) as db_session: |
193 |
| - try: |
194 |
| - logger.debug( |
195 |
| - f"Setting tenant ID context variable for tenant {tenant_id}" |
196 |
| - ) |
197 |
| - token = CURRENT_TENANT_ID_CONTEXTVAR.set(tenant_id or "public") |
198 |
| - slack_bot_tokens = fetch_tokens() |
199 |
| - logger.debug(f"Fetched Slack bot tokens for tenant {tenant_id}") |
200 |
| - CURRENT_TENANT_ID_CONTEXTVAR.reset(token) |
201 |
| - logger.debug( |
202 |
| - f"Reset tenant ID context variable for tenant {tenant_id}" |
203 |
| - ) |
204 |
| - |
205 |
| - if not slack_bot_tokens: |
206 |
| - logger.debug(f"No Slack bot token found for tenant {tenant_id}") |
| 193 | + token = CURRENT_TENANT_ID_CONTEXTVAR.set( |
| 194 | + tenant_id or POSTGRES_DEFAULT_SCHEMA |
| 195 | + ) |
| 196 | + try: |
| 197 | + with get_session_with_tenant(tenant_id) as db_session: |
| 198 | + try: |
| 199 | + logger.debug( |
| 200 | + f"Setting tenant ID context variable for tenant {tenant_id}" |
| 201 | + ) |
| 202 | + slack_bot_tokens = fetch_tokens() |
| 203 | + logger.debug(f"Fetched Slack bot tokens for tenant {tenant_id}") |
| 204 | + logger.debug( |
| 205 | + f"Reset tenant ID context variable for tenant {tenant_id}" |
| 206 | + ) |
| 207 | + |
| 208 | + if not slack_bot_tokens: |
| 209 | + logger.debug( |
| 210 | + f"No Slack bot token found for tenant {tenant_id}" |
| 211 | + ) |
| 212 | + if tenant_id in self.socket_clients: |
| 213 | + asyncio.run(self.socket_clients[tenant_id].close()) |
| 214 | + del self.socket_clients[tenant_id] |
| 215 | + del self.slack_bot_tokens[tenant_id] |
| 216 | + continue |
| 217 | + |
| 218 | + if ( |
| 219 | + tenant_id not in self.slack_bot_tokens |
| 220 | + or slack_bot_tokens != self.slack_bot_tokens[tenant_id] |
| 221 | + ): |
| 222 | + if tenant_id in self.slack_bot_tokens: |
| 223 | + logger.info( |
| 224 | + f"Slack Bot tokens have changed for tenant {tenant_id} - reconnecting" |
| 225 | + ) |
| 226 | + else: |
| 227 | + search_settings = get_current_search_settings( |
| 228 | + db_session |
| 229 | + ) |
| 230 | + embedding_model = EmbeddingModel.from_db_model( |
| 231 | + search_settings=search_settings, |
| 232 | + server_host=MODEL_SERVER_HOST, |
| 233 | + server_port=MODEL_SERVER_PORT, |
| 234 | + ) |
| 235 | + warm_up_bi_encoder(embedding_model=embedding_model) |
| 236 | + |
| 237 | + self.slack_bot_tokens[tenant_id] = slack_bot_tokens |
| 238 | + |
| 239 | + if tenant_id in self.socket_clients: |
| 240 | + asyncio.run(self.socket_clients[tenant_id].close()) |
| 241 | + |
| 242 | + self.start_socket_client(tenant_id, slack_bot_tokens) |
| 243 | + |
| 244 | + except KvKeyNotFoundError: |
| 245 | + logger.debug(f"Missing Slack Bot tokens for tenant {tenant_id}") |
207 | 246 | if tenant_id in self.socket_clients:
|
208 | 247 | asyncio.run(self.socket_clients[tenant_id].close())
|
209 | 248 | del self.socket_clients[tenant_id]
|
210 | 249 | del self.slack_bot_tokens[tenant_id]
|
211 |
| - continue |
212 |
| - |
213 |
| - if ( |
214 |
| - tenant_id not in self.slack_bot_tokens |
215 |
| - or slack_bot_tokens != self.slack_bot_tokens[tenant_id] |
216 |
| - ): |
217 |
| - if tenant_id in self.slack_bot_tokens: |
218 |
| - logger.info( |
219 |
| - f"Slack Bot tokens have changed for tenant {tenant_id} - reconnecting" |
220 |
| - ) |
221 |
| - else: |
222 |
| - search_settings = get_current_search_settings(db_session) |
223 |
| - embedding_model = EmbeddingModel.from_db_model( |
224 |
| - search_settings=search_settings, |
225 |
| - server_host=MODEL_SERVER_HOST, |
226 |
| - server_port=MODEL_SERVER_PORT, |
227 |
| - ) |
228 |
| - warm_up_bi_encoder(embedding_model=embedding_model) |
229 |
| - |
230 |
| - self.slack_bot_tokens[tenant_id] = slack_bot_tokens |
231 |
| - |
232 |
| - if tenant_id in self.socket_clients: |
233 |
| - asyncio.run(self.socket_clients[tenant_id].close()) |
234 |
| - |
235 |
| - self.start_socket_client(tenant_id, slack_bot_tokens) |
236 |
| - |
237 |
| - except KvKeyNotFoundError: |
238 |
| - logger.debug(f"Missing Slack Bot tokens for tenant {tenant_id}") |
239 |
| - if tenant_id in self.socket_clients: |
240 |
| - asyncio.run(self.socket_clients[tenant_id].close()) |
241 |
| - del self.socket_clients[tenant_id] |
242 |
| - del self.slack_bot_tokens[tenant_id] |
243 |
| - except Exception as e: |
244 |
| - logger.exception(f"Error handling tenant {tenant_id}: {e}") |
| 250 | + except Exception as e: |
| 251 | + logger.exception(f"Error handling tenant {tenant_id}: {e}") |
| 252 | + finally: |
| 253 | + CURRENT_TENANT_ID_CONTEXTVAR.reset(token) |
245 | 254 |
|
246 | 255 | def send_heartbeats(self) -> None:
|
247 | 256 | current_time = int(time.time())
|
|
0 commit comments