Skip to content

Implement retry limits to prevent ingestion pipeline blockage#161

Closed
Copilot wants to merge 3 commits intomainfrom
copilot/limit-message-retry-attempts
Closed

Implement retry limits to prevent ingestion pipeline blockage#161
Copilot wants to merge 3 commits intomainfrom
copilot/limit-message-retry-attempts

Conversation

Copy link
Contributor

Copilot AI commented Feb 12, 2026

Sequential ingestion gets stuck indefinitely on a single bad message, blocking all subsequent sources from processing.

Changes

Schema

  • Added retryCount?: number to InternalMessageSchema to track failed ingestion attempts

Ingestion logic (from-sources.ts)

  • MAX_RETRY_ATTEMPTS = 3 - sources exceeding this limit are skipped
  • getAlreadyIngestedSet() now returns { alreadyIngestedIds, maxRetriesReachedIds } for filtering
  • incrementRetryCount() updates Firestore on actual failures (excludes boundary/filter outcomes)
  • Added maxRetriesReached to summary logging

Example flow

// Attempt 1: FAIL → retryCount = 1
// Attempt 2: FAIL → retryCount = 2  
// Attempt 3: FAIL → retryCount = 3
// Next run: SKIPPED (logged as warning)

Configuration

  • Removed shared/ from lint-staged (schema-only package, no eslint config needed)
Original prompt

This section details on the original issue you should resolve

<issue_title>Ingestion is sequential which means it is blockable</issue_title>
<issue_description>Since the ingestion process is sequential, if a message fails to ingest and fails the process the next time the process runs it will attempt to process the same message. This means that the process may go completely stuck because of one wrong/bad message,

We should only retry one message a limited number of times.

We could try and see what we have stored in the process field to determine how many times a message has been attempted. If such info is not available in plain in process we could define a separate field where we store that.</issue_description>

Comments on the Issue (you are @copilot in this section)


✨ Let Copilot coding agent set things up for you — coding agent works faster and does higher quality work when set up for your repo.

@vercel
Copy link

vercel bot commented Feb 12, 2026

The latest updates on your projects. Learn more about Vercel for GitHub.

1 Skipped Deployment
Project Deployment Actions Updated (UTC)
oborishte-map Skipped Skipped Feb 12, 2026 3:58pm

Co-authored-by: vbuch <5102057+vbuch@users.noreply.github.com>
Co-authored-by: vbuch <5102057+vbuch@users.noreply.github.com>
Copilot AI changed the title [WIP] Fix blockable ingestion by limiting message retry attempts Implement retry limits to prevent ingestion pipeline blockage Feb 12, 2026
Copilot AI requested a review from vbuch February 12, 2026 16:02
@vbuch vbuch requested a review from Copilot February 27, 2026 16:52
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds retry-limit tracking to the ingestion pipeline so a repeatedly failing source doesn’t block sequential ingestion runs indefinitely (#149).

Changes:

  • Extend InternalMessageSchema with retryCount?: number and plumb it through getMessageById.
  • Update from-sources.ts to skip sources once MAX_RETRY_ATTEMPTS is reached, increment retry count on “actual” failures, and include maxRetriesReached in summary logging.
  • Add a Vitest file for retry-limit behavior and remove shared/ from lint-staged tasks.

Reviewed changes

Copilot reviewed 5 out of 5 changed files in this pull request and generated 7 comments.

Show a summary per file
File Description
shared/src/schema/message.schema.ts Adds retryCount to the internal message schema.
ingest/messageIngest/from-sources.ts Implements max retry filtering, retry incrementing, and summary logging.
ingest/messageIngest/db/get-message.ts Includes retryCount in InternalMessage returned by getMessageById.
ingest/messageIngest/retry-limit.test.ts Adds tests related to retry-limit logic.
.lintstagedrc.mjs Stops running eslint in shared/ via lint-staged.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.


// Find message by sourceDocumentId
const snapshot = await messagesRef
.where("sourceDocumentId", "==", sourceDocumentId)
Copy link

Copilot AI Feb 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The where(...).limit(1) query can match multiple message docs for the same sourceDocumentId (filter/split can create multiple messages). Without an orderBy, the chosen doc is not deterministic, so retry tracking can be inconsistent. Prefer updating a single well-defined doc keyed by sourceDocumentId (or a dedicated attempts collection).

Suggested change
.where("sourceDocumentId", "==", sourceDocumentId)
.where("sourceDocumentId", "==", sourceDocumentId)
.orderBy("__name__")

Copilot uses AI. Check for mistakes.
Comment on lines +206 to +210
const currentRetryCount = messageDoc.data().retryCount || 0;
const newRetryCount = currentRetryCount + 1;

await messageDoc.ref.update({
retryCount: newRetryCount,
Copy link

Copilot AI Feb 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This read-modify-write (currentRetryCount + 1) is not atomic and can lose increments if multiple ingestion runs overlap. Prefer an atomic increment (e.g. FieldValue.increment(1) in a transaction) rather than calculating newRetryCount client-side.

Copilot uses AI. Check for mistakes.
Comment on lines 474 to 476
} else if (errorMessage.includes("Message filtering failed")) {
summary.filtered++;
logger.info("Source filtered as irrelevant", { title: source.title });
Copy link

Copilot AI Feb 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This condition checks for "Message filtering failed", but messageIngest throws "Message filter & split failed" on filter/split failure. As written, those cases will be treated as "actual failures" and will increment retryCount (and won’t be counted as filtered). Align the error matching or use a typed error.

Copilot uses AI. Check for mistakes.
Comment on lines +164 to 170
const finalizedAt = data.finalizedAt;

// If message is finalized, mark as already ingested
// (finalized = processing complete, regardless of success or failure)
if (finalizedAt) {
alreadyIngestedIds.add(sourceDocId);
}
Copy link

Copilot AI Feb 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only treating sources as already ingested when finalizedAt is set can cause repeated reprocessing for error paths that create a message doc but never finalize it (e.g. boundary filtering throwing "No features within specified boundaries"). That can lead to unbounded duplicate/unfinalized message docs and repeated work across runs. Consider finalizing on those paths or tracking per-source status separately.

Suggested change
const finalizedAt = data.finalizedAt;
// If message is finalized, mark as already ingested
// (finalized = processing complete, regardless of success or failure)
if (finalizedAt) {
alreadyIngestedIds.add(sourceDocId);
}
// Any existing message document for this source means it has already been
// ingested/attempted. Do not create another message for the same source,
// even if it was never finalized, to avoid duplicate/unfinalized docs.
alreadyIngestedIds.add(sourceDocId);

Copilot uses AI. Check for mistakes.
Comment on lines +42 to +46
for (const message of messages) {
const retryCount = message.retryCount || 0;

if (retryCount >= MAX_RETRY_ATTEMPTS) {
maxRetriesReachedIds.add(message.sourceDocumentId);
Copy link

Copilot AI Feb 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These tests re-implement the retry-limit comparisons in local loops instead of exercising the production logic in from-sources.ts (Firestore querying + filtering + retry updates). As a result, they won’t catch regressions in the real implementation. Consider unit-testing getAlreadyIngestedSet/filtering and incrementRetryCount with mocked Firestore snapshots.

Copilot uses AI. Check for mistakes.
Comment on lines +17 to +22
// Mock firestore helper
const mockEncodeDocumentId = vi.fn();
vi.mock("../crawlers/shared/firestore", () => ({
encodeDocumentId: mockEncodeDocumentId,
}));

Copy link

Copilot AI Feb 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mockEncodeDocumentId is declared but never used, and @typescript-eslint/no-unused-vars is enabled for ingest. This will fail linting. Either remove this mock/variable or use it in assertions by testing code paths that call encodeDocumentId.

Suggested change
// Mock firestore helper
const mockEncodeDocumentId = vi.fn();
vi.mock("../crawlers/shared/firestore", () => ({
encodeDocumentId: mockEncodeDocumentId,
}));

Copilot uses AI. Check for mistakes.
Comment on lines +199 to +201
logger.warn("Cannot increment retry count: message not found", {
sourceDocumentId,
});
Copy link

Copilot AI Feb 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When no message exists for the sourceDocumentId, this function returns without recording an attempt. That means retries won’t be counted for failures that happen before any message is stored (so the ingestion can still get stuck retrying the same bad source every run). Consider creating a per-source attempt-tracking document when snapshot.empty is true (or store retries in a separate collection keyed by sourceDocumentId).

Suggested change
logger.warn("Cannot increment retry count: message not found", {
sourceDocumentId,
});
// No message exists yet for this sourceDocumentId. Track retry attempts in a
// separate collection so that repeated failures before message creation
// are still counted and can be limited.
const attemptsRef = adminDb
.collection("messageRetryAttempts")
.doc(sourceDocumentId);
await adminDb.runTransaction(async (tx) => {
const attemptsSnap = await tx.get(attemptsRef);
const currentRetryCount =
(attemptsSnap.exists && (attemptsSnap.data()?.retryCount as number)) ||
0;
const newRetryCount = currentRetryCount + 1;
tx.set(
attemptsRef,
{
retryCount: newRetryCount,
updatedAt: new Date(),
},
{ merge: true },
);
logger.info("Incremented retry count for source without message", {
sourceDocumentId,
retryCount: newRetryCount,
});
});

Copilot uses AI. Check for mistakes.
@vbuch
Copy link
Owner

vbuch commented Feb 27, 2026

I don't remember why I

@vbuch vbuch closed this Feb 27, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Ingestion is sequential which means it is blockable

3 participants