100
100
from onyx .redis .redis_utils import is_fence
101
101
from onyx .server .runtime .onyx_runtime import OnyxRuntime
102
102
from onyx .utils .logger import setup_logger
103
+ from onyx .utils .memory_logger import log_memory_usage
103
104
from onyx .utils .middleware import make_randomized_onyx_request_id
104
105
from onyx .utils .telemetry import optional_telemetry
105
106
from onyx .utils .telemetry import RecordType
@@ -1279,6 +1280,10 @@ def _docprocessing_task(
1279
1280
f"batch_num={ batch_num } "
1280
1281
)
1281
1282
1283
+ log_memory_usage (
1284
+ f"docprocessing_task:start:batch_{ batch_num } :attempt_{ index_attempt_id } "
1285
+ )
1286
+
1282
1287
# Get the document batch storage
1283
1288
storage = get_document_batch_storage (cc_pair_id , index_attempt_id )
1284
1289
@@ -1297,7 +1302,16 @@ def _docprocessing_task(
1297
1302
per_batch_lock : RedisLock | None = None
1298
1303
try :
1299
1304
# Retrieve documents from storage
1305
+ log_memory_usage (
1306
+ f"docprocessing_task:before_load_batch_{ batch_num } :attempt_{ index_attempt_id } "
1307
+ )
1300
1308
documents = storage .get_batch (batch_num )
1309
+ log_memory_usage (
1310
+ f"docprocessing_task:after_load_batch_{ batch_num } :attempt_{ index_attempt_id } " ,
1311
+ documents ,
1312
+ f"batch_{ batch_num } _documents" ,
1313
+ )
1314
+
1301
1315
if not documents :
1302
1316
task_logger .error (f"No documents found for batch { batch_num } " )
1303
1317
return
@@ -1369,6 +1383,12 @@ def _docprocessing_task(
1369
1383
f"Processing { len (documents )} documents through indexing pipeline"
1370
1384
)
1371
1385
1386
+ log_memory_usage (
1387
+ f"docprocessing_task:before_indexing_pipeline:batch_{ batch_num } :attempt_{ index_attempt_id } " ,
1388
+ documents ,
1389
+ f"batch_{ batch_num } _documents_before_pipeline" ,
1390
+ )
1391
+
1372
1392
# real work happens here!
1373
1393
index_pipeline_result = run_indexing_pipeline (
1374
1394
embedder = embedding_model ,
@@ -1381,6 +1401,12 @@ def _docprocessing_task(
1381
1401
index_attempt_metadata = index_attempt_metadata ,
1382
1402
)
1383
1403
1404
+ log_memory_usage (
1405
+ f"docprocessing_task:after_indexing_pipeline:batch_{ batch_num } :attempt_{ index_attempt_id } " ,
1406
+ index_pipeline_result ,
1407
+ f"batch_{ batch_num } _pipeline_result" ,
1408
+ )
1409
+
1384
1410
# Update batch completion and document counts atomically using database coordination
1385
1411
1386
1412
with get_session_with_current_tenant () as db_session , cross_batch_db_lock :
@@ -1458,7 +1484,14 @@ def _docprocessing_task(
1458
1484
f"elapsed={ elapsed_time :.2f} s"
1459
1485
)
1460
1486
1487
+ log_memory_usage (
1488
+ f"docprocessing_task:completed:batch_{ batch_num } :attempt_{ index_attempt_id } "
1489
+ )
1490
+
1461
1491
except Exception :
1492
+ log_memory_usage (
1493
+ f"docprocessing_task:exception:batch_{ batch_num } :attempt_{ index_attempt_id } "
1494
+ )
1462
1495
task_logger .exception (
1463
1496
f"Document batch processing failed: "
1464
1497
f"batch_num={ batch_num } "
0 commit comments