Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 61 additions & 5 deletions src/palace/manager/celery/tasks/playtime_entries.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@

import csv
import os
import random
import tempfile
import time
import uuid
from collections import defaultdict
from collections.abc import Iterable
Expand All @@ -27,6 +29,7 @@
from palace.manager.service.integration_registry.license_providers import (
LicenseProvidersRegistry,
)
from palace.manager.service.redis.models.lock import RedisLock
from palace.manager.sqlalchemy.model.collection import Collection
from palace.manager.sqlalchemy.model.identifier import Identifier
from palace.manager.sqlalchemy.model.integration import IntegrationConfiguration
Expand Down Expand Up @@ -145,6 +148,17 @@ def writerow(self, row: Iterable[Any]) -> Any: ...

REPORT_DATE_FORMAT = "%m-%d-%Y"

# How long (seconds) to wait for the folder-creation lock before proceeding without it.
# Typed as float | int to match acquire_blocking's signature.
FOLDER_LOCK_ACQUIRE_TIMEOUT: float | int = 60

# Maximum seconds of random jitter added once per task invocation before the
# folder-creation loop. Independent CM groups share no Redis instance and therefore
# cannot coordinate via the lock alone; a single random offset per run spreads their
# arrival times across all folder levels without multiplying the delay by N data sources.
# Exposed as a module-level constant so tests can set it to 0.
FOLDER_CREATION_JITTER_MAX: int = 10


@shared_task(queue=QueueNames.default, bind=True)
def generate_playtime_report(
Expand Down Expand Up @@ -184,6 +198,14 @@ def generate_playtime_report(
# create directory hierarchy
root_folder_id: str | None = google_drive_container.config.parent_folder_id()

redis_client = task.services.redis.client()

# Jitter: a single random delay per task invocation spreads independent CM groups
# (which share no Redis instance) in time, reducing the probability that two groups
# race to create the same Google Drive folder. Taking it once before the loop keeps
# the per-data-source overhead constant regardless of how many sources are processed.
time.sleep(random.uniform(0, FOLDER_CREATION_JITTER_MAX))

with task.session() as session:
# get list of collections
data_source_names = _fetch_distinct_eligible_data_source_names(
Expand Down Expand Up @@ -223,12 +245,46 @@ def generate_playtime_report(
reporting_name,
str(start.year),
]
folder_results = google_drive.create_nested_folders_if_not_exist(
folders=nested_folders,
parent_folder_id=root_folder_id,

# Acquire a distributed lock so that concurrent workers within the
# same Redis instance don't each create a duplicate folder hierarchy.
# Google Drive allows same-named folders and has list-API indexing
# latency, so a bare get-then-create is not safe under concurrent access.
folder_lock = RedisLock(
redis_client,
lock_name=[
"playtime_report_folder",
root_folder_id or "default",
data_source_name,
],
lock_timeout=timedelta(minutes=5),
)
# the leaf folder is the last path segment in the result list
leaf_folder = folder_results[-1]
try:
lock_acquired = folder_lock.acquire_blocking(
timeout=FOLDER_LOCK_ACQUIRE_TIMEOUT
)
except Exception:
# Redis may be unavailable (connection error, timeout, etc.).
# Treat this the same as failing to acquire within the timeout:
# log a warning and proceed without the lock rather than
# aborting the entire report for this data source.
lock_acquired = False
if not lock_acquired:
task.log.warning(
f"Could not acquire folder creation lock for {data_source_name!r} "
f"within {FOLDER_LOCK_ACQUIRE_TIMEOUT} seconds. Proceeding without "
"lock — duplicate folders may be created."
)
try:
folder_results = google_drive.create_nested_folders_if_not_exist(
folders=nested_folders,
parent_folder_id=root_folder_id,
)
# the leaf folder is the last path segment in the result list
leaf_folder = folder_results[-1]
finally:
if lock_acquired:
folder_lock.release()

# store file
google_drive.create_file(
Expand Down
17 changes: 12 additions & 5 deletions src/palace/manager/service/google_drive/google_drive.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,23 @@ def __init__(self, api_client: DriveResource) -> None:

def get_file(self, name: str, parent_folder_id: str | None = None) -> File | None:
"""
Return the first non-trashed file or folder with the given name.
Return the oldest non-trashed file or folder with the given name, or ``None``.

Excluding trashed items prevents a folder moved to the Drive trash from
being mistaken for a live folder (which would cause uploads to land in a
trashed, invisible directory).

Ordering by ``createdTime`` (ascending) ensures that when concurrent
workers race to create the same folder, every worker converges on the
same canonical (oldest) result regardless of which worker's ``create``
call won the race.

:param name: The exact name to search for.
:param parent_folder_id: If provided, restrict the search to items
whose parent is this folder/drive ID.
:return: The first matching, non-trashed ``File`` object, or ``None``
:return: The oldest matching, non-trashed ``File`` object, or ``None``
if no match is found.
"""
# Explicitly exclude trashed items so that a folder moved to the
# Drive trash is not mistaken for a live folder, which would cause
# uploads to land inside a trashed (invisible) directory.
query = f"name = '{name}' and trashed = false"

if parent_folder_id:
Expand All @@ -40,6 +46,7 @@ def get_file(self, name: str, parent_folder_id: str | None = None) -> File | Non
q=query,
pageSize=10,
fields="nextPageToken, files(*)",
orderBy="createdTime",
supportsAllDrives=True,
includeItemsFromAllDrives=True,
)
Expand Down
74 changes: 74 additions & 0 deletions tests/manager/celery/tasks/test_playtime_entries.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,14 @@
from palace.manager.service.integration_registry.license_providers import (
LicenseProvidersRegistry,
)
from palace.manager.service.redis.models.lock import RedisLock
from palace.manager.sqlalchemy.model.collection import Collection
from palace.manager.sqlalchemy.model.identifier import Equivalency, Identifier
from palace.manager.sqlalchemy.model.library import Library
from palace.manager.sqlalchemy.model.time_tracking import PlaytimeEntry, PlaytimeSummary
from tests.fixtures.celery import CeleryFixture
from tests.fixtures.database import DatabaseTransactionFixture
from tests.fixtures.redis import RedisFixture
from tests.fixtures.services import ServicesFixture


Expand Down Expand Up @@ -566,6 +568,7 @@ def test_generate_playtime_reports(
db: DatabaseTransactionFixture,
celery_fixture: CeleryFixture,
services_fixture: ServicesFixture,
redis_fixture: RedisFixture,
monkeypatch: pytest.MonkeyPatch,
):
identifier = db.identifier()
Expand Down Expand Up @@ -756,6 +759,10 @@ def mock_create_file(
monkeypatch.setenv(
Configuration.REPORTING_NAME_ENVIRONMENT_VARIABLE, reporting_name
)
monkeypatch.setattr(
"palace.manager.celery.tasks.playtime_entries.FOLDER_CREATION_JITTER_MAX",
0,
)

# Act
generate_playtime_report.delay().wait()
Expand Down Expand Up @@ -959,6 +966,73 @@ def mock_create_file(
],
}

def test_generate_playtime_report_folder_lock_contention(
self,
db: DatabaseTransactionFixture,
celery_fixture: CeleryFixture,
services_fixture: ServicesFixture,
redis_fixture: RedisFixture,
monkeypatch: pytest.MonkeyPatch,
caplog: pytest.LogCaptureFixture,
):
"""When the folder-creation lock is already held, the task logs a warning and proceeds anyway."""
parent_folder_id = "palace-test"
reporting_name = "test cm"

collection = db.collection(
"collection a",
protocol=OPDS2API,
settings=db.opds2_settings(data_source="ds_a"),
)
library = db.default_library()
identifier = db.identifier()
playtime(
db.session,
identifier,
collection,
library,
dt1m(3),
10,
"loan_id:1",
)

mock_google_drive_service = create_autospec(GoogleDriveService)
drive_container = services_fixture.services.google_drive()
drive_container.config.from_dict({"parent_folder_id": parent_folder_id})
drive_container.service.override(mock_google_drive_service)
monkeypatch.setenv(
Configuration.REPORTING_NAME_ENVIRONMENT_VARIABLE, reporting_name
)

# Pre-acquire the folder-creation lock so the task cannot get it.
pre_lock = RedisLock(
redis_fixture.client,
lock_name=["playtime_report_folder", parent_folder_id, "ds_a"],
lock_timeout=timedelta(hours=1),
)
assert pre_lock.acquire()

# Disable jitter and shorten the blocking timeout so the test runs quickly.
monkeypatch.setattr(
"palace.manager.celery.tasks.playtime_entries.FOLDER_CREATION_JITTER_MAX",
0,
)
monkeypatch.setattr(
"palace.manager.celery.tasks.playtime_entries.FOLDER_LOCK_ACQUIRE_TIMEOUT",
0.1,
)

with caplog.at_level("WARNING"):
generate_playtime_report.delay().wait()

# The task should have logged a warning about the lock contention.
assert any(
"Could not acquire folder creation lock" in record.message
for record in caplog.records
)
# Despite the lock timeout, the task should still have completed and uploaded a file.
assert mock_google_drive_service.create_file.call_count == 1

@pytest.mark.parametrize(
"eligible_collections,playtime_summaries,expected_ds_names",
(
Expand Down
43 changes: 43 additions & 0 deletions tests/manager/service/google_drive/test_google_drive.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,3 +179,46 @@ def test_create_existing_file_fails(self):
stream=BytesIO(b"Hello world"),
content_type="text/plain",
)

def test_get_file_orders_by_created_time(self):
"""get_file always requests results ordered by createdTime so that the
oldest (canonical) folder is returned when duplicates exist."""
folder_name = "my-folder"
older_id = "older-id"
newer_id = "newer-id"
http_mock_sequence = HttpMockSequence(
[
(
{"status": "200"},
json.dumps(
{
"files": [
{
"kind": "drive#file",
"id": older_id,
"name": folder_name,
"mimeType": "application/vnd.google-apps.folder",
},
{
"kind": "drive#file",
"id": newer_id,
"name": folder_name,
"mimeType": "application/vnd.google-apps.folder",
},
]
}
),
),
]
)
service = drive_service(http=http_mock_sequence)

result = service.get_file(name=folder_name)

# The first result (oldest, per orderBy=createdTime) is returned.
assert result is not None
assert result["id"] == older_id

# Verify orderBy=createdTime is present in the request URI.
request_uri = unquote_plus(http_mock_sequence.request_sequence[0][0])
assert "orderBy=createdTime" in request_uri
Loading