Skip to content

Commit 7f2e6b0

Browse files
committed
RQ FS fetch performance for is_empty
1 parent 6bbd977 commit 7f2e6b0

File tree

3 files changed

+44
-33
lines changed

3 files changed

+44
-33
lines changed

src/crawlee/storage_clients/_file_system/_dataset_client.py

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -160,11 +160,7 @@ async def open(
160160
metadata_path = dataset_path / METADATA_FILENAME
161161

162162
# If the dataset directory exists, reconstruct the client from the metadata file.
163-
if dataset_path.exists():
164-
# If metadata file is missing, raise an error.
165-
if not metadata_path.exists():
166-
raise ValueError(f'Metadata file not found for dataset "{name}"')
167-
163+
if dataset_path.exists() and metadata_path.exists():
168164
file = await asyncio.to_thread(open, metadata_path)
169165
try:
170166
file_content = json.load(file)

src/crawlee/storage_clients/_file_system/_key_value_store_client.py

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -154,11 +154,7 @@ async def open(
154154
metadata_path = kvs_path / METADATA_FILENAME
155155

156156
# If the key-value store directory exists, reconstruct the client from the metadata file.
157-
if kvs_path.exists():
158-
# If metadata file is missing, raise an error.
159-
if not metadata_path.exists():
160-
raise ValueError(f'Metadata file not found for key-value store "{name}"')
161-
157+
if kvs_path.exists() and metadata_path.exists():
162158
file = await asyncio.to_thread(open, metadata_path)
163159
try:
164160
file_content = json.load(file)

src/crawlee/storage_clients/_file_system/_request_queue_client.py

Lines changed: 42 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -118,9 +118,12 @@ def __init__(
118118
self._request_cache = deque[Request]()
119119
"""Cache for requests: forefront requests at the beginning, regular requests at the end."""
120120

121-
self._cache_needs_refresh = True
121+
self._request_cache_needs_refresh = True
122122
"""Flag indicating whether the cache needs to be refreshed from filesystem."""
123123

124+
self._is_empty_cache: bool | None = None
125+
"""Cache for is_empty result: None means unknown, True/False is cached state."""
126+
124127
@property
125128
@override
126129
def metadata(self) -> RequestQueueMetadata:
@@ -202,11 +205,7 @@ async def open(
202205
metadata_path = rq_path / METADATA_FILENAME
203206

204207
# If the RQ directory exists, reconstruct the client from the metadata file.
205-
if rq_path.exists():
206-
# If metadata file is missing, raise an error.
207-
if not metadata_path.exists():
208-
raise ValueError(f'Metadata file not found for request queue "{name}"')
209-
208+
if rq_path.exists() and metadata_path.exists():
210209
file = await asyncio.to_thread(open, metadata_path)
211210
try:
212211
file_content = json.load(file)
@@ -260,7 +259,10 @@ async def drop(self) -> None:
260259

261260
self._in_progress.clear()
262261
self._request_cache.clear()
263-
self._cache_needs_refresh = True
262+
self._request_cache_needs_refresh = True
263+
264+
# Invalidate is_empty cache.
265+
self._is_empty_cache = None
264266

265267
@override
266268
async def purge(self) -> None:
@@ -272,15 +274,17 @@ async def purge(self) -> None:
272274

273275
self._in_progress.clear()
274276
self._request_cache.clear()
275-
self._cache_needs_refresh = True
277+
self._request_cache_needs_refresh = True
276278

277-
# Update metadata counts
278279
await self._update_metadata(
279280
update_modified_at=True,
280281
update_accessed_at=True,
281282
new_pending_request_count=0,
282283
)
283284

285+
# Invalidate is_empty cache.
286+
self._is_empty_cache = None
287+
284288
@override
285289
async def add_batch_of_requests(
286290
self,
@@ -298,6 +302,7 @@ async def add_batch_of_requests(
298302
Response containing information about the added requests.
299303
"""
300304
async with self._lock:
305+
self._is_empty_cache = None
301306
new_total_request_count = self._metadata.total_request_count
302307
new_pending_request_count = self._metadata.pending_request_count
303308
processed_requests = list[ProcessedRequest]()
@@ -409,7 +414,10 @@ async def add_batch_of_requests(
409414

410415
# Invalidate the cache if we added forefront requests.
411416
if forefront:
412-
self._cache_needs_refresh = True
417+
self._request_cache_needs_refresh = True
418+
419+
# Invalidate is_empty cache.
420+
self._is_empty_cache = None
413421

414422
return AddRequestsResponse(
415423
processed_requests=processed_requests,
@@ -450,7 +458,7 @@ async def fetch_next_request(self) -> Request | None:
450458
"""
451459
async with self._lock:
452460
# Refresh cache if needed or if it's empty.
453-
if self._cache_needs_refresh or not self._request_cache:
461+
if self._request_cache_needs_refresh or not self._request_cache:
454462
await self._refresh_cache()
455463

456464
next_request: Request | None = None
@@ -481,6 +489,8 @@ async def mark_request_as_handled(self, request: Request) -> ProcessedRequest |
481489
Information about the queue operation. `None` if the given request was not in progress.
482490
"""
483491
async with self._lock:
492+
self._is_empty_cache = None
493+
484494
# Check if the request is in progress.
485495
if request.id not in self._in_progress:
486496
logger.warning(f'Marking request {request.id} as handled that is not in progress.')
@@ -537,6 +547,8 @@ async def reclaim_request(
537547
Information about the queue operation. `None` if the given request was not in progress.
538548
"""
539549
async with self._lock:
550+
self._is_empty_cache = None
551+
540552
# Check if the request is in progress.
541553
if request.id not in self._in_progress:
542554
logger.info(f'Reclaiming request {request.id} that is not in progress.')
@@ -587,28 +599,35 @@ async def reclaim_request(
587599

588600
@override
589601
async def is_empty(self) -> bool:
590-
"""Check if the queue is empty.
591-
592-
Returns:
593-
True if the queue is empty, False otherwise.
594-
"""
602+
"""Check if the queue is empty, using a cached value if available and valid."""
595603
async with self._lock:
604+
# If we have a cached value, return it immediately.
605+
if self._is_empty_cache is not None:
606+
return self._is_empty_cache
607+
608+
# If we have a cached requests, check them first (fast path).
609+
if self._request_cache:
610+
for req in self._request_cache:
611+
if req.handled_at is None:
612+
self._is_empty_cache = False
613+
return False
614+
self._is_empty_cache = True
615+
return True
616+
617+
# Fallback: check files on disk (slow path).
596618
await self._update_metadata(update_accessed_at=True)
597619
request_files = await self._get_request_files(self.path_to_rq)
598620

599-
# Check each file to see if there are any unhandled requests.
600621
for request_file in request_files:
601622
request = await self._parse_request_file(request_file)
602-
603623
if request is None:
604624
continue
605-
606-
# If any request is not handled, the queue is not empty.
607625
if request.handled_at is None:
626+
self._is_empty_cache = False
608627
return False
609628

610-
# If we got here, all requests are handled or there are no requests.
611-
return True
629+
self._is_empty_cache = True
630+
return True
612631

613632
def _get_request_path(self, request_id: str) -> Path:
614633
"""Get the path to a specific request file.
@@ -732,7 +751,7 @@ async def _refresh_cache(self) -> None:
732751
break
733752
self._request_cache.append(request)
734753

735-
self._cache_needs_refresh = False
754+
self._request_cache_needs_refresh = False
736755

737756
@classmethod
738757
async def _get_request_files(cls, path_to_rq: Path) -> list[Path]:

0 commit comments

Comments
 (0)