Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
# See https://pre-commit.com for more information
# See https://pre-commit.com/hooks.html for more hooks
default_language_version:
python: python3.12
repos:
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v6.0.0
Expand Down
11 changes: 0 additions & 11 deletions bin/bibliotheca_purchase_monitor

This file was deleted.

8 changes: 8 additions & 0 deletions bin/bibliotheca_purchase_record_import
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
#!/usr/bin/env python
"""Manually kick off the Bibliotheca purchase record import for one or all collections."""

from palace.manager.integration.license.bibliotheca_scripts import (
ImportPurchaseRecordCollection,
)

ImportPurchaseRecordCollection().run()
187 changes: 180 additions & 7 deletions src/palace/manager/celery/tasks/bibliotheca.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,22 @@
"""Celery tasks for Bibliotheca (3M Cloud) collection management.

Two tasks handle near-real-time event import for Bibliotheca collections:
Four tasks handle near-real-time event import and historical purchase record import:

- ``import_all_collections``: Fans out to one ``import_collection`` task per collection.
- ``import_collection``: Processes one time slice of circulation events, then
re-queues itself via ``task.replace()`` until the collection is caught up, holding
a Redis workflow lock across the chain so at most one run proceeds per collection.
- ``import_purchase_records_for_all_collections``: Fans out to one
``import_purchase_records_by_collection`` task per collection.
- ``import_purchase_records_by_collection``: Processes one page of MARC purchase
records, then re-queues itself via ``task.replace()`` until the collection is
caught up to ``utc_now()``, holding a separate Redis workflow lock so at most one
purchase record import run proceeds per collection at a time.
"""

from __future__ import annotations

from datetime import datetime
from datetime import datetime, timedelta
from uuid import uuid4

from celery import shared_task
Expand All @@ -26,7 +32,12 @@
EVENT_IMPORT_OVERLAP,
BibliothecaEventImporter,
)
from palace.manager.integration.license.bibliotheca_purchase_record_importer import (
BibliothecaPurchaseRecordImporter,
)
from palace.manager.service.celery.celery import QueueNames
from palace.manager.service.redis.models.lock import RedisLock
from palace.manager.service.redis.redis import Redis
from palace.manager.sqlalchemy.model.collection import Collection
from palace.manager.util.http.exception import (
BadResponseException,
Expand Down Expand Up @@ -104,15 +115,12 @@ def import_collection(
) as workflow_lock_acquired:
if not workflow_lock_acquired and is_first_slice:
task.log.warning(
f"Bibliotheca event import skipped for collection {collection_id}: "
"another run is already in progress."
f"Bibliotheca event import skipped for collection {collection_id}: another run is already in progress."
)
return
if not workflow_lock_acquired and not is_first_slice:
task.log.warning(
f"Bibliotheca event import for collection {collection_id}: "
"workflow lock expired between slices; continuing "
"(another run may be active)."
f"Bibliotheca event import for collection {collection_id}: workflow lock expired between slices; continuing (another run may be active)."
)

cutoff = utc_now() - EVENT_IMPORT_OVERLAP
Expand Down Expand Up @@ -152,3 +160,168 @@ def import_collection(
lock_value=lock_value,
)
)


# ---------------------------------------------------------------------------
# Purchase record importer
# ---------------------------------------------------------------------------
Comment on lines +165 to +167
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove comment.



def _purchase_record_workflow_lock(
client: Redis, collection_id: int, random_value: str
) -> RedisLock:
"""Create a workflow-level lock for the purchase record importer.

Uses a key distinct from ``import_workflow_lock`` so that event-import
and purchase-record-import runs for the same collection do not block each other.
"""
return RedisLock(
client,
[
"PurchaseRecordCollectionWorkflow",
Collection.redis_key_from_id(collection_id),
],
random_value=random_value,
lock_timeout=timedelta(hours=2),
)


@shared_task(queue=QueueNames.default, bind=True)
def import_purchase_records_for_all_collections(task: Task) -> None:
"""Queue an ``import_purchase_records_by_collection`` task for every Bibliotheca collection."""
with task.session() as session:
registry = task.services.integration_registry().license_providers()
collection_query = Collection.select_by_protocol(
BibliothecaAPI, registry=registry
)
collections = session.scalars(collection_query).all()

for collection in collections:
import_purchase_records_by_collection.delay(collection_id=collection.id)

task.log.info(
f"Queued {len(collections)} Bibliotheca collection(s) for purchase record import."
)


@shared_task(
queue=QueueNames.default,
bind=True,
max_retries=4,
autoretry_for=(BadResponseException, RequestTimedOut),
throws=(RemoteIntegrationException,),
retry_backoff=60,
)
def import_purchase_records_by_collection(
task: Task,
collection_id: int,
*,
current_day: datetime | None = None,
offset: int = 1,
lock_value: str | None = None,
) -> None:
"""Process one page of Bibliotheca MARC purchase records.

Fetches up to 50 MARC records for ``[current_day, current_day+1day]``
starting at ``offset``, creates ``LicensePool`` entries, queues
``bibliographic_apply`` for new or changed titles, then re-queues itself
via ``task.replace()``:

- with ``offset`` advanced when the current page was full (more records
remain for the same day), or
- with ``current_day`` advanced to the next day and ``offset`` reset to 1
when the current day is fully processed.

This continues until the collection is caught up to ``utc_now()``.

A Redis workflow lock keyed to ``collection_id`` (prefix
``PurchaseRecordCollectionWorkflow``) ensures at most one purchase-record-import
chain runs per collection at a time. The lock is held across ``task.replace()``
calls and Celery retries so that a transient API failure does not open a
window for a second concurrent run.

:param collection_id: Database ID of the Bibliotheca collection.
:param current_day: Start of the day to process. ``None`` on the first
invocation — the start is derived from the stored ``Timestamp``
(defaulting to 2014-01-01 when no timestamp exists).
:param offset: 1-based record offset within the current day's result set.
Defaults to ``1`` (the first page).
:param lock_value: UUID identifying this workflow. Generated on the first
invocation and forwarded unchanged to every subsequent page/day so the
lock is held across ``task.replace()`` calls.
"""
redis = task.services.redis().client()

is_first_day = lock_value is None
if lock_value is None:
lock_value = str(uuid4())

workflow_lock = _purchase_record_workflow_lock(redis, collection_id, lock_value)

with workflow_lock.lock(
raise_when_not_acquired=False,
# Hold the lock across task.replace() (Ignore) and Celery retries
# (BadResponseException, RequestTimedOut) so a transient API failure
# does not open a window for a concurrent run on the same collection.
ignored_exceptions=(Ignore, BadResponseException, RequestTimedOut),
) as workflow_lock_acquired:
if not workflow_lock_acquired and is_first_day:
task.log.warning(
f"Bibliotheca purchase record import skipped for collection {collection_id}: another run is already in progress."
)
return
if not workflow_lock_acquired and not is_first_day:
task.log.warning(
f"Bibliotheca purchase record import for collection {collection_id}: workflow lock expired between days; continuing (another run may be active)."
)

cutoff = utc_now()
result = None
collection_name: str | None = None

with task.transaction() as session:
collection = load_from_id(session, Collection, collection_id)
collection_name = collection.name
importer = BibliothecaPurchaseRecordImporter(session, collection)

if current_day is None:
current_day = importer.get_start()

if current_day >= cutoff:
task.log.info(
f"Bibliotheca purchase record import: '{collection_name}' is already up to date."
)
return

result = importer.import_day(current_day, cutoff, offset)

assert result is not None

task.log.info(
f"Bibliotheca purchase record import: handled {result.records_handled} record(s) for "
f"'{collection_name}' "
f"({result.day_start.strftime('%Y-%m-%dT%H:%M:%S')} -> "
f"{result.day_end.strftime('%Y-%m-%dT%H:%M:%S')}, offset {offset})."
)

if result.next_offset is not None:
# More pages remain for the current day.
raise task.replace(
task.s(
collection_id=collection_id,
current_day=current_day,
offset=result.next_offset,
lock_value=lock_value,
)
)

if result.day_end < cutoff:
# Current day is complete; advance to the next day.
raise task.replace(
task.s(
collection_id=collection_id,
current_day=result.day_end,
offset=1,
lock_value=lock_value,
)
)
Loading
Loading