diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 9465c06eb1..ba42598276 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -1,5 +1,7 @@ # See https://pre-commit.com for more information # See https://pre-commit.com/hooks.html for more hooks +default_language_version: + python: python3.12 repos: - repo: https://github.com/pre-commit/pre-commit-hooks rev: v6.0.0 diff --git a/bin/bibliotheca_purchase_monitor b/bin/bibliotheca_purchase_monitor deleted file mode 100755 index fb923b0cdc..0000000000 --- a/bin/bibliotheca_purchase_monitor +++ /dev/null @@ -1,11 +0,0 @@ -#!/usr/bin/env python -"""Ask the Bibliotheca API about license purchases, potentially purchases -that happened many years in the past.""" - - -from palace.manager.integration.license.bibliotheca import ( - BibliothecaPurchaseMonitor, - RunBibliothecaPurchaseMonitorScript, -) - -RunBibliothecaPurchaseMonitorScript(BibliothecaPurchaseMonitor).run() diff --git a/bin/bibliotheca_purchase_record_import b/bin/bibliotheca_purchase_record_import new file mode 100755 index 0000000000..d4f43faf42 --- /dev/null +++ b/bin/bibliotheca_purchase_record_import @@ -0,0 +1,8 @@ +#!/usr/bin/env python +"""Manually kick off the Bibliotheca purchase record import for one or all collections.""" + +from palace.manager.integration.license.bibliotheca_scripts import ( + ImportPurchaseRecordCollection, +) + +ImportPurchaseRecordCollection().run() diff --git a/src/palace/manager/celery/tasks/bibliotheca.py b/src/palace/manager/celery/tasks/bibliotheca.py index d4feeca472..369d13945c 100644 --- a/src/palace/manager/celery/tasks/bibliotheca.py +++ b/src/palace/manager/celery/tasks/bibliotheca.py @@ -1,16 +1,22 @@ """Celery tasks for Bibliotheca (3M Cloud) collection management. -Two tasks handle near-real-time event import for Bibliotheca collections: +Four tasks handle near-real-time event import and historical purchase record import: - ``import_all_collections``: Fans out to one ``import_collection`` task per collection. - ``import_collection``: Processes one time slice of circulation events, then re-queues itself via ``task.replace()`` until the collection is caught up, holding a Redis workflow lock across the chain so at most one run proceeds per collection. +- ``import_purchase_records_for_all_collections``: Fans out to one + ``import_purchase_records_by_collection`` task per collection. +- ``import_purchase_records_by_collection``: Processes one page of MARC purchase + records, then re-queues itself via ``task.replace()`` until the collection is + caught up to ``utc_now()``, holding a separate Redis workflow lock so at most one + purchase record import run proceeds per collection at a time. """ from __future__ import annotations -from datetime import datetime +from datetime import datetime, timedelta from uuid import uuid4 from celery import shared_task @@ -26,7 +32,12 @@ EVENT_IMPORT_OVERLAP, BibliothecaEventImporter, ) +from palace.manager.integration.license.bibliotheca_purchase_record_importer import ( + BibliothecaPurchaseRecordImporter, +) from palace.manager.service.celery.celery import QueueNames +from palace.manager.service.redis.models.lock import RedisLock +from palace.manager.service.redis.redis import Redis from palace.manager.sqlalchemy.model.collection import Collection from palace.manager.util.http.exception import ( BadResponseException, @@ -104,15 +115,12 @@ def import_collection( ) as workflow_lock_acquired: if not workflow_lock_acquired and is_first_slice: task.log.warning( - f"Bibliotheca event import skipped for collection {collection_id}: " - "another run is already in progress." + f"Bibliotheca event import skipped for collection {collection_id}: another run is already in progress." ) return if not workflow_lock_acquired and not is_first_slice: task.log.warning( - f"Bibliotheca event import for collection {collection_id}: " - "workflow lock expired between slices; continuing " - "(another run may be active)." + f"Bibliotheca event import for collection {collection_id}: workflow lock expired between slices; continuing (another run may be active)." ) cutoff = utc_now() - EVENT_IMPORT_OVERLAP @@ -152,3 +160,168 @@ def import_collection( lock_value=lock_value, ) ) + + +# --------------------------------------------------------------------------- +# Purchase record importer +# --------------------------------------------------------------------------- + + +def _purchase_record_workflow_lock( + client: Redis, collection_id: int, random_value: str +) -> RedisLock: + """Create a workflow-level lock for the purchase record importer. + + Uses a key distinct from ``import_workflow_lock`` so that event-import + and purchase-record-import runs for the same collection do not block each other. + """ + return RedisLock( + client, + [ + "PurchaseRecordCollectionWorkflow", + Collection.redis_key_from_id(collection_id), + ], + random_value=random_value, + lock_timeout=timedelta(hours=2), + ) + + +@shared_task(queue=QueueNames.default, bind=True) +def import_purchase_records_for_all_collections(task: Task) -> None: + """Queue an ``import_purchase_records_by_collection`` task for every Bibliotheca collection.""" + with task.session() as session: + registry = task.services.integration_registry().license_providers() + collection_query = Collection.select_by_protocol( + BibliothecaAPI, registry=registry + ) + collections = session.scalars(collection_query).all() + + for collection in collections: + import_purchase_records_by_collection.delay(collection_id=collection.id) + + task.log.info( + f"Queued {len(collections)} Bibliotheca collection(s) for purchase record import." + ) + + +@shared_task( + queue=QueueNames.default, + bind=True, + max_retries=4, + autoretry_for=(BadResponseException, RequestTimedOut), + throws=(RemoteIntegrationException,), + retry_backoff=60, +) +def import_purchase_records_by_collection( + task: Task, + collection_id: int, + *, + current_day: datetime | None = None, + offset: int = 1, + lock_value: str | None = None, +) -> None: + """Process one page of Bibliotheca MARC purchase records. + + Fetches up to 50 MARC records for ``[current_day, current_day+1day]`` + starting at ``offset``, creates ``LicensePool`` entries, queues + ``bibliographic_apply`` for new or changed titles, then re-queues itself + via ``task.replace()``: + + - with ``offset`` advanced when the current page was full (more records + remain for the same day), or + - with ``current_day`` advanced to the next day and ``offset`` reset to 1 + when the current day is fully processed. + + This continues until the collection is caught up to ``utc_now()``. + + A Redis workflow lock keyed to ``collection_id`` (prefix + ``PurchaseRecordCollectionWorkflow``) ensures at most one purchase-record-import + chain runs per collection at a time. The lock is held across ``task.replace()`` + calls and Celery retries so that a transient API failure does not open a + window for a second concurrent run. + + :param collection_id: Database ID of the Bibliotheca collection. + :param current_day: Start of the day to process. ``None`` on the first + invocation — the start is derived from the stored ``Timestamp`` + (defaulting to 2014-01-01 when no timestamp exists). + :param offset: 1-based record offset within the current day's result set. + Defaults to ``1`` (the first page). + :param lock_value: UUID identifying this workflow. Generated on the first + invocation and forwarded unchanged to every subsequent page/day so the + lock is held across ``task.replace()`` calls. + """ + redis = task.services.redis().client() + + is_first_day = lock_value is None + if lock_value is None: + lock_value = str(uuid4()) + + workflow_lock = _purchase_record_workflow_lock(redis, collection_id, lock_value) + + with workflow_lock.lock( + raise_when_not_acquired=False, + # Hold the lock across task.replace() (Ignore) and Celery retries + # (BadResponseException, RequestTimedOut) so a transient API failure + # does not open a window for a concurrent run on the same collection. + ignored_exceptions=(Ignore, BadResponseException, RequestTimedOut), + ) as workflow_lock_acquired: + if not workflow_lock_acquired and is_first_day: + task.log.warning( + f"Bibliotheca purchase record import skipped for collection {collection_id}: another run is already in progress." + ) + return + if not workflow_lock_acquired and not is_first_day: + task.log.warning( + f"Bibliotheca purchase record import for collection {collection_id}: workflow lock expired between days; continuing (another run may be active)." + ) + + cutoff = utc_now() + result = None + collection_name: str | None = None + + with task.transaction() as session: + collection = load_from_id(session, Collection, collection_id) + collection_name = collection.name + importer = BibliothecaPurchaseRecordImporter(session, collection) + + if current_day is None: + current_day = importer.get_start() + + if current_day >= cutoff: + task.log.info( + f"Bibliotheca purchase record import: '{collection_name}' is already up to date." + ) + return + + result = importer.import_day(current_day, cutoff, offset) + + assert result is not None + + task.log.info( + f"Bibliotheca purchase record import: handled {result.records_handled} record(s) for " + f"'{collection_name}' " + f"({result.day_start.strftime('%Y-%m-%dT%H:%M:%S')} -> " + f"{result.day_end.strftime('%Y-%m-%dT%H:%M:%S')}, offset {offset})." + ) + + if result.next_offset is not None: + # More pages remain for the current day. + raise task.replace( + task.s( + collection_id=collection_id, + current_day=current_day, + offset=result.next_offset, + lock_value=lock_value, + ) + ) + + if result.day_end < cutoff: + # Current day is complete; advance to the next day. + raise task.replace( + task.s( + collection_id=collection_id, + current_day=result.day_end, + offset=1, + lock_value=lock_value, + ) + ) diff --git a/src/palace/manager/integration/license/bibliotheca.py b/src/palace/manager/integration/license/bibliotheca.py index a11472e3b7..34e5017967 100644 --- a/src/palace/manager/integration/license/bibliotheca.py +++ b/src/palace/manager/integration/license/bibliotheca.py @@ -10,13 +10,11 @@ import time import urllib.parse from abc import ABC -from argparse import ArgumentParser, Namespace -from collections.abc import Collection as CollectionT, Generator, Iterable, Sequence +from collections.abc import Collection as CollectionT, Generator, Iterable from datetime import datetime, timedelta from io import BytesIO from typing import Annotated, Any, Literal, Optional, Unpack, overload -import dateutil.parser from flask_babel import lazy_gettext as _ from frozendict import frozendict from lxml.etree import Error, _Element @@ -25,7 +23,6 @@ from sqlalchemy.orm import Session from palace.util.datetime_helpers import ( - datetime_utc, strptime_utc, to_utc, utc_now, @@ -64,12 +61,7 @@ ConfigurationAttributeValue, ) from palace.manager.core.coverage import BibliographicCoverageProvider, CoverageFailure -from palace.manager.core.monitor import ( - CollectionMonitor, - IdentifierSweepMonitor, - TimelineMonitor, - TimestampData, -) +from palace.manager.core.monitor import IdentifierSweepMonitor from palace.manager.core.selftest import SelfTestResult from palace.manager.data_layer.bibliographic import BibliographicData from palace.manager.data_layer.circulation import CirculationData @@ -84,13 +76,11 @@ FormFieldType, FormMetadata, ) -from palace.manager.scripts.monitor import RunCollectionMonitorScript from palace.manager.sqlalchemy.constants import DataSourceConstants from palace.manager.sqlalchemy.model.circulationevent import CirculationEvent from palace.manager.sqlalchemy.model.classification import Classification, Subject from palace.manager.sqlalchemy.model.collection import Collection from palace.manager.sqlalchemy.model.contributor import Contributor -from palace.manager.sqlalchemy.model.coverage import Timestamp from palace.manager.sqlalchemy.model.datasource import DataSource from palace.manager.sqlalchemy.model.edition import Edition from palace.manager.sqlalchemy.model.identifier import Identifier @@ -103,7 +93,6 @@ from palace.manager.sqlalchemy.model.measurement import Measurement from palace.manager.sqlalchemy.model.patron import Patron from palace.manager.sqlalchemy.model.resource import Hyperlink, Representation -from palace.manager.sqlalchemy.util import get_one from palace.manager.util import base64 from palace.manager.util.http.exception import RemoteIntegrationException from palace.manager.util.http.http import HTTP, RequestKwargs @@ -1316,337 +1305,6 @@ def _process_bibliographic( ) -class BibliothecaTimelineMonitor(CollectionMonitor, TimelineMonitor): - """Common superclass for our two TimelineMonitors.""" - - PROTOCOL = BibliothecaAPI.label() - LOG_DATE_FORMAT = "%Y-%m-%dT%H:%M:%S" - - def __init__( - self, - _db: Session, - collection: Collection, - api_class: BibliothecaApiClassT, - ) -> None: - """Initializer. - - :param _db: Database session object. - :param collection: Collection for which this monitor operates. - :param api_class: API class or an instance thereof for this monitor. - """ - super().__init__(_db, collection) - if isinstance(api_class, BibliothecaAPI): - # We were given an actual API object. Just use it. - self.api = api_class - else: - self.api = api_class(_db, collection) - self.replacement_policy = ReplacementPolicy.from_license_source() - self.bibliographic_coverage_provider = BibliothecaBibliographicCoverageProvider( - collection, self.api, replacement_policy=self.replacement_policy - ) - - -class BibliothecaPurchaseMonitor(BibliothecaTimelineMonitor): - """Track purchases of licenses from Bibliotheca. - - Most TimelineMonitors monitor the timeline starting at whatever - time they're first run. But it's crucial that this monitor start - at or before the first day on which a book was added to this - collection, even if that date was years in the past. That's - because this monitor may be the only time we hear about a - particular book. - - Because of this, this monitor has a very old DEFAULT_START_TIME - and special capabilities for customizing the start_time to go back - even further. - """ - - SERVICE_NAME = "Bibliotheca Purchase Monitor" - DEFAULT_START_TIME = datetime_utc(2014, 1, 1) - - def __init__( - self, - _db: Session, - collection: Collection, - api_class: BibliothecaApiClassT = BibliothecaAPI, - default_start: str | datetime | None = None, - override_timestamp: bool = False, - ) -> None: - """Initializer. - - :param _db: Database session object. - :param collection: Collection for which this monitor operates. - :param api_class: API class or an instance thereof for this monitor. - :param default_start: A default date/time at which to start - requesting events. It should be specified as a `datetime` or - an ISO 8601 string. If not provided, the monitor's calculated - intrinsic default will be used. - :param override_timestamp: Boolean indicating whether - `default_start` should take precedence over the timestamp - for an already initialized monitor. - """ - super().__init__(_db=_db, collection=collection, api_class=api_class) - - # We should only force the use of `default_start` as the actual - # start time if it was passed in. - self.override_timestamp = override_timestamp if default_start else False - # A specified `default_start` takes precedence over the - # monitor's intrinsic default start date/time. - self.default_start_time = self._optional_iso_date( - default_start - ) or self._intrinsic_start_time(_db) - - def _optional_iso_date(self, date: str | datetime | None) -> datetime | None: - """Return the date in `datetime` format. - - :param date: A date/time value, specified as either an ISO 8601 - string or as a `datetime`. - :return: Optional datetime. - """ - if date is None or isinstance(date, datetime): - return to_utc(date) - try: - dt_date = to_utc(dateutil.parser.isoparse(date)) - except ValueError as e: - self.log.warning( - '%r. Date argument "%s" was not in a valid format. Use an ISO 8601 string or a datetime.', - e, - date, - ) - raise - return dt_date - - def _intrinsic_start_time(self, _db: Session) -> datetime: - """Return the intrinsic start time for this monitor. - - The intrinsic start time is the time at which this monitor would - start if it were uninitialized (no timestamp) and no `default_start` - parameter were supplied. It is `self.DEFAULT_START_TIME`. - - :param _db: Database session object. - - :return: datetime representing a default start time. - """ - # We don't use Monitor.timestamp() because that will create - # the timestamp if it doesn't exist -- we want to see whether - # or not it exists. - default_start_time = self.DEFAULT_START_TIME - initialized = get_one( - _db, - Timestamp, - service=self.service_name, - service_type=Timestamp.MONITOR_TYPE, - collection=self.collection, - ) - if not initialized: - self.log.info( - "Initializing %s from date: %s.", - self.service_name, - default_start_time.strftime(self.LOG_DATE_FORMAT), - ) - return default_start_time - - def timestamp(self) -> Timestamp: - """Find or create a Timestamp for this Monitor. - - If we are overriding the normal start time with one supplied when - the this class was instantiated, we do that here. The instance's - `default_start_time` will have been set to the specified datetime - and setting`timestamp.finish` to None will cause the default to - be used. - """ - timestamp = super().timestamp() - if self.override_timestamp: - self.log.info( - "Overriding timestamp and starting at %s.", - datetime.strftime(self.default_start_time, self.LOG_DATE_FORMAT), - ) - timestamp.finish = None - return timestamp # type: ignore[no-any-return] - - def catch_up_from( - self, start: datetime, cutoff: datetime, progress: TimestampData - ) -> None: - """Ask the Bibliotheca API about new purchases for every - day between `start` and `cutoff`. - - :param start: The first day to ask about. - :param cutoff: The last day to ask about. - :param progress: Object used to record progress through the timeline. - """ - num_records = 0 - # Ask the Bibliotheca API for one day of data at a time. This - # ensures that TITLE_ADD events are associated with the day - # the license was purchased. - today = utc_now().date() - achievement_template = "MARC records processed: %s" - for slice_start, slice_end, is_full_slice in self.slice_timespan( - start, cutoff, timedelta(days=1) - ): - for record in self.purchases(slice_start, slice_end): - self.process_record(record, slice_start) - num_records += 1 - if isinstance(slice_end, datetime): - slice_end_as_date = slice_end.date() - else: - slice_end_as_date = slice_end - if is_full_slice and slice_end_as_date < today: - # We have finished processing a date in the past. - # There can never be more licenses purchased for that - # day. Treat this as a checkpoint. - # - # We're playing it safe by using slice_start instead - # of slice_end here -- slice_end should be fine. - self._checkpoint( - progress, start, slice_start, achievement_template % num_records - ) - # We're all caught up. The superclass will take care of - # finalizing the dates, so there's no need to explicitly - # set a checkpoint. - progress.achievements = achievement_template % num_records - - def _checkpoint( - self, - progress: TimestampData, - start: datetime, - finish: datetime, - achievements: str, - ) -> None: - """Set the monitor's progress so that if it crashes later on it will - start from this point, reducing duplicate work. - - This is especially important for this monitor, which usually - starts several years in the past. TODO: However it might be - useful to make this a general feature of TimelineMonitor. - - :param progress: Object used to record progress through the timeline. - - :param start: New value for `progress.start` - :param finish: New value for `progress.finish` - :param achievements: The monitor's achievements thus far. - """ - progress.start = start - progress.finish = finish - progress.achievements = achievements - progress.finalize( - service=self.service_name, - service_type=Timestamp.MONITOR_TYPE, - collection=self.collection, - ) - progress.apply(self._db) - self._db.commit() - - def purchases(self, start: datetime, end: datetime) -> Generator[Record]: - """Ask Bibliotheca for a MARC record for each book purchased - between `start` and `end`. - - :yield: A sequence of pymarc Record objects - """ - offset = 1 # Smallest allowed offset - page_size = 50 # Maximum supported size. - records = None - while records is None or len(records) >= page_size: - records = [x for x in self.api.marc_request(start, end, offset, page_size)] - yield from records - offset += page_size - - def process_record( - self, record: Record, purchase_time: datetime - ) -> LicensePool | None: - """Record the purchase of a new title. - - :param record: Bibliographic information about the new title. - :param purchase_time: Put down this time as the time the - purchase happened. - - :return: A LicensePool representing the new title. - """ - # The control number associated with the MARC record is what - # we call the Bibliotheca ID. - control_numbers = [x for x in record.fields if x.tag == "001"] - # These errors should not happen in real usage. - error = None - if not control_numbers: - error = "Ignoring MARC record with no Bibliotheca control number." - elif len(control_numbers) > 1: - error = "Ignoring MARC record with multiple Bibliotheca control numbers." - if error is not None: - self.log.error(error + " " + record.as_json()) - return None - - # At this point we know there is one and only one control - # number. - bibliotheca_id = control_numbers[0].value() - - # Find or lookup a LicensePool from the control number. - license_pool, is_new = LicensePool.for_foreign_id( - self._db, - self.api.data_source, - Identifier.BIBLIOTHECA_ID, - bibliotheca_id, - collection=self.collection, - ) - - if is_new: - # We've never seen this book before. Immediately acquire - # bibliographic coverage for it. This will set the - # DistributionMechanisms and make the book - # presentation-ready. - # - # We have most of the bibliographic information in the - # MARC record itself, but using the - # BibliographicCoverageProvider saves code and also gives - # us up-to-date circulation information. - coverage_record = self.bibliographic_coverage_provider.ensure_coverage( - license_pool.identifier, force=True - ) - - return license_pool - - -class RunBibliothecaPurchaseMonitorScript(RunCollectionMonitorScript): - """Adds the ability to specify a particular start date for the - BibliothecaPurchaseMonitor. This is important because for a given - collection, the start date needs to be before books - started being licensed into that collection. - """ - - @classmethod - def arg_parser(cls, _db: Session) -> ArgumentParser: - parser = super().arg_parser(_db) - parser.add_argument( - "--default-start", - metavar="DATETIME", - default=None, - type=dateutil.parser.isoparse, - help="Default start date/time to be used for uninitialized (no timestamp) monitors." - ' Use ISO 8601 format (e.g., "yyyy-mm-dd", "yyyy-mm-ddThh:mm:ss").' - " Do not specify a time zone or offset.", - ) - parser.add_argument( - "--override-timestamp", - action="store_true", - help="Use the specified `--default-start` as the actual" - " start date, even if a monitor is already initialized.", - ) - return parser - - @classmethod - def parse_command_line( - cls, - _db: Session, - cmd_args: Sequence[str | None] | None = None, - *args: Any, - **kwargs: Any, - ) -> Namespace: - parsed = super().parse_command_line(_db, cmd_args, *args, **kwargs) - if parsed.override_timestamp and not parsed.default_start: - cls.arg_parser(_db).error( - '"--override-timestamp" is valid only when "--default-start" is also specified.' - ) - return parsed - - class BibliothecaBibliographicCoverageProvider(BibliographicCoverageProvider): """Fill in bibliographic metadata for Bibliotheca records. diff --git a/src/palace/manager/integration/license/bibliotheca_purchase_record_importer.py b/src/palace/manager/integration/license/bibliotheca_purchase_record_importer.py new file mode 100644 index 0000000000..5e14ce9eb6 --- /dev/null +++ b/src/palace/manager/integration/license/bibliotheca_purchase_record_importer.py @@ -0,0 +1,205 @@ +"""Importer for Bibliotheca (3M Cloud) MARC purchase records.""" + +from __future__ import annotations + +from dataclasses import dataclass +from datetime import datetime, timedelta + +from pymarc import Record +from sqlalchemy.orm import Session + +from palace.util.datetime_helpers import datetime_utc +from palace.util.log import LoggerMixin + +from palace.manager.celery.tasks import apply +from palace.manager.data_layer.policy.replacement import ReplacementPolicy +from palace.manager.integration.license.bibliotheca import BibliothecaAPI +from palace.manager.sqlalchemy.model.collection import Collection +from palace.manager.sqlalchemy.model.coverage import Timestamp +from palace.manager.sqlalchemy.model.identifier import Identifier +from palace.manager.sqlalchemy.model.licensing import LicensePool + +PURCHASE_RECORD_SERVICE_NAME = "Bibliotheca Purchase Record Importer" + +# The importer starts from this date when no prior Timestamp exists. +# Bibliotheca collections typically go back to 2014-01-01. +DEFAULT_PURCHASE_RECORD_START_TIME = datetime_utc(2014, 1, 1) + +_LOG_DATE_FORMAT = "%Y-%m-%dT%H:%M:%S" + +# Maximum number of MARC records per API page (Bibliotheca hard limit). +_MARC_PAGE_SIZE = 50 + + +@dataclass(frozen=True) +class DayImportResult: + """Result of importing one page of Bibliotheca MARC purchase records. + + :param records_handled: Number of MARC records processed in this page. + :param day_start: Start of the day being processed (inclusive). + :param day_end: End of the day window (exclusive). + Equal to ``min(day_start + 1 day, cutoff)``. + :param next_offset: Offset to pass to the next ``import_day`` call for + the same day, or ``None`` when the page was smaller than the maximum + (meaning the day is fully processed and the caller should advance to + ``day_end``). + """ + + records_handled: int + day_start: datetime + day_end: datetime + next_offset: int | None + + +class BibliothecaPurchaseRecordImporter(LoggerMixin): + """Imports Bibliotheca MARC purchase records one page at a time. + + Each call to :meth:`import_day` processes one API page (up to + :data:`_MARC_PAGE_SIZE` records) for a given day. Callers advance + through pages of the same day (via ``next_offset``) and then advance + to the next day (via ``day_end``) until the collection is caught up + to the cutoff. + """ + + def __init__( + self, + session: Session, + collection: Collection, + api: BibliothecaAPI | None = None, + ) -> None: + """ + :param session: Database session. + :param collection: The Bibliotheca collection to import records for. + :param api: Optional pre-constructed API instance; created from + ``session`` and ``collection`` if not supplied. + """ + self._session = session + self._collection = collection + self._api = api or BibliothecaAPI(session, collection) + + def get_start(self) -> datetime: + """Return the start of the next day to import from the stored ``Timestamp``. + + Falls back to :data:`DEFAULT_PURCHASE_RECORD_START_TIME` (2014-01-01) when no + prior run has been recorded, so the first run begins a full historical + backfill from the earliest possible purchase record date. + + :returns: The start datetime for the next import day. + """ + timestamp = Timestamp.lookup( + self._session, + PURCHASE_RECORD_SERVICE_NAME, + Timestamp.MONITOR_TYPE, + self._collection, + ) + if timestamp is None or timestamp.finish is None: + return DEFAULT_PURCHASE_RECORD_START_TIME + finish: datetime = timestamp.finish + return finish + + def import_day( + self, current_day: datetime, cutoff: datetime, offset: int = 1 + ) -> DayImportResult: + """Import one page of MARC purchase records for a given day. + + Fetches up to :data:`_MARC_PAGE_SIZE` records from the window + ``[current_day, day_end]`` starting at ``offset``, where + ``day_end = min(current_day + 1 day, cutoff)``. + + The ``Timestamp`` is always updated after the page is processed: + to ``current_day`` while the day is still in progress (so a restart + after a crash resumes from the beginning of this day rather than an + earlier day), and to ``day_end`` once the page is smaller than the + maximum (signalling that the day is fully processed). + + :param current_day: Start of the day to process. + :param cutoff: Upper bound of the import window; the day will not + extend past this point. + :param offset: 1-based record offset within the day's result set. + Defaults to ``1`` (the first page). + :returns: A :class:`DayImportResult` describing what was processed. + Check :attr:`~DayImportResult.next_offset` to determine whether + to re-queue for the same day or advance to the next. + """ + day_end = min(current_day + timedelta(days=1), cutoff) + + self.log.info( + f"Bibliotheca purchase record import: requesting MARC records for " + f"'{self._collection.name}' between " + f"{current_day.strftime(_LOG_DATE_FORMAT)} and " + f"{day_end.strftime(_LOG_DATE_FORMAT)}, offset {offset}." + ) + + records = list( + self._api.marc_request(current_day, day_end, offset, _MARC_PAGE_SIZE) + ) + for record in records: + self._process_record(record, current_day) + + records_handled = len(records) + day_complete = records_handled < _MARC_PAGE_SIZE + next_offset = None if day_complete else offset + _MARC_PAGE_SIZE + + # Always checkpoint: advance finish to day_end when the day is done, + # or to current_day while still in progress, so a restart after a + # crash resumes from this day rather than the previous one. + Timestamp.stamp( + self._session, + service=PURCHASE_RECORD_SERVICE_NAME, + service_type=Timestamp.MONITOR_TYPE, + collection=self._collection, + start=current_day, + finish=day_end if day_complete else current_day, + achievements=f"MARC records processed: {records_handled}.", + ) + + return DayImportResult( + records_handled=records_handled, + day_start=current_day, + day_end=day_end, + next_offset=next_offset, + ) + + def _process_record(self, record: Record, purchase_record_time: datetime) -> None: + """Process a single Bibliotheca MARC purchase record. + + Extracts the Bibliotheca ID from MARC field ``001``, creates or finds + the ``LicensePool``, then queues a ``bibliographic_apply`` task when + the title's metadata has changed (hash-based deduplication). + + :param record: A pymarc ``Record`` representing one purchased title. + :param purchase_record_time: Timestamp of the purchase record day. + """ + control_numbers = [f for f in record.fields if f.tag == "001"] + if not control_numbers: + self.log.error( + f"Ignoring MARC record with no Bibliotheca control number. {record.as_json()}" + ) + return + if len(control_numbers) > 1: + self.log.error( + f"Ignoring MARC record with multiple Bibliotheca control numbers. {record.as_json()}" + ) + return + + bibliotheca_id = control_numbers[0].value() + + LicensePool.for_foreign_id( + self._session, + self._api.data_source, + Identifier.BIBLIOTHECA_ID, + bibliotheca_id, + collection=self._collection, + ) + + for bibliographic in self._api.bibliographic_lookup(bibliotheca_id): + if bibliographic.needs_apply(self._session): + apply.bibliographic_apply.delay( + bibliographic, + collection_id=self._collection.id, + replace=ReplacementPolicy.from_license_source(), + ) + + self.log.info( + f"{purchase_record_time.strftime(_LOG_DATE_FORMAT)}: processed purchase record for Bibliotheca ID {bibliotheca_id}" + ) diff --git a/src/palace/manager/integration/license/bibliotheca_scripts.py b/src/palace/manager/integration/license/bibliotheca_scripts.py index c2651a879d..6ffe13513f 100644 --- a/src/palace/manager/integration/license/bibliotheca_scripts.py +++ b/src/palace/manager/integration/license/bibliotheca_scripts.py @@ -51,3 +51,47 @@ def do_run(self, cmd_args: list[str] | None = None) -> None: self.log.info( f"Queued event import for Bibliotheca collection '{collection.name}'." ) + + +class ImportPurchaseRecordCollection(Script): + """Manually kick off the Bibliotheca purchase record import for one or all collections.""" + + @classmethod + def arg_parser(cls, _db: Session) -> argparse.ArgumentParser: + parser = argparse.ArgumentParser( + description="Kick off the Bibliotheca purchase record import Celery task." + ) + group = parser.add_mutually_exclusive_group(required=True) + group.add_argument( + "--collection", + type=str, + metavar="NAME", + help="Name of the Bibliotheca collection to import.", + ) + group.add_argument( + "--import-all", + action="store_true", + help="Queue the purchase record import for every Bibliotheca collection.", + ) + return parser + + def do_run(self, cmd_args: list[str] | None = None) -> None: + parsed = self.parse_command_line(self._db, cmd_args=cmd_args) + + if parsed.import_all: + bibliotheca.import_purchase_records_for_all_collections.delay() + self.log.info( + "Queued purchase record import for all Bibliotheca collections." + ) + return + + collection = Collection.by_name(self._db, parsed.collection) + if not collection: + raise PalaceValueError(f'No collection found named "{parsed.collection}".') + + bibliotheca.import_purchase_records_by_collection.delay( + collection_id=collection.id + ) + self.log.info( + f"Queued purchase record import for Bibliotheca collection '{collection.name}'." + ) diff --git a/src/palace/manager/service/celery/celery.py b/src/palace/manager/service/celery/celery.py index 09cf47ed36..69818de431 100644 --- a/src/palace/manager/service/celery/celery.py +++ b/src/palace/manager/service/celery/celery.py @@ -300,6 +300,10 @@ def beat_schedule() -> dict[str, Any]: "task": bibliotheca.import_all_collections.name, "schedule": crontab(minute="0"), # Once an hour }, + "bibliotheca_import_purchase_records_for_all_collections": { + "task": bibliotheca.import_purchase_records_for_all_collections.name, + "schedule": crontab(minute="0", hour="4"), # Once a day at 4:00 AM + }, } diff --git a/tests/manager/celery/tasks/test_bibliotheca.py b/tests/manager/celery/tasks/test_bibliotheca.py index 045445ffcf..d46734335a 100644 --- a/tests/manager/celery/tasks/test_bibliotheca.py +++ b/tests/manager/celery/tasks/test_bibliotheca.py @@ -8,15 +8,19 @@ import pytest -from palace.util.datetime_helpers import utc_now +from palace.util.datetime_helpers import datetime_utc, utc_now from palace.util.log import LogLevel from palace.manager.celery.importer import import_workflow_lock from palace.manager.celery.tasks import apply, bibliotheca +from palace.manager.celery.tasks.bibliotheca import _purchase_record_workflow_lock from palace.manager.integration.license.bibliotheca import BibliothecaAPI from palace.manager.integration.license.bibliotheca_importer import ( EVENT_IMPORT_SERVICE_NAME, ) +from palace.manager.integration.license.bibliotheca_purchase_record_importer import ( + PURCHASE_RECORD_SERVICE_NAME, +) from palace.manager.sqlalchemy.model.circulationevent import CirculationEvent from palace.manager.sqlalchemy.model.collection import Collection from palace.manager.sqlalchemy.model.coverage import Timestamp @@ -513,3 +517,488 @@ def test_multiple_collections_each_get_own_lock( assert mock_api_cls2.return_value.get_events_between.call_count == 1 lock_c1.release() + + +# --------------------------------------------------------------------------- +# Purchase record importer tests +# --------------------------------------------------------------------------- + + +class BibliothecaPurchaseRecordTaskFixture: + """Common setup for Bibliotheca purchase record importer Celery task tests.""" + + def __init__(self, db: DatabaseTransactionFixture) -> None: + self.db = db + self.collection = MockBibliothecaAPI.mock_collection( + db.session, db.default_library() + ) + + def stamp_purchase_record( + self, finish: datetime | None = None, collection: Collection | None = None + ) -> Timestamp: + """Create (or update) the purchase record Timestamp for the collection.""" + return Timestamp.stamp( + self.db.session, + service=PURCHASE_RECORD_SERVICE_NAME, + service_type=Timestamp.MONITOR_TYPE, + collection=collection or self.collection, + finish=finish or utc_now(), + ) + + def get_purchase_record_timestamp( + self, collection: Collection | None = None + ) -> Timestamp | None: + return Timestamp.lookup( + self.db.session, + PURCHASE_RECORD_SERVICE_NAME, + Timestamp.MONITOR_TYPE, + collection or self.collection, + ) + + +@pytest.fixture +def bibliotheca_purchase_record_task_fixture( + db: DatabaseTransactionFixture, +) -> BibliothecaPurchaseRecordTaskFixture: + return BibliothecaPurchaseRecordTaskFixture(db) + + +class TestImportPurchaseRecordsForAllCollections: + def test_queues_purchase_record_collection_for_each_bibliotheca_collection( + self, + db: DatabaseTransactionFixture, + celery_fixture: CeleryFixture, + ) -> None: + """import_purchase_records_for_all_collections queues import_purchase_records_by_collection for every Bibliotheca + collection and ignores collections using other protocols.""" + db.default_collection() # non-Bibliotheca, should be ignored + c1 = MockBibliothecaAPI.mock_collection( + db.session, db.default_library(), name="Bibliotheca 1" + ) + c2 = MockBibliothecaAPI.mock_collection( + db.session, db.default_library(), name="Bibliotheca 2" + ) + + with patch.object( + bibliotheca, "import_purchase_records_by_collection" + ) as mock_task: + bibliotheca.import_purchase_records_for_all_collections.delay().wait() + + mock_task.delay.assert_has_calls( + [call(collection_id=c1.id), call(collection_id=c2.id)], + any_order=True, + ) + assert mock_task.delay.call_count == 2 + + def test_no_bibliotheca_collections( + self, + db: DatabaseTransactionFixture, + celery_fixture: CeleryFixture, + ) -> None: + """import_purchase_records_for_all_collections is a no-op when there are no Bibliotheca collections.""" + db.default_collection() # non-Bibliotheca + + with patch.object( + bibliotheca, "import_purchase_records_by_collection" + ) as mock_task: + bibliotheca.import_purchase_records_for_all_collections.delay().wait() + + mock_task.delay.assert_not_called() + + +class TestImportPurchaseRecordsByCollection: + @patch( + "palace.manager.integration.license.bibliotheca_purchase_record_importer.BibliothecaAPI" + ) + def test_first_run_starts_from_default_start_time( + self, + mock_api_cls: MagicMock, + bibliotheca_purchase_record_task_fixture: BibliothecaPurchaseRecordTaskFixture, + celery_fixture: CeleryFixture, + redis_fixture: RedisFixture, + ) -> None: + """On the first run (no prior Timestamp), the task starts from 2014-01-01.""" + mock_api = mock_api_cls.return_value + mock_api.marc_request.return_value = iter([]) + collection = bibliotheca_purchase_record_task_fixture.collection + + # Stamp a timestamp so we can check the start passed to marc_request. + with patch.object( + bibliotheca.import_purchase_records_by_collection, "replace" + ) as mock_replace: + mock_replace.side_effect = Exception("replaced") + with pytest.raises(Exception, match="replaced"): + bibliotheca.import_purchase_records_by_collection.delay( + collection_id=collection.id + ).wait() + + call_args = mock_api.marc_request.call_args_list[0] + slice_start, _ = call_args.args[:2] + expected_start = datetime_utc(2014, 1, 1) + assert abs((slice_start - expected_start).total_seconds()) < 1 + + @patch( + "palace.manager.integration.license.bibliotheca_purchase_record_importer.BibliothecaAPI" + ) + def test_starts_from_stored_timestamp( + self, + mock_api_cls: MagicMock, + bibliotheca_purchase_record_task_fixture: BibliothecaPurchaseRecordTaskFixture, + celery_fixture: CeleryFixture, + redis_fixture: RedisFixture, + ) -> None: + """The task starts from timestamp.finish when a prior Timestamp exists.""" + mock_api = mock_api_cls.return_value + mock_api.marc_request.return_value = iter([]) + collection = bibliotheca_purchase_record_task_fixture.collection + + stored_finish = datetime_utc(2024, 3, 10) + bibliotheca_purchase_record_task_fixture.stamp_purchase_record( + finish=stored_finish + ) + + with patch.object( + bibliotheca.import_purchase_records_by_collection, "replace" + ) as mock_replace: + mock_replace.side_effect = Exception("replaced") + with pytest.raises(Exception, match="replaced"): + bibliotheca.import_purchase_records_by_collection.delay( + collection_id=collection.id + ).wait() + + call_args = mock_api.marc_request.call_args_list[0] + slice_start, _ = call_args.args[:2] + assert abs((slice_start - stored_finish).total_seconds()) < 1 + + @patch( + "palace.manager.integration.license.bibliotheca_purchase_record_importer.BibliothecaAPI" + ) + def test_already_up_to_date( + self, + mock_api_cls: MagicMock, + bibliotheca_purchase_record_task_fixture: BibliothecaPurchaseRecordTaskFixture, + celery_fixture: CeleryFixture, + redis_fixture: RedisFixture, + ) -> None: + """When the stored Timestamp is current (finish >= now), no API call is made.""" + collection = bibliotheca_purchase_record_task_fixture.collection + bibliotheca_purchase_record_task_fixture.stamp_purchase_record( + finish=utc_now() + timedelta(minutes=10) + ) + + bibliotheca.import_purchase_records_by_collection.delay( + collection_id=collection.id + ).wait() + + mock_api_cls.return_value.marc_request.assert_not_called() + + @patch( + "palace.manager.integration.license.bibliotheca_purchase_record_importer.BibliothecaAPI" + ) + def test_replaces_when_more_days_remain( + self, + mock_api_cls: MagicMock, + bibliotheca_purchase_record_task_fixture: BibliothecaPurchaseRecordTaskFixture, + celery_fixture: CeleryFixture, + redis_fixture: RedisFixture, + ) -> None: + """When current_day + 1 day is still behind cutoff, task.replace() is raised + with the next day and the same lock_value.""" + mock_api_cls.return_value.marc_request.return_value = iter([]) + collection = bibliotheca_purchase_record_task_fixture.collection + + # Stamp a timestamp several days in the past so multiple days are needed. + bibliotheca_purchase_record_task_fixture.stamp_purchase_record( + finish=utc_now() - timedelta(days=5) + ) + + lock_value = str(uuid4()) + with patch.object( + bibliotheca.import_purchase_records_by_collection, "replace" + ) as mock_replace: + mock_replace.side_effect = Exception("replaced") + with pytest.raises(Exception, match="replaced"): + bibliotheca.import_purchase_records_by_collection.delay( + collection_id=collection.id, + lock_value=lock_value, + ).wait() + + replace_sig = mock_replace.call_args[0][0] + assert replace_sig.kwargs["lock_value"] == lock_value + assert replace_sig.kwargs["current_day"] is not None + assert replace_sig.kwargs["offset"] == 1 + + @patch( + "palace.manager.integration.license.bibliotheca_purchase_record_importer.BibliothecaAPI" + ) + def test_replaces_with_next_offset_when_page_full( + self, + mock_api_cls: MagicMock, + bibliotheca_purchase_record_task_fixture: BibliothecaPurchaseRecordTaskFixture, + celery_fixture: CeleryFixture, + redis_fixture: RedisFixture, + ) -> None: + """When a full page (50 records) is returned, task.replace() is called with + the same current_day and the next offset rather than advancing to the next day. + """ + from palace.manager.integration.license.bibliotheca_purchase_record_importer import ( + _MARC_PAGE_SIZE, + ) + + full_page_records = [MagicMock() for _ in range(_MARC_PAGE_SIZE)] + for record in full_page_records: + record.fields = [] + record.as_json.return_value = "{}" + + mock_api_cls.return_value.marc_request.return_value = iter(full_page_records) + collection = bibliotheca_purchase_record_task_fixture.collection + + stored_finish = utc_now() - timedelta(days=5) + bibliotheca_purchase_record_task_fixture.stamp_purchase_record( + finish=stored_finish + ) + + lock_value = str(uuid4()) + with patch.object( + bibliotheca.import_purchase_records_by_collection, "replace" + ) as mock_replace: + mock_replace.side_effect = Exception("replaced") + with pytest.raises(Exception, match="replaced"): + bibliotheca.import_purchase_records_by_collection.delay( + collection_id=collection.id, + offset=1, + lock_value=lock_value, + ).wait() + + replace_sig = mock_replace.call_args[0][0] + # Same day, offset advanced by _MARC_PAGE_SIZE. + assert ( + abs((replace_sig.kwargs["current_day"] - stored_finish).total_seconds()) < 5 + ) + assert replace_sig.kwargs["offset"] == 1 + _MARC_PAGE_SIZE + assert replace_sig.kwargs["lock_value"] == lock_value + + @patch( + "palace.manager.integration.license.bibliotheca_purchase_record_importer.BibliothecaAPI" + ) + def test_no_replace_on_last_day( + self, + mock_api_cls: MagicMock, + bibliotheca_purchase_record_task_fixture: BibliothecaPurchaseRecordTaskFixture, + celery_fixture: CeleryFixture, + redis_fixture: RedisFixture, + ) -> None: + """When current_day + 1 day >= cutoff, task.replace() is not called.""" + mock_api_cls.return_value.marc_request.return_value = iter([]) + collection = bibliotheca_purchase_record_task_fixture.collection + + # Timestamp just a few hours old — fits inside one day. + bibliotheca_purchase_record_task_fixture.stamp_purchase_record( + finish=utc_now() - timedelta(hours=3) + ) + + with patch.object( + bibliotheca.import_purchase_records_by_collection, "replace" + ) as mock_replace: + bibliotheca.import_purchase_records_by_collection.delay( + collection_id=collection.id + ).wait() + + mock_replace.assert_not_called() + + @patch( + "palace.manager.integration.license.bibliotheca_purchase_record_importer.BibliothecaAPI" + ) + def test_lock_value_passed_through_on_replace( + self, + mock_api_cls: MagicMock, + bibliotheca_purchase_record_task_fixture: BibliothecaPurchaseRecordTaskFixture, + celery_fixture: CeleryFixture, + redis_fixture: RedisFixture, + ) -> None: + """The lock_value generated on the first day is forwarded unchanged to the + next day so the workflow lock remains held across task.replace() calls.""" + mock_api_cls.return_value.marc_request.return_value = iter([]) + collection = bibliotheca_purchase_record_task_fixture.collection + + bibliotheca_purchase_record_task_fixture.stamp_purchase_record( + finish=utc_now() - timedelta(days=5) + ) + + with patch.object( + bibliotheca.import_purchase_records_by_collection, "replace" + ) as mock_replace: + mock_replace.side_effect = Exception("replaced") + with pytest.raises(Exception, match="replaced"): + bibliotheca.import_purchase_records_by_collection.delay( + collection_id=collection.id + ).wait() + + replace_sig = mock_replace.call_args[0][0] + lock_value = replace_sig.kwargs["lock_value"] + assert lock_value is not None + assert isinstance(lock_value, str) + + def test_skips_when_lock_held( + self, + bibliotheca_purchase_record_task_fixture: BibliothecaPurchaseRecordTaskFixture, + celery_fixture: CeleryFixture, + redis_fixture: RedisFixture, + caplog: pytest.LogCaptureFixture, + ) -> None: + """When the purchase record workflow lock is already held, the task logs a warning + and returns without making any API calls.""" + collection = bibliotheca_purchase_record_task_fixture.collection + + existing_lock = _purchase_record_workflow_lock( + redis_fixture.client, collection.id, str(uuid4()) + ) + existing_lock.acquire() + + caplog.set_level(LogLevel.warning) + + with patch( + "palace.manager.integration.license.bibliotheca_purchase_record_importer.BibliothecaAPI" + ) as mock_api_cls: + bibliotheca.import_purchase_records_by_collection.delay( + collection_id=collection.id + ).wait() + mock_api_cls.return_value.marc_request.assert_not_called() + + assert "skipped" in caplog.text + assert "already in progress" in caplog.text + + existing_lock.release() + + def test_continues_with_warning_when_lock_expires_mid_chain( + self, + bibliotheca_purchase_record_task_fixture: BibliothecaPurchaseRecordTaskFixture, + celery_fixture: CeleryFixture, + redis_fixture: RedisFixture, + caplog: pytest.LogCaptureFixture, + ) -> None: + """When the purchase record workflow lock expires between days (is_first_day=False + and lock not acquired), the task logs a warning but still processes the day.""" + collection = bibliotheca_purchase_record_task_fixture.collection + bibliotheca_purchase_record_task_fixture.stamp_purchase_record( + finish=utc_now() - timedelta(hours=3) + ) + + competing_lock = _purchase_record_workflow_lock( + redis_fixture.client, collection.id, str(uuid4()) + ) + competing_lock.acquire() + + caplog.set_level(LogLevel.warning) + + with patch( + "palace.manager.integration.license.bibliotheca_purchase_record_importer.BibliothecaAPI" + ) as mock_api_cls: + mock_api_cls.return_value.marc_request.return_value = iter([]) + bibliotheca.import_purchase_records_by_collection.delay( + collection_id=collection.id, + lock_value=str(uuid4()), + ).wait() + mock_api_cls.return_value.marc_request.assert_called_once() + + assert "workflow lock expired between days" in caplog.text + + competing_lock.release() + + def test_lock_not_released_on_autoretry( + self, + bibliotheca_purchase_record_task_fixture: BibliothecaPurchaseRecordTaskFixture, + celery_fixture: CeleryFixture, + redis_fixture: RedisFixture, + ) -> None: + """When a retryable exception is raised the purchase record workflow lock is held so + that a concurrent run cannot start on the same collection while retries are + in progress.""" + collection = bibliotheca_purchase_record_task_fixture.collection + bibliotheca_purchase_record_task_fixture.stamp_purchase_record( + finish=utc_now() - timedelta(days=5) + ) + + mock_response = MockRequestsResponse(500, content="Internal Server Error") + + with patch( + "palace.manager.integration.license.bibliotheca_purchase_record_importer.BibliothecaAPI" + ) as mock_api_cls: + mock_api_cls.return_value.marc_request.side_effect = BadResponseException( + "http://test.com", "Bad response", mock_response + ) + + with celery_fixture.patch_retry_backoff(): + bibliotheca.import_purchase_records_by_collection.delay( + collection_id=collection.id + ).get(propagate=False) + + workflow_lock = _purchase_record_workflow_lock( + redis_fixture.client, collection.id, random_value="any" + ) + assert workflow_lock.locked() + + @patch( + "palace.manager.integration.license.bibliotheca_purchase_record_importer.BibliothecaAPI" + ) + def test_timestamp_updated_after_each_day( + self, + mock_api_cls: MagicMock, + bibliotheca_purchase_record_task_fixture: BibliothecaPurchaseRecordTaskFixture, + celery_fixture: CeleryFixture, + redis_fixture: RedisFixture, + ) -> None: + """The Timestamp.finish is advanced by one day after each task invocation, + enabling crash recovery without re-processing old records.""" + mock_api_cls.return_value.marc_request.return_value = iter([]) + collection = bibliotheca_purchase_record_task_fixture.collection + + stored_finish = datetime_utc(2024, 3, 10) + bibliotheca_purchase_record_task_fixture.stamp_purchase_record( + finish=stored_finish + ) + + lock_value = str(uuid4()) + with patch.object( + bibliotheca.import_purchase_records_by_collection, "replace" + ) as mock_replace: + mock_replace.side_effect = Exception("replaced") + with pytest.raises(Exception, match="replaced"): + bibliotheca.import_purchase_records_by_collection.delay( + collection_id=collection.id, + lock_value=lock_value, + ).wait() + + ts = bibliotheca_purchase_record_task_fixture.get_purchase_record_timestamp() + assert ts is not None + assert ts.finish is not None + expected_finish = stored_finish + timedelta(days=1) + assert abs((ts.finish - expected_finish).total_seconds()) < 5 + + def test_purchase_record_lock_independent_from_import_lock( + self, + db: DatabaseTransactionFixture, + celery_fixture: CeleryFixture, + redis_fixture: RedisFixture, + ) -> None: + """The purchase record workflow lock uses a different Redis key than the event + import workflow lock, so the two can run concurrently per collection.""" + collection = MockBibliothecaAPI.mock_collection( + db.session, db.default_library() + ) + + # Hold the event import lock. + event_lock = import_workflow_lock( + redis_fixture.client, collection.id, str(uuid4()) + ) + event_lock.acquire() + + # The purchase lock for the same collection should still be acquirable. + purchase_record_lock = _purchase_record_workflow_lock( + redis_fixture.client, collection.id, str(uuid4()) + ) + acquired = purchase_record_lock.acquire() + assert acquired + + purchase_record_lock.release() + event_lock.release() diff --git a/tests/manager/integration/license/test_bibliotheca.py b/tests/manager/integration/license/test_bibliotheca.py index 3a3265fd23..2a7f980eae 100644 --- a/tests/manager/integration/license/test_bibliotheca.py +++ b/tests/manager/integration/license/test_bibliotheca.py @@ -2,14 +2,11 @@ import json import random -from datetime import date, datetime, timedelta -from io import BytesIO, StringIO +from datetime import date, timedelta from typing import TYPE_CHECKING, cast -from unittest import mock -from unittest.mock import MagicMock, create_autospec +from unittest.mock import create_autospec import pytest -from pymarc import parse_xml_to_array from pymarc.record import Record from palace.util.datetime_helpers import datetime_utc, utc_now @@ -32,13 +29,11 @@ ) from palace.manager.api.circulation.fulfillment import Fulfillment from palace.manager.api.web_publication_manifest import FindawayManifest -from palace.manager.core.monitor import TimestampData from palace.manager.integration.license.bibliotheca import ( BibliothecaAPI, BibliothecaBibliographicCoverageProvider, BibliothecaCirculationSweep, BibliothecaParser, - BibliothecaPurchaseMonitor, CheckoutResponseParser, ErrorParser, EventParser, @@ -49,19 +44,16 @@ from palace.manager.sqlalchemy.model.circulationevent import CirculationEvent from palace.manager.sqlalchemy.model.classification import Subject from palace.manager.sqlalchemy.model.contributor import Contributor -from palace.manager.sqlalchemy.model.coverage import Timestamp from palace.manager.sqlalchemy.model.datasource import DataSource from palace.manager.sqlalchemy.model.edition import Edition from palace.manager.sqlalchemy.model.identifier import Identifier from palace.manager.sqlalchemy.model.licensing import ( DeliveryMechanism, - LicensePool, LicensePoolDeliveryMechanism, LicensePoolStatus, ) from palace.manager.sqlalchemy.model.measurement import Measurement from palace.manager.sqlalchemy.model.resource import Hyperlink, Representation -from palace.manager.sqlalchemy.model.work import Work from palace.manager.util.http.exception import ( BadResponseException, RemoteIntegrationException, @@ -1022,529 +1014,6 @@ def test_parse_event_batch(self): assert correct_end == end_time -class TestBibliothecaPurchaseMonitor: - @pytest.fixture() - def default_monitor(self, bibliotheca_fixture: BibliothecaAPITestFixture): - return BibliothecaPurchaseMonitor( - bibliotheca_fixture.db.session, - bibliotheca_fixture.collection, - api_class=MockBibliothecaAPI, - ) - - @pytest.fixture() - def initialized_monitor(self, db: DatabaseTransactionFixture): - collection = MockBibliothecaAPI.mock_collection( - db.session, - db.default_library(), - name="Initialized Purchase Monitor Collection", - ) - monitor = BibliothecaPurchaseMonitor( - db.session, collection, api_class=MockBibliothecaAPI - ) - Timestamp.stamp( - db.session, - service=monitor.service_name, - service_type=Timestamp.MONITOR_TYPE, - collection=collection, - ) - return monitor - - @pytest.mark.parametrize( - "specified_default_start, expected_default_start", - [ - ("2011", datetime_utc(year=2011, month=1, day=1)), - ("2011-10", datetime_utc(year=2011, month=10, day=1)), - ("2011-10-05", datetime_utc(year=2011, month=10, day=5)), - ("2011-10-05T15", datetime_utc(year=2011, month=10, day=5, hour=15)), - ( - "2011-10-05T15:27", - datetime_utc(year=2011, month=10, day=5, hour=15, minute=27), - ), - ( - "2011-10-05T15:27:33", - datetime_utc(year=2011, month=10, day=5, hour=15, minute=27, second=33), - ), - ( - "2011-10-05 15:27:33", - datetime_utc(year=2011, month=10, day=5, hour=15, minute=27, second=33), - ), - ( - "2011-10-05T15:27:33.123456", - datetime_utc( - year=2011, - month=10, - day=5, - hour=15, - minute=27, - second=33, - microsecond=123456, - ), - ), - ( - datetime_utc(year=2011, month=10, day=5, hour=15, minute=27), - datetime_utc(year=2011, month=10, day=5, hour=15, minute=27), - ), - (None, None), - ], - ) - def test_optional_iso_date_valid_dates( - self, - specified_default_start: datetime | str | None, - expected_default_start: datetime | None, - default_monitor: BibliothecaPurchaseMonitor, - ): - # ISO 8601 strings, `datetime`s, or None are valid. - actual_default_start = default_monitor._optional_iso_date( - specified_default_start - ) - if expected_default_start is not None: - assert isinstance(actual_default_start, datetime) - assert actual_default_start == expected_default_start - - def test_monitor_intrinsic_start_time( - self, - default_monitor: BibliothecaPurchaseMonitor, - initialized_monitor: BibliothecaPurchaseMonitor, - bibliotheca_fixture: BibliothecaAPITestFixture, - ): - db = bibliotheca_fixture.db - # No `default_start` time is specified for either `default_monitor` or - # `initialized_monitor`, so each monitor's `default_start_time` should - # match the monitor class's intrinsic start time. - for monitor in [default_monitor, initialized_monitor]: - expected_intrinsic_start = BibliothecaPurchaseMonitor.DEFAULT_START_TIME - intrinsic_start = monitor._intrinsic_start_time(db.session) - assert isinstance(intrinsic_start, datetime) - assert intrinsic_start == expected_intrinsic_start - assert intrinsic_start == monitor.default_start_time - - @pytest.mark.parametrize( - "specified_default_start, override_timestamp, expected_start", - [ - ( - "2011-10-05T15:27", - False, - datetime_utc(year=2011, month=10, day=5, hour=15, minute=27), - ), - ( - "2011-10-05T15:27:33", - False, - datetime_utc(year=2011, month=10, day=5, hour=15, minute=27, second=33), - ), - (None, False, None), - (None, True, None), - ( - "2011-10-05T15:27", - True, - datetime_utc(year=2011, month=10, day=5, hour=15, minute=27), - ), - ( - "2011-10-05T15:27:33", - True, - datetime_utc(year=2011, month=10, day=5, hour=15, minute=27, second=33), - ), - ], - ) - def test_specified_start_trumps_intrinsic_default_start( - self, - specified_default_start: str | None, - override_timestamp: bool, - expected_start: datetime | None, - bibliotheca_fixture: BibliothecaAPITestFixture, - ): - db = bibliotheca_fixture.db - # When a valid `default_start` parameter is specified, it -- not the monitor's - # intrinsic default -- will always become the monitor's `default_start_time`. - monitor = BibliothecaPurchaseMonitor( - db.session, - bibliotheca_fixture.collection, - api_class=MockBibliothecaAPI, - default_start=specified_default_start, - override_timestamp=override_timestamp, - ) - monitor_intrinsic_default = monitor._intrinsic_start_time(db.session) - assert isinstance(monitor.default_start_time, datetime) - assert isinstance(monitor_intrinsic_default, datetime) - if specified_default_start: - assert monitor.default_start_time == expected_start - else: - assert ( - abs( - ( - monitor_intrinsic_default - monitor.default_start_time - ).total_seconds() - ) - <= 1 - ) - - # If no `default_date` specified, then `override_timestamp` must be false. - if not specified_default_start: - assert monitor.override_timestamp is False - - # For an uninitialized monitor (no timestamp), the monitor's `default_start_time`, - # whether from a specified `default_start` or the monitor's intrinsic start time, - # will be the actual start time. The cut-off will be roughly the current time, in - # either case. - expected_cutoff = utc_now() - with mock.patch.object( - monitor, "catch_up_from", return_value=None - ) as catch_up_from: - monitor.run() - actual_start, actual_cutoff, progress = catch_up_from.call_args[0] - assert abs((expected_cutoff - actual_cutoff).total_seconds()) <= 1 - assert actual_cutoff == progress.finish - assert actual_start == monitor.default_start_time - assert progress.start == monitor.default_start_time - - @pytest.mark.parametrize( - "specified_default_start, override_timestamp, expected_start", - [ - ( - "2011-10-05T15:27", - False, - datetime_utc(year=2011, month=10, day=5, hour=15, minute=27), - ), - ( - "2011-10-05T15:27:33", - False, - datetime_utc(year=2011, month=10, day=5, hour=15, minute=27, second=33), - ), - (None, False, None), - (None, True, None), - ( - "2011-10-05T15:27", - True, - datetime_utc(year=2011, month=10, day=5, hour=15, minute=27), - ), - ( - "2011-10-05T15:27:33", - True, - datetime_utc(year=2011, month=10, day=5, hour=15, minute=27, second=33), - ), - ], - ) - def test_specified_start_can_override_timestamp( - self, - specified_default_start: str | None, - override_timestamp: bool, - expected_start: datetime | None, - bibliotheca_fixture: BibliothecaAPITestFixture, - ): - monitor = BibliothecaPurchaseMonitor( - bibliotheca_fixture.db.session, - bibliotheca_fixture.collection, - api_class=MockBibliothecaAPI, - default_start=specified_default_start, - override_timestamp=override_timestamp, - ) - # For an initialized monitor, the `default_start_time` will be derived from - # `timestamp.finish`, unless overridden by a specified `default_start` when - # `override_timestamp` is specified as True. - ts = Timestamp.stamp( - bibliotheca_fixture.db.session, - service=monitor.service_name, - service_type=Timestamp.MONITOR_TYPE, - collection=monitor.collection, - ) - assert isinstance(ts.finish, datetime) - start_time_from_ts = ts.finish - BibliothecaPurchaseMonitor.OVERLAP - expected_actual_start_time = ( - expected_start if monitor.override_timestamp else start_time_from_ts - ) - expected_cutoff = utc_now() - with mock.patch.object( - monitor, "catch_up_from", return_value=None - ) as catch_up_from: - monitor.run() - actual_start, actual_cutoff, progress = catch_up_from.call_args[0] - assert abs((expected_cutoff - actual_cutoff).total_seconds()) <= 1 - assert actual_cutoff == progress.finish - assert actual_start == expected_actual_start_time - assert progress.start == expected_actual_start_time - - @pytest.mark.parametrize("input", [("invalid"), ("2020/10"), (["2020-10-05"])]) - def test_optional_iso_date_invalid_dates( - self, - input: list[str] | str, - default_monitor: BibliothecaPurchaseMonitor, - ): - with pytest.raises(ValueError) as excinfo: - default_monitor._optional_iso_date(input) # type: ignore[arg-type] - - def test_catch_up_from(self, default_monitor: BibliothecaPurchaseMonitor): - # catch_up_from() slices up its given timespan, calls - # purchases() to find purchases for each slice, processes each - # purchase using process_record(), and sets a checkpoint for each - # slice that is unambiguously in the past. - today = utc_now().date() - - # _checkpoint() will be called after processing this slice - # because it's a full slice that ends before today. - full_slice = [datetime_utc(2014, 1, 1), datetime_utc(2014, 1, 2), True] - - # _checkpoint() is not called after processing this slice - # because it's not a full slice. - incomplete_slice = [datetime_utc(2015, 1, 1), datetime_utc(2015, 1, 2), False] - - # _checkpoint() is not called after processing this slice, - # even though it's supposedly complete, because today isn't - # over yet. - today_slice = [today - timedelta(days=1), today, True] - - # _checkpoint() is not called after processing this slice - # because it doesn't end in the past. - future_slice = [today + timedelta(days=1), today + timedelta(days=2), True] - - default_monitor.slice_timespan = MagicMock( - return_value=[full_slice, incomplete_slice, today_slice, future_slice] - ) - default_monitor.purchases = MagicMock(return_value=["A record"]) - default_monitor.process_record = MagicMock() - default_monitor._checkpoint = MagicMock() - - # Execute. - progress = TimestampData() - start = datetime_utc(2019, 1, 1) - cutoff = datetime_utc(2020, 1, 1) - default_monitor.catch_up_from(start, cutoff, progress) - - # slice_timespan was called once. - default_monitor.slice_timespan.assert_called_once_with( - start, cutoff, timedelta(days=1) - ) - - # purchases() was called on each slice it returned. - default_monitor.purchases.assert_has_calls( - [ - mock.call(*x[:2]) - for x in (full_slice, incomplete_slice, today_slice, future_slice) - ] - ) - - # Each purchases() call returned a single record, which was - # passed into process_record along with the start date of the - # current slice. - default_monitor.process_record.assert_has_calls( - [ - mock.call("A record", x[0]) - for x in [full_slice, incomplete_slice, today_slice, future_slice] - ] - ) - - # TimestampData.achievements was set to the total number of - # records processed. - assert progress.achievements == "MARC records processed: 4" - - # Only one of our contrived time slices -- the first one -- - # was a full slice that ended before the current - # date. _checkpoint was called on that slice, and only that - # slice. - default_monitor._checkpoint.assert_called_once_with( - progress, start, full_slice[0], "MARC records processed: 1" - ) - - def test__checkpoint(self, default_monitor: BibliothecaPurchaseMonitor): - # The _checkpoint method allows the BibliothecaPurchaseMonitor - # to preserve its progress in case of a crash. - - # The Timestamp for the default monitor shows that it has - # a start date but it's never successfully completed. - timestamp_obj = default_monitor.timestamp() - assert timestamp_obj.achievements is None - assert timestamp_obj.start == BibliothecaPurchaseMonitor.DEFAULT_START_TIME - assert timestamp_obj.finish is None - - timestamp_data = TimestampData() - finish = datetime_utc(2020, 1, 1) - achievements = "Some achievements" - - default_monitor._checkpoint( - timestamp_data, timestamp_obj.start, finish, achievements - ) - - # Calling _checkpoint creates the impression that the monitor - # completed at the checkpoint, even though in point of fact - # it's still running. - timestamp_obj = default_monitor.timestamp() - assert timestamp_obj.achievements == achievements - assert timestamp_obj.start == BibliothecaPurchaseMonitor.DEFAULT_START_TIME - assert timestamp_obj.finish == finish - - def test_purchases(self, default_monitor: BibliothecaPurchaseMonitor): - # The purchases() method calls marc_request repeatedly, handling - # pagination. - - # Mock three pages that contain 50, 50, and 49 items. - default_monitor.api.marc_request = MagicMock( - side_effect=[[1] * 50, [2] * 50, [3] * 49] - ) - start = datetime_utc(2020, 1, 1) - end = datetime_utc(2020, 1, 2) - records = [x for x in default_monitor.purchases(start, end)] - - # marc_request was called repeatedly with increasing offsets - # until it returned fewer than 50 results. - default_monitor.api.marc_request.assert_has_calls( - [mock.call(start, end, offset, 50) for offset in (1, 51, 101)] - ) - - # Every "record" it returned was yielded as part of a single - # stream. - assert ([1] * 50) + ([2] * 50) + ([3] * 49) == records - - def test_process_record( - self, - default_monitor: BibliothecaPurchaseMonitor, - caplog: pytest.LogCaptureFixture, - bibliotheca_fixture: BibliothecaAPITestFixture, - ): - # process_record may create a LicensePool, trigger the - # bibliographic coverage provider, and/or issue a "license - # added" analytics event, based on the identifier found in a - # MARC record. - purchase_time = utc_now() - ensure_coverage = MagicMock() - default_monitor.bibliographic_coverage_provider.ensure_coverage = ( - ensure_coverage - ) - - # Try some cases that won't happen in real life. - multiple_control_numbers = b"""01034nam a22002413a 4500ehasb89abcde""" - no_control_number = b"""01034nam a22002413a 4500""" - for bad_record, expect_error in ( - ( - multiple_control_numbers, - "Ignoring MARC record with multiple Bibliotheca control numbers.", - ), - ( - no_control_number, - "Ignoring MARC record with no Bibliotheca control number.", - ), - ): - [marc] = parse_xml_to_array(BytesIO(bad_record)) - assert default_monitor.process_record(marc, purchase_time) is None - assert expect_error in caplog.messages[-1] - - # Now, try the two real cases. - [ehasb89, oock89] = parse_xml_to_array( - StringIO( - bibliotheca_fixture.files.sample_data("marc_records_two.xml").decode( - "utf8" - ) - ) - ) - - # If the book is new to this collection, it's run through - # BibliothecaBibliographicCoverageProvider.ensure_coverage to - # give it initial bibliographic and circulation data. - pool = default_monitor.process_record(ehasb89, purchase_time) - assert pool.identifier.identifier == "ehasb89" - assert pool.identifier.type == Identifier.BIBLIOTHECA_ID - assert pool.data_source.name == DataSource.BIBLIOTHECA - assert bibliotheca_fixture.collection == pool.collection - ensure_coverage.assert_called_once_with(pool.identifier, force=True) - - # If the book is already in this collection, ensure_coverage - # is not called. - pool, ignore = LicensePool.for_foreign_id( - bibliotheca_fixture.db.session, - DataSource.BIBLIOTHECA, - Identifier.BIBLIOTHECA_ID, - "3oock89", - collection=bibliotheca_fixture.collection, - ) - pool2 = default_monitor.process_record(oock89, purchase_time) - assert pool == pool2 - assert ensure_coverage.call_count == 1 # i.e. was not called again. - - def test_end_to_end( - self, - default_monitor: BibliothecaPurchaseMonitor, - bibliotheca_fixture: BibliothecaAPITestFixture, - ): - # Limited end-to-end test of the BibliothecaPurchaseMonitor. - - # Set the default start time to one minute in the past, so the - # monitor doesn't feel the need to make more than one call to - # the MARC endpoint. - default_monitor.override_timestamp = True - start_time = utc_now() - timedelta(minutes=1) - default_monitor.default_start_time = start_time - - # There will be two calls to the mock API: one to the MARC - # endpoint, which will tell us about the purchase of a single - # book, and one to the metadata endpoint for information about - # that book. - api = cast(MockBibliothecaAPI, default_monitor.api) - api.queue_response( - 200, content=bibliotheca_fixture.files.sample_data("marc_records_one.xml") - ) - api.queue_response( - 200, - content=bibliotheca_fixture.files.sample_data("item_metadata_single.xml"), - ) - default_monitor.run() - - # One book was created. - work = bibliotheca_fixture.db.session.query(Work).one() - - # Bibliographic information came from the coverage provider, - # not from our fake MARC record (which is actually for a - # different book). - assert work.title == "The Incense Game" - - # Licensing information was also taken from the coverage - # provider. - [lp] = work.license_pools - assert lp.identifier.identifier == "ddf4gr9" - assert default_monitor.collection == lp.collection - assert lp.licenses_owned == 1 - assert lp.licenses_available == 1 - - # The timestamp has been updated; the next time the monitor - # runs it will ask for purchases that haven't happened yet. - default_monitor.override_timestamp = False - timestamp = default_monitor.timestamp() - assert timestamp.achievements == "MARC records processed: 1" - assert timestamp.finish is not None - assert timestamp.finish > start_time - - -class TestBibliothecaPurchaseMonitorWhenMultipleCollections: - def test_multiple_service_type_timestamps_with_start_date( - self, bibliotheca_fixture: BibliothecaAPITestFixture - ): - db = bibliotheca_fixture.db - # Start with multiple collections that have timestamps - # because they've run before. - collections = [ - MockBibliothecaAPI.mock_collection( - db.session, db.default_library(), name="Collection 1" - ), - MockBibliothecaAPI.mock_collection( - db.session, db.default_library(), name="Collection 2" - ), - ] - for c in collections: - Timestamp.stamp( - db.session, - service=BibliothecaPurchaseMonitor.SERVICE_NAME, - service_type=Timestamp.MONITOR_TYPE, - collection=c, - ) - # Instantiate the associated monitors with a start date. - monitors = [ - BibliothecaPurchaseMonitor( - db.session, c, api_class=BibliothecaAPI, default_start="2011-02-03" - ) - for c in collections - ] - assert len(monitors) == len(collections) - # Ensure that we get monitors and not an exception. - for m in monitors: - assert isinstance(m, BibliothecaPurchaseMonitor) - - class TestItemListParser: def test_contributors_for_string(cls): authors = list( diff --git a/tests/manager/integration/license/test_bibliotheca_purchase_record_importer.py b/tests/manager/integration/license/test_bibliotheca_purchase_record_importer.py new file mode 100644 index 0000000000..c474c2e040 --- /dev/null +++ b/tests/manager/integration/license/test_bibliotheca_purchase_record_importer.py @@ -0,0 +1,318 @@ +"""Unit tests for BibliothecaPurchaseRecordImporter.""" + +from __future__ import annotations + +from datetime import timedelta +from unittest.mock import MagicMock, patch + +from palace.util.datetime_helpers import datetime_utc, utc_now + +from palace.manager.celery.tasks import apply +from palace.manager.integration.license.bibliotheca import BibliothecaAPI +from palace.manager.integration.license.bibliotheca_purchase_record_importer import ( + _MARC_PAGE_SIZE, + DEFAULT_PURCHASE_RECORD_START_TIME, + PURCHASE_RECORD_SERVICE_NAME, + BibliothecaPurchaseRecordImporter, + DayImportResult, +) +from palace.manager.sqlalchemy.model.coverage import Timestamp +from tests.fixtures.database import DatabaseTransactionFixture +from tests.mocks.bibliotheca import MockBibliothecaAPI + + +def _make_importer( + db: DatabaseTransactionFixture, + api: MockBibliothecaAPI | None = None, +) -> tuple[BibliothecaPurchaseRecordImporter, MockBibliothecaAPI]: + """Return an importer and its bound API for the default test collection.""" + collection = MockBibliothecaAPI.mock_collection(db.session, db.default_library()) + mock_api = api or MockBibliothecaAPI(db.session, collection) + importer = BibliothecaPurchaseRecordImporter(db.session, collection, api=mock_api) + return importer, mock_api + + +def _fake_marc_record(bibliotheca_id: str = "d5rf89") -> MagicMock: + """Return a minimal mock pymarc Record with a single 001 control field.""" + field = MagicMock() + field.tag = "001" + field.value.return_value = bibliotheca_id + record = MagicMock() + record.fields = [field] + return record + + +class TestBibliothecaPurchaseRecordImporterGetStart: + def test_no_prior_timestamp_returns_default_start( + self, db: DatabaseTransactionFixture + ) -> None: + """With no stored Timestamp, get_start returns DEFAULT_PURCHASE_RECORD_START_TIME.""" + importer, _ = _make_importer(db) + start = importer.get_start() + assert start == DEFAULT_PURCHASE_RECORD_START_TIME + + def test_prior_timestamp_returns_its_finish( + self, db: DatabaseTransactionFixture + ) -> None: + """With a stored Timestamp, get_start returns timestamp.finish.""" + importer, mock_api = _make_importer(db) + collection = mock_api.collection + finish = utc_now() - timedelta(days=30) + Timestamp.stamp( + db.session, + service=PURCHASE_RECORD_SERVICE_NAME, + service_type=Timestamp.MONITOR_TYPE, + collection=collection, + finish=finish, + ) + + start = importer.get_start() + assert abs((start - finish).total_seconds()) < 1 + + +class TestBibliothecaPurchaseRecordImporterImportDay: + def test_returns_day_result_with_correct_bounds( + self, db: DatabaseTransactionFixture + ) -> None: + """import_day returns a DayImportResult with the correct window.""" + importer, _ = _make_importer(db) + current_day = datetime_utc(2024, 1, 15) + cutoff = datetime_utc(2024, 1, 20) + + with patch.object(BibliothecaAPI, "marc_request", return_value=iter([])): + result = importer.import_day(current_day, cutoff) + + assert isinstance(result, DayImportResult) + assert result.day_start == current_day + expected_end = current_day + timedelta(days=1) + assert abs((result.day_end - expected_end).total_seconds()) < 1 + assert result.records_handled == 0 + + def test_day_end_capped_at_cutoff(self, db: DatabaseTransactionFixture) -> None: + """When current_day + 1 day > cutoff, day_end is capped at cutoff.""" + importer, _ = _make_importer(db) + cutoff = utc_now() + # Start just a few hours before cutoff — less than a full day. + current_day = cutoff - timedelta(hours=3) + + with patch.object(BibliothecaAPI, "marc_request", return_value=iter([])): + result = importer.import_day(current_day, cutoff) + + assert abs((result.day_end - cutoff).total_seconds()) < 1 + + def test_stamps_timestamp_to_day_end_when_day_complete( + self, db: DatabaseTransactionFixture + ) -> None: + """A partial page (day complete) stamps Timestamp.finish to day_end.""" + importer, mock_api = _make_importer(db) + collection = mock_api.collection + current_day = datetime_utc(2024, 1, 15) + cutoff = datetime_utc(2024, 1, 20) + + with patch.object(BibliothecaAPI, "marc_request", return_value=iter([])): + result = importer.import_day(current_day, cutoff) + + ts = Timestamp.lookup( + db.session, PURCHASE_RECORD_SERVICE_NAME, Timestamp.MONITOR_TYPE, collection + ) + assert ts is not None + assert ts.finish is not None + assert abs((ts.finish - result.day_end).total_seconds()) < 1 + + def test_stamps_timestamp_to_current_day_when_page_full( + self, db: DatabaseTransactionFixture + ) -> None: + """A full page (day still in progress) stamps Timestamp.finish to current_day.""" + importer, mock_api = _make_importer(db) + collection = mock_api.collection + current_day = datetime_utc(2024, 1, 15) + cutoff = datetime_utc(2024, 1, 20) + + full_page = [_fake_marc_record(f"item{i}") for i in range(_MARC_PAGE_SIZE)] + + with ( + patch.object(BibliothecaAPI, "marc_request", return_value=iter(full_page)), + patch.object(BibliothecaAPI, "bibliographic_lookup", return_value=[]), + ): + result = importer.import_day(current_day, cutoff) + + assert result.next_offset is not None # day not yet complete + + ts = Timestamp.lookup( + db.session, PURCHASE_RECORD_SERVICE_NAME, Timestamp.MONITOR_TYPE, collection + ) + assert ts is not None + assert ts.finish is not None + assert abs((ts.finish - current_day).total_seconds()) < 1 + + def test_counts_records_handled(self, db: DatabaseTransactionFixture) -> None: + """records_handled in the result matches the number of records on the page.""" + importer, _ = _make_importer(db) + current_day = datetime_utc(2024, 1, 15) + cutoff = datetime_utc(2024, 1, 20) + + fake_records = [_fake_marc_record(f"item{i}") for i in range(3)] + + with ( + patch.object( + BibliothecaAPI, "marc_request", return_value=iter(fake_records) + ), + patch.object(BibliothecaAPI, "bibliographic_lookup", return_value=[]), + ): + result = importer.import_day(current_day, cutoff) + + assert result.records_handled == 3 + + def test_returns_next_offset_when_page_is_full( + self, db: DatabaseTransactionFixture + ) -> None: + """A full page (50 records) sets next_offset to offset + _MARC_PAGE_SIZE.""" + importer, _ = _make_importer(db) + current_day = datetime_utc(2024, 1, 15) + cutoff = datetime_utc(2024, 1, 20) + + full_page = [_fake_marc_record(f"item{i}") for i in range(_MARC_PAGE_SIZE)] + + with ( + patch.object(BibliothecaAPI, "marc_request", return_value=iter(full_page)), + patch.object(BibliothecaAPI, "bibliographic_lookup", return_value=[]), + ): + result = importer.import_day(current_day, cutoff, offset=1) + + assert result.next_offset == 1 + _MARC_PAGE_SIZE + + def test_returns_no_next_offset_when_page_is_partial( + self, db: DatabaseTransactionFixture + ) -> None: + """A partial page (fewer than 50 records) sets next_offset to None.""" + importer, _ = _make_importer(db) + current_day = datetime_utc(2024, 1, 15) + cutoff = datetime_utc(2024, 1, 20) + + partial_page = [_fake_marc_record(f"item{i}") for i in range(3)] + + with ( + patch.object( + BibliothecaAPI, "marc_request", return_value=iter(partial_page) + ), + patch.object(BibliothecaAPI, "bibliographic_lookup", return_value=[]), + ): + result = importer.import_day(current_day, cutoff, offset=51) + + assert result.next_offset is None + + def test_passes_offset_to_marc_request( + self, db: DatabaseTransactionFixture + ) -> None: + """import_day forwards the offset parameter to marc_request.""" + importer, _ = _make_importer(db) + current_day = datetime_utc(2024, 1, 15) + cutoff = datetime_utc(2024, 1, 20) + + with patch.object( + BibliothecaAPI, "marc_request", return_value=iter([]) + ) as mock_request: + importer.import_day(current_day, cutoff, offset=101) + + args, kwargs = mock_request.call_args + # marc_request(start, end, offset, limit) + assert args[2] == 101 + + +class TestBibliothecaPurchaseRecordImporterProcessRecord: + """Tests for _process_record logic, exercised via import_day.""" + + def test_creates_license_pool_for_valid_record( + self, db: DatabaseTransactionFixture + ) -> None: + """_process_record creates a LicensePool for a record with a valid 001 field.""" + importer, mock_api = _make_importer(db) + collection = mock_api.collection + current_day = datetime_utc(2024, 1, 15) + cutoff = datetime_utc(2024, 1, 20) + + with ( + patch.object( + BibliothecaAPI, + "marc_request", + return_value=iter([_fake_marc_record("d5rf89")]), + ), + patch.object(BibliothecaAPI, "bibliographic_lookup", return_value=[]), + ): + importer.import_day(current_day, cutoff) + + pools = [ + lp for lp in collection.licensepools if lp.identifier.identifier == "d5rf89" + ] + assert len(pools) == 1 + + def test_skips_record_with_no_control_number( + self, db: DatabaseTransactionFixture + ) -> None: + """_process_record logs an error and skips records with no 001 field.""" + importer, _ = _make_importer(db) + current_day = datetime_utc(2024, 1, 15) + cutoff = datetime_utc(2024, 1, 20) + + # Record with no 001 field. + bad_record = MagicMock() + bad_record.fields = [] + bad_record.as_json.return_value = "{}" + + with patch.object( + BibliothecaAPI, "marc_request", return_value=iter([bad_record]) + ): + result = importer.import_day(current_day, cutoff) + + # The record was "handled" (iterated) but produced no LicensePool. + assert result.records_handled == 1 + + def test_queues_bibliographic_apply_when_needed( + self, db: DatabaseTransactionFixture + ) -> None: + """bibliographic_apply is queued when the bibliographic data has changed.""" + importer, _ = _make_importer(db) + mock_bib = MagicMock() + mock_bib.needs_apply.return_value = True + current_day = datetime_utc(2024, 1, 15) + cutoff = datetime_utc(2024, 1, 20) + + with ( + patch.object( + BibliothecaAPI, + "marc_request", + return_value=iter([_fake_marc_record()]), + ), + patch.object( + BibliothecaAPI, "bibliographic_lookup", return_value=[mock_bib] + ), + patch.object(apply, "bibliographic_apply") as mock_apply, + ): + importer.import_day(current_day, cutoff) + + mock_apply.delay.assert_called_once() + + def test_skips_bibliographic_apply_when_not_needed( + self, db: DatabaseTransactionFixture + ) -> None: + """bibliographic_apply is not queued when the data is already up to date.""" + importer, _ = _make_importer(db) + mock_bib = MagicMock() + mock_bib.needs_apply.return_value = False + current_day = datetime_utc(2024, 1, 15) + cutoff = datetime_utc(2024, 1, 20) + + with ( + patch.object( + BibliothecaAPI, + "marc_request", + return_value=iter([_fake_marc_record()]), + ), + patch.object( + BibliothecaAPI, "bibliographic_lookup", return_value=[mock_bib] + ), + patch.object(apply, "bibliographic_apply") as mock_apply, + ): + importer.import_day(current_day, cutoff) + + mock_apply.delay.assert_not_called() diff --git a/tests/manager/integration/license/test_bibliotheca_scripts.py b/tests/manager/integration/license/test_bibliotheca_scripts.py index a10df4549f..fa69c3ba90 100644 --- a/tests/manager/integration/license/test_bibliotheca_scripts.py +++ b/tests/manager/integration/license/test_bibliotheca_scripts.py @@ -11,6 +11,7 @@ from palace.manager.celery.tasks import bibliotheca from palace.manager.integration.license.bibliotheca_scripts import ( ImportEventCollection, + ImportPurchaseRecordCollection, ) from tests.fixtures.database import DatabaseTransactionFixture from tests.mocks.bibliotheca import MockBibliothecaAPI @@ -55,3 +56,50 @@ def test_both_args_raises(self, db: DatabaseTransactionFixture) -> None: ImportEventCollection(db.session).do_run( ["--collection", collection.name, "--import-all"] ) + + +class TestImportPurchaseRecordCollection: + def test_import_all_queues_import_purchase_records_for_all_collections( + self, db: DatabaseTransactionFixture + ) -> None: + """--import-all dispatches import_purchase_records_for_all_collections.""" + with patch.object( + bibliotheca, "import_purchase_records_for_all_collections" + ) as mock_task: + ImportPurchaseRecordCollection(db.session).do_run(["--import-all"]) + mock_task.delay.assert_called_once_with() + + def test_collection_queues_import_purchase_records_by_collection( + self, db: DatabaseTransactionFixture + ) -> None: + """--collection dispatches import_purchase_records_by_collection for that collection.""" + collection = MockBibliothecaAPI.mock_collection( + db.session, db.default_library(), name="My Bibliotheca" + ) + with patch.object( + bibliotheca, "import_purchase_records_by_collection" + ) as mock_task: + ImportPurchaseRecordCollection(db.session).do_run( + ["--collection", collection.name] + ) + mock_task.delay.assert_called_once_with(collection_id=collection.id) + + def test_collection_not_found_raises(self, db: DatabaseTransactionFixture) -> None: + """--collection with an unknown name raises PalaceValueError.""" + with pytest.raises(PalaceValueError, match='No collection found named "Ghost"'): + ImportPurchaseRecordCollection(db.session).do_run(["--collection", "Ghost"]) + + def test_no_args_raises(self, db: DatabaseTransactionFixture) -> None: + """Omitting both --collection and --import-all is an error.""" + with pytest.raises(SystemExit): + ImportPurchaseRecordCollection(db.session).do_run([]) + + def test_both_args_raises(self, db: DatabaseTransactionFixture) -> None: + """Specifying both --collection and --import-all is an error.""" + collection = MockBibliothecaAPI.mock_collection( + db.session, db.default_library() + ) + with pytest.raises(SystemExit): + ImportPurchaseRecordCollection(db.session).do_run( + ["--collection", collection.name, "--import-all"] + )