Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 19 additions & 1 deletion src/crawlee/storage_clients/_base/_request_queue_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,26 @@ async def reclaim_request(

@abstractmethod
async def is_empty(self) -> bool:
"""Check if the request queue is empty.
"""Check if the request queue is empty. That means there are no requests available to fetch.

Returns:
True if the request queue is empty, False otherwise.
"""

async def is_finished(self) -> bool:
"""Check if the request queue is finished.

A finished queue is empty and has no requests currently being processed.

Warning:
This default only checks `is_empty`, which reports whether requests are available to fetch, not
whether requests are still being processed. It can therefore return `True` while requests are in
progress. Subclasses that track in-progress requests should override this method; all built-in
clients already do.

Returns:
True if the request queue is finished, False otherwise.
"""
# TODO: Make this method abstract.
# https://github.com/apify/crawlee-python/issues/1985
return await self.is_empty()
Comment thread
Mantisus marked this conversation as resolved.
34 changes: 24 additions & 10 deletions src/crawlee/storage_clients/_file_system/_request_queue_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -476,6 +476,8 @@ async def fetch_next_request(self) -> Request | None:

if next_request is not None:
state.in_progress_requests.add(next_request.unique_key)
# `in_progress_requests` is updated, so we need to invalidate the `is_empty` cache.
self._is_empty_cache = None

return next_request

Expand Down Expand Up @@ -592,34 +594,46 @@ async def is_empty(self) -> bool:

state = self._state.current_value

# If there are in-progress requests, return False immediately.
if len(state.in_progress_requests) > 0:
self._is_empty_cache = False
return False

# If we have a cached requests, check them first (fast path).
if self._request_cache:
for req in self._request_cache:
if req.unique_key not in state.handled_requests:
self._is_empty_cache = False
return False
self._is_empty_cache = True
return len(state.in_progress_requests) == 0
return True

# Fallback: check state for unhandled requests.
await self._update_metadata(update_accessed_at=True)

# Check if there are any requests that are not handled
all_requests = set(state.forefront_requests.keys()) | set(state.regular_requests.keys())
unhandled_requests = all_requests - state.handled_requests
# Check pending requests in state.
queue_requests = (
set(state.forefront_requests.keys()) | set(state.regular_requests.keys())
) - state.in_progress_requests

if unhandled_requests:
pending_requests = queue_requests - state.handled_requests

if pending_requests:
self._is_empty_cache = False
return False

self._is_empty_cache = True
return True

@override
async def is_finished(self) -> bool:
# If there are requests available to fetch, the queue is not finished.
if not await self.is_empty():
return False

async with self._lock:
# Check, if cache changed while waiting for the lock.
if self._is_empty_cache is not True:
return False

# If there are any in-progress requests, the queue is not finished.
return not self._state.current_value.in_progress_requests

def _get_request_path(self, unique_key: str) -> Path:
"""Get the path to a specific request file.

Expand Down
14 changes: 7 additions & 7 deletions src/crawlee/storage_clients/_memory/_request_queue_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -307,15 +307,15 @@ async def reclaim_request(

@override
async def is_empty(self) -> bool:
"""Check if the queue is empty.

Returns:
True if the queue is empty, False otherwise.
"""
await self._update_metadata(update_accessed_at=True)

# Queue is empty if there are no pending requests and no requests in progress.
return len(self._pending_requests) == 0 and len(self._in_progress_requests) == 0
# Queue is empty if there are no pending requests.
return len(self._pending_requests) == 0

@override
async def is_finished(self) -> bool:
# Queue is finished if it is empty and there are no in-progress requests.
return await self.is_empty() and len(self._in_progress_requests) == 0

async def _update_metadata(
self,
Expand Down
17 changes: 11 additions & 6 deletions src/crawlee/storage_clients/_redis/_request_queue_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -496,11 +496,7 @@ async def reclaim_request(
@retry_on_error(RedisError)
@override
async def is_empty(self) -> bool:
"""Check if the queue is empty.

Returns:
True if the queue is empty, False otherwise.
"""
# Requests buffered for fetching mean the queue is not empty.
if self._pending_fetch_cache:
return False

Expand All @@ -509,8 +505,17 @@ async def is_empty(self) -> bool:
await self._reclaim_stale_requests()
self._next_reclaim_stale = datetime.now(tz=timezone.utc) + self._RECLAIM_INTERVAL

metadata = await self.get_metadata()
# Check if there are any requests in the queue.
requests_in_queue = await await_redis_response(self._redis.llen(self._queue_key))
return requests_in_queue == 0

@retry_on_error(RedisError)
@override
async def is_finished(self) -> bool:
if not await self.is_empty():
return False

metadata = await self.get_metadata()
return metadata.pending_request_count == 0

async def _load_scripts(self) -> None:
Expand Down
29 changes: 27 additions & 2 deletions src/crawlee/storage_clients/_sql/_request_queue_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -595,10 +595,34 @@ async def reclaim_request(
@retry_on_error(SQLAlchemyError)
@override
async def is_empty(self) -> bool:
# Check in-memory cache for requests
# Requests buffered for fetching mean the queue is not empty.
if self._pending_fetch_cache:
return False

now = datetime.now(timezone.utc)

# Check if there are any unhandled requests that are not blocked.
async with self.get_session(with_simple_commit=True) as session:
stmt = select(
exists().where(
self._ITEM_TABLE.request_queue_id == self._id,
self._ITEM_TABLE.is_handled == False, # noqa: E712
or_(self._ITEM_TABLE.time_blocked_until.is_(None), self._ITEM_TABLE.time_blocked_until < now),
)
)
result = await session.execute(stmt)

await self._add_buffer_record(session)

return not result.scalar()

@retry_on_error(SQLAlchemyError)
@override
async def is_finished(self) -> bool:
# If the queue is not empty, it is not finished
if not await self.is_empty():

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is_finished calls await self.is_empty() here, which opens a DB session, and then opens a second session below for get_metadata() and the remaining queries. Since the autoscaled pool polls is_finished/is_empty while scheduling requests, this could have a performance impact.

Maybe we could handle the empty check directly inside is_finished, using only a single session?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Getting a session from the pool is quite cheap, but it can actually impact performance if the pool is small. However, this will only become apparent when the queue is empty but not yet finished.

But merging is_empty and get_metadata into a single session in is_finished will make the method harder to read.

If we need to optimize these methods, I would consider removing the _add_buffer_record calls in is_empty and is_finished, just to update accessed_at in the metadata. This will have a greater impact on performance.

return False

metadata = await self.get_metadata()

async with self.get_session(with_simple_commit=True) as session:
Expand Down Expand Up @@ -629,7 +653,8 @@ async def is_empty(self) -> bool:
has_pending_buffer_updates = buffer_result.scalar()

await self._add_buffer_record(session)
# If there are no pending requests and no buffered updates, the queue is empty

# If there are no pending requests and no buffered updates, the queue is finished
return not has_pending_buffer_updates

# There are pending requests (may be inaccurate), ensure recalculated metadata
Expand Down
17 changes: 8 additions & 9 deletions src/crawlee/storages/_request_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -316,9 +316,9 @@ async def reclaim_request(
async def is_empty(self) -> bool:
"""Check if the request queue is empty.

An empty queue means that there are no requests currently in the queue, either pending or being processed.
However, this does not necessarily mean that the crawling operation is finished, as there still might be
tasks that could add additional requests to the queue.
An empty queue means that there are no requests currently available to fetch. However, this does not
necessarily mean that the crawling operation is finished, as there still might be requests being processed
or tasks that could add additional requests to the queue.

Returns:
True if the request queue is empty, False otherwise.
Expand All @@ -328,19 +328,18 @@ async def is_empty(self) -> bool:
async def is_finished(self) -> bool:
"""Check if the request queue is finished.

A finished queue means that all requests in the queue have been processed (the queue is empty) and there
are no more tasks that could add additional requests to the queue. This is the definitive way to check
if a crawling operation is complete.
A finished queue means that all requests have been processed and there are no more tasks that could add
additional requests to the queue. This is the definitive way to check if a crawling operation is complete.

Returns:
True if the request queue is finished (empty and no pending add operations), False otherwise.
True if the request queue is finished and no pending add operations, False otherwise.
"""
if self._add_requests_tasks:
logger.debug('Background add requests tasks are still in progress.')
return False

if await self.is_empty():
logger.debug('The request queue is empty.')
if await self._client.is_finished():
logger.debug('The request queue is finished.')
return True

return False
Expand Down
38 changes: 34 additions & 4 deletions tests/unit/crawlers/_basic/test_basic_crawler.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,30 @@ async def handler(context: BasicCrawlingContext) -> None:
]


async def test_no_new_tasks_while_only_request_in_progress() -> None:
Comment thread
Mantisus marked this conversation as resolved.
"""No new tasks should be scheduled while queue is empty and only one request is in progress."""
concurrency = 4
crawler = BasicCrawler(
concurrency_settings=ConcurrencySettings(desired_concurrency=concurrency, max_concurrency=concurrency),
)

request_manager = await crawler.get_request_manager()

@crawler.router.default_handler
async def handler(context: BasicCrawlingContext) -> None:
await asyncio.sleep(0.2)

with patch.object(
request_manager,
'fetch_next_request',
wraps=request_manager.fetch_next_request,
) as fetch_counter:
await crawler.run(['https://a.placeholder.com'])

# `concurrency` tasks can be scheduled if control was never yielded with `await` during task scheduling
assert 1 <= fetch_counter.call_count <= 4
Comment thread
vdusek marked this conversation as resolved.


async def test_respects_no_retry() -> None:
crawler = BasicCrawler(max_request_retries=2)
calls = list[str]()
Expand Down Expand Up @@ -1883,10 +1907,16 @@ class _CrawlerInput:


def _process_run_crawlers(crawler_inputs: list[_CrawlerInput], storage_dir: str) -> list[StatisticsState]:
return [
asyncio.run(_run_crawler(crawler_id=crawler_input.id, requests=crawler_input.requests, storage_dir=storage_dir))
for crawler_input in crawler_inputs
]
states = list[StatisticsState]()
for crawler_input in crawler_inputs:
states.append(
asyncio.run(
_run_crawler(crawler_id=crawler_input.id, requests=crawler_input.requests, storage_dir=storage_dir)
)
)
# Each crawler runs in its own event loop. Drop the cached storage instances between runs.
service_locator.storage_instance_manager.clear_cache()
return states


async def test_crawler_state_persistence(tmp_path: Path) -> None:
Expand Down
20 changes: 15 additions & 5 deletions tests/unit/storages/test_request_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -544,23 +544,33 @@ async def test_reclaim_request_with_forefront(rq: RequestQueue) -> None:
assert next_request.url == 'https://example.com/first'


async def test_is_empty(rq: RequestQueue) -> None:
"""Test checking if a request queue is empty."""
# Initially the queue should be empty
async def test_is_empty_and_is_finished(rq: RequestQueue) -> None:
"""Test checking if a request queue is empty and finished."""
# Initially the queue should be empty and finished
assert await rq.is_empty() is True
assert await rq.is_finished() is True

# Add a request
await rq.add_request('https://example.com')
assert await rq.is_empty() is False
assert await rq.is_finished() is False

# Fetch and handle the request
# Fetch the request
request = await rq.fetch_next_request()

assert request is not None

# Queue is empty, because there is no request for fetching
assert await rq.is_empty() is True
# Queue is not finished, because there is a request being processed
assert await rq.is_finished() is False

# Mark the request as handled
await rq.mark_request_as_handled(request)

# Queue should be empty again
# Queue should be empty and finished again
assert await rq.is_empty() is True
assert await rq.is_finished() is True


@pytest.mark.parametrize(
Expand Down
Loading