diff --git a/ckanext/xloader/config_declaration.yaml b/ckanext/xloader/config_declaration.yaml index 4b529839..fafd905c 100644 --- a/ckanext/xloader/config_declaration.yaml +++ b/ckanext/xloader/config_declaration.yaml @@ -189,7 +189,7 @@ groups: required: false - key: ckanext.xloader.search_update_chunks default: 100000 - example: True + example: 1000 description: | The number of rows to process in each batch when populating the full-text search index. Chunked processing prevents database timeouts and memory @@ -198,3 +198,11 @@ groups: improve performance but may cause timeouts on very large tables. type: int required: false + - key: ckanext.xloader.max_retries + default: 1 + example: 3 + description: | + Maximum number of retry attempts for failed jobs due to temporary errors + like database deadlocks or network timeouts. Set to 0 to disable retries. + type: int + required: false diff --git a/ckanext/xloader/job_exceptions.py b/ckanext/xloader/job_exceptions.py index 587c94f5..88f8f2e3 100644 --- a/ckanext/xloader/job_exceptions.py +++ b/ckanext/xloader/job_exceptions.py @@ -52,3 +52,7 @@ def __str__(self): class LoaderError(JobError): '''Exception that's raised if a load fails''' pass + +class XLoaderTimeoutError(JobError): + """Custom timeout exception that can be retried""" + pass \ No newline at end of file diff --git a/ckanext/xloader/jobs.py b/ckanext/xloader/jobs.py index 3a678991..e3f48a68 100644 --- a/ckanext/xloader/jobs.py +++ b/ckanext/xloader/jobs.py @@ -22,7 +22,7 @@ from ckan.plugins.toolkit import get_action, asbool, enqueue_job, ObjectNotFound, config, h from . import db, loader -from .job_exceptions import JobError, HTTPError, DataTooBigError, FileCouldNotBeLoadedError +from .job_exceptions import JobError, HTTPError, DataTooBigError, FileCouldNotBeLoadedError, XLoaderTimeoutError from .utils import datastore_resource_exists, set_resource_metadata, modify_input_url @@ -41,11 +41,13 @@ CHUNK_SIZE = 16 * 1024 # 16kb DOWNLOAD_TIMEOUT = 30 -MAX_RETRIES = 1 +MAX_RETRIES = int(config.get('ckanext.xloader.max_retries', 1)) RETRYABLE_ERRORS = ( errors.DeadlockDetected, errors.LockNotAvailable, errors.ObjectInUse, + HTTPError, + XLoaderTimeoutError ) # Retries can only occur in cases where the datastore entry exists, # so use the standard timeout @@ -53,6 +55,38 @@ APITOKEN_HEADER_NAME = config.get('apitoken_header_name', 'Authorization') +def is_retryable_error(error): + """ + Determine if an error should trigger a retry attempt. + + Checks if the error is a temporary/transient condition that might + succeed on retry. Returns True for retryable HTTP status codes and + other temporary errors. + + Retryable HTTP status codes: + - 408 Request Timeout + - 429 Too Many Requests + - 500 Internal Server Error + - 502 Bad Gateway + - 503 Service Unavailable + - 504 Gateway Timeout + - 507 Insufficient Storage + - 522 Connection Timed Out (Cloudflare) + - 524 A Timeout Occurred (Cloudflare) + + :param error: Exception object to check + :type error: Exception + :return: True if error should be retried, False otherwise + :rtype: bool + """ + if isinstance(error, HTTPError): + retryable_status_codes = {408, 429, 500, 502, 503, 504, 507, 522, 524} + return error.status_code in retryable_status_codes + else: + return True + return False + + # input = { # 'api_key': user['apikey'], # 'job_type': 'xloader_to_datastore', @@ -112,36 +146,12 @@ def xloader_data_into_datastore(input): job_dict['error'] = str(e) log.error('xloader error: job_id %s already exists', job_id) errored = True - except JobError as e: - db.mark_job_as_errored(job_id, str(e)) - job_dict['status'] = 'error' - job_dict['error'] = str(e) - log.error('xloader error: %s, %s', e, traceback.format_exc()) - errored = True except Exception as e: - if isinstance(e, RETRYABLE_ERRORS): - tries = job_dict['metadata'].get('tries', 0) - if tries < MAX_RETRIES: - tries = tries + 1 - log.info("Job %s failed due to temporary error [%s], retrying", job_id, e) - logger.info("Job failed due to temporary error [%s], retrying", e) - job_dict['status'] = 'pending' - job_dict['metadata']['tries'] = tries - enqueue_job( - xloader_data_into_datastore, - [input], - title="retry xloader_data_into_datastore: resource: {} attempt {}".format( - job_dict['metadata']['resource_id'], tries), - rq_kwargs=dict(timeout=RETRIED_JOB_TIMEOUT) - ) - return None - - db.mark_job_as_errored( - job_id, traceback.format_tb(sys.exc_info()[2])[-1] + repr(e)) - job_dict['status'] = 'error' - job_dict['error'] = str(e) - log.error('xloader error: %s, %s', e, traceback.format_exc()) - errored = True + error_state = {'errored': errored} + retry = handle_retryable_error(e, input, job_id, job_dict, logger, error_state) + if retry: + return None + errored = error_state['errored'] finally: # job_dict is defined in xloader_hook's docstring is_saved_ok = callback_xloader_hook(result_url=input['result_url'], @@ -151,6 +161,54 @@ def xloader_data_into_datastore(input): return 'error' if errored else None +def handle_retryable_error(e, input, job_id, job_dict, logger, error_state): + """ + Handle retryable errors by attempting to retry the job or marking it as failed. + + Checks if the error is retryable (database deadlocks, HTTP timeouts, etc.) and + within the retry limit. If so, enqueues a new job attempt. Otherwise, marks + the job as errored. + + :param e: The exception that occurred + :type e: Exception + :param input: Job input data containing metadata and API key + :type input: dict + :param job_id: Unique identifier for the current job + :type job_id: str + :param job_dict: Job status dictionary with metadata and status + :type job_dict: dict + :param logger: Logger instance for the current job + :type logger: logging.Logger + :param error_state: Mutable dict to track error state {'errored': bool} + :type error_state: dict + + :returns: True if job was retried, None otherwise + :rtype: bool or None + """ + if isinstance(e, RETRYABLE_ERRORS) and is_retryable_error(e): + tries = job_dict['metadata'].get('tries', 0) + if tries < MAX_RETRIES: + tries = tries + 1 + log.info("Job %s failed due to temporary error [%s], retrying", job_id, e) + logger.info("Job failed due to temporary error [%s], retrying", e) + job_dict['status'] = 'pending' + job_dict['metadata']['tries'] = tries + enqueue_job( + xloader_data_into_datastore, + [input], + title="retry xloader_data_into_datastore: resource: {} attempt {}".format( + job_dict['metadata']['resource_id'], tries), + rq_kwargs=dict(timeout=RETRIED_JOB_TIMEOUT) + ) + return True + db.mark_job_as_errored( + job_id, traceback.format_tb(sys.exc_info()[2])[-1] + repr(e)) + job_dict['status'] = 'error' + job_dict['error'] = str(e) + log.error('xloader error: %s, %s', e, traceback.format_exc()) + error_state['errored'] = True + + def xloader_data_into_datastore_(input, job_dict, logger): '''This function: * downloads the resource (metadata) from CKAN @@ -380,7 +438,7 @@ def _download_resource_data(resource, data, api_key, logger): request_url=url, response=error) except requests.exceptions.Timeout: logger.warning('URL time out after %ss', DOWNLOAD_TIMEOUT) - raise JobError('Connection timed out after {}s'.format( + raise XLoaderTimeoutError('Connection timed out after {}s'.format( DOWNLOAD_TIMEOUT)) except requests.exceptions.RequestException as e: tmp_file.close() diff --git a/ckanext/xloader/tests/test_jobs.py b/ckanext/xloader/tests/test_jobs.py index f23e7821..a5866f3c 100644 --- a/ckanext/xloader/tests/test_jobs.py +++ b/ckanext/xloader/tests/test_jobs.py @@ -201,6 +201,91 @@ def test_data_with_rq_job_timeout(self, cli, data): # make sure that the tmp file has been closed/deleted in job timeout exception handling assert file_suffix not in f + @pytest.mark.parametrize("error_type,should_retry", [ + # Retryable errors from RETRYABLE_ERRORS + ("DeadlockDetected", True), + ("LockNotAvailable", True), + ("ObjectInUse", True), + ("XLoaderTimeoutError", True), + # Retryable HTTP errors (status codes from is_retryable_error) + ("HTTPError_408", True), + ("HTTPError_429", True), + ("HTTPError_500", True), + ("HTTPError_502", True), + ("HTTPError_503", True), + ("HTTPError_504", True), + ("HTTPError_507", True), + ("HTTPError_522", True), + ("HTTPError_524", True), + # Non-retryable HTTP errors + ("HTTPError_400", False), + ("HTTPError_404", False), + ("HTTPError_403", False), + # Other non-retryable errors (not in RETRYABLE_ERRORS) + ("ValueError", False), + ("TypeError", False), + ]) + def test_retry_behavior(self, cli, data, error_type, should_retry): + """Test retry behavior for different error types.""" + + def create_mock_error(error_type): + if error_type == "DeadlockDetected": + from psycopg2 import errors + return errors.DeadlockDetected() + elif error_type == "LockNotAvailable": + from psycopg2 import errors + return errors.LockNotAvailable() + elif error_type == "ObjectInUse": + from psycopg2 import errors + return errors.ObjectInUse() + elif error_type == "XLoaderTimeoutError": + return jobs.XLoaderTimeoutError('Connection timed out after 30s') + elif error_type.startswith("HTTPError_"): + status_code = int(error_type.split("_")[1]) + return jobs.HTTPError("HTTP Error", status_code=status_code, request_url="test", response=None) + elif error_type == "ValueError": + return ValueError("Test error") + elif error_type == "TypeError": + return TypeError("Test error") + + + def mock_download_with_error(*args, **kwargs): + if not hasattr(mock_download_with_error, 'call_count'): + mock_download_with_error.call_count = 0 + mock_download_with_error.call_count += 1 + + if mock_download_with_error.call_count == 1: + # First call - raise the test error + raise create_mock_error(error_type) + elif should_retry: + # Second call - return successful response only if retryable + import tempfile + tmp_file = tempfile.NamedTemporaryFile(mode='w+', delete=False, suffix='.csv') + tmp_file.write(_TEST_FILE_CONTENT) + tmp_file.flush() + return (tmp_file, 'd44fa65eda3675e11710682fdb5f1648') + else: + # Non-retryable errors should not get a second chance + raise create_mock_error(error_type) + + self.enqueue(jobs.xloader_data_into_datastore, [data]) + + with mock.patch("ckanext.xloader.jobs._download_resource_data", mock_download_with_error): + stdout = cli.invoke(ckan, ["jobs", "worker", "--burst"]).output + + if should_retry: + # Check that retry was attempted + assert "Job failed due to temporary error" in stdout + assert "retrying" in stdout + assert "Express Load completed" in stdout + # Verify resource was successfully loaded after retry + resource = helpers.call_action("resource_show", id=data["metadata"]["resource_id"]) + assert resource["datastore_contains_all_records_of_source_file"] + else: + # Check that job failed without retry - should have error messages + assert "xloader error:" in stdout or "error" in stdout.lower() + assert "Express Load completed" not in stdout + @pytest.mark.usefixtures("clean_db") class TestSetResourceMetadata(object):