-
Notifications
You must be signed in to change notification settings - Fork 6
Implement retry limits to prevent ingestion pipeline blockage #161
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -10,6 +10,12 @@ import { logger } from "@/lib/logger"; | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // Load environment variables | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| dotenv.config({ path: resolve(process.cwd(), ".env.local"), debug: false }); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| /** | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| * Maximum number of retry attempts for a failed message | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| * After this many attempts, the message will be skipped during ingestion | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| */ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| const MAX_RETRY_ATTEMPTS = 3; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| interface SourceDocument { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| url: string; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| datePublished: string; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -41,6 +47,7 @@ interface IngestSummary { | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| outsideBounds: number; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ingested: number; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| alreadyIngested: number; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| maxRetriesReached: number; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| filtered: number; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| failed: number; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| errors: Array<{ url: string; error: string }>; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -122,9 +129,12 @@ async function fetchSources( | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| async function getAlreadyIngestedSet( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| adminDb: Firestore, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| sources: SourceDocument[], | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ): Promise<Set<string>> { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ): Promise<{ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| alreadyIngestedIds: Set<string>; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| maxRetriesReachedIds: Set<string>; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| }> { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if (sources.length === 0) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return new Set(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return { alreadyIngestedIds: new Set(), maxRetriesReachedIds: new Set() }; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| const { encodeDocumentId } = await import("../crawlers/shared/firestore"); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -133,26 +143,81 @@ async function getAlreadyIngestedSet( | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // Firestore 'in' queries support up to 30 values, so we need to batch | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| const BATCH_SIZE = 30; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| const alreadyIngestedIds = new Set<string>(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| const maxRetriesReachedIds = new Set<string>(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| for (let i = 0; i < sourceDocumentIds.length; i += BATCH_SIZE) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| const batch = sourceDocumentIds.slice(i, i + BATCH_SIZE); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| const messagesSnapshot = await adminDb | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| .collection("messages") | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| .where("sourceDocumentId", "in", batch) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| .select("sourceDocumentId") // Only fetch the field we need | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| .select("sourceDocumentId", "retryCount", "finalizedAt") // Fetch fields we need | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| .get(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| for (const doc of messagesSnapshot.docs) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| const sourceDocId = doc.data().sourceDocumentId; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if (sourceDocId) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| const data = doc.data(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| const sourceDocId = data.sourceDocumentId; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if (!sourceDocId) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| continue; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| const retryCount = data.retryCount || 0; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| const finalizedAt = data.finalizedAt; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // If message is finalized, mark as already ingested | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // (finalized = processing complete, regardless of success or failure) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if (finalizedAt) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| alreadyIngestedIds.add(sourceDocId); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // If message has exceeded max retries, skip it | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if (retryCount >= MAX_RETRY_ATTEMPTS) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| maxRetriesReachedIds.add(sourceDocId); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return alreadyIngestedIds; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return { alreadyIngestedIds, maxRetriesReachedIds }; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| /** | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| * Increment retry count for a message with given sourceDocumentId | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| * Creates or updates the message document with incremented retryCount | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| */ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| async function incrementRetryCount( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| adminDb: Firestore, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| sourceDocumentId: string, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ): Promise<void> { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| const messagesRef = adminDb.collection("messages"); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // Find message by sourceDocumentId | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| const snapshot = await messagesRef | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| .where("sourceDocumentId", "==", sourceDocumentId) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| .where("sourceDocumentId", "==", sourceDocumentId) | |
| .where("sourceDocumentId", "==", sourceDocumentId) | |
| .orderBy("__name__") |
Copilot
AI
Feb 27, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When no message exists for the sourceDocumentId, this function returns without recording an attempt. That means retries won’t be counted for failures that happen before any message is stored (so the ingestion can still get stuck retrying the same bad source every run). Consider creating a per-source attempt-tracking document when snapshot.empty is true (or store retries in a separate collection keyed by sourceDocumentId).
| logger.warn("Cannot increment retry count: message not found", { | |
| sourceDocumentId, | |
| }); | |
| // No message exists yet for this sourceDocumentId. Track retry attempts in a | |
| // separate collection so that repeated failures before message creation | |
| // are still counted and can be limited. | |
| const attemptsRef = adminDb | |
| .collection("messageRetryAttempts") | |
| .doc(sourceDocumentId); | |
| await adminDb.runTransaction(async (tx) => { | |
| const attemptsSnap = await tx.get(attemptsRef); | |
| const currentRetryCount = | |
| (attemptsSnap.exists && (attemptsSnap.data()?.retryCount as number)) || | |
| 0; | |
| const newRetryCount = currentRetryCount + 1; | |
| tx.set( | |
| attemptsRef, | |
| { | |
| retryCount: newRetryCount, | |
| updatedAt: new Date(), | |
| }, | |
| { merge: true }, | |
| ); | |
| logger.info("Incremented retry count for source without message", { | |
| sourceDocumentId, | |
| retryCount: newRetryCount, | |
| }); | |
| }); |
Copilot
AI
Feb 27, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This read-modify-write (currentRetryCount + 1) is not atomic and can lose increments if multiple ingestion runs overlap. Prefer an atomic increment (e.g. FieldValue.increment(1) in a transaction) rather than calculating newRetryCount client-side.
| Original file line number | Diff line number | Diff line change | ||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|
| @@ -0,0 +1,171 @@ | ||||||||||||
| import { describe, it, expect, vi, beforeEach } from "vitest"; | ||||||||||||
|
|
||||||||||||
| // Mock firebase-admin | ||||||||||||
| vi.mock("@/lib/firebase-admin", () => ({ | ||||||||||||
| adminDb: {}, | ||||||||||||
| })); | ||||||||||||
|
|
||||||||||||
| // Mock logger | ||||||||||||
| vi.mock("@/lib/logger", () => ({ | ||||||||||||
| logger: { | ||||||||||||
| info: vi.fn(), | ||||||||||||
| warn: vi.fn(), | ||||||||||||
| error: vi.fn(), | ||||||||||||
| }, | ||||||||||||
| })); | ||||||||||||
|
|
||||||||||||
| // Mock firestore helper | ||||||||||||
| const mockEncodeDocumentId = vi.fn(); | ||||||||||||
| vi.mock("../crawlers/shared/firestore", () => ({ | ||||||||||||
| encodeDocumentId: mockEncodeDocumentId, | ||||||||||||
| })); | ||||||||||||
|
|
||||||||||||
|
Comment on lines
+17
to
+22
|
||||||||||||
| // Mock firestore helper | |
| const mockEncodeDocumentId = vi.fn(); | |
| vi.mock("../crawlers/shared/firestore", () => ({ | |
| encodeDocumentId: mockEncodeDocumentId, | |
| })); |
Copilot
AI
Feb 27, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These tests re-implement the retry-limit comparisons in local loops instead of exercising the production logic in from-sources.ts (Firestore querying + filtering + retry updates). As a result, they won’t catch regressions in the real implementation. Consider unit-testing getAlreadyIngestedSet/filtering and incrementRetryCount with mocked Firestore snapshots.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Only treating sources as already ingested when
finalizedAtis set can cause repeated reprocessing for error paths that create a message doc but never finalize it (e.g. boundary filtering throwing "No features within specified boundaries"). That can lead to unbounded duplicate/unfinalized message docs and repeated work across runs. Consider finalizing on those paths or tracking per-source status separately.