Skip to content

refactor!: Introduce new storage client system #1194

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 45 commits into from
Jul 1, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
f285707
refactor!: Introduce new storage client system
vdusek May 10, 2025
dd9be6e
Cleanup
vdusek May 10, 2025
89bfa5b
Address feedback
vdusek May 15, 2025
4050c75
Add purge_if_needed method and improve some typing based on Pylance
vdusek May 16, 2025
26f46e2
Address more feedback
vdusek May 20, 2025
c83a36a
RQ FS client improvements
vdusek Jun 4, 2025
c967fe5
Add caching to RQ FS client
vdusek Jun 5, 2025
7df046f
RQ FS performance optimization in add_requests
vdusek Jun 5, 2025
3555565
RQ FS performance issues in fetch_next_request
vdusek Jun 6, 2025
946d1e2
RQ FS fetch performance for is_empty
vdusek Jun 6, 2025
9f10b95
rm code duplication for open methods
vdusek Jun 6, 2025
0864ff8
Request loaders use async getters for handled/total req cnt
vdusek Jun 9, 2025
af0d129
Add missing_ok when removing files
vdusek Jun 9, 2025
9998a58
Improve is_empty
vdusek Jun 10, 2025
fdee111
Optimize RQ memory storage client
vdusek Jun 10, 2025
79cdfc0
Add upgrading guide and skip problematic test
vdusek Jun 11, 2025
3d2fd73
Merge branch 'master' into new-storage-clients
vdusek Jun 11, 2025
e818585
chore: update `docusaurus-plugin-typedoc-api`, fix failing docs build
barjin Jun 11, 2025
65db9ac
fix docs
vdusek Jun 11, 2025
2b786f7
add retries to atomic write
vdusek Jun 12, 2025
2cb04c5
chore(deps): update dependency pytest-cov to ~=6.2.0 (#1244)
renovate[bot] Jun 12, 2025
0c8c4ec
Fix atomic write on Windows
vdusek Jun 12, 2025
ce1eeb1
resolve write function during import time
vdusek Jun 14, 2025
4c05cee
Merge branch 'master' into new-storage-clients
vdusek Jun 14, 2025
8c80513
Update file utils
vdusek Jun 16, 2025
70bc071
revert un-intentionally makefile changes
vdusek Jun 16, 2025
78efb4d
Address Honza's comments (p1)
vdusek Jun 18, 2025
fa18d19
Introduce storage instance manager
vdusek Jun 19, 2025
c783dac
Utilize recoverable state for the FS RQ state
vdusek Jun 20, 2025
437071e
Details
vdusek Jun 20, 2025
df4bfa7
Rm default_"storage"_id options (were not used at all)
vdusek Jun 23, 2025
e133fcd
Update storages guide and add storage clients guide
vdusek Jun 23, 2025
76f1ffb
Docs guides - code examples
vdusek Jun 24, 2025
fa48644
Docs guides polishment
vdusek Jun 24, 2025
5c935af
docs fix lint & type checks for py 3.9
vdusek Jun 24, 2025
ac259ce
Address Honza's feedback
vdusek Jun 24, 2025
1cbf15e
SDK fixes
vdusek Jun 25, 2025
bc50990
Add KVS record_exists method
vdusek Jun 26, 2025
d1cf967
reduce test duplicities for storages & storage clients
vdusek Jun 26, 2025
aa9bfd3
Create locks in async context only
vdusek Jun 27, 2025
d6c9877
rm open methods from base storage clients
vdusek Jun 27, 2025
3b133ce
update storage clients inits
vdusek Jun 30, 2025
43b9fe9
async metadata getter
vdusek Jul 1, 2025
b628fbb
better typing in storage instance manager
vdusek Jul 1, 2025
9dfac4b
update upgrading guide
vdusek Jul 1, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions src/crawlee/_service_locator.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
from __future__ import annotations

from typing import TYPE_CHECKING

from crawlee._utils.docs import docs_group
from crawlee.configuration import Configuration
from crawlee.errors import ServiceConflictError
from crawlee.events import EventManager, LocalEventManager
from crawlee.storage_clients import FileSystemStorageClient, StorageClient

if TYPE_CHECKING:
from crawlee.storages._storage_instance_manager import StorageInstanceManager


@docs_group('Classes')
class ServiceLocator:
Expand All @@ -18,6 +23,7 @@ def __init__(self) -> None:
self._configuration: Configuration | None = None
self._event_manager: EventManager | None = None
self._storage_client: StorageClient | None = None
self._storage_instance_manager: StorageInstanceManager | None = None

# Flags to check if the services were already set.
self._configuration_was_retrieved = False
Expand Down Expand Up @@ -94,5 +100,16 @@ def set_storage_client(self, storage_client: StorageClient) -> None:

self._storage_client = storage_client

@property
def storage_instance_manager(self) -> StorageInstanceManager:
"""Get the storage instance manager."""
if self._storage_instance_manager is None:
# Import here to avoid circular imports
from crawlee.storages._storage_instance_manager import StorageInstanceManager

self._storage_instance_manager = StorageInstanceManager()

return self._storage_instance_manager


service_locator = ServiceLocator()
8 changes: 6 additions & 2 deletions src/crawlee/_utils/recoverable_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@

from crawlee import service_locator
from crawlee.events._types import Event, EventPersistStateData
from crawlee.storages._key_value_store import KeyValueStore

if TYPE_CHECKING:
import logging

from crawlee.storages._key_value_store import KeyValueStore

TStateModel = TypeVar('TStateModel', bound=BaseModel)


Expand Down Expand Up @@ -59,7 +60,7 @@ def __init__(
self._persist_state_key = persist_state_key
self._persist_state_kvs_name = persist_state_kvs_name
self._persist_state_kvs_id = persist_state_kvs_id
self._key_value_store: KeyValueStore | None = None
self._key_value_store: 'KeyValueStore | None' = None # noqa: UP037
self._log = logger

async def initialize(self) -> TStateModel:
Expand All @@ -75,6 +76,9 @@ async def initialize(self) -> TStateModel:
self._state = self._default_state.model_copy(deep=True)
return self.current_value

# Import here to avoid circular imports
from crawlee.storages._key_value_store import KeyValueStore

self._key_value_store = await KeyValueStore.open(
name=self._persist_state_kvs_name, id=self._persist_state_kvs_id
)
Expand Down
26 changes: 5 additions & 21 deletions src/crawlee/storages/_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,10 @@

from ._base import Storage
from ._key_value_store import KeyValueStore
from ._utils import open_storage_instance

if TYPE_CHECKING:
from collections.abc import AsyncIterator
from typing import Any, ClassVar, Literal
from typing import Any, Literal

from typing_extensions import Unpack

Expand Down Expand Up @@ -66,15 +65,6 @@ class Dataset(Storage):
```
"""

_cache_by_id: ClassVar[dict[str, Dataset]] = {}
"""A dictionary to cache datasets by ID."""

_cache_by_name: ClassVar[dict[str, Dataset]] = {}
"""A dictionary to cache datasets by name."""

_default_instance: ClassVar[Dataset | None] = None
"""Cache for the default dataset instance."""

def __init__(self, client: DatasetClient) -> None:
"""Initialize a new instance.

Expand Down Expand Up @@ -112,25 +102,19 @@ async def open(
) -> Dataset:
configuration = service_locator.get_configuration() if configuration is None else configuration
storage_client = service_locator.get_storage_client() if storage_client is None else storage_client
return await open_storage_instance(

return await service_locator.storage_instance_manager.open_storage_instance(
cls,
id=id,
name=name,
configuration=configuration,
cache_by_id=cls._cache_by_id,
cache_by_name=cls._cache_by_name,
default_instance_attr='_default_instance',
client_opener=storage_client.create_dataset_client,
)

@override
async def drop(self) -> None:
if self.id in self._cache_by_id:
del self._cache_by_id[self.id]

if self.name in self._cache_by_name:
del self._cache_by_name[self.name]

storage_instance_manager = service_locator.storage_instance_manager
storage_instance_manager.remove_from_cache(self)
await self._client.drop()

@override
Expand Down
28 changes: 7 additions & 21 deletions src/crawlee/storages/_key_value_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,20 @@
from crawlee import service_locator
from crawlee._types import JsonSerializable # noqa: TC001
from crawlee._utils.docs import docs_group
from crawlee._utils.recoverable_state import RecoverableState
from crawlee.storage_clients.models import KeyValueStoreMetadata

from ._base import Storage
from ._utils import open_storage_instance

if TYPE_CHECKING:
from collections.abc import AsyncIterator

from crawlee._utils.recoverable_state import RecoverableState
from crawlee.configuration import Configuration
from crawlee.storage_clients import StorageClient
from crawlee.storage_clients._base import KeyValueStoreClient
from crawlee.storage_clients.models import KeyValueStoreMetadata, KeyValueStoreRecordMetadata
else:
from crawlee._utils.recoverable_state import RecoverableState

T = TypeVar('T')

Expand Down Expand Up @@ -65,15 +66,6 @@ class KeyValueStore(Storage):
```
"""

_cache_by_id: ClassVar[dict[str, KeyValueStore]] = {}
"""A dictionary to cache key-value stores by ID."""

_cache_by_name: ClassVar[dict[str, KeyValueStore]] = {}
"""A dictionary to cache key-value stores by name."""

_default_instance: ClassVar[KeyValueStore | None] = None
"""Cache for the default key-value store instance."""

_autosaved_values: ClassVar[
dict[
str,
Expand Down Expand Up @@ -120,23 +112,19 @@ async def open(
) -> KeyValueStore:
configuration = service_locator.get_configuration() if configuration is None else configuration
storage_client = service_locator.get_storage_client() if storage_client is None else storage_client
return await open_storage_instance(

return await service_locator.storage_instance_manager.open_storage_instance(
cls,
id=id,
name=name,
configuration=configuration,
cache_by_id=cls._cache_by_id,
cache_by_name=cls._cache_by_name,
default_instance_attr='_default_instance',
client_opener=storage_client.create_kvs_client,
)

@override
async def drop(self) -> None:
if self.id in self._cache_by_id:
del self._cache_by_id[self.id]
if self.name is not None and self.name in self._cache_by_name:
del self._cache_by_name[self.name]
storage_instance_manager = service_locator.storage_instance_manager
storage_instance_manager.remove_from_cache(self)

await self._clear_cache() # Clear cache with persistent values.
await self._client.drop()
Expand Down Expand Up @@ -259,8 +247,6 @@ async def get_auto_saved_value(
Returns:
Return the value of the key.
"""
from crawlee._utils.recoverable_state import RecoverableState

default_value = {} if default_value is None else default_value

async with self._autosave_lock:
Expand Down
25 changes: 5 additions & 20 deletions src/crawlee/storages/_request_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import asyncio
from datetime import timedelta
from logging import getLogger
from typing import TYPE_CHECKING, ClassVar, TypeVar
from typing import TYPE_CHECKING, TypeVar

from typing_extensions import override

Expand All @@ -13,7 +13,6 @@
from crawlee.request_loaders import RequestManager

from ._base import Storage
from ._utils import open_storage_instance

if TYPE_CHECKING:
from collections.abc import Sequence
Expand Down Expand Up @@ -71,15 +70,6 @@ class RequestQueue(Storage, RequestManager):
```
"""

_cache_by_id: ClassVar[dict[str, RequestQueue]] = {}
"""A dictionary to cache request queues by ID."""

_cache_by_name: ClassVar[dict[str, RequestQueue]] = {}
"""A dictionary to cache request queues by name."""

_default_instance: ClassVar[RequestQueue | None] = None
"""Cache for the default request queue instance."""

def __init__(self, client: RequestQueueClient) -> None:
"""Initialize a new instance.

Expand Down Expand Up @@ -128,25 +118,20 @@ async def open(
) -> RequestQueue:
configuration = service_locator.get_configuration() if configuration is None else configuration
storage_client = service_locator.get_storage_client() if storage_client is None else storage_client
return await open_storage_instance(

return await service_locator.storage_instance_manager.open_storage_instance(
cls,
id=id,
name=name,
configuration=configuration,
cache_by_id=cls._cache_by_id,
cache_by_name=cls._cache_by_name,
default_instance_attr='_default_instance',
client_opener=storage_client.create_rq_client,
)

@override
async def drop(self) -> None:
# Remove from cache before dropping
if self.id in self._cache_by_id:
del self._cache_by_id[self.id]

if self.name is not None and self.name in self._cache_by_name:
del self._cache_by_name[self.name]
storage_instance_manager = service_locator.storage_instance_manager
storage_instance_manager.remove_from_cache(self)

await self._client.drop()

Expand Down
124 changes: 124 additions & 0 deletions src/crawlee/storages/_storage_instance_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
from __future__ import annotations

from typing import Any, Callable, TypeVar, cast

from crawlee._utils.docs import docs_group

from ._base import Storage

T = TypeVar('T', bound='Storage')


@docs_group('Classes')
class StorageInstanceManager:
"""Manager for caching and managing storage instances.

This class centralizes the caching logic for all storage types (Dataset, KeyValueStore, RequestQueue)
and provides a unified interface for opening and managing storage instances.
"""

def __init__(self) -> None:
self._cache_by_id = dict[type[Storage], dict[str, Storage]]()
"""Cache for storage instances by ID, separated by storage type."""

self._cache_by_name = dict[type[Storage], dict[str, Storage]]()
"""Cache for storage instances by name, separated by storage type."""

self._default_instances = dict[type[Storage], Storage]()
"""Cache for default instances of each storage type."""

async def open_storage_instance(
self,
cls: type[T],
*,
id: str | None,
name: str | None,
configuration: Any,
client_opener: Callable[..., Any],
) -> T:
"""Open a storage instance with caching support.

Args:
cls: The storage class to instantiate.
id: Storage ID.
name: Storage name.
configuration: Configuration object.
client_opener: Function to create the storage client.

Returns:
The storage instance.

Raises:
ValueError: If both id and name are specified.
"""
if id and name:
raise ValueError('Only one of "id" or "name" can be specified, not both.')

# Check for default instance
if id is None and name is None and cls in self._default_instances:
return cast('T', self._default_instances[cls])

# Check cache
if id is not None:
type_cache_by_id = self._cache_by_id.get(cls, {})
if id in type_cache_by_id:
cached_instance = type_cache_by_id[id]
if isinstance(cached_instance, cls):
return cached_instance

if name is not None:
type_cache_by_name = self._cache_by_name.get(cls, {})
if name in type_cache_by_name:
cached_instance = type_cache_by_name[name]
if isinstance(cached_instance, cls):
return cached_instance

# Create new instance
client = await client_opener(id=id, name=name, configuration=configuration)
instance = cls(client) # type: ignore[call-arg]
instance_name = getattr(instance, 'name', None)

# Cache the instance
if cls not in self._cache_by_id:
self._cache_by_id[cls] = {}
if cls not in self._cache_by_name:
self._cache_by_name[cls] = {}

self._cache_by_id[cls][instance.id] = instance
if instance_name is not None:
self._cache_by_name[cls][instance_name] = instance

# Set as default if no id/name specified
if id is None and name is None:
self._default_instances[cls] = instance

return instance

def remove_from_cache(self, storage_instance: Storage) -> None:
"""Remove a storage instance from the cache.

Args:
storage_instance: The storage instance to remove.
"""
storage_type = type(storage_instance)

# Remove from ID cache
type_cache_by_id = self._cache_by_id.get(storage_type, {})
if storage_instance.id in type_cache_by_id:
del type_cache_by_id[storage_instance.id]

# Remove from name cache
if storage_instance.name is not None:
type_cache_by_name = self._cache_by_name.get(storage_type, {})
if storage_instance.name in type_cache_by_name:
del type_cache_by_name[storage_instance.name]

# Remove from default instances
if storage_type in self._default_instances and self._default_instances[storage_type] is storage_instance:
del self._default_instances[storage_type]

def clear_cache(self) -> None:
"""Clear all cached storage instances."""
self._cache_by_id.clear()
self._cache_by_name.clear()
self._default_instances.clear()
Loading
Loading