From d8ddc933188ecd5de2577c39aba8bb89d7e50450 Mon Sep 17 00:00:00 2001 From: Max Bohomolov Date: Sun, 21 Jun 2026 20:56:32 +0000 Subject: [PATCH 1/7] add `is_finished` method to `RequestQueueClient` --- .../_base/_request_queue_client.py | 10 +++- .../_file_system/_request_queue_client.py | 57 +++++-------------- .../_memory/_request_queue_client.py | 14 ++++- .../_redis/_request_queue_client.py | 17 ++++-- .../_sql/_request_queue_client.py | 29 +++++++++- src/crawlee/storages/_request_queue.py | 17 +++--- .../crawlers/_basic/test_basic_crawler.py | 35 ++++++++++-- tests/unit/storages/test_request_queue.py | 20 +++++-- 8 files changed, 126 insertions(+), 73 deletions(-) diff --git a/src/crawlee/storage_clients/_base/_request_queue_client.py b/src/crawlee/storage_clients/_base/_request_queue_client.py index a993fcfdb3..df0fe996e5 100644 --- a/src/crawlee/storage_clients/_base/_request_queue_client.py +++ b/src/crawlee/storage_clients/_base/_request_queue_client.py @@ -124,8 +124,16 @@ async def reclaim_request( @abstractmethod async def is_empty(self) -> bool: - """Check if the request queue is empty. + """Check if the request queue is empty. That means there are no requests available to fetch. Returns: True if the request queue is empty, False otherwise. """ + + async def is_finished(self) -> bool: + """Check if the request queue is finished. That means the queue is empty and no requests are being processed. + + Returns: + True if the request queue is finished, False otherwise. + """ + return await self.is_empty() diff --git a/src/crawlee/storage_clients/_file_system/_request_queue_client.py b/src/crawlee/storage_clients/_file_system/_request_queue_client.py index 024a50b8a9..0b5ec6b9e3 100644 --- a/src/crawlee/storage_clients/_file_system/_request_queue_client.py +++ b/src/crawlee/storage_clients/_file_system/_request_queue_client.py @@ -114,9 +114,6 @@ def __init__( self._request_cache_needs_refresh = True """Flag indicating whether the cache needs to be refreshed from filesystem.""" - self._is_empty_cache: bool | None = None - """Cache for is_empty result: None means unknown, True/False is cached state.""" - self._state = recoverable_state """Recoverable state to maintain request ordering, in-progress status, and handled status.""" @@ -291,9 +288,6 @@ async def drop(self) -> None: self._request_cache.clear() self._request_cache_needs_refresh = True - # Invalidate is_empty cache. - self._is_empty_cache = None - @override async def purge(self) -> None: async with self._lock: @@ -315,9 +309,6 @@ async def purge(self) -> None: new_total_request_count=0, ) - # Invalidate is_empty cache. - self._is_empty_cache = None - @override async def add_batch_of_requests( self, @@ -326,7 +317,6 @@ async def add_batch_of_requests( forefront: bool = False, ) -> AddRequestsResponse: async with self._lock: - self._is_empty_cache = None new_total_request_count = self._metadata.total_request_count new_pending_request_count = self._metadata.pending_request_count processed_requests = list[ProcessedRequest]() @@ -435,9 +425,6 @@ async def add_batch_of_requests( if forefront: self._request_cache_needs_refresh = True - # Invalidate is_empty cache. - self._is_empty_cache = None - return AddRequestsResponse( processed_requests=processed_requests, unprocessed_requests=unprocessed_requests, @@ -482,7 +469,6 @@ async def fetch_next_request(self) -> Request | None: @override async def mark_request_as_handled(self, request: Request) -> ProcessedRequest | None: async with self._lock: - self._is_empty_cache = None state = self._state.current_value # Check if the request is in progress. @@ -530,7 +516,6 @@ async def reclaim_request( forefront: bool = False, ) -> ProcessedRequest | None: async with self._lock: - self._is_empty_cache = None state = self._state.current_value # Check if the request is in progress. @@ -586,39 +571,23 @@ async def reclaim_request( @override async def is_empty(self) -> bool: async with self._lock: - # If we have a cached value, return it immediately. - if self._is_empty_cache is not None: - return self._is_empty_cache - - state = self._state.current_value - - # If there are in-progress requests, return False immediately. - if len(state.in_progress_requests) > 0: - self._is_empty_cache = False - return False - - # If we have a cached requests, check them first (fast path). - if self._request_cache: - for req in self._request_cache: - if req.unique_key not in state.handled_requests: - self._is_empty_cache = False - return False - self._is_empty_cache = True - return len(state.in_progress_requests) == 0 - - # Fallback: check state for unhandled requests. await self._update_metadata(update_accessed_at=True) - # Check if there are any requests that are not handled - all_requests = set(state.forefront_requests.keys()) | set(state.regular_requests.keys()) - unhandled_requests = all_requests - state.handled_requests + # The queue is empty when nothing is available to fetch, i.e. every unhandled request is + # currently in progress. + return self._metadata.pending_request_count - len(self._state.current_value.in_progress_requests) <= 0 + + @override + async def is_finished(self) -> bool: + # If anything is still available to fetch, the queue is not finished. + if not await self.is_empty(): + return False - if unhandled_requests: - self._is_empty_cache = False - return False + async with self._lock: + await self._update_metadata(update_accessed_at=True) - self._is_empty_cache = True - return True + # The queue is finished when there are no pending requests and no in-progress requests. + return self._metadata.pending_request_count == 0 and not self._state.current_value.in_progress_requests def _get_request_path(self, unique_key: str) -> Path: """Get the path to a specific request file. diff --git a/src/crawlee/storage_clients/_memory/_request_queue_client.py b/src/crawlee/storage_clients/_memory/_request_queue_client.py index 90b47c63d8..2bcdd60766 100644 --- a/src/crawlee/storage_clients/_memory/_request_queue_client.py +++ b/src/crawlee/storage_clients/_memory/_request_queue_client.py @@ -314,8 +314,18 @@ async def is_empty(self) -> bool: """ await self._update_metadata(update_accessed_at=True) - # Queue is empty if there are no pending requests and no requests in progress. - return len(self._pending_requests) == 0 and len(self._in_progress_requests) == 0 + # Queue is empty if there are no pending requests. + return len(self._pending_requests) == 0 + + @override + async def is_finished(self) -> bool: + """Check if the queue is finished. + + Returns: + True if the queue is finished, False otherwise. + """ + # Queue is finished if it is empty and there are no in-progress requests. + return await self.is_empty() and len(self._in_progress_requests) == 0 async def _update_metadata( self, diff --git a/src/crawlee/storage_clients/_redis/_request_queue_client.py b/src/crawlee/storage_clients/_redis/_request_queue_client.py index b2defb752e..4cefa87789 100644 --- a/src/crawlee/storage_clients/_redis/_request_queue_client.py +++ b/src/crawlee/storage_clients/_redis/_request_queue_client.py @@ -496,11 +496,7 @@ async def reclaim_request( @retry_on_error(RedisError) @override async def is_empty(self) -> bool: - """Check if the queue is empty. - - Returns: - True if the queue is empty, False otherwise. - """ + # Requests buffered for fetching mean the queue is not empty. if self._pending_fetch_cache: return False @@ -509,9 +505,18 @@ async def is_empty(self) -> bool: await self._reclaim_stale_requests() self._next_reclaim_stale = datetime.now(tz=timezone.utc) + self._RECLAIM_INTERVAL + # Check if there are any requests in the queue. + requests_in_queue = await await_redis_response(self._redis.llen(self._queue_key)) + return requests_in_queue == 0 + + @retry_on_error(RedisError) + @override + async def is_finished(self) -> bool: + is_empty = await self.is_empty() + metadata = await self.get_metadata() - return metadata.pending_request_count == 0 + return is_empty and metadata.pending_request_count == 0 async def _load_scripts(self) -> None: """Ensure Lua scripts are loaded in Redis.""" diff --git a/src/crawlee/storage_clients/_sql/_request_queue_client.py b/src/crawlee/storage_clients/_sql/_request_queue_client.py index e3e75bf3c3..99630a3bd8 100644 --- a/src/crawlee/storage_clients/_sql/_request_queue_client.py +++ b/src/crawlee/storage_clients/_sql/_request_queue_client.py @@ -595,10 +595,34 @@ async def reclaim_request( @retry_on_error(SQLAlchemyError) @override async def is_empty(self) -> bool: - # Check in-memory cache for requests + # Requests buffered for fetching mean the queue is not empty. if self._pending_fetch_cache: return False + now = datetime.now(timezone.utc) + + # Check if there are any unhandled requests that are not blocked. + async with self.get_session(with_simple_commit=True) as session: + stmt = select( + exists().where( + self._ITEM_TABLE.request_queue_id == self._id, + self._ITEM_TABLE.is_handled == False, # noqa: E712 + or_(self._ITEM_TABLE.time_blocked_until.is_(None), self._ITEM_TABLE.time_blocked_until < now), + ) + ) + result = await session.execute(stmt) + + await self._add_buffer_record(session) + + return not result.scalar() + + @retry_on_error(SQLAlchemyError) + @override + async def is_finished(self) -> bool: + # If the queue is not empty, it is not finished + if not await self.is_empty(): + return False + metadata = await self.get_metadata() async with self.get_session(with_simple_commit=True) as session: @@ -629,7 +653,8 @@ async def is_empty(self) -> bool: has_pending_buffer_updates = buffer_result.scalar() await self._add_buffer_record(session) - # If there are no pending requests and no buffered updates, the queue is empty + + # If there are no pending requests and no buffered updates, the queue is finished return not has_pending_buffer_updates # There are pending requests (may be inaccurate), ensure recalculated metadata diff --git a/src/crawlee/storages/_request_queue.py b/src/crawlee/storages/_request_queue.py index eaa93785c9..e29bda4af9 100644 --- a/src/crawlee/storages/_request_queue.py +++ b/src/crawlee/storages/_request_queue.py @@ -316,9 +316,9 @@ async def reclaim_request( async def is_empty(self) -> bool: """Check if the request queue is empty. - An empty queue means that there are no requests currently in the queue, either pending or being processed. - However, this does not necessarily mean that the crawling operation is finished, as there still might be - tasks that could add additional requests to the queue. + An empty queue means that there are no requests currently available to fetch. However, this does not + necessarily mean that the crawling operation is finished, as there still might be requests being processed + or tasks that could add additional requests to the queue. Returns: True if the request queue is empty, False otherwise. @@ -328,19 +328,18 @@ async def is_empty(self) -> bool: async def is_finished(self) -> bool: """Check if the request queue is finished. - A finished queue means that all requests in the queue have been processed (the queue is empty) and there - are no more tasks that could add additional requests to the queue. This is the definitive way to check - if a crawling operation is complete. + A finished queue means that all requests have been processed and there are no more tasks that could add + additional requests to the queue. This is the definitive way to check if a crawling operation is complete. Returns: - True if the request queue is finished (empty and no pending add operations), False otherwise. + True if the request queue is finished and no pending add operations, False otherwise. """ if self._add_requests_tasks: logger.debug('Background add requests tasks are still in progress.') return False - if await self.is_empty(): - logger.debug('The request queue is empty.') + if await self._client.is_finished(): + logger.debug('The request queue is finished.') return True return False diff --git a/tests/unit/crawlers/_basic/test_basic_crawler.py b/tests/unit/crawlers/_basic/test_basic_crawler.py index 6a39e83c12..e23df42886 100644 --- a/tests/unit/crawlers/_basic/test_basic_crawler.py +++ b/tests/unit/crawlers/_basic/test_basic_crawler.py @@ -142,6 +142,27 @@ async def handler(context: BasicCrawlingContext) -> None: ] +async def test_no_new_tasks_while_only_request_in_progress() -> None: + crawler = BasicCrawler( + concurrency_settings=ConcurrencySettings(desired_concurrency=4, max_concurrency=4), + ) + + request_manager = await crawler.get_request_manager() + + @crawler.router.default_handler + async def handler(context: BasicCrawlingContext) -> None: + await asyncio.sleep(0.2) + + with patch.object( + request_manager, + 'fetch_next_request', + wraps=request_manager.fetch_next_request, + ) as fetch_counter: + await crawler.run(['https://a.placeholder.com']) + + fetch_counter.assert_called_once() + + async def test_respects_no_retry() -> None: crawler = BasicCrawler(max_request_retries=2) calls = list[str]() @@ -1883,10 +1904,16 @@ class _CrawlerInput: def _process_run_crawlers(crawler_inputs: list[_CrawlerInput], storage_dir: str) -> list[StatisticsState]: - return [ - asyncio.run(_run_crawler(crawler_id=crawler_input.id, requests=crawler_input.requests, storage_dir=storage_dir)) - for crawler_input in crawler_inputs - ] + states = list[StatisticsState]() + for crawler_input in crawler_inputs: + states.append( + asyncio.run( + _run_crawler(crawler_id=crawler_input.id, requests=crawler_input.requests, storage_dir=storage_dir) + ) + ) + # Each crawler runs in its own event loop. Drop the cached storage instances between runs. + service_locator.storage_instance_manager.clear_cache() + return states async def test_crawler_state_persistence(tmp_path: Path) -> None: diff --git a/tests/unit/storages/test_request_queue.py b/tests/unit/storages/test_request_queue.py index 2f12137625..ebcec63a1b 100644 --- a/tests/unit/storages/test_request_queue.py +++ b/tests/unit/storages/test_request_queue.py @@ -544,23 +544,33 @@ async def test_reclaim_request_with_forefront(rq: RequestQueue) -> None: assert next_request.url == 'https://example.com/first' -async def test_is_empty(rq: RequestQueue) -> None: - """Test checking if a request queue is empty.""" - # Initially the queue should be empty +async def test_is_empty_and_is_finished(rq: RequestQueue) -> None: + """Test checking if a request queue is empty and finished.""" + # Initially the queue should be empty and finished assert await rq.is_empty() is True + assert await rq.is_finished() is True # Add a request await rq.add_request('https://example.com') assert await rq.is_empty() is False + assert await rq.is_finished() is False - # Fetch and handle the request + # Fetch the request request = await rq.fetch_next_request() assert request is not None + + # Queue is empty, because there is no request for fetching + assert await rq.is_empty() is True + # Queue is not finished, because there is a request being processed + assert await rq.is_finished() is False + + # Mark the request as handled await rq.mark_request_as_handled(request) - # Queue should be empty again + # Queue should be empty and finished again assert await rq.is_empty() is True + assert await rq.is_finished() is True @pytest.mark.parametrize( From 59f6e9b4c7b0f1d2ac498b5092acca2875d7ddcd Mon Sep 17 00:00:00 2001 From: Max Bohomolov Date: Mon, 22 Jun 2026 17:10:36 +0000 Subject: [PATCH 2/7] return to cache logic for `FileSystemRequestQueueClient` --- .../_file_system/_request_queue_client.py | 59 ++++++++++++++++--- .../crawlers/_basic/test_basic_crawler.py | 6 +- 2 files changed, 56 insertions(+), 9 deletions(-) diff --git a/src/crawlee/storage_clients/_file_system/_request_queue_client.py b/src/crawlee/storage_clients/_file_system/_request_queue_client.py index 0b5ec6b9e3..e950747e10 100644 --- a/src/crawlee/storage_clients/_file_system/_request_queue_client.py +++ b/src/crawlee/storage_clients/_file_system/_request_queue_client.py @@ -114,6 +114,9 @@ def __init__( self._request_cache_needs_refresh = True """Flag indicating whether the cache needs to be refreshed from filesystem.""" + self._is_empty_cache: bool | None = None + """Cache for is_empty result: None means unknown, True/False is cached state.""" + self._state = recoverable_state """Recoverable state to maintain request ordering, in-progress status, and handled status.""" @@ -288,6 +291,9 @@ async def drop(self) -> None: self._request_cache.clear() self._request_cache_needs_refresh = True + # Invalidate is_empty cache. + self._is_empty_cache = None + @override async def purge(self) -> None: async with self._lock: @@ -309,6 +315,9 @@ async def purge(self) -> None: new_total_request_count=0, ) + # Invalidate is_empty cache. + self._is_empty_cache = None + @override async def add_batch_of_requests( self, @@ -317,6 +326,7 @@ async def add_batch_of_requests( forefront: bool = False, ) -> AddRequestsResponse: async with self._lock: + self._is_empty_cache = None new_total_request_count = self._metadata.total_request_count new_pending_request_count = self._metadata.pending_request_count processed_requests = list[ProcessedRequest]() @@ -425,6 +435,9 @@ async def add_batch_of_requests( if forefront: self._request_cache_needs_refresh = True + # Invalidate is_empty cache. + self._is_empty_cache = None + return AddRequestsResponse( processed_requests=processed_requests, unprocessed_requests=unprocessed_requests, @@ -463,12 +476,15 @@ async def fetch_next_request(self) -> Request | None: if next_request is not None: state.in_progress_requests.add(next_request.unique_key) + # `in_progress_requests` is updated, so we need to invalidate the `is_empty` cache. + self._is_empty_cache = None return next_request @override async def mark_request_as_handled(self, request: Request) -> ProcessedRequest | None: async with self._lock: + self._is_empty_cache = None state = self._state.current_value # Check if the request is in progress. @@ -516,6 +532,7 @@ async def reclaim_request( forefront: bool = False, ) -> ProcessedRequest | None: async with self._lock: + self._is_empty_cache = None state = self._state.current_value # Check if the request is in progress. @@ -571,23 +588,51 @@ async def reclaim_request( @override async def is_empty(self) -> bool: async with self._lock: + # If we have a cached value, return it immediately. + if self._is_empty_cache is not None: + return self._is_empty_cache + + state = self._state.current_value + + # If we have a cached requests, check them first (fast path). + if self._request_cache: + for req in self._request_cache: + if req.unique_key not in state.handled_requests: + self._is_empty_cache = False + return False + self._is_empty_cache = True + return True + + # Fallback: check state for unhandled requests. await self._update_metadata(update_accessed_at=True) - # The queue is empty when nothing is available to fetch, i.e. every unhandled request is - # currently in progress. - return self._metadata.pending_request_count - len(self._state.current_value.in_progress_requests) <= 0 + # Check pending requests is state. + queue_requests = ( + set(state.forefront_requests.keys()) | set(state.regular_requests.keys()) + ) - state.in_progress_requests + + pending_requests = queue_requests - state.handled_requests + + if pending_requests: + self._is_empty_cache = False + return False + + self._is_empty_cache = True + return True @override async def is_finished(self) -> bool: - # If anything is still available to fetch, the queue is not finished. + # If there are requests available to fetch, the queue is not finished. if not await self.is_empty(): return False async with self._lock: - await self._update_metadata(update_accessed_at=True) + # Check, if cache changed while waiting for the lock. + if self._is_empty_cache is not True: + return False - # The queue is finished when there are no pending requests and no in-progress requests. - return self._metadata.pending_request_count == 0 and not self._state.current_value.in_progress_requests + # If there are any in-progress requests, the queue is not finished. + return not self._state.current_value.in_progress_requests def _get_request_path(self, unique_key: str) -> Path: """Get the path to a specific request file. diff --git a/tests/unit/crawlers/_basic/test_basic_crawler.py b/tests/unit/crawlers/_basic/test_basic_crawler.py index e23df42886..ba53bd0a3a 100644 --- a/tests/unit/crawlers/_basic/test_basic_crawler.py +++ b/tests/unit/crawlers/_basic/test_basic_crawler.py @@ -143,8 +143,9 @@ async def handler(context: BasicCrawlingContext) -> None: async def test_no_new_tasks_while_only_request_in_progress() -> None: + concurrency = 4 crawler = BasicCrawler( - concurrency_settings=ConcurrencySettings(desired_concurrency=4, max_concurrency=4), + concurrency_settings=ConcurrencySettings(desired_concurrency=concurrency, max_concurrency=concurrency), ) request_manager = await crawler.get_request_manager() @@ -160,7 +161,8 @@ async def handler(context: BasicCrawlingContext) -> None: ) as fetch_counter: await crawler.run(['https://a.placeholder.com']) - fetch_counter.assert_called_once() + # `concurency` tasks can be scheduled if control was never yielded with `await` during task scheduling + assert fetch_counter.call_count <= 4 async def test_respects_no_retry() -> None: From 6433c378b58ffa8d233d3c232b05425bb6ade1a5 Mon Sep 17 00:00:00 2001 From: Max Bohomolov Date: Mon, 22 Jun 2026 17:12:41 +0000 Subject: [PATCH 3/7] fix typo --- tests/unit/crawlers/_basic/test_basic_crawler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/unit/crawlers/_basic/test_basic_crawler.py b/tests/unit/crawlers/_basic/test_basic_crawler.py index ba53bd0a3a..5344bf33d1 100644 --- a/tests/unit/crawlers/_basic/test_basic_crawler.py +++ b/tests/unit/crawlers/_basic/test_basic_crawler.py @@ -161,7 +161,7 @@ async def handler(context: BasicCrawlingContext) -> None: ) as fetch_counter: await crawler.run(['https://a.placeholder.com']) - # `concurency` tasks can be scheduled if control was never yielded with `await` during task scheduling + # `concurrency` tasks can be scheduled if control was never yielded with `await` during task scheduling assert fetch_counter.call_count <= 4 From 7760d6262e86cbf2695e266b47786a2b3da0fdc1 Mon Sep 17 00:00:00 2001 From: Max Bohomolov <34358312+Mantisus@users.noreply.github.com> Date: Tue, 23 Jun 2026 14:35:11 +0300 Subject: [PATCH 4/7] Apply suggestion from @vdusek Co-authored-by: Vlada Dusek --- .../storage_clients/_file_system/_request_queue_client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/crawlee/storage_clients/_file_system/_request_queue_client.py b/src/crawlee/storage_clients/_file_system/_request_queue_client.py index e950747e10..b0c37ecf22 100644 --- a/src/crawlee/storage_clients/_file_system/_request_queue_client.py +++ b/src/crawlee/storage_clients/_file_system/_request_queue_client.py @@ -606,7 +606,7 @@ async def is_empty(self) -> bool: # Fallback: check state for unhandled requests. await self._update_metadata(update_accessed_at=True) - # Check pending requests is state. + # Check pending requests in state. queue_requests = ( set(state.forefront_requests.keys()) | set(state.regular_requests.keys()) ) - state.in_progress_requests From 53619194ac66268a7f80a292d8dc2e81a96c41a6 Mon Sep 17 00:00:00 2001 From: Max Bohomolov Date: Tue, 23 Jun 2026 12:00:24 +0000 Subject: [PATCH 5/7] polish --- .../storage_clients/_memory/_request_queue_client.py | 10 ---------- .../storage_clients/_redis/_request_queue_client.py | 7 ++++--- tests/unit/crawlers/_basic/test_basic_crawler.py | 2 +- 3 files changed, 5 insertions(+), 14 deletions(-) diff --git a/src/crawlee/storage_clients/_memory/_request_queue_client.py b/src/crawlee/storage_clients/_memory/_request_queue_client.py index 2bcdd60766..ae98cc9ad6 100644 --- a/src/crawlee/storage_clients/_memory/_request_queue_client.py +++ b/src/crawlee/storage_clients/_memory/_request_queue_client.py @@ -307,11 +307,6 @@ async def reclaim_request( @override async def is_empty(self) -> bool: - """Check if the queue is empty. - - Returns: - True if the queue is empty, False otherwise. - """ await self._update_metadata(update_accessed_at=True) # Queue is empty if there are no pending requests. @@ -319,11 +314,6 @@ async def is_empty(self) -> bool: @override async def is_finished(self) -> bool: - """Check if the queue is finished. - - Returns: - True if the queue is finished, False otherwise. - """ # Queue is finished if it is empty and there are no in-progress requests. return await self.is_empty() and len(self._in_progress_requests) == 0 diff --git a/src/crawlee/storage_clients/_redis/_request_queue_client.py b/src/crawlee/storage_clients/_redis/_request_queue_client.py index 4cefa87789..914d4ba02a 100644 --- a/src/crawlee/storage_clients/_redis/_request_queue_client.py +++ b/src/crawlee/storage_clients/_redis/_request_queue_client.py @@ -512,11 +512,12 @@ async def is_empty(self) -> bool: @retry_on_error(RedisError) @override async def is_finished(self) -> bool: - is_empty = await self.is_empty() - metadata = await self.get_metadata() + if not await self.is_empty(): + return False - return is_empty and metadata.pending_request_count == 0 + metadata = await self.get_metadata() + return metadata.pending_request_count == 0 async def _load_scripts(self) -> None: """Ensure Lua scripts are loaded in Redis.""" diff --git a/tests/unit/crawlers/_basic/test_basic_crawler.py b/tests/unit/crawlers/_basic/test_basic_crawler.py index 5344bf33d1..5ca74f03a2 100644 --- a/tests/unit/crawlers/_basic/test_basic_crawler.py +++ b/tests/unit/crawlers/_basic/test_basic_crawler.py @@ -162,7 +162,7 @@ async def handler(context: BasicCrawlingContext) -> None: await crawler.run(['https://a.placeholder.com']) # `concurrency` tasks can be scheduled if control was never yielded with `await` during task scheduling - assert fetch_counter.call_count <= 4 + assert 1 <= fetch_counter.call_count <= 4 async def test_respects_no_retry() -> None: From e913621c3e9b63f4f93c2d6a3d28c9432310c91f Mon Sep 17 00:00:00 2001 From: Max Bohomolov Date: Tue, 23 Jun 2026 12:27:09 +0000 Subject: [PATCH 6/7] add warning --- .../storage_clients/_base/_request_queue_client.py | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/src/crawlee/storage_clients/_base/_request_queue_client.py b/src/crawlee/storage_clients/_base/_request_queue_client.py index df0fe996e5..bd3529031b 100644 --- a/src/crawlee/storage_clients/_base/_request_queue_client.py +++ b/src/crawlee/storage_clients/_base/_request_queue_client.py @@ -131,9 +131,19 @@ async def is_empty(self) -> bool: """ async def is_finished(self) -> bool: - """Check if the request queue is finished. That means the queue is empty and no requests are being processed. + """Check if the request queue is finished. + + A finished queue is empty and has no requests currently being processed. + + Warning: + This default only checks `is_empty`, which reports whether requests are available to fetch, not + whether requests are still being processed. It can therefore return `True` while requests are in + progress. Subclasses that track in-progress requests should override this method; all built-in + clients already do. Returns: True if the request queue is finished, False otherwise. """ + # TODO: Make this method abstract. + # https://github.com/apify/crawlee-python/issues/1985 return await self.is_empty() From d69fff24ecd43087e3cf4a07249b3534d54ebfdb Mon Sep 17 00:00:00 2001 From: Max Bohomolov Date: Thu, 25 Jun 2026 15:30:38 +0000 Subject: [PATCH 7/7] fix --- src/crawlee/storage_clients/_redis/_request_queue_client.py | 1 - tests/unit/crawlers/_basic/test_basic_crawler.py | 1 + 2 files changed, 1 insertion(+), 1 deletion(-) diff --git a/src/crawlee/storage_clients/_redis/_request_queue_client.py b/src/crawlee/storage_clients/_redis/_request_queue_client.py index 914d4ba02a..7290318023 100644 --- a/src/crawlee/storage_clients/_redis/_request_queue_client.py +++ b/src/crawlee/storage_clients/_redis/_request_queue_client.py @@ -512,7 +512,6 @@ async def is_empty(self) -> bool: @retry_on_error(RedisError) @override async def is_finished(self) -> bool: - if not await self.is_empty(): return False diff --git a/tests/unit/crawlers/_basic/test_basic_crawler.py b/tests/unit/crawlers/_basic/test_basic_crawler.py index 5ca74f03a2..0391d65843 100644 --- a/tests/unit/crawlers/_basic/test_basic_crawler.py +++ b/tests/unit/crawlers/_basic/test_basic_crawler.py @@ -143,6 +143,7 @@ async def handler(context: BasicCrawlingContext) -> None: async def test_no_new_tasks_while_only_request_in_progress() -> None: + """No new tasks should be scheduled while queue is empty and only one request is in progress.""" concurrency = 4 crawler = BasicCrawler( concurrency_settings=ConcurrencySettings(desired_concurrency=concurrency, max_concurrency=concurrency),