Skip to content

Commit 137e3c8

Browse files
authored
feat: Implement Scrapy HTTP cache backend (#403)
I successfully use this code in my project. In 381c044 I explicitly specify licensing. I didn't add docs or anything else (yet), just made sure the code passes linters and a type check. Relates: apify/actor-templates#303
1 parent 3c7de28 commit 137e3c8

File tree

8 files changed

+299
-1
lines changed

8 files changed

+299
-1
lines changed

.github/workflows/run_code_checks.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,5 +26,5 @@ jobs:
2626
integration_tests:
2727
name: Integration tests
2828
needs: [lint_check, type_check, unit_tests]
29-
uses: apify/workflows/.github/workflows/python_integration_tests.yaml@main
29+
uses: apify/workflows/.github/workflows/python_integration_tests.yaml@fix-integration-tests-from-forks
3030
secrets: inherit

docs/02_guides/05_scrapy.mdx

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ The Apify SDK provides several custom components to support integration with the
4040
- [`apify.scrapy.ApifyScheduler`](https://docs.apify.com/sdk/python/reference/class/ApifyScheduler) - Replaces Scrapy's default [scheduler](https://docs.scrapy.org/en/latest/topics/scheduler.html) with one that uses Apify's [request queue](https://docs.apify.com/platform/storage/request-queue) for storing requests. It manages enqueuing, dequeuing, and maintaining the state and priority of requests.
4141
- [`apify.scrapy.ActorDatasetPushPipeline`](https://docs.apify.com/sdk/python/reference/class/ActorDatasetPushPipeline) - A Scrapy [item pipeline](https://docs.scrapy.org/en/latest/topics/item-pipeline.html) that pushes scraped items to Apify's [dataset](https://docs.apify.com/platform/storage/dataset). When enabled, every item produced by the spider is sent to the dataset.
4242
- [`apify.scrapy.ApifyHttpProxyMiddleware`](https://docs.apify.com/sdk/python/reference/class/ApifyHttpProxyMiddleware) - A Scrapy [middleware](https://docs.scrapy.org/en/latest/topics/downloader-middleware.html) that manages proxy configurations. This middleware replaces Scrapy's default `HttpProxyMiddleware` to facilitate the use of Apify's proxy service.
43+
- [`apify.scrapy.extensions.ApifyCacheStorage`](https://docs.apify.com/sdk/python/reference/class/ApifyCacheStorage) - A storage backend for Scrapy's built-in [HTTP cache middleware](https://docs.scrapy.org/en/latest/topics/downloader-middleware.html#module-scrapy.downloadermiddlewares.httpcache). This backend uses Apify's [key-value store](https://docs.apify.com/platform/storage/key-value-store). Make sure to set `HTTPCACHE_ENABLED` and `HTTPCACHE_EXPIRATION_SECS` in your settings, or caching won't work.
4344

4445
Additional helper functions in the [`apify.scrapy`](https://github.com/apify/apify-sdk-python/tree/master/src/apify/scrapy) subpackage include:
4546

@@ -94,6 +95,12 @@ The following example demonstrates a Scrapy Actor that scrapes page titles and e
9495
</TabItem>
9596
</Tabs>
9697

98+
## Dealing with ‘imminent migration to another host’
99+
100+
Under some circumstances, the platform may decide to [migrate your Actor](https://docs.apify.com/academy/expert-scraping-with-apify/migrations-maintaining-state) from one piece of infrastructure to another while it's in progress. While [Crawlee](https://crawlee.dev/python)-based projects can pause and resume their work after a restart, achieving the same with a Scrapy-based project can be challenging.
101+
102+
As a workaround for this issue (tracked as [apify/actor-templates#303](https://github.com/apify/actor-templates/issues/303)), turn on caching with `HTTPCACHE_ENABLED` and set `HTTPCACHE_EXPIRATION_SECS` to at least a few minutes—the exact value depends on your use case. If your Actor gets migrated and restarted, the subsequent run will hit the cache, making it fast and avoiding unnecessary resource consumption.
103+
97104
## Conclusion
98105

99106
In this guide you learned how to use Scrapy in Apify Actors. You can now start building your own web scraping projects using Scrapy, the Apify SDK and host them on the Apify platform. See the [Actor templates](https://apify.com/templates/categories/python) to get started with your own scraping tasks. If you have questions or need assistance, feel free to reach out on our [GitHub](https://github.com/apify/apify-sdk-python) or join our [Discord community](https://discord.com/invite/jyEM2PRvMU). Happy scraping!

docs/02_guides/code/scrapy_project/src/settings.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,3 +7,5 @@
77
TELNETCONSOLE_ENABLED = False
88
# Do not change the Twisted reactor unless you really know what you are doing.
99
TWISTED_REACTOR = 'twisted.internet.asyncioreactor.AsyncioSelectorReactor'
10+
HTTPCACHE_ENABLED = True
11+
HTTPCACHE_EXPIRATION_SECS = 7200
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
from apify.scrapy.extensions._httpcache import ApifyCacheStorage
2+
3+
__all__ = ['ApifyCacheStorage']
Lines changed: 212 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,212 @@
1+
from __future__ import annotations
2+
3+
import gzip
4+
import io
5+
import pickle
6+
import re
7+
import struct
8+
from logging import getLogger
9+
from time import time
10+
from typing import TYPE_CHECKING
11+
12+
from scrapy.http.headers import Headers
13+
from scrapy.responsetypes import responsetypes
14+
15+
from apify import Configuration
16+
from apify.apify_storage_client import ApifyStorageClient
17+
from apify.scrapy._async_thread import AsyncThread
18+
from apify.storages import KeyValueStore
19+
20+
if TYPE_CHECKING:
21+
from scrapy import Request, Spider
22+
from scrapy.http.response import Response
23+
from scrapy.settings import BaseSettings
24+
from scrapy.utils.request import RequestFingerprinterProtocol
25+
26+
logger = getLogger(__name__)
27+
28+
29+
class ApifyCacheStorage:
30+
"""A Scrapy cache storage that uses the Apify `KeyValueStore` to store responses.
31+
32+
It can be set as a storage for Scrapy's built-in `HttpCacheMiddleware`, which caches
33+
responses to requests. See HTTPCache middleware settings (prefixed with `HTTPCACHE_`)
34+
in the Scrapy documentation for more information. Requires the asyncio Twisted reactor
35+
to be installed.
36+
"""
37+
38+
def __init__(self, settings: BaseSettings) -> None:
39+
self._expiration_max_items = 100
40+
self._expiration_secs: int = settings.getint('HTTPCACHE_EXPIRATION_SECS')
41+
self._spider: Spider | None = None
42+
self._kvs: KeyValueStore | None = None
43+
self._fingerprinter: RequestFingerprinterProtocol | None = None
44+
self._async_thread: AsyncThread | None = None
45+
46+
def open_spider(self, spider: Spider) -> None:
47+
"""Open the cache storage for a spider."""
48+
logger.debug('Using Apify key value cache storage', extra={'spider': spider})
49+
self._spider = spider
50+
self._fingerprinter = spider.crawler.request_fingerprinter
51+
kvs_name = get_kvs_name(spider.name)
52+
53+
async def open_kvs() -> KeyValueStore:
54+
config = Configuration.get_global_configuration()
55+
if config.is_at_home:
56+
storage_client = ApifyStorageClient.from_config(config)
57+
return await KeyValueStore.open(name=kvs_name, storage_client=storage_client)
58+
return await KeyValueStore.open(name=kvs_name)
59+
60+
logger.debug("Starting background thread for cache storage's event loop")
61+
self._async_thread = AsyncThread()
62+
logger.debug(f"Opening cache storage's {kvs_name!r} key value store")
63+
self._kvs = self._async_thread.run_coro(open_kvs())
64+
65+
def close_spider(self, _: Spider, current_time: int | None = None) -> None:
66+
"""Close the cache storage for a spider."""
67+
if self._async_thread is None:
68+
raise ValueError('Async thread not initialized')
69+
70+
logger.info(f'Cleaning up cache items (max {self._expiration_max_items})')
71+
if self._expiration_secs > 0:
72+
if current_time is None:
73+
current_time = int(time())
74+
75+
async def expire_kvs() -> None:
76+
if self._kvs is None:
77+
raise ValueError('Key value store not initialized')
78+
i = 0
79+
async for item in self._kvs.iterate_keys():
80+
value = await self._kvs.get_value(item.key)
81+
try:
82+
gzip_time = read_gzip_time(value)
83+
except Exception as e:
84+
logger.warning(f'Malformed cache item {item.key}: {e}')
85+
await self._kvs.set_value(item.key, None)
86+
else:
87+
if self._expiration_secs < current_time - gzip_time:
88+
logger.debug(f'Expired cache item {item.key}')
89+
await self._kvs.set_value(item.key, None)
90+
else:
91+
logger.debug(f'Valid cache item {item.key}')
92+
if i == self._expiration_max_items:
93+
break
94+
i += 1
95+
96+
self._async_thread.run_coro(expire_kvs())
97+
98+
logger.debug('Closing cache storage')
99+
try:
100+
self._async_thread.close()
101+
except KeyboardInterrupt:
102+
logger.warning('Shutdown interrupted by KeyboardInterrupt!')
103+
except Exception:
104+
logger.exception('Exception occurred while shutting down cache storage')
105+
finally:
106+
logger.debug('Cache storage closed')
107+
108+
def retrieve_response(self, _: Spider, request: Request, current_time: int | None = None) -> Response | None:
109+
"""Retrieve a response from the cache storage."""
110+
if self._async_thread is None:
111+
raise ValueError('Async thread not initialized')
112+
if self._kvs is None:
113+
raise ValueError('Key value store not initialized')
114+
if self._fingerprinter is None:
115+
raise ValueError('Request fingerprinter not initialized')
116+
117+
key = self._fingerprinter.fingerprint(request).hex()
118+
value = self._async_thread.run_coro(self._kvs.get_value(key))
119+
120+
if value is None:
121+
logger.debug('Cache miss', extra={'request': request})
122+
return None
123+
124+
if current_time is None:
125+
current_time = int(time())
126+
if 0 < self._expiration_secs < current_time - read_gzip_time(value):
127+
logger.debug('Cache expired', extra={'request': request})
128+
return None
129+
130+
data = from_gzip(value)
131+
url = data['url']
132+
status = data['status']
133+
headers = Headers(data['headers'])
134+
body = data['body']
135+
respcls = responsetypes.from_args(headers=headers, url=url, body=body)
136+
137+
logger.debug('Cache hit', extra={'request': request})
138+
return respcls(url=url, headers=headers, status=status, body=body)
139+
140+
def store_response(self, _: Spider, request: Request, response: Response) -> None:
141+
"""Store a response in the cache storage."""
142+
if self._async_thread is None:
143+
raise ValueError('Async thread not initialized')
144+
if self._kvs is None:
145+
raise ValueError('Key value store not initialized')
146+
if self._fingerprinter is None:
147+
raise ValueError('Request fingerprinter not initialized')
148+
149+
key = self._fingerprinter.fingerprint(request).hex()
150+
data = {
151+
'status': response.status,
152+
'url': response.url,
153+
'headers': dict(response.headers),
154+
'body': response.body,
155+
}
156+
value = to_gzip(data)
157+
self._async_thread.run_coro(self._kvs.set_value(key, value))
158+
159+
160+
def to_gzip(data: dict, mtime: int | None = None) -> bytes:
161+
"""Dump a dictionary to a gzip-compressed byte stream."""
162+
with io.BytesIO() as byte_stream:
163+
with gzip.GzipFile(fileobj=byte_stream, mode='wb', mtime=mtime) as gzip_file:
164+
pickle.dump(data, gzip_file, protocol=4)
165+
return byte_stream.getvalue()
166+
167+
168+
def from_gzip(gzip_bytes: bytes) -> dict:
169+
"""Load a dictionary from a gzip-compressed byte stream."""
170+
with io.BytesIO(gzip_bytes) as byte_stream, gzip.GzipFile(fileobj=byte_stream, mode='rb') as gzip_file:
171+
data: dict = pickle.load(gzip_file)
172+
return data
173+
174+
175+
def read_gzip_time(gzip_bytes: bytes) -> int:
176+
"""Read the modification time from a gzip-compressed byte stream without decompressing the data."""
177+
header = gzip_bytes[:10]
178+
header_components = struct.unpack('<HBBI2B', header)
179+
mtime: int = header_components[3]
180+
return mtime
181+
182+
183+
def get_kvs_name(spider_name: str, max_length: int = 60) -> str:
184+
"""Get the key value store name for a spider.
185+
186+
The key value store name is derived from the spider name by replacing all special characters
187+
with hyphens and trimming leading and trailing hyphens. The resulting name is prefixed with
188+
'httpcache-' and truncated to the maximum length.
189+
190+
The documentation
191+
[about storages](https://docs.apify.com/platform/storage/usage#named-and-unnamed-storages)
192+
mentions that names can be up to 63 characters long, so the default max length is set to 60.
193+
194+
Such naming isn't unique per spider, but should be sufficiently unique for most use cases.
195+
The name of the key value store should indicate to which spider it belongs, e.g. in
196+
the listing in the Apify's console.
197+
198+
Args:
199+
spider_name: Value of the Spider instance's name attribute.
200+
max_length: Maximum length of the key value store name.
201+
202+
Returns: Key value store name.
203+
204+
Raises:
205+
ValueError: If the spider name contains only special characters.
206+
"""
207+
slug = re.sub(r'[^a-zA-Z0-9-]', '-', spider_name)
208+
slug = re.sub(r'-+', '-', slug)
209+
slug = slug.strip('-')
210+
if not slug:
211+
raise ValueError(f'Unsupported spider name: {spider_name!r} (slug: {slug!r})')
212+
return f'httpcache-{slug}'[:max_length]

src/apify/scrapy/utils.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,9 @@ def apply_apify_settings(*, settings: Settings | None = None, proxy_config: dict
4444
settings['DOWNLOADER_MIDDLEWARES']['scrapy.downloadermiddlewares.httpproxy.HttpProxyMiddleware'] = None
4545
settings['DOWNLOADER_MIDDLEWARES']['apify.scrapy.middlewares.ApifyHttpProxyMiddleware'] = 750
4646

47+
# Set the default HTTPCache middleware storage backend to ApifyCacheStorage
48+
settings['HTTPCACHE_STORAGE'] = 'apify.scrapy.extensions.ApifyCacheStorage'
49+
4750
# Store the proxy configuration
4851
settings['APIFY_PROXY_SETTINGS'] = proxy_config
4952

tests/unit/scrapy/extensions/__init__.py

Whitespace-only changes.
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
from time import time
2+
3+
import pytest
4+
5+
from apify.scrapy.extensions._httpcache import from_gzip, get_kvs_name, read_gzip_time, to_gzip
6+
7+
FIXTURE_DICT = {'name': 'Alice'}
8+
9+
FIXTURE_BYTES = (
10+
b'\x1f\x8b\x08\x00\x00\x00\x00\x00\x02\xffk`\x99*\xcc\x00\x01\xb5SzX\xf2\x12s'
11+
b'S\xa7\xf4\xb0:\xe6d&\xa7N)\xd6\x03\x00\x1c\xe8U\x9c\x1e\x00\x00\x00'
12+
)
13+
14+
15+
def test_gzip() -> None:
16+
assert from_gzip(to_gzip(FIXTURE_DICT)) == FIXTURE_DICT
17+
18+
19+
def test_to_gzip() -> None:
20+
data_bytes = to_gzip(FIXTURE_DICT, mtime=0)
21+
22+
assert data_bytes == FIXTURE_BYTES
23+
24+
25+
def test_from_gzip() -> None:
26+
data_dict = from_gzip(FIXTURE_BYTES)
27+
28+
assert data_dict == FIXTURE_DICT
29+
30+
31+
def test_read_gzip_time() -> None:
32+
assert read_gzip_time(FIXTURE_BYTES) == 0
33+
34+
35+
def test_read_gzip_time_non_zero() -> None:
36+
current_time = int(time())
37+
data_bytes = to_gzip(FIXTURE_DICT, mtime=current_time)
38+
39+
assert read_gzip_time(data_bytes) == current_time
40+
41+
42+
@pytest.mark.parametrize(
43+
('spider_name', 'expected'),
44+
[
45+
('test', 'httpcache-test'),
46+
('123', 'httpcache-123'),
47+
('test-spider', 'httpcache-test-spider'),
48+
('test_spider', 'httpcache-test-spider'),
49+
('test spider', 'httpcache-test-spider'),
50+
('test👻spider', 'httpcache-test-spider'),
51+
('test@spider', 'httpcache-test-spider'),
52+
(' test spider ', 'httpcache-test-spider'),
53+
('testspider.com', 'httpcache-testspider-com'),
54+
('t' * 100, 'httpcache-tttttttttttttttttttttttttttttttttttttttttttttttttt'),
55+
],
56+
)
57+
def test_get_kvs_name(spider_name: str, expected: str) -> None:
58+
assert get_kvs_name(spider_name) == expected
59+
60+
61+
@pytest.mark.parametrize(
62+
('spider_name'),
63+
[
64+
'',
65+
'-',
66+
'-@-/-',
67+
],
68+
)
69+
def test_get_kvs_name_raises(spider_name: str) -> None:
70+
with pytest.raises(ValueError, match='Unsupported spider name'):
71+
assert get_kvs_name(spider_name)

0 commit comments

Comments
 (0)