diff --git a/json_extractor.py b/json_extractor.py new file mode 100644 index 0000000..950d4db --- /dev/null +++ b/json_extractor.py @@ -0,0 +1,18 @@ + +class JsonExtractor: + """Provides the best available JSON module""" + + def __init__(self): + try: + import simdjson + self.json = simdjson.Parser() + self.parse = self.json.parse + except ImportError: + import json + self.json = json + self.parse = self.json.loads + def parse(self, s, *args, **kwargs): + """High-performance parsing alternate to `loads` creating only necessary python objects when simdjson is installed. + Falls back to `loads` otherwise.""" + raise NotImplementedError("JSON parser not initialized") + diff --git a/requirements.txt b/requirements.txt index 091743d..d7434b9 100644 --- a/requirements.txt +++ b/requirements.txt @@ -22,4 +22,6 @@ lxml # to parse HTML (used in cc_index_word_count.py) using Resiliparse (https://pypi.org/project/Resiliparse/). Resiliparse requires compatible fastwarc version. #Resiliparse # (tested with) -#Resiliparse==0.14.1 \ No newline at end of file +#Resiliparse==0.14.1 +# Performant json parser. Comment out if not available for your environment. +pysimdjson \ No newline at end of file diff --git a/server_count.py b/server_count.py index 8901817..995deed 100644 --- a/server_count.py +++ b/server_count.py @@ -1,5 +1,6 @@ import ujson as json +from json_extractor import JsonExtractor from sparkcc import CCSparkJob @@ -10,6 +11,10 @@ class ServerCountJob(CCSparkJob): name = "CountServers" fallback_server_name = '(no server in HTTP header)' + def iterate_records(self, _warc_uri, archive_iterator): + self.json_extractor = JsonExtractor() + return super().iterate_records(_warc_uri, archive_iterator) + def process_record(self, record): # Notes: # - HTTP headers may include multiple "Server" headers, often indicating @@ -21,7 +26,7 @@ def process_record(self, record): if self.is_wat_json_record(record): # WAT (response) record - record = json.loads(self.get_payload_stream(record).read()) + record = self.json_extractor.parse(self.get_payload_stream(record).read()) try: payload = record['Envelope']['Payload-Metadata'] if 'HTTP-Response-Metadata' in payload: diff --git a/server_ip_address.py b/server_ip_address.py index bf3f30c..91de004 100644 --- a/server_ip_address.py +++ b/server_ip_address.py @@ -4,6 +4,7 @@ from pyspark.sql.types import StructType, StructField, StringType, LongType +from json_extractor import JsonExtractor from sparkcc import CCSparkJob @@ -23,13 +24,18 @@ class ServerIPAddressJob(CCSparkJob): response_no_ip_address = '(no IP address)' response_no_host = '(no host name)' + + def iterate_records(self, _warc_uri, archive_iterator): + self.json_extractor = JsonExtractor() + return super().iterate_records(_warc_uri, archive_iterator) + def process_record(self, record): ip_address = None url = None if self.is_wat_json_record(record): # WAT (response) record - record = json.loads(self.get_payload_stream(record).read()) + record = self.json_extractor.parse(self.get_payload_stream(record).read()) try: warc_header = record['Envelope']['WARC-Header-Metadata'] if warc_header['WARC-Type'] != 'response': diff --git a/wat_extract_links.py b/wat_extract_links.py index 46c18b8..184e4db 100644 --- a/wat_extract_links.py +++ b/wat_extract_links.py @@ -8,6 +8,7 @@ from pyspark.sql.types import StructType, StructField, StringType +from json_extractor import JsonExtractor from sparkcc import CCSparkJob @@ -68,6 +69,7 @@ def _url_join(base, link): def iterate_records(self, warc_uri, archive_iterator): """Iterate over all WARC records and process them""" + self.json_extractor = JsonExtractor() self.processing_robotstxt_warc \ = ExtractLinksJob.robotstxt_warc_path_pattern.match(warc_uri) for record in archive_iterator: @@ -79,7 +81,7 @@ def process_record(self, record): link_count = 0 if self.is_wat_json_record(record): try: - wat_record = json.loads(self.get_payload_stream(record).read()) + wat_record = self.json_extractor.parse(self.get_payload_stream(record).read()) except ValueError as e: self.get_logger().error('Failed to load JSON: {}'.format(e)) self.records_failed.add(1) @@ -188,9 +190,9 @@ def yield_links(self, src_url, base_url, links, url_attr, opt_attr=None): has_links = False for l in links: link = None - if url_attr in l: + if url_attr is not None and url_attr in l: link = l[url_attr] - elif opt_attr in l and ExtractLinksJob.url_abs_pattern.match(l[opt_attr]): + elif opt_attr is not None and opt_attr in l and ExtractLinksJob.url_abs_pattern.match(l[opt_attr]): link = l[opt_attr] else: continue @@ -422,9 +424,9 @@ def yield_links(self, src_url, base_url, links, url_attr, opt_attr=None, for l in links: if not l: continue - if url_attr in l: + if url_attr is not None and url_attr in l: link = l[url_attr] - elif opt_attr in l and ExtractLinksJob.url_abs_pattern.match(l[opt_attr]): + elif opt_attr is not None and opt_attr in l and ExtractLinksJob.url_abs_pattern.match(l[opt_attr]): link = l[opt_attr] else: continue