Skip to content

Commit 1dbeb4c

Browse files
Actually make use of Error Monitor and add configuration option for it
1 parent 7bc85bb commit 1dbeb4c

File tree

8 files changed

+98
-43
lines changed

8 files changed

+98
-43
lines changed

connectors/config.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,13 @@ def _default_config():
8787
"preflight_idle": 30,
8888
"max_errors": 20,
8989
"max_errors_span": 600,
90+
"error_monitor": {
91+
"max_total_errors": 1000,
92+
"max_consecutive_errors": 10,
93+
"max_error_rate": 0.15,
94+
"error_window_size": 100,
95+
"error_queue_size": 10,
96+
},
9097
"max_concurrent_content_syncs": 1,
9198
"max_concurrent_access_control_syncs": 1,
9299
"max_file_download_size": DEFAULT_MAX_FILE_SIZE,

connectors/es/sink.py

Lines changed: 50 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,10 @@ def __init__(self, cause=None):
110110
self.__cause__ = cause
111111

112112

113+
class DocumentIngestionError(Exception):
114+
pass
115+
116+
113117
class Sink:
114118
"""Send bulk operations in batches by consuming a queue.
115119
@@ -136,6 +140,7 @@ def __init__(
136140
max_concurrency,
137141
max_retries,
138142
retry_interval,
143+
error_monitor,
139144
logger_=None,
140145
enable_bulk_operations_logging=False,
141146
):
@@ -145,6 +150,7 @@ def __init__(
145150
self.pipeline = pipeline
146151
self.chunk_mem_size = chunk_mem_size * 1024 * 1024
147152
self.bulk_tasks = ConcurrentTasks(max_concurrency=max_concurrency)
153+
self.error_monitor = error_monitor
148154
self.max_retires = max_retries
149155
self.retry_interval = retry_interval
150156
self.error = None
@@ -272,18 +278,19 @@ async def _process_bulk_response(self, res, ids_to_ops, do_log=False):
272278
successful_result = result in SUCCESSFUL_RESULTS
273279
if not successful_result:
274280
if "error" in item[action_item]:
281+
message = f"Failed to execute '{action_item}' on document with id '{doc_id}'. Error: {item[action_item].get('error')}"
282+
self.error_monitor.track_error(DocumentIngestionError(message))
275283
if do_log:
276-
self._logger.debug(
277-
f"Failed to execute '{action_item}' on document with id '{doc_id}'. Error: {item[action_item].get('error')}"
278-
)
284+
self._logger.debug(message)
279285
self.counters.increment(RESULT_ERROR, namespace=BULK_RESPONSES)
280286
else:
287+
message = f"Executed '{action_item}' on document with id '{doc_id}', but got non-successful result: {result}"
288+
self.error_monitor.track_error(DocumentIngestionError(message))
281289
if do_log:
282-
self._logger.debug(
283-
f"Executed '{action_item}' on document with id '{doc_id}', but got non-successful result: {result}"
284-
)
290+
self._logger.debug(message)
285291
self.counters.increment(RESULT_UNDEFINED, namespace=BULK_RESPONSES)
286292
else:
293+
self.error_monitor.track_success()
287294
if do_log:
288295
self._logger.debug(
289296
f"Successfully executed '{action_item}' on document with id '{doc_id}'. Result: {result}"
@@ -424,6 +431,7 @@ def __init__(
424431
client,
425432
queue,
426433
index,
434+
error_monitor,
427435
filter_=None,
428436
sync_rules_enabled=False,
429437
content_extraction_enabled=True,
@@ -449,27 +457,37 @@ def __init__(
449457
self.concurrent_downloads = concurrent_downloads
450458
self._logger = logger_ or logger
451459
self._canceled = False
460+
self.error_monitor = error_monitor
452461
self.skip_unchanged_documents = skip_unchanged_documents
453462

454463
async def _deferred_index(self, lazy_download, doc_id, doc, operation):
455-
data = await lazy_download(doit=True, timestamp=doc[TIMESTAMP_FIELD])
456-
457-
if data is not None:
458-
self.counters.increment(BIN_DOCS_DOWNLOADED)
459-
data.pop("_id", None)
460-
data.pop(TIMESTAMP_FIELD, None)
461-
doc.update(data)
462-
463-
doc.pop("_original_filename", None)
464-
465-
await self.put_doc(
466-
{
467-
"_op_type": operation,
468-
"_index": self.index,
469-
"_id": doc_id,
470-
"doc": doc,
471-
}
472-
)
464+
try:
465+
data = await lazy_download(doit=True, timestamp=doc[TIMESTAMP_FIELD])
466+
467+
if data is not None:
468+
self.counters.increment(BIN_DOCS_DOWNLOADED)
469+
data.pop("_id", None)
470+
data.pop(TIMESTAMP_FIELD, None)
471+
doc.update(data)
472+
473+
doc.pop("_original_filename", None)
474+
475+
await self.put_doc(
476+
{
477+
"_op_type": operation,
478+
"_index": self.index,
479+
"_id": doc_id,
480+
"doc": doc,
481+
}
482+
)
483+
self.error_monitor.track_success()
484+
except ForceCanceledError:
485+
raise
486+
except Exception as ex:
487+
self._logger.error(
488+
f"Failed to do deferred index operation for doc {doc_id}: {ex}"
489+
)
490+
self.error_monitor.track_error(ex)
473491

474492
def force_cancel(self):
475493
self._canceled = True
@@ -599,6 +617,8 @@ async def get_docs(self, generator, skip_unchanged_documents=False):
599617
}
600618
)
601619

620+
lazy_downloads.raise_any_exception()
621+
602622
await asyncio.sleep(0)
603623
finally:
604624
# wait for all downloads to be finished
@@ -803,7 +823,8 @@ class SyncOrchestrator:
803823
- once they are both over, returns totals
804824
"""
805825

806-
def __init__(self, elastic_config, logger_=None):
826+
def __init__(self, elastic_config, error_monitor, logger_=None):
827+
self.error_monitor = error_monitor
807828
self._logger = logger_ or logger
808829
self._logger.debug(f"SyncOrchestrator connecting to {elastic_config['host']}")
809830
self.es_management_client = ESManagementClient(elastic_config)
@@ -884,7 +905,7 @@ def _extractor_task_running(self):
884905
async def cancel(self):
885906
if self._sink_task_running():
886907
self._logger.info(
887-
f"Cancling the Sink task: {self._sink_task.name}" # pyright: ignore
908+
f"Canceling the Sink task: {self._sink_task.get_name()}" # pyright: ignore
888909
)
889910
self._sink_task.cancel()
890911
else:
@@ -894,7 +915,7 @@ async def cancel(self):
894915

895916
if self._extractor_task_running():
896917
self._logger.info(
897-
f"Canceling the Extractor task: {self._extractor_task.name}" # pyright: ignore
918+
f"Canceling the Extractor task: {self._extractor_task.get_name()}" # pyright: ignore
898919
)
899920
self._extractor_task.cancel()
900921
else:
@@ -1005,6 +1026,7 @@ async def async_bulk(
10051026
self.es_management_client,
10061027
stream,
10071028
index,
1029+
error_monitor=self.error_monitor,
10081030
filter_=filter_,
10091031
sync_rules_enabled=sync_rules_enabled,
10101032
content_extraction_enabled=content_extraction_enabled,
@@ -1031,6 +1053,7 @@ async def async_bulk(
10311053
max_concurrency=max_concurrency,
10321054
max_retries=max_bulk_retries,
10331055
retry_interval=retry_interval,
1056+
error_monitor=self.error_monitor,
10341057
logger_=self._logger,
10351058
enable_bulk_operations_logging=enable_bulk_operations_logging,
10361059
)

connectors/source.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@
3232
from connectors.logger import logger
3333
from connectors.utils import (
3434
TIKA_SUPPORTED_FILETYPES,
35-
ErrorMonitor,
3635
convert_to_b64,
3736
epoch_timestamp_zulu,
3837
get_file_extension,
@@ -410,7 +409,7 @@ def __init__(self, configuration):
410409

411410
# this will be overwritten by set_framework_config()
412411
self.framework_config = DataSourceFrameworkConfig.Builder().build()
413-
self.error_monitor = ErrorMonitor()
412+
self.error_monitor = None
414413

415414
def __str__(self):
416415
return f"Datasource `{self.__class__.name}`"
@@ -419,6 +418,9 @@ def set_logger(self, logger_):
419418
self._logger = logger_
420419
self._set_internal_logger()
421420

421+
def set_error_monitor(self, error_monitor):
422+
self.error_monitor = error_monitor
423+
422424
def _set_internal_logger(self):
423425
# no op for BaseDataSource
424426
# if there are internal class (e.g. Client class) to which the logger need to be set,
@@ -778,12 +780,14 @@ async def download_and_extract_file(
778780
doc = await self.handle_file_content_extraction(
779781
doc, source_filename, temp_filename
780782
)
783+
self.error_monitor.track_success()
781784
return doc
782785
except Exception as e:
783786
self._logger.warning(
784787
f"File download and extraction or conversion for file {source_filename} failed: {e}",
785788
exc_info=True,
786789
)
790+
self.error_monitor.track_error(e)
787791
if return_doc_if_failed:
788792
return doc
789793
else:

connectors/sources/mongo.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -240,8 +240,9 @@ def _serialize(value):
240240
value = _serialize(value.as_doc().to_dict())
241241
return value
242242

243-
for key, value in doc.items():
244-
doc[key] = _serialize(value)
243+
with self.with_error_monitoring():
244+
for key, value in doc.items():
245+
doc[key] = _serialize(value)
245246

246247
return doc
247248

connectors/sync_job_runner.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@
3535
INDEXED_DOCUMENT_VOLUME,
3636
)
3737
from connectors.source import BaseDataSource
38-
from connectors.utils import truncate_id
38+
from connectors.utils import ErrorMonitor, truncate_id
3939

4040
UTF_8 = "utf-8"
4141

@@ -113,6 +113,8 @@ def __init__(
113113
self.es_config = es_config
114114
self.service_config = service_config
115115
self.sync_orchestrator = None
116+
error_monitor_config = service_config.get("error_monitor", {})
117+
self.error_monitor = ErrorMonitor(**error_monitor_config)
116118
self.job_reporting_task = None
117119
self.bulk_options = self.es_config.get("bulk", {})
118120
self._start_time = None
@@ -149,6 +151,7 @@ async def execute(self):
149151
configuration=self.sync_job.configuration
150152
)
151153
self.data_provider.set_logger(self.sync_job.logger)
154+
self.data_provider.set_error_monitor(self.error_monitor)
152155
self.data_provider.set_framework_config(
153156
self._data_source_framework_config()
154157
)
@@ -183,7 +186,7 @@ async def execute(self):
183186
await self._update_native_connector_authentication()
184187

185188
self.sync_orchestrator = SyncOrchestrator(
186-
self.es_config, self.sync_job.logger
189+
self.es_config, self.error_monitor, self.sync_job.logger
187190
)
188191

189192
if job_type in [JobType.INCREMENTAL, JobType.FULL]:

connectors/utils.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1009,7 +1009,7 @@ def __init__(
10091009
max_consecutive_errors=10,
10101010
max_error_rate=0.15,
10111011
error_window_size=100,
1012-
error_queue_size=20,
1012+
error_queue_size=10,
10131013
):
10141014
self.max_error_rate = max_error_rate
10151015
self.error_window_size = error_window_size
@@ -1082,13 +1082,13 @@ def _error_window_error_rate(self):
10821082

10831083
def _raise_if_necessary(self):
10841084
if self.consecutive_error_count > self.max_consecutive_errors:
1085-
msg = f"Exceeded maximum consecutive errors - saw {self.consecutive_error_count} errors in a row"
1085+
msg = f"Exceeded maximum consecutive errors - saw {self.consecutive_error_count} errors in a row. Last error: {self.last_error}"
10861086
raise TooManyErrors(msg) from self.last_error
10871087
elif self.total_error_count > self.max_total_errors:
1088-
msg = f"Exceeded maximum total error count - saw {self.total_error_count} errors"
1088+
msg = f"Exceeded maximum total error count - saw {self.total_error_count} errors. Last error: {self.last_error}"
10891089
raise TooManyErrors(msg) from self.last_error
10901090
elif self.error_window_size > 0:
10911091
error_rate = self._error_window_error_rate()
10921092
if error_rate > self.max_error_rate:
1093-
msg = f"Exceeded maximum error ratio of {self.max_error_rate} for last {self.error_window_size} operations."
1093+
msg = f"Exceeded maximum error ratio of {self.max_error_rate} for last {self.error_window_size} operations. Last error: {self.last_error}"
10941094
raise TooManyErrors(msg) from self.last_error

tests/sources/support.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
# you may not use this file except in compliance with the Elastic License 2.0.
55
#
66
from contextlib import asynccontextmanager
7+
from unittest.mock import Mock
78

89
from connectors.source import DEFAULT_CONFIGURATION, DataSourceConfiguration
910

@@ -18,6 +19,7 @@ async def create_source(klass, **extras):
1819
config[k] = DEFAULT_CONFIGURATION.copy() | {"value": v}
1920

2021
source = klass(configuration=DataSourceConfiguration(config))
22+
source.set_error_monitor(Mock())
2123
try:
2224
yield source
2325
finally:

0 commit comments

Comments
 (0)