Skip to content

Migrate BibliothecaPurchaseMonitor to Celery tasks#3385

Open
dbernstein wants to merge 6 commits into
mainfrom
feature/bibliotheca-purchase-monitor-celery
Open

Migrate BibliothecaPurchaseMonitor to Celery tasks#3385
dbernstein wants to merge 6 commits into
mainfrom
feature/bibliotheca-purchase-monitor-celery

Conversation

@dbernstein
Copy link
Copy Markdown
Contributor

Description

Replaces the legacy script-driven BibliothecaPurchaseMonitor with a pair of Celery tasks that process one day of MARC purchase records per invocation and chain days via task.replace() until the collection is caught up to utc_now().

This is PR 2 of 3 in the Bibliotheca → Celery migration series (PR 1 migrated the event monitor).

New files:

  • src/palace/manager/integration/license/bibliotheca_purchase_importer.pyBibliothecaPurchaseImporter with get_start(), import_day(), _purchases() (pagination), and _process_record(). Uses hash-based bibliographic_apply.delay() instead of the legacy BibliothecaBibliographicCoverageProvider.
  • bin/bibliotheca_purchase_import — manual trigger script (mirrors bin/bibliotheca_event_import from PR 1)
  • tests/manager/integration/license/test_bibliotheca_purchase_importer.py — unit tests for the importer class

Modified files:

  • src/palace/manager/celery/tasks/bibliotheca.py — adds purchase_all_collections and purchase_collection tasks with a _purchase_workflow_lock (independent Redis key from the event-import lock so both can run concurrently per collection), autoretry_for=(BadResponseException, RequestTimedOut), and replace-per-day chaining
  • src/palace/manager/service/celery/celery.py — adds daily beat schedule entry at 4:00 AM
  • src/palace/manager/integration/license/bibliotheca_scripts.py — adds ImportPurchaseCollection script for manual task dispatch
  • src/palace/manager/integration/license/bibliotheca.py — deletes BibliothecaTimelineMonitor, BibliothecaPurchaseMonitor, RunBibliothecaPurchaseMonitorScript, and their now-dead imports
  • .pre-commit-config.yaml — adds default_language_version: python: python3.12 so check-ast can parse Python 3.12 generic class syntax already present in the codebase

Deleted: bin/bibliotheca_purchase_monitor (replaced by Celery beat)

Motivation and Context

The three Bibliotheca monitors (event, purchase, circulation sweep) are currently driven by external cron jobs invoking one-shot scripts backed by a legacy Monitor base-class hierarchy. Migrating to Celery gives us centralized scheduling via beat, task.replace() to avoid monopolizing workers during long backfills, standard retry/backoff handling, and the ability to delete several hundred lines of monitor scaffolding.

How Has This Been Tested?

  • 39 tests across tests/manager/celery/tasks/test_bibliotheca.py and tests/manager/integration/license/test_bibliotheca_purchase_importer.py — all passing
  • Test coverage includes: first-run default start, stored-timestamp resume, already-up-to-date early exit, replace-per-day chaining, lock forwarding, lock-held skip/warning, lock independence from event-import lock, autoretry lock-not-released, timestamp advancement, and MARC record processing end-to-end

Checklist

  • I have updated the documentation accordingly.
  • All new and existing tests passed.

@dbernstein dbernstein added the feature New feature label May 22, 2026
@claude
Copy link
Copy Markdown
Contributor

claude Bot commented May 22, 2026

Claude finished @dbernstein's task in 12m 17s —— View job


Code Review

  • Gather context: read changed files
  • Analyze implementation correctness
  • Check for bugs, edge cases, and security issues
  • Review test coverage and quality
  • Post final review

Summary

This is a clean, well-structured migration. The core lock chain mechanism is correct: RedisLock.acquire() re-checks the stored value and — when random_value matches — extends the TTL rather than failing, so each replaced invocation transparently renews the lock rather than spuriously hitting the "lock expired between days" warning path. The offset advancement (offset + _MARC_PAGE_SIZE) reproduces the legacy purchases() loop exactly. Test coverage of the happy path, lock skipping, lock independence, autoretry lock-hold, and timestamp advance is solid. One performance concern stands out, and there is a misleading test comment worth fixing.

Details

Minor: src/palace/manager/integration/license/bibliotheca_purchase_record_importer.py:133-201

import_day calls _process_record for each of up to 50 MARC records inside task.transaction(). Each _process_record makes a synchronous HTTP call (bibliographic_lookup) and potentially dispatches a Celery task (bibliographic_apply.delay()), all while the DB transaction is held open. In the steady state this is roughly 50 sequential HTTP calls inside one transaction. During a 12-year backfill with thousands of pages this may hold DB connections for extended periods and could slow other writers.

The event importer uses a single batched HTTP call (get_events_between) per invocation inside the transaction; the purchase importer makes O(page_size) calls. A straightforward improvement is to collect the BibliographicData (and which IDs need it) outside the transaction, then commit the LicensePool / Timestamp writes in a short transaction, then dispatch Celery tasks after commit.

records = list(
self._api.marc_request(current_day, day_end, offset, _MARC_PAGE_SIZE)
)
for record in records:
self._process_record(record, current_day)
records_handled = len(records)
day_complete = records_handled < _MARC_PAGE_SIZE
next_offset = None if day_complete else offset + _MARC_PAGE_SIZE
# Always checkpoint: advance finish to day_end when the day is done,
# or to current_day while still in progress, so a restart after a
# crash resumes from this day rather than the previous one.
Timestamp.stamp(
self._session,
service=PURCHASE_RECORD_SERVICE_NAME,
service_type=Timestamp.MONITOR_TYPE,
collection=self._collection,
start=current_day,
finish=day_end if day_complete else current_day,
achievements=f"MARC records processed: {records_handled}.",
)
return DayImportResult(
records_handled=records_handled,
day_start=current_day,
day_end=day_end,
next_offset=next_offset,
)
def _process_record(self, record: Record, purchase_record_time: datetime) -> None:
"""Process a single Bibliotheca MARC purchase record.
Extracts the Bibliotheca ID from MARC field ``001``, creates or finds
the ``LicensePool``, then queues a ``bibliographic_apply`` task when
the title's metadata has changed (hash-based deduplication).
:param record: A pymarc ``Record`` representing one purchased title.
:param purchase_record_time: Timestamp of the purchase record day.
"""
control_numbers = [f for f in record.fields if f.tag == "001"]
if not control_numbers:
self.log.error(
f"Ignoring MARC record with no Bibliotheca control number. {record.as_json()}"
)
return
if len(control_numbers) > 1:
self.log.error(
f"Ignoring MARC record with multiple Bibliotheca control numbers. {record.as_json()}"
)
return
bibliotheca_id = control_numbers[0].value()
LicensePool.for_foreign_id(
self._session,
self._api.data_source,
Identifier.BIBLIOTHECA_ID,
bibliotheca_id,
collection=self._collection,
)
for bibliographic in self._api.bibliographic_lookup(bibliotheca_id):
if bibliographic.needs_apply(self._session):
apply.bibliographic_apply.delay(
bibliographic,
collection_id=self._collection.id,
replace=ReplacementPolicy.from_license_source(),
)

Nit: tests/manager/celery/tasks/test_bibliotheca.py

The comment just before the patch.object(…, "replace") block in test_first_run_starts_from_default_start_time reads:

# Stamp a timestamp so we can check the start passed to marc_request.

No timestamp is stamped in that test; the patch.object block is there to intercept task.replace() so that the test can inspect the arguments marc_request received before the chain continues. The comment is a copy-paste artifact from a different test and should be corrected (e.g. # Intercept task.replace() so we can inspect the marc_request arguments.).

mock_task.delay.assert_has_calls(
[call(collection_id=c1.id), call(collection_id=c2.id)],
any_order=True,
)
assert mock_task.delay.call_count == 2
def test_no_bibliotheca_collections(
self,
db: DatabaseTransactionFixture,
celery_fixture: CeleryFixture,
) -> None:
"""import_purchase_records_for_all_collections is a no-op when there are no Bibliotheca collections."""
db.default_collection() # non-Bibliotheca
with patch.object(
bibliotheca, "import_purchase_records_by_collection"
) as mock_task:
bibliotheca.import_purchase_records_for_all_collections.delay().wait()

@codecov
Copy link
Copy Markdown

codecov Bot commented May 22, 2026

Codecov Report

❌ Patch coverage is 96.87500% with 4 lines in your changes missing coverage. Please review.
✅ Project coverage is 93.35%. Comparing base (5315bfb) to head (6a34bf8).
⚠️ Report is 1 commits behind head on main.

Files with missing lines Patch % Lines
...on/license/bibliotheca_purchase_record_importer.py 95.00% 2 Missing and 1 partial ⚠️
src/palace/manager/celery/tasks/bibliotheca.py 97.87% 0 Missing and 1 partial ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main    #3385      +/-   ##
==========================================
+ Coverage   93.34%   93.35%   +0.01%     
==========================================
  Files         507      508       +1     
  Lines       46434    46459      +25     
  Branches     6336     6338       +2     
==========================================
+ Hits        43345    43373      +28     
+ Misses       1999     1994       -5     
- Partials     1090     1092       +2     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

Comment on lines +164 to +166
# ---------------------------------------------------------------------------
# Purchase monitor
# ---------------------------------------------------------------------------
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove comment.

def _purchase_workflow_lock(
client: Redis, collection_id: int, random_value: str
) -> RedisLock:
"""Create a workflow-level lock for the purchase monitor.
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

monitor -> importer

Comment on lines +174 to +175
Uses a key distinct from ``import_workflow_lock`` so that event-import
and purchase-import runs for the same collection do not block each other.
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove comment.



@shared_task(queue=QueueNames.default, bind=True)
def purchase_all_collections(task: Task) -> None:
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

change name according to first comment.

throws=(RemoteIntegrationException,),
retry_backoff=60,
)
def purchase_collection(
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above.

dbernstein and others added 6 commits May 22, 2026 14:24
Replace the legacy script-driven BibliothecaPurchaseMonitor with a
Celery-native purchase_collection task that processes one day of MARC
records per invocation and chains days via task.replace() until the
collection is caught up.

- Add BibliothecaPurchaseImporter (bibliotheca_purchase_importer.py)
  with get_start/import_day/_purchases/_process_record; uses hash-based
  bibliographic_apply.delay() instead of BibliothecaBibliographicCoverageProvider
- Add purchase_all_collections and purchase_collection Celery tasks with
  Redis workflow lock (_purchase_workflow_lock, independent key from
  import_workflow_lock), autoretry for BadResponseException/RequestTimedOut,
  and replace-per-day chaining
- Add daily beat schedule entry (4:00 AM) in celery.py
- Add ImportPurchaseCollection script + bin/bibliotheca_purchase_import
  for manual trigger (mirrors ImportEventCollection from PR 1)
- Delete BibliothecaTimelineMonitor, BibliothecaPurchaseMonitor,
  RunBibliothecaPurchaseMonitorScript from bibliotheca.py and remove
  now-dead imports
- Delete bin/bibliotheca_purchase_monitor (replaced by Celery beat)
- Add default_language_version: python3.12 to .pre-commit-config.yaml
  so check-ast can parse Python 3.12 generic class syntax already present
  in bibliotheca.py

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Replace %-style log arguments and implicit f-string + bare-string
concatenations with f-strings throughout the new purchase monitor files.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
- Remove TestBibliothecaPurchaseMonitor and
  TestBibliothecaPurchaseMonitorWhenMultipleCollections from
  test_bibliotheca.py, along with the now-dead BibliothecaPurchaseMonitor
  and TimestampData imports
- Add return type Iterator[MagicMock] to mock_marc_request in
  test_bibliotheca_purchase_importer.py, removing the unnecessary
  type: ignore[no-untyped-def] comment

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Previously import_day fetched all pages for a day in a single loop,
allowing a single task to process an unbounded number of records if a
day had many purchases. This brings the purchase monitor in line with
every other paginated Celery task in the codebase.

Changes:
- import_day(current_day, cutoff, offset=1) now fetches exactly one
  API page (up to _MARC_PAGE_SIZE=50 records) and returns
  DayImportResult.next_offset: set to offset+50 when the page was full
  (more records remain), or None when the page was partial (day done)
- Remove _purchases() — pagination now lives at the task level via
  task.replace(), consistent with the event-import and Overdrive patterns
- Timestamp is checkpointed after every page: finish=current_day while
  the day is in progress (so a worker crash restarts from this day, not
  the previous one), finish=day_end when the day completes
- purchase_collection gains an offset: int = 1 parameter and handles two
  replace paths: same day + next offset, or next day + offset reset to 1
- Tests updated: replace test_paginates_marc_request with
  test_returns_next_offset_when_page_is_full,
  test_returns_no_next_offset_when_page_is_partial, and
  test_passes_offset_to_marc_request; add
  test_replaces_with_next_offset_when_page_full to task tests;
  tighten test_replaces_when_more_days_remain to assert offset=1

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
All identifiers introduced for the purchase-record sync now carry the
word "Record" to make the concept explicit: BibliothecaPurchaseRecordImporter,
PURCHASE_RECORD_SERVICE_NAME, import_purchase_records_for_all_collections,
import_purchase_records_by_collection, the Redis lock key, beat-schedule
entry, bin script, and all test fixtures/classes. Pure mechanical rename;
no logic changes.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Cover the five cases that mirror TestImportEventCollection:
--import-all, --collection <name>, unknown collection,
no args, and both args simultaneously.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
@dbernstein dbernstein force-pushed the feature/bibliotheca-purchase-monitor-celery branch from f56fc86 to 6a34bf8 Compare May 22, 2026 21:24
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

feature New feature

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant