Skip to content

Commit 118ea90

Browse files
authored
Merge pull request #181 from dathere/refactor-upload-log-level
Refactor upload log level
2 parents e80462c + c21a0c7 commit 118ea90

File tree

4 files changed

+114
-141
lines changed

4 files changed

+114
-141
lines changed

ckanext/datapusher_plus/config.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,9 @@
2525
# QSV version requirements
2626
MINIMUM_QSV_VERSION = "4.0.0"
2727

28-
# Logging verbosity (0 = None, 1 = INFO, 2 = DEBUG)
29-
UPLOAD_LOG_VERBOSITY = tk.asint(
30-
tk.config.get("ckanext.datapusher_plus.upload_log_verbosity", 1)
31-
)
28+
# Logging level
29+
# DEBUG, INFO, WARNING, ERROR, CRITICAL
30+
UPLOAD_LOG_LEVEL = tk.config.get("ckanext.datapusher_plus.upload_log_level", "INFO")
3231

3332
# PII screening settings
3433
PII_SCREENING = tk.asbool(tk.config.get("ckanext.datastore_plus.pii_screening", False))

ckanext/datapusher_plus/config_declaration.yaml

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -178,9 +178,8 @@ groups:
178178
description: |
179179
Auto alias unique
180180
181-
- key: ckanext.datapusher_plus.upload_log_verbosity
182-
type: int
181+
- key: ckanext.datapusher_plus.upload_log_level
183182
editable: true
184-
default: 1
183+
default: INFO
185184
description: |
186-
Upload log verbosity (0 = None, 1 = INFO, 2 = DEBUG)
185+
Upload log level (DEBUG, INFO, WARNING, ERROR, CRITICAL)

ckanext/datapusher_plus/jobs.py

Lines changed: 55 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -316,21 +316,19 @@ def process_formulae(
316316
if not formula_fields:
317317
return
318318

319-
if conf.UPLOAD_LOG_VERBOSITY >= 1:
320-
self.logger.info(
321-
f"Found {len(formula_fields)} {entity_type.upper()} field/s with {formula_type} in the scheming_yaml"
322-
)
319+
self.logger.info(
320+
f"Found {len(formula_fields)} {entity_type.upper()} field/s with {formula_type} in the scheming_yaml"
321+
)
323322

324323
jinja2_formulae = {}
325324
for schema_field in formula_fields:
326325
field_name = schema_field["field_name"]
327326
template = schema_field[formula_type]
328327
jinja2_formulae[field_name] = template
329328

330-
if conf.UPLOAD_LOG_VERBOSITY >= 2:
331-
self.logger.debug(
332-
f'Jinja2 {formula_type} for {entity_type.upper()} field "{field_name}": {template}'
333-
)
329+
self.logger.debug(
330+
f'Jinja2 {formula_type} for {entity_type.upper()} field "{field_name}": {template}'
331+
)
334332

335333
context = {"package": self.package, "resource": self.resource_fields_stats}
336334
context.update(jinja2_formulae)
@@ -344,10 +342,9 @@ def process_formulae(
344342
rendered_formula = formula.render(**context)
345343
updates[field_name] = rendered_formula
346344

347-
if conf.UPLOAD_LOG_VERBOSITY >= 2:
348-
self.logger.debug(
349-
f'Evaluated jinja2 {formula_type} for {entity_type.upper()} field "{field_name}": {rendered_formula}'
350-
)
345+
self.logger.debug(
346+
f'Evaluated jinja2 {formula_type} for {entity_type.upper()} field "{field_name}": {rendered_formula}'
347+
)
351348
except Exception as e:
352349
self.logger.error(
353350
f'Error evaluating jinja2 {formula_type} for {entity_type.upper()} field "{field_name}": {str(e)}'
@@ -445,9 +442,15 @@ def _push_to_datastore(task_id, input, dry_run=False, temp_dir=None):
445442
handler = utils.StoringHandler(task_id, input)
446443
logger = logging.getLogger(task_id)
447444
logger.addHandler(handler)
445+
448446
# also show logs on stderr
449447
logger.addHandler(logging.StreamHandler())
450-
logger.setLevel(logging.DEBUG)
448+
log_level = getattr(logging, conf.UPLOAD_LOG_LEVEL.upper())
449+
450+
# set the log level to the config upload_log_level
451+
logger.setLevel(logging.INFO)
452+
logger.info(f"Setting log level to {logging.getLevelName(int(log_level))}")
453+
logger.setLevel(log_level)
451454

452455
# check if conf.QSV_BIN and conf.FILE_BIN exists
453456
# qsv_path = Path(conf.QSV_BIN)
@@ -478,8 +481,8 @@ def _push_to_datastore(task_id, input, dry_run=False, temp_dir=None):
478481
qsv_semver = qsv_version_info[
479482
qsv_version_info.find(" ") : qsv_version_info.find("-")
480483
].lstrip()
481-
if conf.UPLOAD_LOG_VERBOSITY >= 1:
482-
logger.info("qsv version found: {}".format(qsv_semver))
484+
485+
logger.info("qsv version found: {}".format(qsv_semver))
483486
try:
484487
if semver.compare(qsv_semver, conf.MINIMUM_QSV_VERSION) < 0:
485488
raise utils.JobError(
@@ -520,8 +523,7 @@ def _push_to_datastore(task_id, input, dry_run=False, temp_dir=None):
520523
timer_start = time.perf_counter()
521524

522525
# fetch the resource data
523-
if conf.UPLOAD_LOG_VERBOSITY >= 1:
524-
logger.info("Fetching from: {0}...".format(resource_url))
526+
logger.info("Fetching from: {0}...".format(resource_url))
525527
headers = {}
526528
if resource.get("url_type") == "upload":
527529
# If this is an uploaded file to CKAN, authenticate the request,
@@ -538,8 +540,7 @@ def _push_to_datastore(task_id, input, dry_run=False, temp_dir=None):
538540
scheme=rewrite_url.scheme, netloc=rewrite_url.netloc
539541
)
540542
resource_url = new_url.geturl()
541-
if conf.UPLOAD_LOG_VERBOSITY >= 1:
542-
logger.info("Rewritten resource url to: {0}".format(resource_url))
543+
logger.info("Rewritten resource url to: {0}".format(resource_url))
543544

544545
try:
545546
kwargs = {
@@ -600,11 +601,9 @@ def _push_to_datastore(task_id, input, dry_run=False, temp_dir=None):
600601

601602
# download the file
602603
if cl:
603-
if conf.UPLOAD_LOG_VERBOSITY >= 1:
604-
logger.info("Downloading {:.2MB} file...".format(DataSize(int(cl))))
604+
logger.info("Downloading {:.2MB} file...".format(DataSize(int(cl))))
605605
else:
606-
if conf.UPLOAD_LOG_VERBOSITY >= 1:
607-
logger.info("Downloading file of unknown size...")
606+
logger.info("Downloading file of unknown size...")
608607

609608
with open(tmp, "wb") as tmp_file:
610609
for chunk in response.iter_content(conf.CHUNK_SIZE):
@@ -667,6 +666,20 @@ def _push_to_datastore(task_id, input, dry_run=False, temp_dir=None):
667666
)
668667
)
669668

669+
# # Check if the file is a zip file
670+
# if resource_format.upper() == "ZIP":
671+
# logger.info("ZIP file detected...")
672+
# # get the zip file's metadata
673+
# # zip_metadata = zipfile.ZipFile(tmp, "r").infolist()
674+
# # logger.info("ZIP file metadata: {}".format(zip_metadata))
675+
676+
# # # get the zip file's contents
677+
# # zip_contents = zipfile.ZipFile(tmp, "r").namelist()
678+
# # logger.info("ZIP file contents: {}".format(zip_contents))
679+
680+
# extracted_metadata = dph.extract_zip_or_metadata(tmp, os.path.join(temp_dir, "zip_metadata.csv"))
681+
# logger.info("Extracted metadata: {}".format(extracted_metadata))
682+
670683
# ===================================================================================
671684
# ANALYZE WITH QSV
672685
# ===================================================================================
@@ -862,16 +875,12 @@ def _push_to_datastore(task_id, input, dry_run=False, temp_dir=None):
862875
qsv_input_csv = os.path.join(temp_dir, "qsv_input.csv")
863876
# if resource_format is CSV we don't need to normalize
864877
if resource_format.upper() == "CSV":
865-
if conf.UPLOAD_LOG_VERBOSITY >= 1:
866-
logger.info(
867-
"Normalizing/UTF-8 transcoding {}...".format(resource_format)
868-
)
878+
logger.info("Normalizing/UTF-8 transcoding {}...".format(resource_format))
869879
else:
870880
# if not CSV (e.g. TSV, TAB, etc.) we need to normalize to CSV
871-
if conf.UPLOAD_LOG_VERBOSITY >= 1:
872-
logger.info(
873-
"Normalizing/UTF-8 transcoding {} to CSV...".format(resource_format)
874-
)
881+
logger.info(
882+
"Normalizing/UTF-8 transcoding {} to CSV...".format(resource_format)
883+
)
875884

876885
qsv_input_utf_8_encoded_csv = os.path.join(
877886
temp_dir, "qsv_input_utf_8_encoded.csv"
@@ -884,10 +893,7 @@ def _push_to_datastore(task_id, input, dry_run=False, temp_dir=None):
884893
capture_output=True,
885894
text=True,
886895
)
887-
if conf.UPLOAD_LOG_VERBOSITY >= 1:
888-
logger.info(
889-
"Identified encoding of the file: {}".format(file_encoding.stdout)
890-
)
896+
logger.info("Identified encoding of the file: {}".format(file_encoding.stdout))
891897

892898
# trim the encoding string
893899
file_encoding.stdout = file_encoding.stdout.strip()
@@ -1258,10 +1264,9 @@ def _push_to_datastore(task_id, input, dry_run=False, temp_dir=None):
12581264
if type_override in list(conf.TYPE_MAPPING.values()):
12591265
h["type"] = type_override
12601266

1261-
if conf.UPLOAD_LOG_VERBOSITY >= 1:
1262-
logger.info(
1263-
"Determined headers and types: {headers}...".format(headers=headers_dicts)
1264-
)
1267+
logger.info(
1268+
"Determined headers and types: {headers}...".format(headers=headers_dicts)
1269+
)
12651270

12661271
# save stats to the datastore by loading qsv_stats_csv directly using COPY
12671272
stats_table = sql.Identifier(resource_id + "-druf-stats")
@@ -1318,13 +1323,14 @@ def _push_to_datastore(task_id, input, dry_run=False, temp_dir=None):
13181323
)
13191324

13201325
# Copy stats CSV to /tmp directory for debugging purposes
1321-
if conf.UPLOAD_LOG_VERBOSITY >= 2:
1326+
more_debug_info = logger.getEffectiveLevel() >= logging.DEBUG
1327+
if more_debug_info:
13221328
try:
13231329
debug_stats_path = os.path.join("/tmp", os.path.basename(qsv_stats_csv))
13241330
shutil.copy2(qsv_stats_csv, debug_stats_path)
1325-
logger.info(f"Copied stats CSV to {debug_stats_path} for debugging")
1331+
logger.debug(f"Copied stats CSV to {debug_stats_path} for debugging")
13261332
except Exception as e:
1327-
logger.warning(f"Failed to copy stats CSV to /tmp for debugging: {e}")
1333+
logger.debug(f"Failed to copy stats CSV to /tmp for debugging: {e}")
13281334

13291335
try:
13301336
with open(qsv_stats_csv, "r") as f:
@@ -1373,13 +1379,13 @@ def _push_to_datastore(task_id, input, dry_run=False, temp_dir=None):
13731379
)
13741380

13751381
# Copy frequency CSV to /tmp directory for debugging purposes
1376-
if conf.UPLOAD_LOG_VERBOSITY >= 2:
1382+
if more_debug_info:
13771383
try:
13781384
debug_freq_path = os.path.join("/tmp", os.path.basename(qsv_freq_csv))
13791385
shutil.copy2(qsv_freq_csv, debug_freq_path)
1380-
logger.info(f"Copied frequency CSV to {debug_freq_path} for debugging")
1386+
logger.debug(f"Copied frequency CSV to {debug_freq_path} for debugging")
13811387
except Exception as e:
1382-
logger.warning(f"Failed to copy frequency CSV to /tmp for debugging: {e}")
1388+
logger.debug(f"Failed to copy frequency CSV to /tmp for debugging: {e}")
13831389

13841390
# load the frequency table using COPY
13851391
copy_sql = sql.SQL("COPY {} FROM STDIN WITH (FORMAT CSV, HEADER TRUE)").format(
@@ -1833,8 +1839,7 @@ def _push_to_datastore(task_id, input, dry_run=False, temp_dir=None):
18331839
package_id = resource["package_id"]
18341840
scheming_yaml, package = get_scheming_yaml(package_id, scheming_yaml_type="dataset")
18351841

1836-
if conf.UPLOAD_LOG_VERBOSITY >= 2:
1837-
logger.debug(f"package: {package}")
1842+
logger.debug(f"package: {package}")
18381843

18391844
# Initialize the formula processor
18401845
formula_processor = FormulaProcessor(
@@ -1853,8 +1858,7 @@ def _push_to_datastore(task_id, input, dry_run=False, temp_dir=None):
18531858
package.update(package_updates)
18541859
try:
18551860
patched_package = patch_package(package)
1856-
if conf.UPLOAD_LOG_VERBOSITY >= 2:
1857-
logger.debug(f"Package after patching: {patched_package}")
1861+
logger.debug(f"Package after patching: {patched_package}")
18581862
package = patched_package
18591863
logger.info("PACKAGE formulae processed...")
18601864
except Exception as e:
@@ -1882,8 +1886,7 @@ def _push_to_datastore(task_id, input, dry_run=False, temp_dir=None):
18821886
revised_package = revise_package(
18831887
package_id, update={"dpp_suggestions": revise_update_content}
18841888
)
1885-
if conf.UPLOAD_LOG_VERBOSITY >= 2:
1886-
logger.debug(f"Package after revising: {revised_package}")
1889+
logger.debug(f"Package after revising: {revised_package}")
18871890
package = revised_package
18881891
logger.info("PACKAGE suggestion formulae processed...")
18891892
except Exception as e:
@@ -1912,8 +1915,7 @@ def _push_to_datastore(task_id, input, dry_run=False, temp_dir=None):
19121915
revised_package = revise_package(
19131916
package_id, update={"dpp_suggestions": revise_update_content}
19141917
)
1915-
if conf.UPLOAD_LOG_VERBOSITY >= 2:
1916-
logger.debug(f"Package after revising: {revised_package}")
1918+
logger.debug(f"Package after revising: {revised_package}")
19171919
package = revised_package
19181920
logger.info("RESOURCE suggestion formulae processed...")
19191921
except Exception as e:

0 commit comments

Comments
 (0)