-
Notifications
You must be signed in to change notification settings - Fork 90
Use pysimdjson for parsing wat records #49
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,18 @@ | ||
|
||
class JsonExtractor: | ||
"""Provides the best available JSON module""" | ||
|
||
def __init__(self): | ||
try: | ||
import simdjson | ||
self.json = simdjson.Parser() | ||
self.parse = self.json.parse | ||
except ImportError: | ||
import json | ||
self.json = json | ||
self.parse = self.json.loads | ||
def parse(self, s, *args, **kwargs): | ||
"""High-performance parsing alternate to `loads` creating only necessary python objects when simdjson is installed. | ||
Falls back to `loads` otherwise.""" | ||
raise NotImplementedError("JSON parser not initialized") | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,5 +1,6 @@ | ||
import ujson as json | ||
|
||
from json_extractor import JsonExtractor | ||
from sparkcc import CCSparkJob | ||
|
||
|
||
|
@@ -10,6 +11,10 @@ class ServerCountJob(CCSparkJob): | |
name = "CountServers" | ||
fallback_server_name = '(no server in HTTP header)' | ||
|
||
def iterate_records(self, _warc_uri, archive_iterator): | ||
self.json_extractor = JsonExtractor() | ||
return super().iterate_records(_warc_uri, archive_iterator) | ||
|
||
def process_record(self, record): | ||
# Notes: | ||
# - HTTP headers may include multiple "Server" headers, often indicating | ||
|
@@ -21,7 +26,7 @@ def process_record(self, record): | |
|
||
if self.is_wat_json_record(record): | ||
# WAT (response) record | ||
record = json.loads(self.get_payload_stream(record).read()) | ||
record = self.json_extractor.parse(self.get_payload_stream(record).read()) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Unfortunately, processing the JSON may raise an exception:
Here a short snippet why this happens:
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Would need to add an extra check: But then it's no drop-in replacement anymore. |
||
try: | ||
payload = record['Envelope']['Payload-Metadata'] | ||
if 'HTTP-Response-Metadata' in payload: | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -8,6 +8,7 @@ | |
|
||
from pyspark.sql.types import StructType, StructField, StringType | ||
|
||
from json_extractor import JsonExtractor | ||
from sparkcc import CCSparkJob | ||
|
||
|
||
|
@@ -68,6 +69,7 @@ def _url_join(base, link): | |
|
||
def iterate_records(self, warc_uri, archive_iterator): | ||
"""Iterate over all WARC records and process them""" | ||
self.json_extractor = JsonExtractor() | ||
self.processing_robotstxt_warc \ | ||
= ExtractLinksJob.robotstxt_warc_path_pattern.match(warc_uri) | ||
for record in archive_iterator: | ||
|
@@ -79,7 +81,7 @@ def process_record(self, record): | |
link_count = 0 | ||
if self.is_wat_json_record(record): | ||
try: | ||
wat_record = json.loads(self.get_payload_stream(record).read()) | ||
wat_record = self.json_extractor.parse(self.get_payload_stream(record).read()) | ||
except ValueError as e: | ||
self.get_logger().error('Failed to load JSON: {}'.format(e)) | ||
self.records_failed.add(1) | ||
|
@@ -188,9 +190,9 @@ def yield_links(self, src_url, base_url, links, url_attr, opt_attr=None): | |
has_links = False | ||
for l in links: | ||
link = None | ||
if url_attr in l: | ||
if url_attr is not None and url_attr in l: | ||
link = l[url_attr] | ||
elif opt_attr in l and ExtractLinksJob.url_abs_pattern.match(l[opt_attr]): | ||
elif opt_attr is not None and opt_attr in l and ExtractLinksJob.url_abs_pattern.match(l[opt_attr]): | ||
link = l[opt_attr] | ||
else: | ||
continue | ||
|
@@ -422,9 +424,9 @@ def yield_links(self, src_url, base_url, links, url_attr, opt_attr=None, | |
for l in links: | ||
if not l: | ||
continue | ||
if url_attr in l: | ||
if url_attr is not None and url_attr in l: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good observation and thanks for testing this. Unfortunately, it's not the only incompatibility. |
||
link = l[url_attr] | ||
elif opt_attr in l and ExtractLinksJob.url_abs_pattern.match(l[opt_attr]): | ||
elif opt_attr is not None and opt_attr in l and ExtractLinksJob.url_abs_pattern.match(l[opt_attr]): | ||
link = l[opt_attr] | ||
else: | ||
continue | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could write:
to force recursive parsing and avoid incompatibilities.
However, then one of the major performance benefits of the simdjson module fades away.