24
24
from onyx .background .celery .apps .task_formatters import CeleryTaskPlainFormatter
25
25
from onyx .background .celery .celery_utils import celery_is_worker_primary
26
26
from onyx .background .celery .celery_utils import make_probe_path
27
- # from onyx.background.celery.tasks.vespa.document_sync import DOCUMENT_SYNC_PREFIX
28
- # from onyx.background.celery.tasks.vespa.document_sync import DOCUMENT_SYNC_TASKSET_KEY
27
+ from onyx .background .celery .tasks .vespa .document_sync import DOCUMENT_SYNC_PREFIX
28
+ from onyx .background .celery .tasks .vespa .document_sync import DOCUMENT_SYNC_TASKSET_KEY
29
29
from onyx .configs .constants import ONYX_CLOUD_CELERY_TASK_PREFIX
30
30
from onyx .configs .constants import OnyxRedisLocks
31
31
from onyx .db .engine .sql_engine import get_sqlalchemy_engine
32
32
from onyx .document_index .vespa .shared_utils .utils import wait_for_vespa_with_timeout
33
33
from onyx .httpx .httpx_pool import HttpxPool
34
- # from onyx.redis.redis_connector import RedisConnector
35
- # from onyx.redis.redis_connector_delete import RedisConnectorDelete
34
+ from onyx .redis .redis_connector import RedisConnector
35
+ from onyx .redis .redis_connector_delete import RedisConnectorDelete
36
36
from onyx .redis .redis_connector_doc_perm_sync import RedisConnectorPermissionSync
37
37
from onyx .redis .redis_connector_ext_group_sync import RedisConnectorExternalGroupSync
38
38
from onyx .redis .redis_connector_prune import RedisConnectorPrune
@@ -157,9 +157,9 @@ def on_task_postrun(
157
157
# NOTE: we want to remove the `Redis*` classes, prefer to just have functions to
158
158
# do these things going forward. In short, things should generally be like the doc
159
159
# sync task rather than the others below
160
- # if task_id.startswith(DOCUMENT_SYNC_PREFIX):
161
- # r.srem(DOCUMENT_SYNC_TASKSET_KEY, task_id)
162
- # return
160
+ if task_id .startswith (DOCUMENT_SYNC_PREFIX ):
161
+ r .srem (DOCUMENT_SYNC_TASKSET_KEY , task_id )
162
+ return
163
163
164
164
if task_id .startswith (RedisDocumentSet .PREFIX ):
165
165
document_set_id = RedisDocumentSet .get_id_from_task_id (task_id )
@@ -175,33 +175,33 @@ def on_task_postrun(
175
175
r .srem (rug .taskset_key , task_id )
176
176
return
177
177
178
- # if task_id.startswith(RedisConnectorDelete.PREFIX):
179
- # cc_pair_id = RedisConnector.get_id_from_task_id(task_id)
180
- # if cc_pair_id is not None:
181
- # RedisConnectorDelete.remove_from_taskset(int(cc_pair_id), task_id, r)
182
- # return
183
-
184
- # if task_id.startswith(RedisConnectorPrune.SUBTASK_PREFIX):
185
- # cc_pair_id = RedisConnector.get_id_from_task_id(task_id)
186
- # if cc_pair_id is not None:
187
- # RedisConnectorPrune.remove_from_taskset(int(cc_pair_id), task_id, r)
188
- # return
189
-
190
- # if task_id.startswith(RedisConnectorPermissionSync.SUBTASK_PREFIX):
191
- # cc_pair_id = RedisConnector.get_id_from_task_id(task_id)
192
- # if cc_pair_id is not None:
193
- # RedisConnectorPermissionSync.remove_from_taskset(
194
- # int(cc_pair_id), task_id, r
195
- # )
196
- # return
197
-
198
- # if task_id.startswith(RedisConnectorExternalGroupSync.SUBTASK_PREFIX):
199
- # cc_pair_id = RedisConnector.get_id_from_task_id(task_id)
200
- # if cc_pair_id is not None:
201
- # RedisConnectorExternalGroupSync.remove_from_taskset(
202
- # int(cc_pair_id), task_id, r
203
- # )
204
- # return
178
+ if task_id .startswith (RedisConnectorDelete .PREFIX ):
179
+ cc_pair_id = RedisConnector .get_id_from_task_id (task_id )
180
+ if cc_pair_id is not None :
181
+ RedisConnectorDelete .remove_from_taskset (int (cc_pair_id ), task_id , r )
182
+ return
183
+
184
+ if task_id .startswith (RedisConnectorPrune .SUBTASK_PREFIX ):
185
+ cc_pair_id = RedisConnector .get_id_from_task_id (task_id )
186
+ if cc_pair_id is not None :
187
+ RedisConnectorPrune .remove_from_taskset (int (cc_pair_id ), task_id , r )
188
+ return
189
+
190
+ if task_id .startswith (RedisConnectorPermissionSync .SUBTASK_PREFIX ):
191
+ cc_pair_id = RedisConnector .get_id_from_task_id (task_id )
192
+ if cc_pair_id is not None :
193
+ RedisConnectorPermissionSync .remove_from_taskset (
194
+ int (cc_pair_id ), task_id , r
195
+ )
196
+ return
197
+
198
+ if task_id .startswith (RedisConnectorExternalGroupSync .SUBTASK_PREFIX ):
199
+ cc_pair_id = RedisConnector .get_id_from_task_id (task_id )
200
+ if cc_pair_id is not None :
201
+ RedisConnectorExternalGroupSync .remove_from_taskset (
202
+ int (cc_pair_id ), task_id , r
203
+ )
204
+ return
205
205
206
206
207
207
def on_celeryd_init (sender : str , conf : Any = None , ** kwargs : Any ) -> None :
0 commit comments