From 1434925cc96c1c366ed014f0fded6cb49a5dc1d1 Mon Sep 17 00:00:00 2001 From: Daniel Bernstein Date: Thu, 21 May 2026 17:15:28 -0700 Subject: [PATCH 1/5] Serialise Google Drive folder creation with a distributed Redis lock (PP-XXXX) Multiple Celery workers running generate_playtime_report concurrently all call create_nested_folders_if_not_exist at the same time. Google Drive allows same-named folders and its list-API has indexing latency, so all workers see "folder not found" and each creates a duplicate. A distributed Redis lock scoped per (root_folder_id, data_source_name) serialises the check-and-create step. If a worker cannot acquire the lock within FOLDER_LOCK_ACQUIRE_TIMEOUT seconds it logs a warning and proceeds without the lock (graceful degradation). Co-Authored-By: Claude Sonnet 4.6 --- .../manager/celery/tasks/playtime_entries.py | 44 +++++++++++-- .../celery/tasks/test_playtime_entries.py | 66 +++++++++++++++++++ 2 files changed, 105 insertions(+), 5 deletions(-) diff --git a/src/palace/manager/celery/tasks/playtime_entries.py b/src/palace/manager/celery/tasks/playtime_entries.py index 10170aed98..3c578e7fb2 100644 --- a/src/palace/manager/celery/tasks/playtime_entries.py +++ b/src/palace/manager/celery/tasks/playtime_entries.py @@ -27,6 +27,7 @@ from palace.manager.service.integration_registry.license_providers import ( LicenseProvidersRegistry, ) +from palace.manager.service.redis.models.lock import RedisLock from palace.manager.sqlalchemy.model.collection import Collection from palace.manager.sqlalchemy.model.identifier import Identifier from palace.manager.sqlalchemy.model.integration import IntegrationConfiguration @@ -145,6 +146,10 @@ def writerow(self, row: Iterable[Any]) -> Any: ... REPORT_DATE_FORMAT = "%m-%d-%Y" +# How long (seconds) to wait for the folder-creation lock before proceeding without it. +# Exposed as a module-level constant so tests can monkeypatch a short value. +FOLDER_LOCK_ACQUIRE_TIMEOUT: int = 60 + @shared_task(queue=QueueNames.default, bind=True) def generate_playtime_report( @@ -184,6 +189,8 @@ def generate_playtime_report( # create directory hierarchy root_folder_id: str | None = google_drive_container.config.parent_folder_id() + redis_client = task.services.redis.client() + with task.session() as session: # get list of collections data_source_names = _fetch_distinct_eligible_data_source_names( @@ -223,12 +230,39 @@ def generate_playtime_report( reporting_name, str(start.year), ] - folder_results = google_drive.create_nested_folders_if_not_exist( - folders=nested_folders, - parent_folder_id=root_folder_id, + + # Acquire a distributed lock so that concurrent workers don't each + # create a duplicate folder hierarchy. Google Drive allows same-named + # folders and has list-API indexing latency, so a bare + # get-then-create is not safe under concurrent access. + folder_lock = RedisLock( + redis_client, + lock_name=[ + "playtime_report_folder", + root_folder_id or "default", + data_source_name, + ], + lock_timeout=timedelta(minutes=5), + ) + lock_acquired = folder_lock.acquire_blocking( + timeout=FOLDER_LOCK_ACQUIRE_TIMEOUT ) - # the leaf folder is the last path segment in the result list - leaf_folder = folder_results[-1] + if not lock_acquired: + task.log.warning( + f"Could not acquire folder creation lock for {data_source_name!r} " + f"within {FOLDER_LOCK_ACQUIRE_TIMEOUT} seconds. Proceeding without " + "lock — duplicate folders may be created." + ) + try: + folder_results = google_drive.create_nested_folders_if_not_exist( + folders=nested_folders, + parent_folder_id=root_folder_id, + ) + # the leaf folder is the last path segment in the result list + leaf_folder = folder_results[-1] + finally: + if lock_acquired: + folder_lock.release() # store file google_drive.create_file( diff --git a/tests/manager/celery/tasks/test_playtime_entries.py b/tests/manager/celery/tasks/test_playtime_entries.py index b0eb927e4f..51594e45bd 100644 --- a/tests/manager/celery/tasks/test_playtime_entries.py +++ b/tests/manager/celery/tasks/test_playtime_entries.py @@ -33,12 +33,14 @@ from palace.manager.service.integration_registry.license_providers import ( LicenseProvidersRegistry, ) +from palace.manager.service.redis.models.lock import RedisLock from palace.manager.sqlalchemy.model.collection import Collection from palace.manager.sqlalchemy.model.identifier import Equivalency, Identifier from palace.manager.sqlalchemy.model.library import Library from palace.manager.sqlalchemy.model.time_tracking import PlaytimeEntry, PlaytimeSummary from tests.fixtures.celery import CeleryFixture from tests.fixtures.database import DatabaseTransactionFixture +from tests.fixtures.redis import RedisFixture from tests.fixtures.services import ServicesFixture @@ -566,6 +568,7 @@ def test_generate_playtime_reports( db: DatabaseTransactionFixture, celery_fixture: CeleryFixture, services_fixture: ServicesFixture, + redis_fixture: RedisFixture, monkeypatch: pytest.MonkeyPatch, ): identifier = db.identifier() @@ -959,6 +962,69 @@ def mock_create_file( ], } + def test_generate_playtime_report_folder_lock_contention( + self, + db: DatabaseTransactionFixture, + celery_fixture: CeleryFixture, + services_fixture: ServicesFixture, + redis_fixture: RedisFixture, + monkeypatch: pytest.MonkeyPatch, + caplog: pytest.LogCaptureFixture, + ): + """When the folder-creation lock is already held, the task logs a warning and proceeds anyway.""" + parent_folder_id = "palace-test" + reporting_name = "test cm" + + collection = db.collection( + "collection a", + protocol=OPDS2API, + settings=db.opds2_settings(data_source="ds_a"), + ) + library = db.default_library() + identifier = db.identifier() + playtime( + db.session, + identifier, + collection, + library, + dt1m(3), + 10, + "loan_id:1", + ) + + mock_google_drive_service = create_autospec(GoogleDriveService) + drive_container = services_fixture.services.google_drive() + drive_container.config.from_dict({"parent_folder_id": parent_folder_id}) + drive_container.service.override(mock_google_drive_service) + monkeypatch.setenv( + Configuration.REPORTING_NAME_ENVIRONMENT_VARIABLE, reporting_name + ) + + # Pre-acquire the folder-creation lock so the task cannot get it. + pre_lock = RedisLock( + redis_fixture.client, + lock_name=["playtime_report_folder", parent_folder_id, "ds_a"], + lock_timeout=timedelta(hours=1), + ) + assert pre_lock.acquire() + + # Shorten the task's blocking timeout so the test runs quickly. + monkeypatch.setattr( + "palace.manager.celery.tasks.playtime_entries.FOLDER_LOCK_ACQUIRE_TIMEOUT", + 0.1, + ) + + with caplog.at_level("WARNING"): + generate_playtime_report.delay().wait() + + # The task should have logged a warning about the lock contention. + assert any( + "Could not acquire folder creation lock" in record.message + for record in caplog.records + ) + # Despite the lock timeout, the task should still have completed and uploaded a file. + assert mock_google_drive_service.create_file.call_count == 1 + @pytest.mark.parametrize( "eligible_collections,playtime_summaries,expected_ds_names", ( From 6e5d38922c1466708b780756879078ea4809daf7 Mon Sep 17 00:00:00 2001 From: Daniel Bernstein Date: Thu, 21 May 2026 21:10:25 -0700 Subject: [PATCH 2/5] Add jitter + pick-oldest to prevent cross-group Google Drive folder races MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Three independent layers now guard against duplicate folder creation: 1. Redis lock (existing) — serialises workers within the same Redis instance (i.e. the same CM group), eliminating intra-group races entirely. 2. Random jitter (new) — each worker sleeps random.uniform(0, 30) seconds before acquiring the lock, spreading independent CM groups in time and reducing the probability that two groups race to create the same folder. 3. Pick-oldest (new) — get_file now passes orderBy=createdTime to files.list, so it always returns the oldest matching folder first. If two groups do race and each creates a copy, every worker converges on the same canonical (oldest) folder for all subsequent levels and the file upload. Orphaned empty duplicates may exist but no files are misplaced. Co-Authored-By: Claude Sonnet 4.6 --- .../manager/celery/tasks/playtime_entries.py | 21 +++++++-- .../service/google_drive/google_drive.py | 17 +++++--- .../celery/tasks/test_playtime_entries.py | 10 ++++- .../service/google_drive/test_google_drive.py | 43 +++++++++++++++++++ 4 files changed, 81 insertions(+), 10 deletions(-) diff --git a/src/palace/manager/celery/tasks/playtime_entries.py b/src/palace/manager/celery/tasks/playtime_entries.py index 3c578e7fb2..31dbe2e9e6 100644 --- a/src/palace/manager/celery/tasks/playtime_entries.py +++ b/src/palace/manager/celery/tasks/playtime_entries.py @@ -2,7 +2,9 @@ import csv import os +import random import tempfile +import time import uuid from collections import defaultdict from collections.abc import Iterable @@ -150,6 +152,13 @@ def writerow(self, row: Iterable[Any]) -> Any: ... # Exposed as a module-level constant so tests can monkeypatch a short value. FOLDER_LOCK_ACQUIRE_TIMEOUT: int = 60 +# Maximum seconds of random jitter added before acquiring the folder-creation lock. +# Independent CM groups share no Redis instance and therefore cannot coordinate via +# the lock alone; spreading their start times reduces the probability that two groups +# race to create the same Google Drive folder at the same moment. +# Exposed as a module-level constant so tests can set it to 0. +FOLDER_CREATION_JITTER_MAX: int = 30 + @shared_task(queue=QueueNames.default, bind=True) def generate_playtime_report( @@ -231,10 +240,14 @@ def generate_playtime_report( str(start.year), ] - # Acquire a distributed lock so that concurrent workers don't each - # create a duplicate folder hierarchy. Google Drive allows same-named - # folders and has list-API indexing latency, so a bare - # get-then-create is not safe under concurrent access. + # Jitter: spread out independent CM groups (which share no Redis) + # so they are less likely to race when creating the same folder. + time.sleep(random.uniform(0, FOLDER_CREATION_JITTER_MAX)) + + # Acquire a distributed lock so that concurrent workers within the + # same Redis instance don't each create a duplicate folder hierarchy. + # Google Drive allows same-named folders and has list-API indexing + # latency, so a bare get-then-create is not safe under concurrent access. folder_lock = RedisLock( redis_client, lock_name=[ diff --git a/src/palace/manager/service/google_drive/google_drive.py b/src/palace/manager/service/google_drive/google_drive.py index 7b8e41a0a8..04180cd522 100644 --- a/src/palace/manager/service/google_drive/google_drive.py +++ b/src/palace/manager/service/google_drive/google_drive.py @@ -18,17 +18,23 @@ def __init__(self, api_client: DriveResource) -> None: def get_file(self, name: str, parent_folder_id: str | None = None) -> File | None: """ - Return the first non-trashed file or folder with the given name. + Return the oldest non-trashed file or folder with the given name, or ``None``. + + Excluding trashed items prevents a folder moved to the Drive trash from + being mistaken for a live folder (which would cause uploads to land in a + trashed, invisible directory). + + Ordering by ``createdTime`` (ascending) ensures that when concurrent + workers race to create the same folder, every worker converges on the + same canonical (oldest) result regardless of which worker's ``create`` + call won the race. :param name: The exact name to search for. :param parent_folder_id: If provided, restrict the search to items whose parent is this folder/drive ID. - :return: The first matching, non-trashed ``File`` object, or ``None`` + :return: The oldest matching, non-trashed ``File`` object, or ``None`` if no match is found. """ - # Explicitly exclude trashed items so that a folder moved to the - # Drive trash is not mistaken for a live folder, which would cause - # uploads to land inside a trashed (invisible) directory. query = f"name = '{name}' and trashed = false" if parent_folder_id: @@ -40,6 +46,7 @@ def get_file(self, name: str, parent_folder_id: str | None = None) -> File | Non q=query, pageSize=10, fields="nextPageToken, files(*)", + orderBy="createdTime", supportsAllDrives=True, includeItemsFromAllDrives=True, ) diff --git a/tests/manager/celery/tasks/test_playtime_entries.py b/tests/manager/celery/tasks/test_playtime_entries.py index 51594e45bd..5951e7b4a6 100644 --- a/tests/manager/celery/tasks/test_playtime_entries.py +++ b/tests/manager/celery/tasks/test_playtime_entries.py @@ -759,6 +759,10 @@ def mock_create_file( monkeypatch.setenv( Configuration.REPORTING_NAME_ENVIRONMENT_VARIABLE, reporting_name ) + monkeypatch.setattr( + "palace.manager.celery.tasks.playtime_entries.FOLDER_CREATION_JITTER_MAX", + 0, + ) # Act generate_playtime_report.delay().wait() @@ -1008,7 +1012,11 @@ def test_generate_playtime_report_folder_lock_contention( ) assert pre_lock.acquire() - # Shorten the task's blocking timeout so the test runs quickly. + # Disable jitter and shorten the blocking timeout so the test runs quickly. + monkeypatch.setattr( + "palace.manager.celery.tasks.playtime_entries.FOLDER_CREATION_JITTER_MAX", + 0, + ) monkeypatch.setattr( "palace.manager.celery.tasks.playtime_entries.FOLDER_LOCK_ACQUIRE_TIMEOUT", 0.1, diff --git a/tests/manager/service/google_drive/test_google_drive.py b/tests/manager/service/google_drive/test_google_drive.py index 0eb499fecd..4f086ec04b 100644 --- a/tests/manager/service/google_drive/test_google_drive.py +++ b/tests/manager/service/google_drive/test_google_drive.py @@ -179,3 +179,46 @@ def test_create_existing_file_fails(self): stream=BytesIO(b"Hello world"), content_type="text/plain", ) + + def test_get_file_orders_by_created_time(self): + """get_file always requests results ordered by createdTime so that the + oldest (canonical) folder is returned when duplicates exist.""" + folder_name = "my-folder" + older_id = "older-id" + newer_id = "newer-id" + http_mock_sequence = HttpMockSequence( + [ + ( + {"status": "200"}, + json.dumps( + { + "files": [ + { + "kind": "drive#file", + "id": older_id, + "name": folder_name, + "mimeType": "application/vnd.google-apps.folder", + }, + { + "kind": "drive#file", + "id": newer_id, + "name": folder_name, + "mimeType": "application/vnd.google-apps.folder", + }, + ] + } + ), + ), + ] + ) + service = drive_service(http=http_mock_sequence) + + result = service.get_file(name=folder_name) + + # The first result (oldest, per orderBy=createdTime) is returned. + assert result is not None + assert result["id"] == older_id + + # Verify orderBy=createdTime is present in the request URI. + request_uri = unquote_plus(http_mock_sequence.request_sequence[0][0]) + assert "orderBy=createdTime" in request_uri From c2033244cfa507c6154420de9d85751f57687664 Mon Sep 17 00:00:00 2001 From: Daniel Bernstein Date: Wed, 27 May 2026 13:51:16 -0700 Subject: [PATCH 3/5] Fix FOLDER_LOCK_ACQUIRE_TIMEOUT type annotation and move jitter before loop - Change FOLDER_LOCK_ACQUIRE_TIMEOUT type from int to float | int so it matches acquire_blocking's signature and tests can monkeypatch it to a small float (e.g. 0.1) without mypy complaints. - Reduce FOLDER_CREATION_JITTER_MAX from 30 to 10 seconds. - Move the single jitter sleep before the data-source loop so the delay is incurred once per task invocation rather than once per data source, keeping per-data-source overhead constant. Co-Authored-By: Claude Sonnet 4.6 --- .../manager/celery/tasks/playtime_entries.py | 27 ++++++++++--------- 1 file changed, 15 insertions(+), 12 deletions(-) diff --git a/src/palace/manager/celery/tasks/playtime_entries.py b/src/palace/manager/celery/tasks/playtime_entries.py index 31dbe2e9e6..c2706b5c6d 100644 --- a/src/palace/manager/celery/tasks/playtime_entries.py +++ b/src/palace/manager/celery/tasks/playtime_entries.py @@ -149,15 +149,16 @@ def writerow(self, row: Iterable[Any]) -> Any: ... REPORT_DATE_FORMAT = "%m-%d-%Y" # How long (seconds) to wait for the folder-creation lock before proceeding without it. -# Exposed as a module-level constant so tests can monkeypatch a short value. -FOLDER_LOCK_ACQUIRE_TIMEOUT: int = 60 - -# Maximum seconds of random jitter added before acquiring the folder-creation lock. -# Independent CM groups share no Redis instance and therefore cannot coordinate via -# the lock alone; spreading their start times reduces the probability that two groups -# race to create the same Google Drive folder at the same moment. +# Typed as float | int to match acquire_blocking's signature; tests monkeypatch this +# to a small float (e.g. 0.1) to avoid waiting a full 60 seconds. +FOLDER_LOCK_ACQUIRE_TIMEOUT: float | int = 60 + +# Maximum seconds of random jitter added once per task invocation before the +# folder-creation loop. Independent CM groups share no Redis instance and therefore +# cannot coordinate via the lock alone; a single random offset per run spreads their +# arrival times across all folder levels without multiplying the delay by N data sources. # Exposed as a module-level constant so tests can set it to 0. -FOLDER_CREATION_JITTER_MAX: int = 30 +FOLDER_CREATION_JITTER_MAX: int = 10 @shared_task(queue=QueueNames.default, bind=True) @@ -200,6 +201,12 @@ def generate_playtime_report( redis_client = task.services.redis.client() + # Jitter: a single random delay per task invocation spreads independent CM groups + # (which share no Redis instance) in time, reducing the probability that two groups + # race to create the same Google Drive folder. Taking it once before the loop keeps + # the per-data-source overhead constant regardless of how many sources are processed. + time.sleep(random.uniform(0, FOLDER_CREATION_JITTER_MAX)) + with task.session() as session: # get list of collections data_source_names = _fetch_distinct_eligible_data_source_names( @@ -240,10 +247,6 @@ def generate_playtime_report( str(start.year), ] - # Jitter: spread out independent CM groups (which share no Redis) - # so they are less likely to race when creating the same folder. - time.sleep(random.uniform(0, FOLDER_CREATION_JITTER_MAX)) - # Acquire a distributed lock so that concurrent workers within the # same Redis instance don't each create a duplicate folder hierarchy. # Google Drive allows same-named folders and has list-API indexing From 6cad07a7b114f082870845b8952c9c73c1145cb4 Mon Sep 17 00:00:00 2001 From: Daniel Bernstein Date: Fri, 29 May 2026 10:56:11 -0700 Subject: [PATCH 4/5] Remove implementation detail from FOLDER_LOCK_ACQUIRE_TIMEOUT comment Co-Authored-By: Claude Sonnet 4.6 --- src/palace/manager/celery/tasks/playtime_entries.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/palace/manager/celery/tasks/playtime_entries.py b/src/palace/manager/celery/tasks/playtime_entries.py index c2706b5c6d..9edc1569b8 100644 --- a/src/palace/manager/celery/tasks/playtime_entries.py +++ b/src/palace/manager/celery/tasks/playtime_entries.py @@ -149,8 +149,7 @@ def writerow(self, row: Iterable[Any]) -> Any: ... REPORT_DATE_FORMAT = "%m-%d-%Y" # How long (seconds) to wait for the folder-creation lock before proceeding without it. -# Typed as float | int to match acquire_blocking's signature; tests monkeypatch this -# to a small float (e.g. 0.1) to avoid waiting a full 60 seconds. +# Typed as float | int to match acquire_blocking's signature. FOLDER_LOCK_ACQUIRE_TIMEOUT: float | int = 60 # Maximum seconds of random jitter added once per task invocation before the From 436a21046ed4ab9435d7a9b3c5b481409dbbb85a Mon Sep 17 00:00:00 2001 From: Daniel Bernstein Date: Fri, 29 May 2026 11:00:11 -0700 Subject: [PATCH 5/5] Handle Redis errors during lock acquisition as graceful degradation MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit If Redis is unavailable (connection error, timeout, etc.) acquire_blocking raises rather than returning False, bypassing the fallback path entirely. Catching the exception and treating it as lock_acquired=False ensures the task proceeds without the lock — logging a warning — instead of aborting. Co-Authored-By: Claude Sonnet 4.6 --- src/palace/manager/celery/tasks/playtime_entries.py | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/src/palace/manager/celery/tasks/playtime_entries.py b/src/palace/manager/celery/tasks/playtime_entries.py index 9edc1569b8..a54fc0e22a 100644 --- a/src/palace/manager/celery/tasks/playtime_entries.py +++ b/src/palace/manager/celery/tasks/playtime_entries.py @@ -259,9 +259,16 @@ def generate_playtime_report( ], lock_timeout=timedelta(minutes=5), ) - lock_acquired = folder_lock.acquire_blocking( - timeout=FOLDER_LOCK_ACQUIRE_TIMEOUT - ) + try: + lock_acquired = folder_lock.acquire_blocking( + timeout=FOLDER_LOCK_ACQUIRE_TIMEOUT + ) + except Exception: + # Redis may be unavailable (connection error, timeout, etc.). + # Treat this the same as failing to acquire within the timeout: + # log a warning and proceed without the lock rather than + # aborting the entire report for this data source. + lock_acquired = False if not lock_acquired: task.log.warning( f"Could not acquire folder creation lock for {data_source_name!r} "