Skip to content

Use pysimdjson for parsing wat records #49

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions json_extractor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@

class JsonExtractor:
"""Provides the best available JSON module"""

def __init__(self):
try:
import simdjson
self.json = simdjson.Parser()
self.parse = self.json.parse
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could write:

self.parse = lambda j: self.json.parse(j, True)

to force recursive parsing and avoid incompatibilities.

However, then one of the major performance benefits of the simdjson module fades away.

except ImportError:
import json
self.json = json
self.parse = self.json.loads
def parse(self, s, *args, **kwargs):
"""High-performance parsing alternate to `loads` creating only necessary python objects when simdjson is installed.
Falls back to `loads` otherwise."""
raise NotImplementedError("JSON parser not initialized")

4 changes: 3 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,6 @@ lxml
# to parse HTML (used in cc_index_word_count.py) using Resiliparse (https://pypi.org/project/Resiliparse/). Resiliparse requires compatible fastwarc version.
#Resiliparse
# (tested with)
#Resiliparse==0.14.1
#Resiliparse==0.14.1
# Performant json parser. Comment out if not available for your environment.
pysimdjson
7 changes: 6 additions & 1 deletion server_count.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import ujson as json

from json_extractor import JsonExtractor
from sparkcc import CCSparkJob


Expand All @@ -10,6 +11,10 @@ class ServerCountJob(CCSparkJob):
name = "CountServers"
fallback_server_name = '(no server in HTTP header)'

def iterate_records(self, _warc_uri, archive_iterator):
self.json_extractor = JsonExtractor()
return super().iterate_records(_warc_uri, archive_iterator)

def process_record(self, record):
# Notes:
# - HTTP headers may include multiple "Server" headers, often indicating
Expand All @@ -21,7 +26,7 @@ def process_record(self, record):

if self.is_wat_json_record(record):
# WAT (response) record
record = json.loads(self.get_payload_stream(record).read())
record = self.json_extractor.parse(self.get_payload_stream(record).read())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately, processing the JSON may raise an exception:

  File "/mnt/data/wastl/proj/cc/git/cc-pyspark/sparkcc.py", line 377, in iterate_records
    for res in self.process_record(record):
               ~~~~~~~~~~~~~~~~~~~^^^^^^^^
  File "/mnt/data/wastl/proj/cc/git/cc-pyspark/server_count.py", line 42, in process_record
    server_names.append(headers[header].strip())
                        ^^^^^^^^^^^^^^^^^^^^^
AttributeError: 'csimdjson.Array' object has no attribute 'strip'

Here a short snippet why this happens:

>>> import simdjson
>>> type(simdjson.Parser().parse('[1,2]'))
<class 'csimdjson.Array'>
>>> type(simdjson.Parser().parse('[1,2]', True))
<class 'list'>

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would need to add an extra check:
... or isinstance(headers[header], simdjson.Array)

But then it's no drop-in replacement anymore.

try:
payload = record['Envelope']['Payload-Metadata']
if 'HTTP-Response-Metadata' in payload:
Expand Down
8 changes: 7 additions & 1 deletion server_ip_address.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

from pyspark.sql.types import StructType, StructField, StringType, LongType

from json_extractor import JsonExtractor
from sparkcc import CCSparkJob


Expand All @@ -23,13 +24,18 @@ class ServerIPAddressJob(CCSparkJob):
response_no_ip_address = '(no IP address)'
response_no_host = '(no host name)'


def iterate_records(self, _warc_uri, archive_iterator):
self.json_extractor = JsonExtractor()
return super().iterate_records(_warc_uri, archive_iterator)

def process_record(self, record):
ip_address = None
url = None

if self.is_wat_json_record(record):
# WAT (response) record
record = json.loads(self.get_payload_stream(record).read())
record = self.json_extractor.parse(self.get_payload_stream(record).read())
try:
warc_header = record['Envelope']['WARC-Header-Metadata']
if warc_header['WARC-Type'] != 'response':
Expand Down
12 changes: 7 additions & 5 deletions wat_extract_links.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

from pyspark.sql.types import StructType, StructField, StringType

from json_extractor import JsonExtractor
from sparkcc import CCSparkJob


Expand Down Expand Up @@ -68,6 +69,7 @@ def _url_join(base, link):

def iterate_records(self, warc_uri, archive_iterator):
"""Iterate over all WARC records and process them"""
self.json_extractor = JsonExtractor()
self.processing_robotstxt_warc \
= ExtractLinksJob.robotstxt_warc_path_pattern.match(warc_uri)
for record in archive_iterator:
Expand All @@ -79,7 +81,7 @@ def process_record(self, record):
link_count = 0
if self.is_wat_json_record(record):
try:
wat_record = json.loads(self.get_payload_stream(record).read())
wat_record = self.json_extractor.parse(self.get_payload_stream(record).read())
except ValueError as e:
self.get_logger().error('Failed to load JSON: {}'.format(e))
self.records_failed.add(1)
Expand Down Expand Up @@ -188,9 +190,9 @@ def yield_links(self, src_url, base_url, links, url_attr, opt_attr=None):
has_links = False
for l in links:
link = None
if url_attr in l:
if url_attr is not None and url_attr in l:
link = l[url_attr]
elif opt_attr in l and ExtractLinksJob.url_abs_pattern.match(l[opt_attr]):
elif opt_attr is not None and opt_attr in l and ExtractLinksJob.url_abs_pattern.match(l[opt_attr]):
link = l[opt_attr]
else:
continue
Expand Down Expand Up @@ -422,9 +424,9 @@ def yield_links(self, src_url, base_url, links, url_attr, opt_attr=None,
for l in links:
if not l:
continue
if url_attr in l:
if url_attr is not None and url_attr in l:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good observation and thanks for testing this. Unfortunately, it's not the only incompatibility.

link = l[url_attr]
elif opt_attr in l and ExtractLinksJob.url_abs_pattern.match(l[opt_attr]):
elif opt_attr is not None and opt_attr in l and ExtractLinksJob.url_abs_pattern.match(l[opt_attr]):
link = l[opt_attr]
else:
continue
Expand Down