diff --git a/ckanext/xloader/action.py b/ckanext/xloader/action.py index c0f3f84f..e92c2fde 100644 --- a/ckanext/xloader/action.py +++ b/ckanext/xloader/action.py @@ -53,6 +53,13 @@ def xloader_submit(context, data_dict): if errors: raise p.toolkit.ValidationError(errors) + p.toolkit.check_access('xloader_submit', context, data_dict) + + # If sync is set to True, the xloader callback will be executed right + # away, instead of a job being enqueued. It will also delete any existing jobs + # for the given resource. This is only controlled by sysadmins or the system. + sync = data_dict.pop('sync', False) + res_id = data_dict['resource_id'] try: resource_dict = p.toolkit.get_action('resource_show')(context, { @@ -166,15 +173,20 @@ def xloader_submit(context, data_dict): job = enqueue_job( jobs.xloader_data_into_datastore, [data], queue=custom_queue, title="xloader_submit: package: {} resource: {}".format(resource_dict.get('package_id'), res_id), - rq_kwargs=dict(timeout=timeout) + rq_kwargs=dict(timeout=timeout, at_front=sync) ) except Exception: - log.exception('Unable to enqueued xloader res_id=%s', res_id) + if sync: + log.exception('Unable to xloader res_id=%s', res_id) + else: + log.exception('Unable to enqueued xloader res_id=%s', res_id) return False log.debug('Enqueued xloader job=%s res_id=%s', job.id, res_id) - value = json.dumps({'job_id': job.id}) + if sync: + log.debug('Pushed xloader sync mode job=%s res_id=%s to front of queue', job.id, res_id) + task['value'] = value task['state'] = 'pending' task['last_updated'] = str(datetime.datetime.utcnow()) diff --git a/ckanext/xloader/config_declaration.yaml b/ckanext/xloader/config_declaration.yaml index 9487999d..6d6bf21b 100644 --- a/ckanext/xloader/config_declaration.yaml +++ b/ckanext/xloader/config_declaration.yaml @@ -141,8 +141,10 @@ groups: example: False description: | Resources are expected to have a Validation Schema, or use the default ones if not. + If this option is set to `False`, Resources that do not have a Validation Schema will be treated like they do not require Validation. + See https://github.com/frictionlessdata/ckanext-validation?tab=readme-ov-file#data-schema for more details. - key: ckanext.xloader.clean_datastore_tables diff --git a/ckanext/xloader/plugin.py b/ckanext/xloader/plugin.py index d916cc54..155f955a 100644 --- a/ckanext/xloader/plugin.py +++ b/ckanext/xloader/plugin.py @@ -7,6 +7,7 @@ from ckan.model.domain_object import DomainObjectOperation from ckan.model.resource import Resource +from ckan.model.package import Package from . import action, auth, helpers as xloader_helpers, utils from ckanext.xloader.utils import XLoaderFormats @@ -91,6 +92,7 @@ def receive_validation_report(self, validation_report): sync = toolkit.asbool(toolkit.config.get(u'ckanext.validation.run_on_update_async', True)) self._submit_to_xloader(res_dict, sync=sync) + # IDomainObjectModification def notify(self, entity, operation): @@ -183,7 +185,7 @@ def before_show(self, resource_dict): def after_update(self, context, resource_dict): self.after_resource_update(context, resource_dict) - def _submit_to_xloader(self, resource_dict): + def _submit_to_xloader(self, resource_dict, sync=False): context = {"ignore_auth": True, "defer_commit": True} resource_format = resource_dict.get("format") if not XLoaderFormats.is_it_an_xloader_format(resource_format): @@ -203,14 +205,20 @@ def _submit_to_xloader(self, resource_dict): return try: - log.debug( - "Submitting resource %s to be xloadered", resource_dict["id"] - ) + if sync: + log.debug( + "xloadering resource %s in sync mode", resource_dict["id"] + ) + else: + log.debug( + "Submitting resource %s to be xloadered", resource_dict["id"] + ) toolkit.get_action("xloader_submit")( context, { "resource_id": resource_dict["id"], "ignore_hash": self.ignore_hash, + "sync": sync, }, ) except toolkit.ValidationError as e: diff --git a/ckanext/xloader/schema.py b/ckanext/xloader/schema.py index c0e8d938..47ae65a5 100644 --- a/ckanext/xloader/schema.py +++ b/ckanext/xloader/schema.py @@ -16,6 +16,7 @@ boolean_validator = get_validator('boolean_validator') int_validator = get_validator('int_validator') OneOf = get_validator('OneOf') +ignore_not_sysadmin = get_validator('ignore_not_sysadmin') if p.toolkit.check_ckan_version('2.9'): unicode_safe = get_validator('unicode_safe') @@ -29,6 +30,7 @@ def xloader_submit_schema(): 'id': [ignore_missing], 'set_url_type': [ignore_missing, boolean_validator], 'ignore_hash': [ignore_missing, boolean_validator], + 'sync': [ignore_missing, boolean_validator, ignore_not_sysadmin], '__junk': [empty], '__before': [dsschema.rename('id', 'resource_id')] } diff --git a/ckanext/xloader/utils.py b/ckanext/xloader/utils.py index bcba510e..067649c5 100644 --- a/ckanext/xloader/utils.py +++ b/ckanext/xloader/utils.py @@ -2,6 +2,7 @@ import json import datetime +from rq import get_current_job from six import text_type as str, binary_type @@ -13,8 +14,6 @@ import ckan.plugins as p from ckan.plugins.toolkit import config, h, _ -from .job_exceptions import JobError - from logging import getLogger @@ -58,11 +57,14 @@ def requires_successful_validation_report(): def awaiting_validation(res_dict): + # type: (dict) -> bool """ Checks the existence of a logic action from the ckanext-validation plugin, thus supporting any extending of the Validation Plugin class. + Checks ckanext.xloader.validation.requires_successful_report config option value. + Checks ckanext.xloader.validation.enforce_schema config option value. Then checks the Resource's validation_status. """ @@ -273,7 +275,7 @@ def type_guess(rows, types=TYPES, strict=False): at_least_one_value = [] for ri, row in enumerate(rows): diff = len(row) - len(guesses) - for _ in range(diff): + for _i in range(diff): typesdict = {} for type in types: typesdict[type] = 0