Implement retry limits to prevent ingestion pipeline blockage#161
Implement retry limits to prevent ingestion pipeline blockage#161
Conversation
|
The latest updates on your projects. Learn more about Vercel for GitHub. 1 Skipped Deployment
|
Co-authored-by: vbuch <5102057+vbuch@users.noreply.github.com>
Co-authored-by: vbuch <5102057+vbuch@users.noreply.github.com>
There was a problem hiding this comment.
Pull request overview
Adds retry-limit tracking to the ingestion pipeline so a repeatedly failing source doesn’t block sequential ingestion runs indefinitely (#149).
Changes:
- Extend
InternalMessageSchemawithretryCount?: numberand plumb it throughgetMessageById. - Update
from-sources.tsto skip sources onceMAX_RETRY_ATTEMPTSis reached, increment retry count on “actual” failures, and includemaxRetriesReachedin summary logging. - Add a Vitest file for retry-limit behavior and remove
shared/from lint-staged tasks.
Reviewed changes
Copilot reviewed 5 out of 5 changed files in this pull request and generated 7 comments.
Show a summary per file
| File | Description |
|---|---|
| shared/src/schema/message.schema.ts | Adds retryCount to the internal message schema. |
| ingest/messageIngest/from-sources.ts | Implements max retry filtering, retry incrementing, and summary logging. |
| ingest/messageIngest/db/get-message.ts | Includes retryCount in InternalMessage returned by getMessageById. |
| ingest/messageIngest/retry-limit.test.ts | Adds tests related to retry-limit logic. |
| .lintstagedrc.mjs | Stops running eslint in shared/ via lint-staged. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
|
|
||
| // Find message by sourceDocumentId | ||
| const snapshot = await messagesRef | ||
| .where("sourceDocumentId", "==", sourceDocumentId) |
There was a problem hiding this comment.
The where(...).limit(1) query can match multiple message docs for the same sourceDocumentId (filter/split can create multiple messages). Without an orderBy, the chosen doc is not deterministic, so retry tracking can be inconsistent. Prefer updating a single well-defined doc keyed by sourceDocumentId (or a dedicated attempts collection).
| .where("sourceDocumentId", "==", sourceDocumentId) | |
| .where("sourceDocumentId", "==", sourceDocumentId) | |
| .orderBy("__name__") |
| const currentRetryCount = messageDoc.data().retryCount || 0; | ||
| const newRetryCount = currentRetryCount + 1; | ||
|
|
||
| await messageDoc.ref.update({ | ||
| retryCount: newRetryCount, |
There was a problem hiding this comment.
This read-modify-write (currentRetryCount + 1) is not atomic and can lose increments if multiple ingestion runs overlap. Prefer an atomic increment (e.g. FieldValue.increment(1) in a transaction) rather than calculating newRetryCount client-side.
| } else if (errorMessage.includes("Message filtering failed")) { | ||
| summary.filtered++; | ||
| logger.info("Source filtered as irrelevant", { title: source.title }); |
There was a problem hiding this comment.
This condition checks for "Message filtering failed", but messageIngest throws "Message filter & split failed" on filter/split failure. As written, those cases will be treated as "actual failures" and will increment retryCount (and won’t be counted as filtered). Align the error matching or use a typed error.
| const finalizedAt = data.finalizedAt; | ||
|
|
||
| // If message is finalized, mark as already ingested | ||
| // (finalized = processing complete, regardless of success or failure) | ||
| if (finalizedAt) { | ||
| alreadyIngestedIds.add(sourceDocId); | ||
| } |
There was a problem hiding this comment.
Only treating sources as already ingested when finalizedAt is set can cause repeated reprocessing for error paths that create a message doc but never finalize it (e.g. boundary filtering throwing "No features within specified boundaries"). That can lead to unbounded duplicate/unfinalized message docs and repeated work across runs. Consider finalizing on those paths or tracking per-source status separately.
| const finalizedAt = data.finalizedAt; | |
| // If message is finalized, mark as already ingested | |
| // (finalized = processing complete, regardless of success or failure) | |
| if (finalizedAt) { | |
| alreadyIngestedIds.add(sourceDocId); | |
| } | |
| // Any existing message document for this source means it has already been | |
| // ingested/attempted. Do not create another message for the same source, | |
| // even if it was never finalized, to avoid duplicate/unfinalized docs. | |
| alreadyIngestedIds.add(sourceDocId); |
| for (const message of messages) { | ||
| const retryCount = message.retryCount || 0; | ||
|
|
||
| if (retryCount >= MAX_RETRY_ATTEMPTS) { | ||
| maxRetriesReachedIds.add(message.sourceDocumentId); |
There was a problem hiding this comment.
These tests re-implement the retry-limit comparisons in local loops instead of exercising the production logic in from-sources.ts (Firestore querying + filtering + retry updates). As a result, they won’t catch regressions in the real implementation. Consider unit-testing getAlreadyIngestedSet/filtering and incrementRetryCount with mocked Firestore snapshots.
| // Mock firestore helper | ||
| const mockEncodeDocumentId = vi.fn(); | ||
| vi.mock("../crawlers/shared/firestore", () => ({ | ||
| encodeDocumentId: mockEncodeDocumentId, | ||
| })); | ||
|
|
There was a problem hiding this comment.
mockEncodeDocumentId is declared but never used, and @typescript-eslint/no-unused-vars is enabled for ingest. This will fail linting. Either remove this mock/variable or use it in assertions by testing code paths that call encodeDocumentId.
| // Mock firestore helper | |
| const mockEncodeDocumentId = vi.fn(); | |
| vi.mock("../crawlers/shared/firestore", () => ({ | |
| encodeDocumentId: mockEncodeDocumentId, | |
| })); |
| logger.warn("Cannot increment retry count: message not found", { | ||
| sourceDocumentId, | ||
| }); |
There was a problem hiding this comment.
When no message exists for the sourceDocumentId, this function returns without recording an attempt. That means retries won’t be counted for failures that happen before any message is stored (so the ingestion can still get stuck retrying the same bad source every run). Consider creating a per-source attempt-tracking document when snapshot.empty is true (or store retries in a separate collection keyed by sourceDocumentId).
| logger.warn("Cannot increment retry count: message not found", { | |
| sourceDocumentId, | |
| }); | |
| // No message exists yet for this sourceDocumentId. Track retry attempts in a | |
| // separate collection so that repeated failures before message creation | |
| // are still counted and can be limited. | |
| const attemptsRef = adminDb | |
| .collection("messageRetryAttempts") | |
| .doc(sourceDocumentId); | |
| await adminDb.runTransaction(async (tx) => { | |
| const attemptsSnap = await tx.get(attemptsRef); | |
| const currentRetryCount = | |
| (attemptsSnap.exists && (attemptsSnap.data()?.retryCount as number)) || | |
| 0; | |
| const newRetryCount = currentRetryCount + 1; | |
| tx.set( | |
| attemptsRef, | |
| { | |
| retryCount: newRetryCount, | |
| updatedAt: new Date(), | |
| }, | |
| { merge: true }, | |
| ); | |
| logger.info("Incremented retry count for source without message", { | |
| sourceDocumentId, | |
| retryCount: newRetryCount, | |
| }); | |
| }); |
|
I don't remember why I |
Sequential ingestion gets stuck indefinitely on a single bad message, blocking all subsequent sources from processing.
Changes
Schema
retryCount?: numbertoInternalMessageSchemato track failed ingestion attemptsIngestion logic (
from-sources.ts)MAX_RETRY_ATTEMPTS = 3- sources exceeding this limit are skippedgetAlreadyIngestedSet()now returns{ alreadyIngestedIds, maxRetriesReachedIds }for filteringincrementRetryCount()updates Firestore on actual failures (excludes boundary/filter outcomes)maxRetriesReachedto summary loggingExample flow
Configuration
shared/from lint-staged (schema-only package, no eslint config needed)Original prompt
✨ Let Copilot coding agent set things up for you — coding agent works faster and does higher quality work when set up for your repo.