Skip to content

Commit 5336748

Browse files
committed
thread safe approach to docprocessing logging
1 parent 9bc62cc commit 5336748

File tree

3 files changed

+18
-6
lines changed

3 files changed

+18
-6
lines changed

backend/onyx/background/celery/tasks/docprocessing/tasks.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -92,14 +92,14 @@
9292
from onyx.redis.redis_utils import is_fence
9393
from onyx.server.runtime.onyx_runtime import OnyxRuntime
9494
from onyx.utils.logger import setup_logger
95-
from onyx.utils.logger import TaskAttemptSingleton
9695
from onyx.utils.middleware import make_randomized_onyx_request_id
9796
from onyx.utils.telemetry import optional_telemetry
9897
from onyx.utils.telemetry import RecordType
9998
from shared_configs.configs import INDEXING_MODEL_SERVER_HOST
10099
from shared_configs.configs import INDEXING_MODEL_SERVER_PORT
101100
from shared_configs.configs import MULTI_TENANT
102101
from shared_configs.contextvars import CURRENT_TENANT_ID_CONTEXTVAR
102+
from shared_configs.contextvars import INDEX_ATTEMPT_INFO_CONTEXTVAR
103103

104104
logger = setup_logger()
105105

@@ -1085,9 +1085,9 @@ def _docprocessing_task(
10851085
) -> None:
10861086
start_time = time.monotonic()
10871087

1088-
# set the indexing attempt ID so that all log messages from this process
1089-
# will have it added as a prefix
1090-
TaskAttemptSingleton.set_cc_and_index_id(index_attempt_id, cc_pair_id)
1088+
# Cannot use the TaskSingleton approach here because the worker is multithreaded
1089+
INDEX_ATTEMPT_INFO_CONTEXTVAR.set((cc_pair_id, index_attempt_id))
1090+
task_logger
10911091
if tenant_id:
10921092
CURRENT_TENANT_ID_CONTEXTVAR.set(tenant_id)
10931093

backend/onyx/utils/logger.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
from shared_configs.configs import SLACK_CHANNEL_ID
1414
from shared_configs.configs import TENANT_ID_PREFIX
1515
from shared_configs.contextvars import CURRENT_TENANT_ID_CONTEXTVAR
16+
from shared_configs.contextvars import INDEX_ATTEMPT_INFO_CONTEXTVAR
1617
from shared_configs.contextvars import ONYX_REQUEST_ID_CONTEXTVAR
1718

1819

@@ -102,8 +103,14 @@ def process(
102103
msg = f"[Doc Permissions Sync: {doc_permission_sync_ctx_dict['request_id']}] {msg}"
103104
break
104105

105-
index_attempt_id = TaskAttemptSingleton.get_index_attempt_id()
106-
cc_pair_id = TaskAttemptSingleton.get_connector_credential_pair_id()
106+
index_attempt_info = INDEX_ATTEMPT_INFO_CONTEXTVAR.get()
107+
if index_attempt_info:
108+
cc_pair_id: int | None = index_attempt_info[0]
109+
index_attempt_id: int | None = index_attempt_info[1]
110+
111+
else:
112+
index_attempt_id = TaskAttemptSingleton.get_index_attempt_id()
113+
cc_pair_id = TaskAttemptSingleton.get_connector_credential_pair_id()
107114

108115
if index_attempt_id is not None:
109116
msg = f"[Index Attempt: {index_attempt_id}] {msg}"

backend/shared_configs/contextvars.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,11 @@
2121
"onyx_request_id", default=None
2222
)
2323

24+
# Used to store cc pair id and index attempt id in multithreaded environments
25+
INDEX_ATTEMPT_INFO_CONTEXTVAR: contextvars.ContextVar[tuple[int, int] | None] = (
26+
contextvars.ContextVar("index_attempt_info", default=None)
27+
)
28+
2429
"""Utils related to contextvars"""
2530

2631

0 commit comments

Comments
 (0)