diff --git a/src/palace/manager/celery/tasks/playtime_entries.py b/src/palace/manager/celery/tasks/playtime_entries.py index 10170aed98..a54fc0e22a 100644 --- a/src/palace/manager/celery/tasks/playtime_entries.py +++ b/src/palace/manager/celery/tasks/playtime_entries.py @@ -2,7 +2,9 @@ import csv import os +import random import tempfile +import time import uuid from collections import defaultdict from collections.abc import Iterable @@ -27,6 +29,7 @@ from palace.manager.service.integration_registry.license_providers import ( LicenseProvidersRegistry, ) +from palace.manager.service.redis.models.lock import RedisLock from palace.manager.sqlalchemy.model.collection import Collection from palace.manager.sqlalchemy.model.identifier import Identifier from palace.manager.sqlalchemy.model.integration import IntegrationConfiguration @@ -145,6 +148,17 @@ def writerow(self, row: Iterable[Any]) -> Any: ... REPORT_DATE_FORMAT = "%m-%d-%Y" +# How long (seconds) to wait for the folder-creation lock before proceeding without it. +# Typed as float | int to match acquire_blocking's signature. +FOLDER_LOCK_ACQUIRE_TIMEOUT: float | int = 60 + +# Maximum seconds of random jitter added once per task invocation before the +# folder-creation loop. Independent CM groups share no Redis instance and therefore +# cannot coordinate via the lock alone; a single random offset per run spreads their +# arrival times across all folder levels without multiplying the delay by N data sources. +# Exposed as a module-level constant so tests can set it to 0. +FOLDER_CREATION_JITTER_MAX: int = 10 + @shared_task(queue=QueueNames.default, bind=True) def generate_playtime_report( @@ -184,6 +198,14 @@ def generate_playtime_report( # create directory hierarchy root_folder_id: str | None = google_drive_container.config.parent_folder_id() + redis_client = task.services.redis.client() + + # Jitter: a single random delay per task invocation spreads independent CM groups + # (which share no Redis instance) in time, reducing the probability that two groups + # race to create the same Google Drive folder. Taking it once before the loop keeps + # the per-data-source overhead constant regardless of how many sources are processed. + time.sleep(random.uniform(0, FOLDER_CREATION_JITTER_MAX)) + with task.session() as session: # get list of collections data_source_names = _fetch_distinct_eligible_data_source_names( @@ -223,12 +245,46 @@ def generate_playtime_report( reporting_name, str(start.year), ] - folder_results = google_drive.create_nested_folders_if_not_exist( - folders=nested_folders, - parent_folder_id=root_folder_id, + + # Acquire a distributed lock so that concurrent workers within the + # same Redis instance don't each create a duplicate folder hierarchy. + # Google Drive allows same-named folders and has list-API indexing + # latency, so a bare get-then-create is not safe under concurrent access. + folder_lock = RedisLock( + redis_client, + lock_name=[ + "playtime_report_folder", + root_folder_id or "default", + data_source_name, + ], + lock_timeout=timedelta(minutes=5), ) - # the leaf folder is the last path segment in the result list - leaf_folder = folder_results[-1] + try: + lock_acquired = folder_lock.acquire_blocking( + timeout=FOLDER_LOCK_ACQUIRE_TIMEOUT + ) + except Exception: + # Redis may be unavailable (connection error, timeout, etc.). + # Treat this the same as failing to acquire within the timeout: + # log a warning and proceed without the lock rather than + # aborting the entire report for this data source. + lock_acquired = False + if not lock_acquired: + task.log.warning( + f"Could not acquire folder creation lock for {data_source_name!r} " + f"within {FOLDER_LOCK_ACQUIRE_TIMEOUT} seconds. Proceeding without " + "lock — duplicate folders may be created." + ) + try: + folder_results = google_drive.create_nested_folders_if_not_exist( + folders=nested_folders, + parent_folder_id=root_folder_id, + ) + # the leaf folder is the last path segment in the result list + leaf_folder = folder_results[-1] + finally: + if lock_acquired: + folder_lock.release() # store file google_drive.create_file( diff --git a/src/palace/manager/service/google_drive/google_drive.py b/src/palace/manager/service/google_drive/google_drive.py index 7b8e41a0a8..04180cd522 100644 --- a/src/palace/manager/service/google_drive/google_drive.py +++ b/src/palace/manager/service/google_drive/google_drive.py @@ -18,17 +18,23 @@ def __init__(self, api_client: DriveResource) -> None: def get_file(self, name: str, parent_folder_id: str | None = None) -> File | None: """ - Return the first non-trashed file or folder with the given name. + Return the oldest non-trashed file or folder with the given name, or ``None``. + + Excluding trashed items prevents a folder moved to the Drive trash from + being mistaken for a live folder (which would cause uploads to land in a + trashed, invisible directory). + + Ordering by ``createdTime`` (ascending) ensures that when concurrent + workers race to create the same folder, every worker converges on the + same canonical (oldest) result regardless of which worker's ``create`` + call won the race. :param name: The exact name to search for. :param parent_folder_id: If provided, restrict the search to items whose parent is this folder/drive ID. - :return: The first matching, non-trashed ``File`` object, or ``None`` + :return: The oldest matching, non-trashed ``File`` object, or ``None`` if no match is found. """ - # Explicitly exclude trashed items so that a folder moved to the - # Drive trash is not mistaken for a live folder, which would cause - # uploads to land inside a trashed (invisible) directory. query = f"name = '{name}' and trashed = false" if parent_folder_id: @@ -40,6 +46,7 @@ def get_file(self, name: str, parent_folder_id: str | None = None) -> File | Non q=query, pageSize=10, fields="nextPageToken, files(*)", + orderBy="createdTime", supportsAllDrives=True, includeItemsFromAllDrives=True, ) diff --git a/tests/manager/celery/tasks/test_playtime_entries.py b/tests/manager/celery/tasks/test_playtime_entries.py index b0eb927e4f..5951e7b4a6 100644 --- a/tests/manager/celery/tasks/test_playtime_entries.py +++ b/tests/manager/celery/tasks/test_playtime_entries.py @@ -33,12 +33,14 @@ from palace.manager.service.integration_registry.license_providers import ( LicenseProvidersRegistry, ) +from palace.manager.service.redis.models.lock import RedisLock from palace.manager.sqlalchemy.model.collection import Collection from palace.manager.sqlalchemy.model.identifier import Equivalency, Identifier from palace.manager.sqlalchemy.model.library import Library from palace.manager.sqlalchemy.model.time_tracking import PlaytimeEntry, PlaytimeSummary from tests.fixtures.celery import CeleryFixture from tests.fixtures.database import DatabaseTransactionFixture +from tests.fixtures.redis import RedisFixture from tests.fixtures.services import ServicesFixture @@ -566,6 +568,7 @@ def test_generate_playtime_reports( db: DatabaseTransactionFixture, celery_fixture: CeleryFixture, services_fixture: ServicesFixture, + redis_fixture: RedisFixture, monkeypatch: pytest.MonkeyPatch, ): identifier = db.identifier() @@ -756,6 +759,10 @@ def mock_create_file( monkeypatch.setenv( Configuration.REPORTING_NAME_ENVIRONMENT_VARIABLE, reporting_name ) + monkeypatch.setattr( + "palace.manager.celery.tasks.playtime_entries.FOLDER_CREATION_JITTER_MAX", + 0, + ) # Act generate_playtime_report.delay().wait() @@ -959,6 +966,73 @@ def mock_create_file( ], } + def test_generate_playtime_report_folder_lock_contention( + self, + db: DatabaseTransactionFixture, + celery_fixture: CeleryFixture, + services_fixture: ServicesFixture, + redis_fixture: RedisFixture, + monkeypatch: pytest.MonkeyPatch, + caplog: pytest.LogCaptureFixture, + ): + """When the folder-creation lock is already held, the task logs a warning and proceeds anyway.""" + parent_folder_id = "palace-test" + reporting_name = "test cm" + + collection = db.collection( + "collection a", + protocol=OPDS2API, + settings=db.opds2_settings(data_source="ds_a"), + ) + library = db.default_library() + identifier = db.identifier() + playtime( + db.session, + identifier, + collection, + library, + dt1m(3), + 10, + "loan_id:1", + ) + + mock_google_drive_service = create_autospec(GoogleDriveService) + drive_container = services_fixture.services.google_drive() + drive_container.config.from_dict({"parent_folder_id": parent_folder_id}) + drive_container.service.override(mock_google_drive_service) + monkeypatch.setenv( + Configuration.REPORTING_NAME_ENVIRONMENT_VARIABLE, reporting_name + ) + + # Pre-acquire the folder-creation lock so the task cannot get it. + pre_lock = RedisLock( + redis_fixture.client, + lock_name=["playtime_report_folder", parent_folder_id, "ds_a"], + lock_timeout=timedelta(hours=1), + ) + assert pre_lock.acquire() + + # Disable jitter and shorten the blocking timeout so the test runs quickly. + monkeypatch.setattr( + "palace.manager.celery.tasks.playtime_entries.FOLDER_CREATION_JITTER_MAX", + 0, + ) + monkeypatch.setattr( + "palace.manager.celery.tasks.playtime_entries.FOLDER_LOCK_ACQUIRE_TIMEOUT", + 0.1, + ) + + with caplog.at_level("WARNING"): + generate_playtime_report.delay().wait() + + # The task should have logged a warning about the lock contention. + assert any( + "Could not acquire folder creation lock" in record.message + for record in caplog.records + ) + # Despite the lock timeout, the task should still have completed and uploaded a file. + assert mock_google_drive_service.create_file.call_count == 1 + @pytest.mark.parametrize( "eligible_collections,playtime_summaries,expected_ds_names", ( diff --git a/tests/manager/service/google_drive/test_google_drive.py b/tests/manager/service/google_drive/test_google_drive.py index 0eb499fecd..4f086ec04b 100644 --- a/tests/manager/service/google_drive/test_google_drive.py +++ b/tests/manager/service/google_drive/test_google_drive.py @@ -179,3 +179,46 @@ def test_create_existing_file_fails(self): stream=BytesIO(b"Hello world"), content_type="text/plain", ) + + def test_get_file_orders_by_created_time(self): + """get_file always requests results ordered by createdTime so that the + oldest (canonical) folder is returned when duplicates exist.""" + folder_name = "my-folder" + older_id = "older-id" + newer_id = "newer-id" + http_mock_sequence = HttpMockSequence( + [ + ( + {"status": "200"}, + json.dumps( + { + "files": [ + { + "kind": "drive#file", + "id": older_id, + "name": folder_name, + "mimeType": "application/vnd.google-apps.folder", + }, + { + "kind": "drive#file", + "id": newer_id, + "name": folder_name, + "mimeType": "application/vnd.google-apps.folder", + }, + ] + } + ), + ), + ] + ) + service = drive_service(http=http_mock_sequence) + + result = service.get_file(name=folder_name) + + # The first result (oldest, per orderBy=createdTime) is returned. + assert result is not None + assert result["id"] == older_id + + # Verify orderBy=createdTime is present in the request URI. + request_uri = unquote_plus(http_mock_sequence.request_sequence[0][0]) + assert "orderBy=createdTime" in request_uri