diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml
index 9465c06eb1..ba42598276 100644
--- a/.pre-commit-config.yaml
+++ b/.pre-commit-config.yaml
@@ -1,5 +1,7 @@
# See https://pre-commit.com for more information
# See https://pre-commit.com/hooks.html for more hooks
+default_language_version:
+ python: python3.12
repos:
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v6.0.0
diff --git a/bin/bibliotheca_purchase_monitor b/bin/bibliotheca_purchase_monitor
deleted file mode 100755
index fb923b0cdc..0000000000
--- a/bin/bibliotheca_purchase_monitor
+++ /dev/null
@@ -1,11 +0,0 @@
-#!/usr/bin/env python
-"""Ask the Bibliotheca API about license purchases, potentially purchases
-that happened many years in the past."""
-
-
-from palace.manager.integration.license.bibliotheca import (
- BibliothecaPurchaseMonitor,
- RunBibliothecaPurchaseMonitorScript,
-)
-
-RunBibliothecaPurchaseMonitorScript(BibliothecaPurchaseMonitor).run()
diff --git a/bin/bibliotheca_purchase_record_import b/bin/bibliotheca_purchase_record_import
new file mode 100755
index 0000000000..d4f43faf42
--- /dev/null
+++ b/bin/bibliotheca_purchase_record_import
@@ -0,0 +1,8 @@
+#!/usr/bin/env python
+"""Manually kick off the Bibliotheca purchase record import for one or all collections."""
+
+from palace.manager.integration.license.bibliotheca_scripts import (
+ ImportPurchaseRecordCollection,
+)
+
+ImportPurchaseRecordCollection().run()
diff --git a/src/palace/manager/celery/tasks/bibliotheca.py b/src/palace/manager/celery/tasks/bibliotheca.py
index d4feeca472..369d13945c 100644
--- a/src/palace/manager/celery/tasks/bibliotheca.py
+++ b/src/palace/manager/celery/tasks/bibliotheca.py
@@ -1,16 +1,22 @@
"""Celery tasks for Bibliotheca (3M Cloud) collection management.
-Two tasks handle near-real-time event import for Bibliotheca collections:
+Four tasks handle near-real-time event import and historical purchase record import:
- ``import_all_collections``: Fans out to one ``import_collection`` task per collection.
- ``import_collection``: Processes one time slice of circulation events, then
re-queues itself via ``task.replace()`` until the collection is caught up, holding
a Redis workflow lock across the chain so at most one run proceeds per collection.
+- ``import_purchase_records_for_all_collections``: Fans out to one
+ ``import_purchase_records_by_collection`` task per collection.
+- ``import_purchase_records_by_collection``: Processes one page of MARC purchase
+ records, then re-queues itself via ``task.replace()`` until the collection is
+ caught up to ``utc_now()``, holding a separate Redis workflow lock so at most one
+ purchase record import run proceeds per collection at a time.
"""
from __future__ import annotations
-from datetime import datetime
+from datetime import datetime, timedelta
from uuid import uuid4
from celery import shared_task
@@ -26,7 +32,12 @@
EVENT_IMPORT_OVERLAP,
BibliothecaEventImporter,
)
+from palace.manager.integration.license.bibliotheca_purchase_record_importer import (
+ BibliothecaPurchaseRecordImporter,
+)
from palace.manager.service.celery.celery import QueueNames
+from palace.manager.service.redis.models.lock import RedisLock
+from palace.manager.service.redis.redis import Redis
from palace.manager.sqlalchemy.model.collection import Collection
from palace.manager.util.http.exception import (
BadResponseException,
@@ -104,15 +115,12 @@ def import_collection(
) as workflow_lock_acquired:
if not workflow_lock_acquired and is_first_slice:
task.log.warning(
- f"Bibliotheca event import skipped for collection {collection_id}: "
- "another run is already in progress."
+ f"Bibliotheca event import skipped for collection {collection_id}: another run is already in progress."
)
return
if not workflow_lock_acquired and not is_first_slice:
task.log.warning(
- f"Bibliotheca event import for collection {collection_id}: "
- "workflow lock expired between slices; continuing "
- "(another run may be active)."
+ f"Bibliotheca event import for collection {collection_id}: workflow lock expired between slices; continuing (another run may be active)."
)
cutoff = utc_now() - EVENT_IMPORT_OVERLAP
@@ -152,3 +160,168 @@ def import_collection(
lock_value=lock_value,
)
)
+
+
+# ---------------------------------------------------------------------------
+# Purchase record importer
+# ---------------------------------------------------------------------------
+
+
+def _purchase_record_workflow_lock(
+ client: Redis, collection_id: int, random_value: str
+) -> RedisLock:
+ """Create a workflow-level lock for the purchase record importer.
+
+ Uses a key distinct from ``import_workflow_lock`` so that event-import
+ and purchase-record-import runs for the same collection do not block each other.
+ """
+ return RedisLock(
+ client,
+ [
+ "PurchaseRecordCollectionWorkflow",
+ Collection.redis_key_from_id(collection_id),
+ ],
+ random_value=random_value,
+ lock_timeout=timedelta(hours=2),
+ )
+
+
+@shared_task(queue=QueueNames.default, bind=True)
+def import_purchase_records_for_all_collections(task: Task) -> None:
+ """Queue an ``import_purchase_records_by_collection`` task for every Bibliotheca collection."""
+ with task.session() as session:
+ registry = task.services.integration_registry().license_providers()
+ collection_query = Collection.select_by_protocol(
+ BibliothecaAPI, registry=registry
+ )
+ collections = session.scalars(collection_query).all()
+
+ for collection in collections:
+ import_purchase_records_by_collection.delay(collection_id=collection.id)
+
+ task.log.info(
+ f"Queued {len(collections)} Bibliotheca collection(s) for purchase record import."
+ )
+
+
+@shared_task(
+ queue=QueueNames.default,
+ bind=True,
+ max_retries=4,
+ autoretry_for=(BadResponseException, RequestTimedOut),
+ throws=(RemoteIntegrationException,),
+ retry_backoff=60,
+)
+def import_purchase_records_by_collection(
+ task: Task,
+ collection_id: int,
+ *,
+ current_day: datetime | None = None,
+ offset: int = 1,
+ lock_value: str | None = None,
+) -> None:
+ """Process one page of Bibliotheca MARC purchase records.
+
+ Fetches up to 50 MARC records for ``[current_day, current_day+1day]``
+ starting at ``offset``, creates ``LicensePool`` entries, queues
+ ``bibliographic_apply`` for new or changed titles, then re-queues itself
+ via ``task.replace()``:
+
+ - with ``offset`` advanced when the current page was full (more records
+ remain for the same day), or
+ - with ``current_day`` advanced to the next day and ``offset`` reset to 1
+ when the current day is fully processed.
+
+ This continues until the collection is caught up to ``utc_now()``.
+
+ A Redis workflow lock keyed to ``collection_id`` (prefix
+ ``PurchaseRecordCollectionWorkflow``) ensures at most one purchase-record-import
+ chain runs per collection at a time. The lock is held across ``task.replace()``
+ calls and Celery retries so that a transient API failure does not open a
+ window for a second concurrent run.
+
+ :param collection_id: Database ID of the Bibliotheca collection.
+ :param current_day: Start of the day to process. ``None`` on the first
+ invocation — the start is derived from the stored ``Timestamp``
+ (defaulting to 2014-01-01 when no timestamp exists).
+ :param offset: 1-based record offset within the current day's result set.
+ Defaults to ``1`` (the first page).
+ :param lock_value: UUID identifying this workflow. Generated on the first
+ invocation and forwarded unchanged to every subsequent page/day so the
+ lock is held across ``task.replace()`` calls.
+ """
+ redis = task.services.redis().client()
+
+ is_first_day = lock_value is None
+ if lock_value is None:
+ lock_value = str(uuid4())
+
+ workflow_lock = _purchase_record_workflow_lock(redis, collection_id, lock_value)
+
+ with workflow_lock.lock(
+ raise_when_not_acquired=False,
+ # Hold the lock across task.replace() (Ignore) and Celery retries
+ # (BadResponseException, RequestTimedOut) so a transient API failure
+ # does not open a window for a concurrent run on the same collection.
+ ignored_exceptions=(Ignore, BadResponseException, RequestTimedOut),
+ ) as workflow_lock_acquired:
+ if not workflow_lock_acquired and is_first_day:
+ task.log.warning(
+ f"Bibliotheca purchase record import skipped for collection {collection_id}: another run is already in progress."
+ )
+ return
+ if not workflow_lock_acquired and not is_first_day:
+ task.log.warning(
+ f"Bibliotheca purchase record import for collection {collection_id}: workflow lock expired between days; continuing (another run may be active)."
+ )
+
+ cutoff = utc_now()
+ result = None
+ collection_name: str | None = None
+
+ with task.transaction() as session:
+ collection = load_from_id(session, Collection, collection_id)
+ collection_name = collection.name
+ importer = BibliothecaPurchaseRecordImporter(session, collection)
+
+ if current_day is None:
+ current_day = importer.get_start()
+
+ if current_day >= cutoff:
+ task.log.info(
+ f"Bibliotheca purchase record import: '{collection_name}' is already up to date."
+ )
+ return
+
+ result = importer.import_day(current_day, cutoff, offset)
+
+ assert result is not None
+
+ task.log.info(
+ f"Bibliotheca purchase record import: handled {result.records_handled} record(s) for "
+ f"'{collection_name}' "
+ f"({result.day_start.strftime('%Y-%m-%dT%H:%M:%S')} -> "
+ f"{result.day_end.strftime('%Y-%m-%dT%H:%M:%S')}, offset {offset})."
+ )
+
+ if result.next_offset is not None:
+ # More pages remain for the current day.
+ raise task.replace(
+ task.s(
+ collection_id=collection_id,
+ current_day=current_day,
+ offset=result.next_offset,
+ lock_value=lock_value,
+ )
+ )
+
+ if result.day_end < cutoff:
+ # Current day is complete; advance to the next day.
+ raise task.replace(
+ task.s(
+ collection_id=collection_id,
+ current_day=result.day_end,
+ offset=1,
+ lock_value=lock_value,
+ )
+ )
diff --git a/src/palace/manager/integration/license/bibliotheca.py b/src/palace/manager/integration/license/bibliotheca.py
index a11472e3b7..34e5017967 100644
--- a/src/palace/manager/integration/license/bibliotheca.py
+++ b/src/palace/manager/integration/license/bibliotheca.py
@@ -10,13 +10,11 @@
import time
import urllib.parse
from abc import ABC
-from argparse import ArgumentParser, Namespace
-from collections.abc import Collection as CollectionT, Generator, Iterable, Sequence
+from collections.abc import Collection as CollectionT, Generator, Iterable
from datetime import datetime, timedelta
from io import BytesIO
from typing import Annotated, Any, Literal, Optional, Unpack, overload
-import dateutil.parser
from flask_babel import lazy_gettext as _
from frozendict import frozendict
from lxml.etree import Error, _Element
@@ -25,7 +23,6 @@
from sqlalchemy.orm import Session
from palace.util.datetime_helpers import (
- datetime_utc,
strptime_utc,
to_utc,
utc_now,
@@ -64,12 +61,7 @@
ConfigurationAttributeValue,
)
from palace.manager.core.coverage import BibliographicCoverageProvider, CoverageFailure
-from palace.manager.core.monitor import (
- CollectionMonitor,
- IdentifierSweepMonitor,
- TimelineMonitor,
- TimestampData,
-)
+from palace.manager.core.monitor import IdentifierSweepMonitor
from palace.manager.core.selftest import SelfTestResult
from palace.manager.data_layer.bibliographic import BibliographicData
from palace.manager.data_layer.circulation import CirculationData
@@ -84,13 +76,11 @@
FormFieldType,
FormMetadata,
)
-from palace.manager.scripts.monitor import RunCollectionMonitorScript
from palace.manager.sqlalchemy.constants import DataSourceConstants
from palace.manager.sqlalchemy.model.circulationevent import CirculationEvent
from palace.manager.sqlalchemy.model.classification import Classification, Subject
from palace.manager.sqlalchemy.model.collection import Collection
from palace.manager.sqlalchemy.model.contributor import Contributor
-from palace.manager.sqlalchemy.model.coverage import Timestamp
from palace.manager.sqlalchemy.model.datasource import DataSource
from palace.manager.sqlalchemy.model.edition import Edition
from palace.manager.sqlalchemy.model.identifier import Identifier
@@ -103,7 +93,6 @@
from palace.manager.sqlalchemy.model.measurement import Measurement
from palace.manager.sqlalchemy.model.patron import Patron
from palace.manager.sqlalchemy.model.resource import Hyperlink, Representation
-from palace.manager.sqlalchemy.util import get_one
from palace.manager.util import base64
from palace.manager.util.http.exception import RemoteIntegrationException
from palace.manager.util.http.http import HTTP, RequestKwargs
@@ -1316,337 +1305,6 @@ def _process_bibliographic(
)
-class BibliothecaTimelineMonitor(CollectionMonitor, TimelineMonitor):
- """Common superclass for our two TimelineMonitors."""
-
- PROTOCOL = BibliothecaAPI.label()
- LOG_DATE_FORMAT = "%Y-%m-%dT%H:%M:%S"
-
- def __init__(
- self,
- _db: Session,
- collection: Collection,
- api_class: BibliothecaApiClassT,
- ) -> None:
- """Initializer.
-
- :param _db: Database session object.
- :param collection: Collection for which this monitor operates.
- :param api_class: API class or an instance thereof for this monitor.
- """
- super().__init__(_db, collection)
- if isinstance(api_class, BibliothecaAPI):
- # We were given an actual API object. Just use it.
- self.api = api_class
- else:
- self.api = api_class(_db, collection)
- self.replacement_policy = ReplacementPolicy.from_license_source()
- self.bibliographic_coverage_provider = BibliothecaBibliographicCoverageProvider(
- collection, self.api, replacement_policy=self.replacement_policy
- )
-
-
-class BibliothecaPurchaseMonitor(BibliothecaTimelineMonitor):
- """Track purchases of licenses from Bibliotheca.
-
- Most TimelineMonitors monitor the timeline starting at whatever
- time they're first run. But it's crucial that this monitor start
- at or before the first day on which a book was added to this
- collection, even if that date was years in the past. That's
- because this monitor may be the only time we hear about a
- particular book.
-
- Because of this, this monitor has a very old DEFAULT_START_TIME
- and special capabilities for customizing the start_time to go back
- even further.
- """
-
- SERVICE_NAME = "Bibliotheca Purchase Monitor"
- DEFAULT_START_TIME = datetime_utc(2014, 1, 1)
-
- def __init__(
- self,
- _db: Session,
- collection: Collection,
- api_class: BibliothecaApiClassT = BibliothecaAPI,
- default_start: str | datetime | None = None,
- override_timestamp: bool = False,
- ) -> None:
- """Initializer.
-
- :param _db: Database session object.
- :param collection: Collection for which this monitor operates.
- :param api_class: API class or an instance thereof for this monitor.
- :param default_start: A default date/time at which to start
- requesting events. It should be specified as a `datetime` or
- an ISO 8601 string. If not provided, the monitor's calculated
- intrinsic default will be used.
- :param override_timestamp: Boolean indicating whether
- `default_start` should take precedence over the timestamp
- for an already initialized monitor.
- """
- super().__init__(_db=_db, collection=collection, api_class=api_class)
-
- # We should only force the use of `default_start` as the actual
- # start time if it was passed in.
- self.override_timestamp = override_timestamp if default_start else False
- # A specified `default_start` takes precedence over the
- # monitor's intrinsic default start date/time.
- self.default_start_time = self._optional_iso_date(
- default_start
- ) or self._intrinsic_start_time(_db)
-
- def _optional_iso_date(self, date: str | datetime | None) -> datetime | None:
- """Return the date in `datetime` format.
-
- :param date: A date/time value, specified as either an ISO 8601
- string or as a `datetime`.
- :return: Optional datetime.
- """
- if date is None or isinstance(date, datetime):
- return to_utc(date)
- try:
- dt_date = to_utc(dateutil.parser.isoparse(date))
- except ValueError as e:
- self.log.warning(
- '%r. Date argument "%s" was not in a valid format. Use an ISO 8601 string or a datetime.',
- e,
- date,
- )
- raise
- return dt_date
-
- def _intrinsic_start_time(self, _db: Session) -> datetime:
- """Return the intrinsic start time for this monitor.
-
- The intrinsic start time is the time at which this monitor would
- start if it were uninitialized (no timestamp) and no `default_start`
- parameter were supplied. It is `self.DEFAULT_START_TIME`.
-
- :param _db: Database session object.
-
- :return: datetime representing a default start time.
- """
- # We don't use Monitor.timestamp() because that will create
- # the timestamp if it doesn't exist -- we want to see whether
- # or not it exists.
- default_start_time = self.DEFAULT_START_TIME
- initialized = get_one(
- _db,
- Timestamp,
- service=self.service_name,
- service_type=Timestamp.MONITOR_TYPE,
- collection=self.collection,
- )
- if not initialized:
- self.log.info(
- "Initializing %s from date: %s.",
- self.service_name,
- default_start_time.strftime(self.LOG_DATE_FORMAT),
- )
- return default_start_time
-
- def timestamp(self) -> Timestamp:
- """Find or create a Timestamp for this Monitor.
-
- If we are overriding the normal start time with one supplied when
- the this class was instantiated, we do that here. The instance's
- `default_start_time` will have been set to the specified datetime
- and setting`timestamp.finish` to None will cause the default to
- be used.
- """
- timestamp = super().timestamp()
- if self.override_timestamp:
- self.log.info(
- "Overriding timestamp and starting at %s.",
- datetime.strftime(self.default_start_time, self.LOG_DATE_FORMAT),
- )
- timestamp.finish = None
- return timestamp # type: ignore[no-any-return]
-
- def catch_up_from(
- self, start: datetime, cutoff: datetime, progress: TimestampData
- ) -> None:
- """Ask the Bibliotheca API about new purchases for every
- day between `start` and `cutoff`.
-
- :param start: The first day to ask about.
- :param cutoff: The last day to ask about.
- :param progress: Object used to record progress through the timeline.
- """
- num_records = 0
- # Ask the Bibliotheca API for one day of data at a time. This
- # ensures that TITLE_ADD events are associated with the day
- # the license was purchased.
- today = utc_now().date()
- achievement_template = "MARC records processed: %s"
- for slice_start, slice_end, is_full_slice in self.slice_timespan(
- start, cutoff, timedelta(days=1)
- ):
- for record in self.purchases(slice_start, slice_end):
- self.process_record(record, slice_start)
- num_records += 1
- if isinstance(slice_end, datetime):
- slice_end_as_date = slice_end.date()
- else:
- slice_end_as_date = slice_end
- if is_full_slice and slice_end_as_date < today:
- # We have finished processing a date in the past.
- # There can never be more licenses purchased for that
- # day. Treat this as a checkpoint.
- #
- # We're playing it safe by using slice_start instead
- # of slice_end here -- slice_end should be fine.
- self._checkpoint(
- progress, start, slice_start, achievement_template % num_records
- )
- # We're all caught up. The superclass will take care of
- # finalizing the dates, so there's no need to explicitly
- # set a checkpoint.
- progress.achievements = achievement_template % num_records
-
- def _checkpoint(
- self,
- progress: TimestampData,
- start: datetime,
- finish: datetime,
- achievements: str,
- ) -> None:
- """Set the monitor's progress so that if it crashes later on it will
- start from this point, reducing duplicate work.
-
- This is especially important for this monitor, which usually
- starts several years in the past. TODO: However it might be
- useful to make this a general feature of TimelineMonitor.
-
- :param progress: Object used to record progress through the timeline.
-
- :param start: New value for `progress.start`
- :param finish: New value for `progress.finish`
- :param achievements: The monitor's achievements thus far.
- """
- progress.start = start
- progress.finish = finish
- progress.achievements = achievements
- progress.finalize(
- service=self.service_name,
- service_type=Timestamp.MONITOR_TYPE,
- collection=self.collection,
- )
- progress.apply(self._db)
- self._db.commit()
-
- def purchases(self, start: datetime, end: datetime) -> Generator[Record]:
- """Ask Bibliotheca for a MARC record for each book purchased
- between `start` and `end`.
-
- :yield: A sequence of pymarc Record objects
- """
- offset = 1 # Smallest allowed offset
- page_size = 50 # Maximum supported size.
- records = None
- while records is None or len(records) >= page_size:
- records = [x for x in self.api.marc_request(start, end, offset, page_size)]
- yield from records
- offset += page_size
-
- def process_record(
- self, record: Record, purchase_time: datetime
- ) -> LicensePool | None:
- """Record the purchase of a new title.
-
- :param record: Bibliographic information about the new title.
- :param purchase_time: Put down this time as the time the
- purchase happened.
-
- :return: A LicensePool representing the new title.
- """
- # The control number associated with the MARC record is what
- # we call the Bibliotheca ID.
- control_numbers = [x for x in record.fields if x.tag == "001"]
- # These errors should not happen in real usage.
- error = None
- if not control_numbers:
- error = "Ignoring MARC record with no Bibliotheca control number."
- elif len(control_numbers) > 1:
- error = "Ignoring MARC record with multiple Bibliotheca control numbers."
- if error is not None:
- self.log.error(error + " " + record.as_json())
- return None
-
- # At this point we know there is one and only one control
- # number.
- bibliotheca_id = control_numbers[0].value()
-
- # Find or lookup a LicensePool from the control number.
- license_pool, is_new = LicensePool.for_foreign_id(
- self._db,
- self.api.data_source,
- Identifier.BIBLIOTHECA_ID,
- bibliotheca_id,
- collection=self.collection,
- )
-
- if is_new:
- # We've never seen this book before. Immediately acquire
- # bibliographic coverage for it. This will set the
- # DistributionMechanisms and make the book
- # presentation-ready.
- #
- # We have most of the bibliographic information in the
- # MARC record itself, but using the
- # BibliographicCoverageProvider saves code and also gives
- # us up-to-date circulation information.
- coverage_record = self.bibliographic_coverage_provider.ensure_coverage(
- license_pool.identifier, force=True
- )
-
- return license_pool
-
-
-class RunBibliothecaPurchaseMonitorScript(RunCollectionMonitorScript):
- """Adds the ability to specify a particular start date for the
- BibliothecaPurchaseMonitor. This is important because for a given
- collection, the start date needs to be before books
- started being licensed into that collection.
- """
-
- @classmethod
- def arg_parser(cls, _db: Session) -> ArgumentParser:
- parser = super().arg_parser(_db)
- parser.add_argument(
- "--default-start",
- metavar="DATETIME",
- default=None,
- type=dateutil.parser.isoparse,
- help="Default start date/time to be used for uninitialized (no timestamp) monitors."
- ' Use ISO 8601 format (e.g., "yyyy-mm-dd", "yyyy-mm-ddThh:mm:ss").'
- " Do not specify a time zone or offset.",
- )
- parser.add_argument(
- "--override-timestamp",
- action="store_true",
- help="Use the specified `--default-start` as the actual"
- " start date, even if a monitor is already initialized.",
- )
- return parser
-
- @classmethod
- def parse_command_line(
- cls,
- _db: Session,
- cmd_args: Sequence[str | None] | None = None,
- *args: Any,
- **kwargs: Any,
- ) -> Namespace:
- parsed = super().parse_command_line(_db, cmd_args, *args, **kwargs)
- if parsed.override_timestamp and not parsed.default_start:
- cls.arg_parser(_db).error(
- '"--override-timestamp" is valid only when "--default-start" is also specified.'
- )
- return parsed
-
-
class BibliothecaBibliographicCoverageProvider(BibliographicCoverageProvider):
"""Fill in bibliographic metadata for Bibliotheca records.
diff --git a/src/palace/manager/integration/license/bibliotheca_purchase_record_importer.py b/src/palace/manager/integration/license/bibliotheca_purchase_record_importer.py
new file mode 100644
index 0000000000..5e14ce9eb6
--- /dev/null
+++ b/src/palace/manager/integration/license/bibliotheca_purchase_record_importer.py
@@ -0,0 +1,205 @@
+"""Importer for Bibliotheca (3M Cloud) MARC purchase records."""
+
+from __future__ import annotations
+
+from dataclasses import dataclass
+from datetime import datetime, timedelta
+
+from pymarc import Record
+from sqlalchemy.orm import Session
+
+from palace.util.datetime_helpers import datetime_utc
+from palace.util.log import LoggerMixin
+
+from palace.manager.celery.tasks import apply
+from palace.manager.data_layer.policy.replacement import ReplacementPolicy
+from palace.manager.integration.license.bibliotheca import BibliothecaAPI
+from palace.manager.sqlalchemy.model.collection import Collection
+from palace.manager.sqlalchemy.model.coverage import Timestamp
+from palace.manager.sqlalchemy.model.identifier import Identifier
+from palace.manager.sqlalchemy.model.licensing import LicensePool
+
+PURCHASE_RECORD_SERVICE_NAME = "Bibliotheca Purchase Record Importer"
+
+# The importer starts from this date when no prior Timestamp exists.
+# Bibliotheca collections typically go back to 2014-01-01.
+DEFAULT_PURCHASE_RECORD_START_TIME = datetime_utc(2014, 1, 1)
+
+_LOG_DATE_FORMAT = "%Y-%m-%dT%H:%M:%S"
+
+# Maximum number of MARC records per API page (Bibliotheca hard limit).
+_MARC_PAGE_SIZE = 50
+
+
+@dataclass(frozen=True)
+class DayImportResult:
+ """Result of importing one page of Bibliotheca MARC purchase records.
+
+ :param records_handled: Number of MARC records processed in this page.
+ :param day_start: Start of the day being processed (inclusive).
+ :param day_end: End of the day window (exclusive).
+ Equal to ``min(day_start + 1 day, cutoff)``.
+ :param next_offset: Offset to pass to the next ``import_day`` call for
+ the same day, or ``None`` when the page was smaller than the maximum
+ (meaning the day is fully processed and the caller should advance to
+ ``day_end``).
+ """
+
+ records_handled: int
+ day_start: datetime
+ day_end: datetime
+ next_offset: int | None
+
+
+class BibliothecaPurchaseRecordImporter(LoggerMixin):
+ """Imports Bibliotheca MARC purchase records one page at a time.
+
+ Each call to :meth:`import_day` processes one API page (up to
+ :data:`_MARC_PAGE_SIZE` records) for a given day. Callers advance
+ through pages of the same day (via ``next_offset``) and then advance
+ to the next day (via ``day_end``) until the collection is caught up
+ to the cutoff.
+ """
+
+ def __init__(
+ self,
+ session: Session,
+ collection: Collection,
+ api: BibliothecaAPI | None = None,
+ ) -> None:
+ """
+ :param session: Database session.
+ :param collection: The Bibliotheca collection to import records for.
+ :param api: Optional pre-constructed API instance; created from
+ ``session`` and ``collection`` if not supplied.
+ """
+ self._session = session
+ self._collection = collection
+ self._api = api or BibliothecaAPI(session, collection)
+
+ def get_start(self) -> datetime:
+ """Return the start of the next day to import from the stored ``Timestamp``.
+
+ Falls back to :data:`DEFAULT_PURCHASE_RECORD_START_TIME` (2014-01-01) when no
+ prior run has been recorded, so the first run begins a full historical
+ backfill from the earliest possible purchase record date.
+
+ :returns: The start datetime for the next import day.
+ """
+ timestamp = Timestamp.lookup(
+ self._session,
+ PURCHASE_RECORD_SERVICE_NAME,
+ Timestamp.MONITOR_TYPE,
+ self._collection,
+ )
+ if timestamp is None or timestamp.finish is None:
+ return DEFAULT_PURCHASE_RECORD_START_TIME
+ finish: datetime = timestamp.finish
+ return finish
+
+ def import_day(
+ self, current_day: datetime, cutoff: datetime, offset: int = 1
+ ) -> DayImportResult:
+ """Import one page of MARC purchase records for a given day.
+
+ Fetches up to :data:`_MARC_PAGE_SIZE` records from the window
+ ``[current_day, day_end]`` starting at ``offset``, where
+ ``day_end = min(current_day + 1 day, cutoff)``.
+
+ The ``Timestamp`` is always updated after the page is processed:
+ to ``current_day`` while the day is still in progress (so a restart
+ after a crash resumes from the beginning of this day rather than an
+ earlier day), and to ``day_end`` once the page is smaller than the
+ maximum (signalling that the day is fully processed).
+
+ :param current_day: Start of the day to process.
+ :param cutoff: Upper bound of the import window; the day will not
+ extend past this point.
+ :param offset: 1-based record offset within the day's result set.
+ Defaults to ``1`` (the first page).
+ :returns: A :class:`DayImportResult` describing what was processed.
+ Check :attr:`~DayImportResult.next_offset` to determine whether
+ to re-queue for the same day or advance to the next.
+ """
+ day_end = min(current_day + timedelta(days=1), cutoff)
+
+ self.log.info(
+ f"Bibliotheca purchase record import: requesting MARC records for "
+ f"'{self._collection.name}' between "
+ f"{current_day.strftime(_LOG_DATE_FORMAT)} and "
+ f"{day_end.strftime(_LOG_DATE_FORMAT)}, offset {offset}."
+ )
+
+ records = list(
+ self._api.marc_request(current_day, day_end, offset, _MARC_PAGE_SIZE)
+ )
+ for record in records:
+ self._process_record(record, current_day)
+
+ records_handled = len(records)
+ day_complete = records_handled < _MARC_PAGE_SIZE
+ next_offset = None if day_complete else offset + _MARC_PAGE_SIZE
+
+ # Always checkpoint: advance finish to day_end when the day is done,
+ # or to current_day while still in progress, so a restart after a
+ # crash resumes from this day rather than the previous one.
+ Timestamp.stamp(
+ self._session,
+ service=PURCHASE_RECORD_SERVICE_NAME,
+ service_type=Timestamp.MONITOR_TYPE,
+ collection=self._collection,
+ start=current_day,
+ finish=day_end if day_complete else current_day,
+ achievements=f"MARC records processed: {records_handled}.",
+ )
+
+ return DayImportResult(
+ records_handled=records_handled,
+ day_start=current_day,
+ day_end=day_end,
+ next_offset=next_offset,
+ )
+
+ def _process_record(self, record: Record, purchase_record_time: datetime) -> None:
+ """Process a single Bibliotheca MARC purchase record.
+
+ Extracts the Bibliotheca ID from MARC field ``001``, creates or finds
+ the ``LicensePool``, then queues a ``bibliographic_apply`` task when
+ the title's metadata has changed (hash-based deduplication).
+
+ :param record: A pymarc ``Record`` representing one purchased title.
+ :param purchase_record_time: Timestamp of the purchase record day.
+ """
+ control_numbers = [f for f in record.fields if f.tag == "001"]
+ if not control_numbers:
+ self.log.error(
+ f"Ignoring MARC record with no Bibliotheca control number. {record.as_json()}"
+ )
+ return
+ if len(control_numbers) > 1:
+ self.log.error(
+ f"Ignoring MARC record with multiple Bibliotheca control numbers. {record.as_json()}"
+ )
+ return
+
+ bibliotheca_id = control_numbers[0].value()
+
+ LicensePool.for_foreign_id(
+ self._session,
+ self._api.data_source,
+ Identifier.BIBLIOTHECA_ID,
+ bibliotheca_id,
+ collection=self._collection,
+ )
+
+ for bibliographic in self._api.bibliographic_lookup(bibliotheca_id):
+ if bibliographic.needs_apply(self._session):
+ apply.bibliographic_apply.delay(
+ bibliographic,
+ collection_id=self._collection.id,
+ replace=ReplacementPolicy.from_license_source(),
+ )
+
+ self.log.info(
+ f"{purchase_record_time.strftime(_LOG_DATE_FORMAT)}: processed purchase record for Bibliotheca ID {bibliotheca_id}"
+ )
diff --git a/src/palace/manager/integration/license/bibliotheca_scripts.py b/src/palace/manager/integration/license/bibliotheca_scripts.py
index c2651a879d..6ffe13513f 100644
--- a/src/palace/manager/integration/license/bibliotheca_scripts.py
+++ b/src/palace/manager/integration/license/bibliotheca_scripts.py
@@ -51,3 +51,47 @@ def do_run(self, cmd_args: list[str] | None = None) -> None:
self.log.info(
f"Queued event import for Bibliotheca collection '{collection.name}'."
)
+
+
+class ImportPurchaseRecordCollection(Script):
+ """Manually kick off the Bibliotheca purchase record import for one or all collections."""
+
+ @classmethod
+ def arg_parser(cls, _db: Session) -> argparse.ArgumentParser:
+ parser = argparse.ArgumentParser(
+ description="Kick off the Bibliotheca purchase record import Celery task."
+ )
+ group = parser.add_mutually_exclusive_group(required=True)
+ group.add_argument(
+ "--collection",
+ type=str,
+ metavar="NAME",
+ help="Name of the Bibliotheca collection to import.",
+ )
+ group.add_argument(
+ "--import-all",
+ action="store_true",
+ help="Queue the purchase record import for every Bibliotheca collection.",
+ )
+ return parser
+
+ def do_run(self, cmd_args: list[str] | None = None) -> None:
+ parsed = self.parse_command_line(self._db, cmd_args=cmd_args)
+
+ if parsed.import_all:
+ bibliotheca.import_purchase_records_for_all_collections.delay()
+ self.log.info(
+ "Queued purchase record import for all Bibliotheca collections."
+ )
+ return
+
+ collection = Collection.by_name(self._db, parsed.collection)
+ if not collection:
+ raise PalaceValueError(f'No collection found named "{parsed.collection}".')
+
+ bibliotheca.import_purchase_records_by_collection.delay(
+ collection_id=collection.id
+ )
+ self.log.info(
+ f"Queued purchase record import for Bibliotheca collection '{collection.name}'."
+ )
diff --git a/src/palace/manager/service/celery/celery.py b/src/palace/manager/service/celery/celery.py
index 09cf47ed36..69818de431 100644
--- a/src/palace/manager/service/celery/celery.py
+++ b/src/palace/manager/service/celery/celery.py
@@ -300,6 +300,10 @@ def beat_schedule() -> dict[str, Any]:
"task": bibliotheca.import_all_collections.name,
"schedule": crontab(minute="0"), # Once an hour
},
+ "bibliotheca_import_purchase_records_for_all_collections": {
+ "task": bibliotheca.import_purchase_records_for_all_collections.name,
+ "schedule": crontab(minute="0", hour="4"), # Once a day at 4:00 AM
+ },
}
diff --git a/tests/manager/celery/tasks/test_bibliotheca.py b/tests/manager/celery/tasks/test_bibliotheca.py
index 045445ffcf..d46734335a 100644
--- a/tests/manager/celery/tasks/test_bibliotheca.py
+++ b/tests/manager/celery/tasks/test_bibliotheca.py
@@ -8,15 +8,19 @@
import pytest
-from palace.util.datetime_helpers import utc_now
+from palace.util.datetime_helpers import datetime_utc, utc_now
from palace.util.log import LogLevel
from palace.manager.celery.importer import import_workflow_lock
from palace.manager.celery.tasks import apply, bibliotheca
+from palace.manager.celery.tasks.bibliotheca import _purchase_record_workflow_lock
from palace.manager.integration.license.bibliotheca import BibliothecaAPI
from palace.manager.integration.license.bibliotheca_importer import (
EVENT_IMPORT_SERVICE_NAME,
)
+from palace.manager.integration.license.bibliotheca_purchase_record_importer import (
+ PURCHASE_RECORD_SERVICE_NAME,
+)
from palace.manager.sqlalchemy.model.circulationevent import CirculationEvent
from palace.manager.sqlalchemy.model.collection import Collection
from palace.manager.sqlalchemy.model.coverage import Timestamp
@@ -513,3 +517,488 @@ def test_multiple_collections_each_get_own_lock(
assert mock_api_cls2.return_value.get_events_between.call_count == 1
lock_c1.release()
+
+
+# ---------------------------------------------------------------------------
+# Purchase record importer tests
+# ---------------------------------------------------------------------------
+
+
+class BibliothecaPurchaseRecordTaskFixture:
+ """Common setup for Bibliotheca purchase record importer Celery task tests."""
+
+ def __init__(self, db: DatabaseTransactionFixture) -> None:
+ self.db = db
+ self.collection = MockBibliothecaAPI.mock_collection(
+ db.session, db.default_library()
+ )
+
+ def stamp_purchase_record(
+ self, finish: datetime | None = None, collection: Collection | None = None
+ ) -> Timestamp:
+ """Create (or update) the purchase record Timestamp for the collection."""
+ return Timestamp.stamp(
+ self.db.session,
+ service=PURCHASE_RECORD_SERVICE_NAME,
+ service_type=Timestamp.MONITOR_TYPE,
+ collection=collection or self.collection,
+ finish=finish or utc_now(),
+ )
+
+ def get_purchase_record_timestamp(
+ self, collection: Collection | None = None
+ ) -> Timestamp | None:
+ return Timestamp.lookup(
+ self.db.session,
+ PURCHASE_RECORD_SERVICE_NAME,
+ Timestamp.MONITOR_TYPE,
+ collection or self.collection,
+ )
+
+
+@pytest.fixture
+def bibliotheca_purchase_record_task_fixture(
+ db: DatabaseTransactionFixture,
+) -> BibliothecaPurchaseRecordTaskFixture:
+ return BibliothecaPurchaseRecordTaskFixture(db)
+
+
+class TestImportPurchaseRecordsForAllCollections:
+ def test_queues_purchase_record_collection_for_each_bibliotheca_collection(
+ self,
+ db: DatabaseTransactionFixture,
+ celery_fixture: CeleryFixture,
+ ) -> None:
+ """import_purchase_records_for_all_collections queues import_purchase_records_by_collection for every Bibliotheca
+ collection and ignores collections using other protocols."""
+ db.default_collection() # non-Bibliotheca, should be ignored
+ c1 = MockBibliothecaAPI.mock_collection(
+ db.session, db.default_library(), name="Bibliotheca 1"
+ )
+ c2 = MockBibliothecaAPI.mock_collection(
+ db.session, db.default_library(), name="Bibliotheca 2"
+ )
+
+ with patch.object(
+ bibliotheca, "import_purchase_records_by_collection"
+ ) as mock_task:
+ bibliotheca.import_purchase_records_for_all_collections.delay().wait()
+
+ mock_task.delay.assert_has_calls(
+ [call(collection_id=c1.id), call(collection_id=c2.id)],
+ any_order=True,
+ )
+ assert mock_task.delay.call_count == 2
+
+ def test_no_bibliotheca_collections(
+ self,
+ db: DatabaseTransactionFixture,
+ celery_fixture: CeleryFixture,
+ ) -> None:
+ """import_purchase_records_for_all_collections is a no-op when there are no Bibliotheca collections."""
+ db.default_collection() # non-Bibliotheca
+
+ with patch.object(
+ bibliotheca, "import_purchase_records_by_collection"
+ ) as mock_task:
+ bibliotheca.import_purchase_records_for_all_collections.delay().wait()
+
+ mock_task.delay.assert_not_called()
+
+
+class TestImportPurchaseRecordsByCollection:
+ @patch(
+ "palace.manager.integration.license.bibliotheca_purchase_record_importer.BibliothecaAPI"
+ )
+ def test_first_run_starts_from_default_start_time(
+ self,
+ mock_api_cls: MagicMock,
+ bibliotheca_purchase_record_task_fixture: BibliothecaPurchaseRecordTaskFixture,
+ celery_fixture: CeleryFixture,
+ redis_fixture: RedisFixture,
+ ) -> None:
+ """On the first run (no prior Timestamp), the task starts from 2014-01-01."""
+ mock_api = mock_api_cls.return_value
+ mock_api.marc_request.return_value = iter([])
+ collection = bibliotheca_purchase_record_task_fixture.collection
+
+ # Stamp a timestamp so we can check the start passed to marc_request.
+ with patch.object(
+ bibliotheca.import_purchase_records_by_collection, "replace"
+ ) as mock_replace:
+ mock_replace.side_effect = Exception("replaced")
+ with pytest.raises(Exception, match="replaced"):
+ bibliotheca.import_purchase_records_by_collection.delay(
+ collection_id=collection.id
+ ).wait()
+
+ call_args = mock_api.marc_request.call_args_list[0]
+ slice_start, _ = call_args.args[:2]
+ expected_start = datetime_utc(2014, 1, 1)
+ assert abs((slice_start - expected_start).total_seconds()) < 1
+
+ @patch(
+ "palace.manager.integration.license.bibliotheca_purchase_record_importer.BibliothecaAPI"
+ )
+ def test_starts_from_stored_timestamp(
+ self,
+ mock_api_cls: MagicMock,
+ bibliotheca_purchase_record_task_fixture: BibliothecaPurchaseRecordTaskFixture,
+ celery_fixture: CeleryFixture,
+ redis_fixture: RedisFixture,
+ ) -> None:
+ """The task starts from timestamp.finish when a prior Timestamp exists."""
+ mock_api = mock_api_cls.return_value
+ mock_api.marc_request.return_value = iter([])
+ collection = bibliotheca_purchase_record_task_fixture.collection
+
+ stored_finish = datetime_utc(2024, 3, 10)
+ bibliotheca_purchase_record_task_fixture.stamp_purchase_record(
+ finish=stored_finish
+ )
+
+ with patch.object(
+ bibliotheca.import_purchase_records_by_collection, "replace"
+ ) as mock_replace:
+ mock_replace.side_effect = Exception("replaced")
+ with pytest.raises(Exception, match="replaced"):
+ bibliotheca.import_purchase_records_by_collection.delay(
+ collection_id=collection.id
+ ).wait()
+
+ call_args = mock_api.marc_request.call_args_list[0]
+ slice_start, _ = call_args.args[:2]
+ assert abs((slice_start - stored_finish).total_seconds()) < 1
+
+ @patch(
+ "palace.manager.integration.license.bibliotheca_purchase_record_importer.BibliothecaAPI"
+ )
+ def test_already_up_to_date(
+ self,
+ mock_api_cls: MagicMock,
+ bibliotheca_purchase_record_task_fixture: BibliothecaPurchaseRecordTaskFixture,
+ celery_fixture: CeleryFixture,
+ redis_fixture: RedisFixture,
+ ) -> None:
+ """When the stored Timestamp is current (finish >= now), no API call is made."""
+ collection = bibliotheca_purchase_record_task_fixture.collection
+ bibliotheca_purchase_record_task_fixture.stamp_purchase_record(
+ finish=utc_now() + timedelta(minutes=10)
+ )
+
+ bibliotheca.import_purchase_records_by_collection.delay(
+ collection_id=collection.id
+ ).wait()
+
+ mock_api_cls.return_value.marc_request.assert_not_called()
+
+ @patch(
+ "palace.manager.integration.license.bibliotheca_purchase_record_importer.BibliothecaAPI"
+ )
+ def test_replaces_when_more_days_remain(
+ self,
+ mock_api_cls: MagicMock,
+ bibliotheca_purchase_record_task_fixture: BibliothecaPurchaseRecordTaskFixture,
+ celery_fixture: CeleryFixture,
+ redis_fixture: RedisFixture,
+ ) -> None:
+ """When current_day + 1 day is still behind cutoff, task.replace() is raised
+ with the next day and the same lock_value."""
+ mock_api_cls.return_value.marc_request.return_value = iter([])
+ collection = bibliotheca_purchase_record_task_fixture.collection
+
+ # Stamp a timestamp several days in the past so multiple days are needed.
+ bibliotheca_purchase_record_task_fixture.stamp_purchase_record(
+ finish=utc_now() - timedelta(days=5)
+ )
+
+ lock_value = str(uuid4())
+ with patch.object(
+ bibliotheca.import_purchase_records_by_collection, "replace"
+ ) as mock_replace:
+ mock_replace.side_effect = Exception("replaced")
+ with pytest.raises(Exception, match="replaced"):
+ bibliotheca.import_purchase_records_by_collection.delay(
+ collection_id=collection.id,
+ lock_value=lock_value,
+ ).wait()
+
+ replace_sig = mock_replace.call_args[0][0]
+ assert replace_sig.kwargs["lock_value"] == lock_value
+ assert replace_sig.kwargs["current_day"] is not None
+ assert replace_sig.kwargs["offset"] == 1
+
+ @patch(
+ "palace.manager.integration.license.bibliotheca_purchase_record_importer.BibliothecaAPI"
+ )
+ def test_replaces_with_next_offset_when_page_full(
+ self,
+ mock_api_cls: MagicMock,
+ bibliotheca_purchase_record_task_fixture: BibliothecaPurchaseRecordTaskFixture,
+ celery_fixture: CeleryFixture,
+ redis_fixture: RedisFixture,
+ ) -> None:
+ """When a full page (50 records) is returned, task.replace() is called with
+ the same current_day and the next offset rather than advancing to the next day.
+ """
+ from palace.manager.integration.license.bibliotheca_purchase_record_importer import (
+ _MARC_PAGE_SIZE,
+ )
+
+ full_page_records = [MagicMock() for _ in range(_MARC_PAGE_SIZE)]
+ for record in full_page_records:
+ record.fields = []
+ record.as_json.return_value = "{}"
+
+ mock_api_cls.return_value.marc_request.return_value = iter(full_page_records)
+ collection = bibliotheca_purchase_record_task_fixture.collection
+
+ stored_finish = utc_now() - timedelta(days=5)
+ bibliotheca_purchase_record_task_fixture.stamp_purchase_record(
+ finish=stored_finish
+ )
+
+ lock_value = str(uuid4())
+ with patch.object(
+ bibliotheca.import_purchase_records_by_collection, "replace"
+ ) as mock_replace:
+ mock_replace.side_effect = Exception("replaced")
+ with pytest.raises(Exception, match="replaced"):
+ bibliotheca.import_purchase_records_by_collection.delay(
+ collection_id=collection.id,
+ offset=1,
+ lock_value=lock_value,
+ ).wait()
+
+ replace_sig = mock_replace.call_args[0][0]
+ # Same day, offset advanced by _MARC_PAGE_SIZE.
+ assert (
+ abs((replace_sig.kwargs["current_day"] - stored_finish).total_seconds()) < 5
+ )
+ assert replace_sig.kwargs["offset"] == 1 + _MARC_PAGE_SIZE
+ assert replace_sig.kwargs["lock_value"] == lock_value
+
+ @patch(
+ "palace.manager.integration.license.bibliotheca_purchase_record_importer.BibliothecaAPI"
+ )
+ def test_no_replace_on_last_day(
+ self,
+ mock_api_cls: MagicMock,
+ bibliotheca_purchase_record_task_fixture: BibliothecaPurchaseRecordTaskFixture,
+ celery_fixture: CeleryFixture,
+ redis_fixture: RedisFixture,
+ ) -> None:
+ """When current_day + 1 day >= cutoff, task.replace() is not called."""
+ mock_api_cls.return_value.marc_request.return_value = iter([])
+ collection = bibliotheca_purchase_record_task_fixture.collection
+
+ # Timestamp just a few hours old — fits inside one day.
+ bibliotheca_purchase_record_task_fixture.stamp_purchase_record(
+ finish=utc_now() - timedelta(hours=3)
+ )
+
+ with patch.object(
+ bibliotheca.import_purchase_records_by_collection, "replace"
+ ) as mock_replace:
+ bibliotheca.import_purchase_records_by_collection.delay(
+ collection_id=collection.id
+ ).wait()
+
+ mock_replace.assert_not_called()
+
+ @patch(
+ "palace.manager.integration.license.bibliotheca_purchase_record_importer.BibliothecaAPI"
+ )
+ def test_lock_value_passed_through_on_replace(
+ self,
+ mock_api_cls: MagicMock,
+ bibliotheca_purchase_record_task_fixture: BibliothecaPurchaseRecordTaskFixture,
+ celery_fixture: CeleryFixture,
+ redis_fixture: RedisFixture,
+ ) -> None:
+ """The lock_value generated on the first day is forwarded unchanged to the
+ next day so the workflow lock remains held across task.replace() calls."""
+ mock_api_cls.return_value.marc_request.return_value = iter([])
+ collection = bibliotheca_purchase_record_task_fixture.collection
+
+ bibliotheca_purchase_record_task_fixture.stamp_purchase_record(
+ finish=utc_now() - timedelta(days=5)
+ )
+
+ with patch.object(
+ bibliotheca.import_purchase_records_by_collection, "replace"
+ ) as mock_replace:
+ mock_replace.side_effect = Exception("replaced")
+ with pytest.raises(Exception, match="replaced"):
+ bibliotheca.import_purchase_records_by_collection.delay(
+ collection_id=collection.id
+ ).wait()
+
+ replace_sig = mock_replace.call_args[0][0]
+ lock_value = replace_sig.kwargs["lock_value"]
+ assert lock_value is not None
+ assert isinstance(lock_value, str)
+
+ def test_skips_when_lock_held(
+ self,
+ bibliotheca_purchase_record_task_fixture: BibliothecaPurchaseRecordTaskFixture,
+ celery_fixture: CeleryFixture,
+ redis_fixture: RedisFixture,
+ caplog: pytest.LogCaptureFixture,
+ ) -> None:
+ """When the purchase record workflow lock is already held, the task logs a warning
+ and returns without making any API calls."""
+ collection = bibliotheca_purchase_record_task_fixture.collection
+
+ existing_lock = _purchase_record_workflow_lock(
+ redis_fixture.client, collection.id, str(uuid4())
+ )
+ existing_lock.acquire()
+
+ caplog.set_level(LogLevel.warning)
+
+ with patch(
+ "palace.manager.integration.license.bibliotheca_purchase_record_importer.BibliothecaAPI"
+ ) as mock_api_cls:
+ bibliotheca.import_purchase_records_by_collection.delay(
+ collection_id=collection.id
+ ).wait()
+ mock_api_cls.return_value.marc_request.assert_not_called()
+
+ assert "skipped" in caplog.text
+ assert "already in progress" in caplog.text
+
+ existing_lock.release()
+
+ def test_continues_with_warning_when_lock_expires_mid_chain(
+ self,
+ bibliotheca_purchase_record_task_fixture: BibliothecaPurchaseRecordTaskFixture,
+ celery_fixture: CeleryFixture,
+ redis_fixture: RedisFixture,
+ caplog: pytest.LogCaptureFixture,
+ ) -> None:
+ """When the purchase record workflow lock expires between days (is_first_day=False
+ and lock not acquired), the task logs a warning but still processes the day."""
+ collection = bibliotheca_purchase_record_task_fixture.collection
+ bibliotheca_purchase_record_task_fixture.stamp_purchase_record(
+ finish=utc_now() - timedelta(hours=3)
+ )
+
+ competing_lock = _purchase_record_workflow_lock(
+ redis_fixture.client, collection.id, str(uuid4())
+ )
+ competing_lock.acquire()
+
+ caplog.set_level(LogLevel.warning)
+
+ with patch(
+ "palace.manager.integration.license.bibliotheca_purchase_record_importer.BibliothecaAPI"
+ ) as mock_api_cls:
+ mock_api_cls.return_value.marc_request.return_value = iter([])
+ bibliotheca.import_purchase_records_by_collection.delay(
+ collection_id=collection.id,
+ lock_value=str(uuid4()),
+ ).wait()
+ mock_api_cls.return_value.marc_request.assert_called_once()
+
+ assert "workflow lock expired between days" in caplog.text
+
+ competing_lock.release()
+
+ def test_lock_not_released_on_autoretry(
+ self,
+ bibliotheca_purchase_record_task_fixture: BibliothecaPurchaseRecordTaskFixture,
+ celery_fixture: CeleryFixture,
+ redis_fixture: RedisFixture,
+ ) -> None:
+ """When a retryable exception is raised the purchase record workflow lock is held so
+ that a concurrent run cannot start on the same collection while retries are
+ in progress."""
+ collection = bibliotheca_purchase_record_task_fixture.collection
+ bibliotheca_purchase_record_task_fixture.stamp_purchase_record(
+ finish=utc_now() - timedelta(days=5)
+ )
+
+ mock_response = MockRequestsResponse(500, content="Internal Server Error")
+
+ with patch(
+ "palace.manager.integration.license.bibliotheca_purchase_record_importer.BibliothecaAPI"
+ ) as mock_api_cls:
+ mock_api_cls.return_value.marc_request.side_effect = BadResponseException(
+ "http://test.com", "Bad response", mock_response
+ )
+
+ with celery_fixture.patch_retry_backoff():
+ bibliotheca.import_purchase_records_by_collection.delay(
+ collection_id=collection.id
+ ).get(propagate=False)
+
+ workflow_lock = _purchase_record_workflow_lock(
+ redis_fixture.client, collection.id, random_value="any"
+ )
+ assert workflow_lock.locked()
+
+ @patch(
+ "palace.manager.integration.license.bibliotheca_purchase_record_importer.BibliothecaAPI"
+ )
+ def test_timestamp_updated_after_each_day(
+ self,
+ mock_api_cls: MagicMock,
+ bibliotheca_purchase_record_task_fixture: BibliothecaPurchaseRecordTaskFixture,
+ celery_fixture: CeleryFixture,
+ redis_fixture: RedisFixture,
+ ) -> None:
+ """The Timestamp.finish is advanced by one day after each task invocation,
+ enabling crash recovery without re-processing old records."""
+ mock_api_cls.return_value.marc_request.return_value = iter([])
+ collection = bibliotheca_purchase_record_task_fixture.collection
+
+ stored_finish = datetime_utc(2024, 3, 10)
+ bibliotheca_purchase_record_task_fixture.stamp_purchase_record(
+ finish=stored_finish
+ )
+
+ lock_value = str(uuid4())
+ with patch.object(
+ bibliotheca.import_purchase_records_by_collection, "replace"
+ ) as mock_replace:
+ mock_replace.side_effect = Exception("replaced")
+ with pytest.raises(Exception, match="replaced"):
+ bibliotheca.import_purchase_records_by_collection.delay(
+ collection_id=collection.id,
+ lock_value=lock_value,
+ ).wait()
+
+ ts = bibliotheca_purchase_record_task_fixture.get_purchase_record_timestamp()
+ assert ts is not None
+ assert ts.finish is not None
+ expected_finish = stored_finish + timedelta(days=1)
+ assert abs((ts.finish - expected_finish).total_seconds()) < 5
+
+ def test_purchase_record_lock_independent_from_import_lock(
+ self,
+ db: DatabaseTransactionFixture,
+ celery_fixture: CeleryFixture,
+ redis_fixture: RedisFixture,
+ ) -> None:
+ """The purchase record workflow lock uses a different Redis key than the event
+ import workflow lock, so the two can run concurrently per collection."""
+ collection = MockBibliothecaAPI.mock_collection(
+ db.session, db.default_library()
+ )
+
+ # Hold the event import lock.
+ event_lock = import_workflow_lock(
+ redis_fixture.client, collection.id, str(uuid4())
+ )
+ event_lock.acquire()
+
+ # The purchase lock for the same collection should still be acquirable.
+ purchase_record_lock = _purchase_record_workflow_lock(
+ redis_fixture.client, collection.id, str(uuid4())
+ )
+ acquired = purchase_record_lock.acquire()
+ assert acquired
+
+ purchase_record_lock.release()
+ event_lock.release()
diff --git a/tests/manager/integration/license/test_bibliotheca.py b/tests/manager/integration/license/test_bibliotheca.py
index 3a3265fd23..2a7f980eae 100644
--- a/tests/manager/integration/license/test_bibliotheca.py
+++ b/tests/manager/integration/license/test_bibliotheca.py
@@ -2,14 +2,11 @@
import json
import random
-from datetime import date, datetime, timedelta
-from io import BytesIO, StringIO
+from datetime import date, timedelta
from typing import TYPE_CHECKING, cast
-from unittest import mock
-from unittest.mock import MagicMock, create_autospec
+from unittest.mock import create_autospec
import pytest
-from pymarc import parse_xml_to_array
from pymarc.record import Record
from palace.util.datetime_helpers import datetime_utc, utc_now
@@ -32,13 +29,11 @@
)
from palace.manager.api.circulation.fulfillment import Fulfillment
from palace.manager.api.web_publication_manifest import FindawayManifest
-from palace.manager.core.monitor import TimestampData
from palace.manager.integration.license.bibliotheca import (
BibliothecaAPI,
BibliothecaBibliographicCoverageProvider,
BibliothecaCirculationSweep,
BibliothecaParser,
- BibliothecaPurchaseMonitor,
CheckoutResponseParser,
ErrorParser,
EventParser,
@@ -49,19 +44,16 @@
from palace.manager.sqlalchemy.model.circulationevent import CirculationEvent
from palace.manager.sqlalchemy.model.classification import Subject
from palace.manager.sqlalchemy.model.contributor import Contributor
-from palace.manager.sqlalchemy.model.coverage import Timestamp
from palace.manager.sqlalchemy.model.datasource import DataSource
from palace.manager.sqlalchemy.model.edition import Edition
from palace.manager.sqlalchemy.model.identifier import Identifier
from palace.manager.sqlalchemy.model.licensing import (
DeliveryMechanism,
- LicensePool,
LicensePoolDeliveryMechanism,
LicensePoolStatus,
)
from palace.manager.sqlalchemy.model.measurement import Measurement
from palace.manager.sqlalchemy.model.resource import Hyperlink, Representation
-from palace.manager.sqlalchemy.model.work import Work
from palace.manager.util.http.exception import (
BadResponseException,
RemoteIntegrationException,
@@ -1022,529 +1014,6 @@ def test_parse_event_batch(self):
assert correct_end == end_time
-class TestBibliothecaPurchaseMonitor:
- @pytest.fixture()
- def default_monitor(self, bibliotheca_fixture: BibliothecaAPITestFixture):
- return BibliothecaPurchaseMonitor(
- bibliotheca_fixture.db.session,
- bibliotheca_fixture.collection,
- api_class=MockBibliothecaAPI,
- )
-
- @pytest.fixture()
- def initialized_monitor(self, db: DatabaseTransactionFixture):
- collection = MockBibliothecaAPI.mock_collection(
- db.session,
- db.default_library(),
- name="Initialized Purchase Monitor Collection",
- )
- monitor = BibliothecaPurchaseMonitor(
- db.session, collection, api_class=MockBibliothecaAPI
- )
- Timestamp.stamp(
- db.session,
- service=monitor.service_name,
- service_type=Timestamp.MONITOR_TYPE,
- collection=collection,
- )
- return monitor
-
- @pytest.mark.parametrize(
- "specified_default_start, expected_default_start",
- [
- ("2011", datetime_utc(year=2011, month=1, day=1)),
- ("2011-10", datetime_utc(year=2011, month=10, day=1)),
- ("2011-10-05", datetime_utc(year=2011, month=10, day=5)),
- ("2011-10-05T15", datetime_utc(year=2011, month=10, day=5, hour=15)),
- (
- "2011-10-05T15:27",
- datetime_utc(year=2011, month=10, day=5, hour=15, minute=27),
- ),
- (
- "2011-10-05T15:27:33",
- datetime_utc(year=2011, month=10, day=5, hour=15, minute=27, second=33),
- ),
- (
- "2011-10-05 15:27:33",
- datetime_utc(year=2011, month=10, day=5, hour=15, minute=27, second=33),
- ),
- (
- "2011-10-05T15:27:33.123456",
- datetime_utc(
- year=2011,
- month=10,
- day=5,
- hour=15,
- minute=27,
- second=33,
- microsecond=123456,
- ),
- ),
- (
- datetime_utc(year=2011, month=10, day=5, hour=15, minute=27),
- datetime_utc(year=2011, month=10, day=5, hour=15, minute=27),
- ),
- (None, None),
- ],
- )
- def test_optional_iso_date_valid_dates(
- self,
- specified_default_start: datetime | str | None,
- expected_default_start: datetime | None,
- default_monitor: BibliothecaPurchaseMonitor,
- ):
- # ISO 8601 strings, `datetime`s, or None are valid.
- actual_default_start = default_monitor._optional_iso_date(
- specified_default_start
- )
- if expected_default_start is not None:
- assert isinstance(actual_default_start, datetime)
- assert actual_default_start == expected_default_start
-
- def test_monitor_intrinsic_start_time(
- self,
- default_monitor: BibliothecaPurchaseMonitor,
- initialized_monitor: BibliothecaPurchaseMonitor,
- bibliotheca_fixture: BibliothecaAPITestFixture,
- ):
- db = bibliotheca_fixture.db
- # No `default_start` time is specified for either `default_monitor` or
- # `initialized_monitor`, so each monitor's `default_start_time` should
- # match the monitor class's intrinsic start time.
- for monitor in [default_monitor, initialized_monitor]:
- expected_intrinsic_start = BibliothecaPurchaseMonitor.DEFAULT_START_TIME
- intrinsic_start = monitor._intrinsic_start_time(db.session)
- assert isinstance(intrinsic_start, datetime)
- assert intrinsic_start == expected_intrinsic_start
- assert intrinsic_start == monitor.default_start_time
-
- @pytest.mark.parametrize(
- "specified_default_start, override_timestamp, expected_start",
- [
- (
- "2011-10-05T15:27",
- False,
- datetime_utc(year=2011, month=10, day=5, hour=15, minute=27),
- ),
- (
- "2011-10-05T15:27:33",
- False,
- datetime_utc(year=2011, month=10, day=5, hour=15, minute=27, second=33),
- ),
- (None, False, None),
- (None, True, None),
- (
- "2011-10-05T15:27",
- True,
- datetime_utc(year=2011, month=10, day=5, hour=15, minute=27),
- ),
- (
- "2011-10-05T15:27:33",
- True,
- datetime_utc(year=2011, month=10, day=5, hour=15, minute=27, second=33),
- ),
- ],
- )
- def test_specified_start_trumps_intrinsic_default_start(
- self,
- specified_default_start: str | None,
- override_timestamp: bool,
- expected_start: datetime | None,
- bibliotheca_fixture: BibliothecaAPITestFixture,
- ):
- db = bibliotheca_fixture.db
- # When a valid `default_start` parameter is specified, it -- not the monitor's
- # intrinsic default -- will always become the monitor's `default_start_time`.
- monitor = BibliothecaPurchaseMonitor(
- db.session,
- bibliotheca_fixture.collection,
- api_class=MockBibliothecaAPI,
- default_start=specified_default_start,
- override_timestamp=override_timestamp,
- )
- monitor_intrinsic_default = monitor._intrinsic_start_time(db.session)
- assert isinstance(monitor.default_start_time, datetime)
- assert isinstance(monitor_intrinsic_default, datetime)
- if specified_default_start:
- assert monitor.default_start_time == expected_start
- else:
- assert (
- abs(
- (
- monitor_intrinsic_default - monitor.default_start_time
- ).total_seconds()
- )
- <= 1
- )
-
- # If no `default_date` specified, then `override_timestamp` must be false.
- if not specified_default_start:
- assert monitor.override_timestamp is False
-
- # For an uninitialized monitor (no timestamp), the monitor's `default_start_time`,
- # whether from a specified `default_start` or the monitor's intrinsic start time,
- # will be the actual start time. The cut-off will be roughly the current time, in
- # either case.
- expected_cutoff = utc_now()
- with mock.patch.object(
- monitor, "catch_up_from", return_value=None
- ) as catch_up_from:
- monitor.run()
- actual_start, actual_cutoff, progress = catch_up_from.call_args[0]
- assert abs((expected_cutoff - actual_cutoff).total_seconds()) <= 1
- assert actual_cutoff == progress.finish
- assert actual_start == monitor.default_start_time
- assert progress.start == monitor.default_start_time
-
- @pytest.mark.parametrize(
- "specified_default_start, override_timestamp, expected_start",
- [
- (
- "2011-10-05T15:27",
- False,
- datetime_utc(year=2011, month=10, day=5, hour=15, minute=27),
- ),
- (
- "2011-10-05T15:27:33",
- False,
- datetime_utc(year=2011, month=10, day=5, hour=15, minute=27, second=33),
- ),
- (None, False, None),
- (None, True, None),
- (
- "2011-10-05T15:27",
- True,
- datetime_utc(year=2011, month=10, day=5, hour=15, minute=27),
- ),
- (
- "2011-10-05T15:27:33",
- True,
- datetime_utc(year=2011, month=10, day=5, hour=15, minute=27, second=33),
- ),
- ],
- )
- def test_specified_start_can_override_timestamp(
- self,
- specified_default_start: str | None,
- override_timestamp: bool,
- expected_start: datetime | None,
- bibliotheca_fixture: BibliothecaAPITestFixture,
- ):
- monitor = BibliothecaPurchaseMonitor(
- bibliotheca_fixture.db.session,
- bibliotheca_fixture.collection,
- api_class=MockBibliothecaAPI,
- default_start=specified_default_start,
- override_timestamp=override_timestamp,
- )
- # For an initialized monitor, the `default_start_time` will be derived from
- # `timestamp.finish`, unless overridden by a specified `default_start` when
- # `override_timestamp` is specified as True.
- ts = Timestamp.stamp(
- bibliotheca_fixture.db.session,
- service=monitor.service_name,
- service_type=Timestamp.MONITOR_TYPE,
- collection=monitor.collection,
- )
- assert isinstance(ts.finish, datetime)
- start_time_from_ts = ts.finish - BibliothecaPurchaseMonitor.OVERLAP
- expected_actual_start_time = (
- expected_start if monitor.override_timestamp else start_time_from_ts
- )
- expected_cutoff = utc_now()
- with mock.patch.object(
- monitor, "catch_up_from", return_value=None
- ) as catch_up_from:
- monitor.run()
- actual_start, actual_cutoff, progress = catch_up_from.call_args[0]
- assert abs((expected_cutoff - actual_cutoff).total_seconds()) <= 1
- assert actual_cutoff == progress.finish
- assert actual_start == expected_actual_start_time
- assert progress.start == expected_actual_start_time
-
- @pytest.mark.parametrize("input", [("invalid"), ("2020/10"), (["2020-10-05"])])
- def test_optional_iso_date_invalid_dates(
- self,
- input: list[str] | str,
- default_monitor: BibliothecaPurchaseMonitor,
- ):
- with pytest.raises(ValueError) as excinfo:
- default_monitor._optional_iso_date(input) # type: ignore[arg-type]
-
- def test_catch_up_from(self, default_monitor: BibliothecaPurchaseMonitor):
- # catch_up_from() slices up its given timespan, calls
- # purchases() to find purchases for each slice, processes each
- # purchase using process_record(), and sets a checkpoint for each
- # slice that is unambiguously in the past.
- today = utc_now().date()
-
- # _checkpoint() will be called after processing this slice
- # because it's a full slice that ends before today.
- full_slice = [datetime_utc(2014, 1, 1), datetime_utc(2014, 1, 2), True]
-
- # _checkpoint() is not called after processing this slice
- # because it's not a full slice.
- incomplete_slice = [datetime_utc(2015, 1, 1), datetime_utc(2015, 1, 2), False]
-
- # _checkpoint() is not called after processing this slice,
- # even though it's supposedly complete, because today isn't
- # over yet.
- today_slice = [today - timedelta(days=1), today, True]
-
- # _checkpoint() is not called after processing this slice
- # because it doesn't end in the past.
- future_slice = [today + timedelta(days=1), today + timedelta(days=2), True]
-
- default_monitor.slice_timespan = MagicMock(
- return_value=[full_slice, incomplete_slice, today_slice, future_slice]
- )
- default_monitor.purchases = MagicMock(return_value=["A record"])
- default_monitor.process_record = MagicMock()
- default_monitor._checkpoint = MagicMock()
-
- # Execute.
- progress = TimestampData()
- start = datetime_utc(2019, 1, 1)
- cutoff = datetime_utc(2020, 1, 1)
- default_monitor.catch_up_from(start, cutoff, progress)
-
- # slice_timespan was called once.
- default_monitor.slice_timespan.assert_called_once_with(
- start, cutoff, timedelta(days=1)
- )
-
- # purchases() was called on each slice it returned.
- default_monitor.purchases.assert_has_calls(
- [
- mock.call(*x[:2])
- for x in (full_slice, incomplete_slice, today_slice, future_slice)
- ]
- )
-
- # Each purchases() call returned a single record, which was
- # passed into process_record along with the start date of the
- # current slice.
- default_monitor.process_record.assert_has_calls(
- [
- mock.call("A record", x[0])
- for x in [full_slice, incomplete_slice, today_slice, future_slice]
- ]
- )
-
- # TimestampData.achievements was set to the total number of
- # records processed.
- assert progress.achievements == "MARC records processed: 4"
-
- # Only one of our contrived time slices -- the first one --
- # was a full slice that ended before the current
- # date. _checkpoint was called on that slice, and only that
- # slice.
- default_monitor._checkpoint.assert_called_once_with(
- progress, start, full_slice[0], "MARC records processed: 1"
- )
-
- def test__checkpoint(self, default_monitor: BibliothecaPurchaseMonitor):
- # The _checkpoint method allows the BibliothecaPurchaseMonitor
- # to preserve its progress in case of a crash.
-
- # The Timestamp for the default monitor shows that it has
- # a start date but it's never successfully completed.
- timestamp_obj = default_monitor.timestamp()
- assert timestamp_obj.achievements is None
- assert timestamp_obj.start == BibliothecaPurchaseMonitor.DEFAULT_START_TIME
- assert timestamp_obj.finish is None
-
- timestamp_data = TimestampData()
- finish = datetime_utc(2020, 1, 1)
- achievements = "Some achievements"
-
- default_monitor._checkpoint(
- timestamp_data, timestamp_obj.start, finish, achievements
- )
-
- # Calling _checkpoint creates the impression that the monitor
- # completed at the checkpoint, even though in point of fact
- # it's still running.
- timestamp_obj = default_monitor.timestamp()
- assert timestamp_obj.achievements == achievements
- assert timestamp_obj.start == BibliothecaPurchaseMonitor.DEFAULT_START_TIME
- assert timestamp_obj.finish == finish
-
- def test_purchases(self, default_monitor: BibliothecaPurchaseMonitor):
- # The purchases() method calls marc_request repeatedly, handling
- # pagination.
-
- # Mock three pages that contain 50, 50, and 49 items.
- default_monitor.api.marc_request = MagicMock(
- side_effect=[[1] * 50, [2] * 50, [3] * 49]
- )
- start = datetime_utc(2020, 1, 1)
- end = datetime_utc(2020, 1, 2)
- records = [x for x in default_monitor.purchases(start, end)]
-
- # marc_request was called repeatedly with increasing offsets
- # until it returned fewer than 50 results.
- default_monitor.api.marc_request.assert_has_calls(
- [mock.call(start, end, offset, 50) for offset in (1, 51, 101)]
- )
-
- # Every "record" it returned was yielded as part of a single
- # stream.
- assert ([1] * 50) + ([2] * 50) + ([3] * 49) == records
-
- def test_process_record(
- self,
- default_monitor: BibliothecaPurchaseMonitor,
- caplog: pytest.LogCaptureFixture,
- bibliotheca_fixture: BibliothecaAPITestFixture,
- ):
- # process_record may create a LicensePool, trigger the
- # bibliographic coverage provider, and/or issue a "license
- # added" analytics event, based on the identifier found in a
- # MARC record.
- purchase_time = utc_now()
- ensure_coverage = MagicMock()
- default_monitor.bibliographic_coverage_provider.ensure_coverage = (
- ensure_coverage
- )
-
- # Try some cases that won't happen in real life.
- multiple_control_numbers = b"""01034nam a22002413a 4500ehasb89abcde"""
- no_control_number = b"""01034nam a22002413a 4500"""
- for bad_record, expect_error in (
- (
- multiple_control_numbers,
- "Ignoring MARC record with multiple Bibliotheca control numbers.",
- ),
- (
- no_control_number,
- "Ignoring MARC record with no Bibliotheca control number.",
- ),
- ):
- [marc] = parse_xml_to_array(BytesIO(bad_record))
- assert default_monitor.process_record(marc, purchase_time) is None
- assert expect_error in caplog.messages[-1]
-
- # Now, try the two real cases.
- [ehasb89, oock89] = parse_xml_to_array(
- StringIO(
- bibliotheca_fixture.files.sample_data("marc_records_two.xml").decode(
- "utf8"
- )
- )
- )
-
- # If the book is new to this collection, it's run through
- # BibliothecaBibliographicCoverageProvider.ensure_coverage to
- # give it initial bibliographic and circulation data.
- pool = default_monitor.process_record(ehasb89, purchase_time)
- assert pool.identifier.identifier == "ehasb89"
- assert pool.identifier.type == Identifier.BIBLIOTHECA_ID
- assert pool.data_source.name == DataSource.BIBLIOTHECA
- assert bibliotheca_fixture.collection == pool.collection
- ensure_coverage.assert_called_once_with(pool.identifier, force=True)
-
- # If the book is already in this collection, ensure_coverage
- # is not called.
- pool, ignore = LicensePool.for_foreign_id(
- bibliotheca_fixture.db.session,
- DataSource.BIBLIOTHECA,
- Identifier.BIBLIOTHECA_ID,
- "3oock89",
- collection=bibliotheca_fixture.collection,
- )
- pool2 = default_monitor.process_record(oock89, purchase_time)
- assert pool == pool2
- assert ensure_coverage.call_count == 1 # i.e. was not called again.
-
- def test_end_to_end(
- self,
- default_monitor: BibliothecaPurchaseMonitor,
- bibliotheca_fixture: BibliothecaAPITestFixture,
- ):
- # Limited end-to-end test of the BibliothecaPurchaseMonitor.
-
- # Set the default start time to one minute in the past, so the
- # monitor doesn't feel the need to make more than one call to
- # the MARC endpoint.
- default_monitor.override_timestamp = True
- start_time = utc_now() - timedelta(minutes=1)
- default_monitor.default_start_time = start_time
-
- # There will be two calls to the mock API: one to the MARC
- # endpoint, which will tell us about the purchase of a single
- # book, and one to the metadata endpoint for information about
- # that book.
- api = cast(MockBibliothecaAPI, default_monitor.api)
- api.queue_response(
- 200, content=bibliotheca_fixture.files.sample_data("marc_records_one.xml")
- )
- api.queue_response(
- 200,
- content=bibliotheca_fixture.files.sample_data("item_metadata_single.xml"),
- )
- default_monitor.run()
-
- # One book was created.
- work = bibliotheca_fixture.db.session.query(Work).one()
-
- # Bibliographic information came from the coverage provider,
- # not from our fake MARC record (which is actually for a
- # different book).
- assert work.title == "The Incense Game"
-
- # Licensing information was also taken from the coverage
- # provider.
- [lp] = work.license_pools
- assert lp.identifier.identifier == "ddf4gr9"
- assert default_monitor.collection == lp.collection
- assert lp.licenses_owned == 1
- assert lp.licenses_available == 1
-
- # The timestamp has been updated; the next time the monitor
- # runs it will ask for purchases that haven't happened yet.
- default_monitor.override_timestamp = False
- timestamp = default_monitor.timestamp()
- assert timestamp.achievements == "MARC records processed: 1"
- assert timestamp.finish is not None
- assert timestamp.finish > start_time
-
-
-class TestBibliothecaPurchaseMonitorWhenMultipleCollections:
- def test_multiple_service_type_timestamps_with_start_date(
- self, bibliotheca_fixture: BibliothecaAPITestFixture
- ):
- db = bibliotheca_fixture.db
- # Start with multiple collections that have timestamps
- # because they've run before.
- collections = [
- MockBibliothecaAPI.mock_collection(
- db.session, db.default_library(), name="Collection 1"
- ),
- MockBibliothecaAPI.mock_collection(
- db.session, db.default_library(), name="Collection 2"
- ),
- ]
- for c in collections:
- Timestamp.stamp(
- db.session,
- service=BibliothecaPurchaseMonitor.SERVICE_NAME,
- service_type=Timestamp.MONITOR_TYPE,
- collection=c,
- )
- # Instantiate the associated monitors with a start date.
- monitors = [
- BibliothecaPurchaseMonitor(
- db.session, c, api_class=BibliothecaAPI, default_start="2011-02-03"
- )
- for c in collections
- ]
- assert len(monitors) == len(collections)
- # Ensure that we get monitors and not an exception.
- for m in monitors:
- assert isinstance(m, BibliothecaPurchaseMonitor)
-
-
class TestItemListParser:
def test_contributors_for_string(cls):
authors = list(
diff --git a/tests/manager/integration/license/test_bibliotheca_purchase_record_importer.py b/tests/manager/integration/license/test_bibliotheca_purchase_record_importer.py
new file mode 100644
index 0000000000..c474c2e040
--- /dev/null
+++ b/tests/manager/integration/license/test_bibliotheca_purchase_record_importer.py
@@ -0,0 +1,318 @@
+"""Unit tests for BibliothecaPurchaseRecordImporter."""
+
+from __future__ import annotations
+
+from datetime import timedelta
+from unittest.mock import MagicMock, patch
+
+from palace.util.datetime_helpers import datetime_utc, utc_now
+
+from palace.manager.celery.tasks import apply
+from palace.manager.integration.license.bibliotheca import BibliothecaAPI
+from palace.manager.integration.license.bibliotheca_purchase_record_importer import (
+ _MARC_PAGE_SIZE,
+ DEFAULT_PURCHASE_RECORD_START_TIME,
+ PURCHASE_RECORD_SERVICE_NAME,
+ BibliothecaPurchaseRecordImporter,
+ DayImportResult,
+)
+from palace.manager.sqlalchemy.model.coverage import Timestamp
+from tests.fixtures.database import DatabaseTransactionFixture
+from tests.mocks.bibliotheca import MockBibliothecaAPI
+
+
+def _make_importer(
+ db: DatabaseTransactionFixture,
+ api: MockBibliothecaAPI | None = None,
+) -> tuple[BibliothecaPurchaseRecordImporter, MockBibliothecaAPI]:
+ """Return an importer and its bound API for the default test collection."""
+ collection = MockBibliothecaAPI.mock_collection(db.session, db.default_library())
+ mock_api = api or MockBibliothecaAPI(db.session, collection)
+ importer = BibliothecaPurchaseRecordImporter(db.session, collection, api=mock_api)
+ return importer, mock_api
+
+
+def _fake_marc_record(bibliotheca_id: str = "d5rf89") -> MagicMock:
+ """Return a minimal mock pymarc Record with a single 001 control field."""
+ field = MagicMock()
+ field.tag = "001"
+ field.value.return_value = bibliotheca_id
+ record = MagicMock()
+ record.fields = [field]
+ return record
+
+
+class TestBibliothecaPurchaseRecordImporterGetStart:
+ def test_no_prior_timestamp_returns_default_start(
+ self, db: DatabaseTransactionFixture
+ ) -> None:
+ """With no stored Timestamp, get_start returns DEFAULT_PURCHASE_RECORD_START_TIME."""
+ importer, _ = _make_importer(db)
+ start = importer.get_start()
+ assert start == DEFAULT_PURCHASE_RECORD_START_TIME
+
+ def test_prior_timestamp_returns_its_finish(
+ self, db: DatabaseTransactionFixture
+ ) -> None:
+ """With a stored Timestamp, get_start returns timestamp.finish."""
+ importer, mock_api = _make_importer(db)
+ collection = mock_api.collection
+ finish = utc_now() - timedelta(days=30)
+ Timestamp.stamp(
+ db.session,
+ service=PURCHASE_RECORD_SERVICE_NAME,
+ service_type=Timestamp.MONITOR_TYPE,
+ collection=collection,
+ finish=finish,
+ )
+
+ start = importer.get_start()
+ assert abs((start - finish).total_seconds()) < 1
+
+
+class TestBibliothecaPurchaseRecordImporterImportDay:
+ def test_returns_day_result_with_correct_bounds(
+ self, db: DatabaseTransactionFixture
+ ) -> None:
+ """import_day returns a DayImportResult with the correct window."""
+ importer, _ = _make_importer(db)
+ current_day = datetime_utc(2024, 1, 15)
+ cutoff = datetime_utc(2024, 1, 20)
+
+ with patch.object(BibliothecaAPI, "marc_request", return_value=iter([])):
+ result = importer.import_day(current_day, cutoff)
+
+ assert isinstance(result, DayImportResult)
+ assert result.day_start == current_day
+ expected_end = current_day + timedelta(days=1)
+ assert abs((result.day_end - expected_end).total_seconds()) < 1
+ assert result.records_handled == 0
+
+ def test_day_end_capped_at_cutoff(self, db: DatabaseTransactionFixture) -> None:
+ """When current_day + 1 day > cutoff, day_end is capped at cutoff."""
+ importer, _ = _make_importer(db)
+ cutoff = utc_now()
+ # Start just a few hours before cutoff — less than a full day.
+ current_day = cutoff - timedelta(hours=3)
+
+ with patch.object(BibliothecaAPI, "marc_request", return_value=iter([])):
+ result = importer.import_day(current_day, cutoff)
+
+ assert abs((result.day_end - cutoff).total_seconds()) < 1
+
+ def test_stamps_timestamp_to_day_end_when_day_complete(
+ self, db: DatabaseTransactionFixture
+ ) -> None:
+ """A partial page (day complete) stamps Timestamp.finish to day_end."""
+ importer, mock_api = _make_importer(db)
+ collection = mock_api.collection
+ current_day = datetime_utc(2024, 1, 15)
+ cutoff = datetime_utc(2024, 1, 20)
+
+ with patch.object(BibliothecaAPI, "marc_request", return_value=iter([])):
+ result = importer.import_day(current_day, cutoff)
+
+ ts = Timestamp.lookup(
+ db.session, PURCHASE_RECORD_SERVICE_NAME, Timestamp.MONITOR_TYPE, collection
+ )
+ assert ts is not None
+ assert ts.finish is not None
+ assert abs((ts.finish - result.day_end).total_seconds()) < 1
+
+ def test_stamps_timestamp_to_current_day_when_page_full(
+ self, db: DatabaseTransactionFixture
+ ) -> None:
+ """A full page (day still in progress) stamps Timestamp.finish to current_day."""
+ importer, mock_api = _make_importer(db)
+ collection = mock_api.collection
+ current_day = datetime_utc(2024, 1, 15)
+ cutoff = datetime_utc(2024, 1, 20)
+
+ full_page = [_fake_marc_record(f"item{i}") for i in range(_MARC_PAGE_SIZE)]
+
+ with (
+ patch.object(BibliothecaAPI, "marc_request", return_value=iter(full_page)),
+ patch.object(BibliothecaAPI, "bibliographic_lookup", return_value=[]),
+ ):
+ result = importer.import_day(current_day, cutoff)
+
+ assert result.next_offset is not None # day not yet complete
+
+ ts = Timestamp.lookup(
+ db.session, PURCHASE_RECORD_SERVICE_NAME, Timestamp.MONITOR_TYPE, collection
+ )
+ assert ts is not None
+ assert ts.finish is not None
+ assert abs((ts.finish - current_day).total_seconds()) < 1
+
+ def test_counts_records_handled(self, db: DatabaseTransactionFixture) -> None:
+ """records_handled in the result matches the number of records on the page."""
+ importer, _ = _make_importer(db)
+ current_day = datetime_utc(2024, 1, 15)
+ cutoff = datetime_utc(2024, 1, 20)
+
+ fake_records = [_fake_marc_record(f"item{i}") for i in range(3)]
+
+ with (
+ patch.object(
+ BibliothecaAPI, "marc_request", return_value=iter(fake_records)
+ ),
+ patch.object(BibliothecaAPI, "bibliographic_lookup", return_value=[]),
+ ):
+ result = importer.import_day(current_day, cutoff)
+
+ assert result.records_handled == 3
+
+ def test_returns_next_offset_when_page_is_full(
+ self, db: DatabaseTransactionFixture
+ ) -> None:
+ """A full page (50 records) sets next_offset to offset + _MARC_PAGE_SIZE."""
+ importer, _ = _make_importer(db)
+ current_day = datetime_utc(2024, 1, 15)
+ cutoff = datetime_utc(2024, 1, 20)
+
+ full_page = [_fake_marc_record(f"item{i}") for i in range(_MARC_PAGE_SIZE)]
+
+ with (
+ patch.object(BibliothecaAPI, "marc_request", return_value=iter(full_page)),
+ patch.object(BibliothecaAPI, "bibliographic_lookup", return_value=[]),
+ ):
+ result = importer.import_day(current_day, cutoff, offset=1)
+
+ assert result.next_offset == 1 + _MARC_PAGE_SIZE
+
+ def test_returns_no_next_offset_when_page_is_partial(
+ self, db: DatabaseTransactionFixture
+ ) -> None:
+ """A partial page (fewer than 50 records) sets next_offset to None."""
+ importer, _ = _make_importer(db)
+ current_day = datetime_utc(2024, 1, 15)
+ cutoff = datetime_utc(2024, 1, 20)
+
+ partial_page = [_fake_marc_record(f"item{i}") for i in range(3)]
+
+ with (
+ patch.object(
+ BibliothecaAPI, "marc_request", return_value=iter(partial_page)
+ ),
+ patch.object(BibliothecaAPI, "bibliographic_lookup", return_value=[]),
+ ):
+ result = importer.import_day(current_day, cutoff, offset=51)
+
+ assert result.next_offset is None
+
+ def test_passes_offset_to_marc_request(
+ self, db: DatabaseTransactionFixture
+ ) -> None:
+ """import_day forwards the offset parameter to marc_request."""
+ importer, _ = _make_importer(db)
+ current_day = datetime_utc(2024, 1, 15)
+ cutoff = datetime_utc(2024, 1, 20)
+
+ with patch.object(
+ BibliothecaAPI, "marc_request", return_value=iter([])
+ ) as mock_request:
+ importer.import_day(current_day, cutoff, offset=101)
+
+ args, kwargs = mock_request.call_args
+ # marc_request(start, end, offset, limit)
+ assert args[2] == 101
+
+
+class TestBibliothecaPurchaseRecordImporterProcessRecord:
+ """Tests for _process_record logic, exercised via import_day."""
+
+ def test_creates_license_pool_for_valid_record(
+ self, db: DatabaseTransactionFixture
+ ) -> None:
+ """_process_record creates a LicensePool for a record with a valid 001 field."""
+ importer, mock_api = _make_importer(db)
+ collection = mock_api.collection
+ current_day = datetime_utc(2024, 1, 15)
+ cutoff = datetime_utc(2024, 1, 20)
+
+ with (
+ patch.object(
+ BibliothecaAPI,
+ "marc_request",
+ return_value=iter([_fake_marc_record("d5rf89")]),
+ ),
+ patch.object(BibliothecaAPI, "bibliographic_lookup", return_value=[]),
+ ):
+ importer.import_day(current_day, cutoff)
+
+ pools = [
+ lp for lp in collection.licensepools if lp.identifier.identifier == "d5rf89"
+ ]
+ assert len(pools) == 1
+
+ def test_skips_record_with_no_control_number(
+ self, db: DatabaseTransactionFixture
+ ) -> None:
+ """_process_record logs an error and skips records with no 001 field."""
+ importer, _ = _make_importer(db)
+ current_day = datetime_utc(2024, 1, 15)
+ cutoff = datetime_utc(2024, 1, 20)
+
+ # Record with no 001 field.
+ bad_record = MagicMock()
+ bad_record.fields = []
+ bad_record.as_json.return_value = "{}"
+
+ with patch.object(
+ BibliothecaAPI, "marc_request", return_value=iter([bad_record])
+ ):
+ result = importer.import_day(current_day, cutoff)
+
+ # The record was "handled" (iterated) but produced no LicensePool.
+ assert result.records_handled == 1
+
+ def test_queues_bibliographic_apply_when_needed(
+ self, db: DatabaseTransactionFixture
+ ) -> None:
+ """bibliographic_apply is queued when the bibliographic data has changed."""
+ importer, _ = _make_importer(db)
+ mock_bib = MagicMock()
+ mock_bib.needs_apply.return_value = True
+ current_day = datetime_utc(2024, 1, 15)
+ cutoff = datetime_utc(2024, 1, 20)
+
+ with (
+ patch.object(
+ BibliothecaAPI,
+ "marc_request",
+ return_value=iter([_fake_marc_record()]),
+ ),
+ patch.object(
+ BibliothecaAPI, "bibliographic_lookup", return_value=[mock_bib]
+ ),
+ patch.object(apply, "bibliographic_apply") as mock_apply,
+ ):
+ importer.import_day(current_day, cutoff)
+
+ mock_apply.delay.assert_called_once()
+
+ def test_skips_bibliographic_apply_when_not_needed(
+ self, db: DatabaseTransactionFixture
+ ) -> None:
+ """bibliographic_apply is not queued when the data is already up to date."""
+ importer, _ = _make_importer(db)
+ mock_bib = MagicMock()
+ mock_bib.needs_apply.return_value = False
+ current_day = datetime_utc(2024, 1, 15)
+ cutoff = datetime_utc(2024, 1, 20)
+
+ with (
+ patch.object(
+ BibliothecaAPI,
+ "marc_request",
+ return_value=iter([_fake_marc_record()]),
+ ),
+ patch.object(
+ BibliothecaAPI, "bibliographic_lookup", return_value=[mock_bib]
+ ),
+ patch.object(apply, "bibliographic_apply") as mock_apply,
+ ):
+ importer.import_day(current_day, cutoff)
+
+ mock_apply.delay.assert_not_called()
diff --git a/tests/manager/integration/license/test_bibliotheca_scripts.py b/tests/manager/integration/license/test_bibliotheca_scripts.py
index a10df4549f..fa69c3ba90 100644
--- a/tests/manager/integration/license/test_bibliotheca_scripts.py
+++ b/tests/manager/integration/license/test_bibliotheca_scripts.py
@@ -11,6 +11,7 @@
from palace.manager.celery.tasks import bibliotheca
from palace.manager.integration.license.bibliotheca_scripts import (
ImportEventCollection,
+ ImportPurchaseRecordCollection,
)
from tests.fixtures.database import DatabaseTransactionFixture
from tests.mocks.bibliotheca import MockBibliothecaAPI
@@ -55,3 +56,50 @@ def test_both_args_raises(self, db: DatabaseTransactionFixture) -> None:
ImportEventCollection(db.session).do_run(
["--collection", collection.name, "--import-all"]
)
+
+
+class TestImportPurchaseRecordCollection:
+ def test_import_all_queues_import_purchase_records_for_all_collections(
+ self, db: DatabaseTransactionFixture
+ ) -> None:
+ """--import-all dispatches import_purchase_records_for_all_collections."""
+ with patch.object(
+ bibliotheca, "import_purchase_records_for_all_collections"
+ ) as mock_task:
+ ImportPurchaseRecordCollection(db.session).do_run(["--import-all"])
+ mock_task.delay.assert_called_once_with()
+
+ def test_collection_queues_import_purchase_records_by_collection(
+ self, db: DatabaseTransactionFixture
+ ) -> None:
+ """--collection dispatches import_purchase_records_by_collection for that collection."""
+ collection = MockBibliothecaAPI.mock_collection(
+ db.session, db.default_library(), name="My Bibliotheca"
+ )
+ with patch.object(
+ bibliotheca, "import_purchase_records_by_collection"
+ ) as mock_task:
+ ImportPurchaseRecordCollection(db.session).do_run(
+ ["--collection", collection.name]
+ )
+ mock_task.delay.assert_called_once_with(collection_id=collection.id)
+
+ def test_collection_not_found_raises(self, db: DatabaseTransactionFixture) -> None:
+ """--collection with an unknown name raises PalaceValueError."""
+ with pytest.raises(PalaceValueError, match='No collection found named "Ghost"'):
+ ImportPurchaseRecordCollection(db.session).do_run(["--collection", "Ghost"])
+
+ def test_no_args_raises(self, db: DatabaseTransactionFixture) -> None:
+ """Omitting both --collection and --import-all is an error."""
+ with pytest.raises(SystemExit):
+ ImportPurchaseRecordCollection(db.session).do_run([])
+
+ def test_both_args_raises(self, db: DatabaseTransactionFixture) -> None:
+ """Specifying both --collection and --import-all is an error."""
+ collection = MockBibliothecaAPI.mock_collection(
+ db.session, db.default_library()
+ )
+ with pytest.raises(SystemExit):
+ ImportPurchaseRecordCollection(db.session).do_run(
+ ["--collection", collection.name, "--import-all"]
+ )