Migrate BibliothecaPurchaseMonitor to Celery tasks#3385
Conversation
|
Claude finished @dbernstein's task in 12m 17s —— View job Code Review
SummaryThis is a clean, well-structured migration. The core lock chain mechanism is correct: DetailsMinor:
|
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #3385 +/- ##
==========================================
+ Coverage 93.34% 93.35% +0.01%
==========================================
Files 507 508 +1
Lines 46434 46459 +25
Branches 6336 6338 +2
==========================================
+ Hits 43345 43373 +28
+ Misses 1999 1994 -5
- Partials 1090 1092 +2 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
| # --------------------------------------------------------------------------- | ||
| # Purchase monitor | ||
| # --------------------------------------------------------------------------- |
| def _purchase_workflow_lock( | ||
| client: Redis, collection_id: int, random_value: str | ||
| ) -> RedisLock: | ||
| """Create a workflow-level lock for the purchase monitor. |
There was a problem hiding this comment.
monitor -> importer
| Uses a key distinct from ``import_workflow_lock`` so that event-import | ||
| and purchase-import runs for the same collection do not block each other. |
|
|
||
|
|
||
| @shared_task(queue=QueueNames.default, bind=True) | ||
| def purchase_all_collections(task: Task) -> None: |
There was a problem hiding this comment.
change name according to first comment.
| throws=(RemoteIntegrationException,), | ||
| retry_backoff=60, | ||
| ) | ||
| def purchase_collection( |
Replace the legacy script-driven BibliothecaPurchaseMonitor with a Celery-native purchase_collection task that processes one day of MARC records per invocation and chains days via task.replace() until the collection is caught up. - Add BibliothecaPurchaseImporter (bibliotheca_purchase_importer.py) with get_start/import_day/_purchases/_process_record; uses hash-based bibliographic_apply.delay() instead of BibliothecaBibliographicCoverageProvider - Add purchase_all_collections and purchase_collection Celery tasks with Redis workflow lock (_purchase_workflow_lock, independent key from import_workflow_lock), autoretry for BadResponseException/RequestTimedOut, and replace-per-day chaining - Add daily beat schedule entry (4:00 AM) in celery.py - Add ImportPurchaseCollection script + bin/bibliotheca_purchase_import for manual trigger (mirrors ImportEventCollection from PR 1) - Delete BibliothecaTimelineMonitor, BibliothecaPurchaseMonitor, RunBibliothecaPurchaseMonitorScript from bibliotheca.py and remove now-dead imports - Delete bin/bibliotheca_purchase_monitor (replaced by Celery beat) - Add default_language_version: python3.12 to .pre-commit-config.yaml so check-ast can parse Python 3.12 generic class syntax already present in bibliotheca.py Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Replace %-style log arguments and implicit f-string + bare-string concatenations with f-strings throughout the new purchase monitor files. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
- Remove TestBibliothecaPurchaseMonitor and TestBibliothecaPurchaseMonitorWhenMultipleCollections from test_bibliotheca.py, along with the now-dead BibliothecaPurchaseMonitor and TimestampData imports - Add return type Iterator[MagicMock] to mock_marc_request in test_bibliotheca_purchase_importer.py, removing the unnecessary type: ignore[no-untyped-def] comment Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Previously import_day fetched all pages for a day in a single loop, allowing a single task to process an unbounded number of records if a day had many purchases. This brings the purchase monitor in line with every other paginated Celery task in the codebase. Changes: - import_day(current_day, cutoff, offset=1) now fetches exactly one API page (up to _MARC_PAGE_SIZE=50 records) and returns DayImportResult.next_offset: set to offset+50 when the page was full (more records remain), or None when the page was partial (day done) - Remove _purchases() — pagination now lives at the task level via task.replace(), consistent with the event-import and Overdrive patterns - Timestamp is checkpointed after every page: finish=current_day while the day is in progress (so a worker crash restarts from this day, not the previous one), finish=day_end when the day completes - purchase_collection gains an offset: int = 1 parameter and handles two replace paths: same day + next offset, or next day + offset reset to 1 - Tests updated: replace test_paginates_marc_request with test_returns_next_offset_when_page_is_full, test_returns_no_next_offset_when_page_is_partial, and test_passes_offset_to_marc_request; add test_replaces_with_next_offset_when_page_full to task tests; tighten test_replaces_when_more_days_remain to assert offset=1 Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
All identifiers introduced for the purchase-record sync now carry the word "Record" to make the concept explicit: BibliothecaPurchaseRecordImporter, PURCHASE_RECORD_SERVICE_NAME, import_purchase_records_for_all_collections, import_purchase_records_by_collection, the Redis lock key, beat-schedule entry, bin script, and all test fixtures/classes. Pure mechanical rename; no logic changes. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Cover the five cases that mirror TestImportEventCollection: --import-all, --collection <name>, unknown collection, no args, and both args simultaneously. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
f56fc86 to
6a34bf8
Compare
Description
Replaces the legacy script-driven
BibliothecaPurchaseMonitorwith a pair of Celery tasks that process one day of MARC purchase records per invocation and chain days viatask.replace()until the collection is caught up toutc_now().This is PR 2 of 3 in the Bibliotheca → Celery migration series (PR 1 migrated the event monitor).
New files:
src/palace/manager/integration/license/bibliotheca_purchase_importer.py—BibliothecaPurchaseImporterwithget_start(),import_day(),_purchases()(pagination), and_process_record(). Uses hash-basedbibliographic_apply.delay()instead of the legacyBibliothecaBibliographicCoverageProvider.bin/bibliotheca_purchase_import— manual trigger script (mirrorsbin/bibliotheca_event_importfrom PR 1)tests/manager/integration/license/test_bibliotheca_purchase_importer.py— unit tests for the importer classModified files:
src/palace/manager/celery/tasks/bibliotheca.py— addspurchase_all_collectionsandpurchase_collectiontasks with a_purchase_workflow_lock(independent Redis key from the event-import lock so both can run concurrently per collection),autoretry_for=(BadResponseException, RequestTimedOut), and replace-per-day chainingsrc/palace/manager/service/celery/celery.py— adds daily beat schedule entry at 4:00 AMsrc/palace/manager/integration/license/bibliotheca_scripts.py— addsImportPurchaseCollectionscript for manual task dispatchsrc/palace/manager/integration/license/bibliotheca.py— deletesBibliothecaTimelineMonitor,BibliothecaPurchaseMonitor,RunBibliothecaPurchaseMonitorScript, and their now-dead imports.pre-commit-config.yaml— addsdefault_language_version: python: python3.12socheck-astcan parse Python 3.12 generic class syntax already present in the codebaseDeleted:
bin/bibliotheca_purchase_monitor(replaced by Celery beat)Motivation and Context
The three Bibliotheca monitors (event, purchase, circulation sweep) are currently driven by external cron jobs invoking one-shot scripts backed by a legacy
Monitorbase-class hierarchy. Migrating to Celery gives us centralized scheduling via beat,task.replace()to avoid monopolizing workers during long backfills, standard retry/backoff handling, and the ability to delete several hundred lines of monitor scaffolding.How Has This Been Tested?
tests/manager/celery/tasks/test_bibliotheca.pyandtests/manager/integration/license/test_bibliotheca_purchase_importer.py— all passingChecklist