Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .lintstagedrc.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -39,5 +39,4 @@ function createLintTask(dir, filenames) {
export default {
"web/**/*.{js,jsx,ts,tsx}": (filenames) => createLintTask("web", filenames),
"ingest/**/*.{js,ts}": (filenames) => createLintTask("ingest", filenames),
"shared/**/*.{js,ts}": (filenames) => createLintTask("shared", filenames),
};
1 change: 1 addition & 0 deletions ingest/messageIngest/db/get-message.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,5 +51,6 @@ export async function getMessageById(
: data.ingestErrors,
sourceDocumentId: data.sourceDocumentId,
isRelevant: data.isRelevant,
retryCount: data.retryCount,
};
}
116 changes: 104 additions & 12 deletions ingest/messageIngest/from-sources.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,12 @@ import { logger } from "@/lib/logger";
// Load environment variables
dotenv.config({ path: resolve(process.cwd(), ".env.local"), debug: false });

/**
* Maximum number of retry attempts for a failed message
* After this many attempts, the message will be skipped during ingestion
*/
const MAX_RETRY_ATTEMPTS = 3;

interface SourceDocument {
url: string;
datePublished: string;
Expand Down Expand Up @@ -41,6 +47,7 @@ interface IngestSummary {
outsideBounds: number;
ingested: number;
alreadyIngested: number;
maxRetriesReached: number;
filtered: number;
failed: number;
errors: Array<{ url: string; error: string }>;
Expand Down Expand Up @@ -122,9 +129,12 @@ async function fetchSources(
async function getAlreadyIngestedSet(
adminDb: Firestore,
sources: SourceDocument[],
): Promise<Set<string>> {
): Promise<{
alreadyIngestedIds: Set<string>;
maxRetriesReachedIds: Set<string>;
}> {
if (sources.length === 0) {
return new Set();
return { alreadyIngestedIds: new Set(), maxRetriesReachedIds: new Set() };
}

const { encodeDocumentId } = await import("../crawlers/shared/firestore");
Expand All @@ -133,26 +143,81 @@ async function getAlreadyIngestedSet(
// Firestore 'in' queries support up to 30 values, so we need to batch
const BATCH_SIZE = 30;
const alreadyIngestedIds = new Set<string>();
const maxRetriesReachedIds = new Set<string>();

for (let i = 0; i < sourceDocumentIds.length; i += BATCH_SIZE) {
const batch = sourceDocumentIds.slice(i, i + BATCH_SIZE);
const messagesSnapshot = await adminDb
.collection("messages")
.where("sourceDocumentId", "in", batch)
.select("sourceDocumentId") // Only fetch the field we need
.select("sourceDocumentId", "retryCount", "finalizedAt") // Fetch fields we need
.get();

for (const doc of messagesSnapshot.docs) {
const sourceDocId = doc.data().sourceDocumentId;
if (sourceDocId) {
const data = doc.data();
const sourceDocId = data.sourceDocumentId;
if (!sourceDocId) {
continue;
}

const retryCount = data.retryCount || 0;
const finalizedAt = data.finalizedAt;

// If message is finalized, mark as already ingested
// (finalized = processing complete, regardless of success or failure)
if (finalizedAt) {
alreadyIngestedIds.add(sourceDocId);
}
Comment on lines +164 to 170
Copy link

Copilot AI Feb 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only treating sources as already ingested when finalizedAt is set can cause repeated reprocessing for error paths that create a message doc but never finalize it (e.g. boundary filtering throwing "No features within specified boundaries"). That can lead to unbounded duplicate/unfinalized message docs and repeated work across runs. Consider finalizing on those paths or tracking per-source status separately.

Suggested change
const finalizedAt = data.finalizedAt;
// If message is finalized, mark as already ingested
// (finalized = processing complete, regardless of success or failure)
if (finalizedAt) {
alreadyIngestedIds.add(sourceDocId);
}
// Any existing message document for this source means it has already been
// ingested/attempted. Do not create another message for the same source,
// even if it was never finalized, to avoid duplicate/unfinalized docs.
alreadyIngestedIds.add(sourceDocId);

Copilot uses AI. Check for mistakes.

// If message has exceeded max retries, skip it
if (retryCount >= MAX_RETRY_ATTEMPTS) {
maxRetriesReachedIds.add(sourceDocId);
}
}
}

return alreadyIngestedIds;
return { alreadyIngestedIds, maxRetriesReachedIds };
}

/**
* Increment retry count for a message with given sourceDocumentId
* Creates or updates the message document with incremented retryCount
*/
async function incrementRetryCount(
adminDb: Firestore,
sourceDocumentId: string,
): Promise<void> {
const messagesRef = adminDb.collection("messages");

// Find message by sourceDocumentId
const snapshot = await messagesRef
.where("sourceDocumentId", "==", sourceDocumentId)
Copy link

Copilot AI Feb 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The where(...).limit(1) query can match multiple message docs for the same sourceDocumentId (filter/split can create multiple messages). Without an orderBy, the chosen doc is not deterministic, so retry tracking can be inconsistent. Prefer updating a single well-defined doc keyed by sourceDocumentId (or a dedicated attempts collection).

Suggested change
.where("sourceDocumentId", "==", sourceDocumentId)
.where("sourceDocumentId", "==", sourceDocumentId)
.orderBy("__name__")

Copilot uses AI. Check for mistakes.
.limit(1)
.get();

if (snapshot.empty) {
logger.warn("Cannot increment retry count: message not found", {
sourceDocumentId,
});
Comment on lines +199 to +201
Copy link

Copilot AI Feb 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When no message exists for the sourceDocumentId, this function returns without recording an attempt. That means retries won’t be counted for failures that happen before any message is stored (so the ingestion can still get stuck retrying the same bad source every run). Consider creating a per-source attempt-tracking document when snapshot.empty is true (or store retries in a separate collection keyed by sourceDocumentId).

Suggested change
logger.warn("Cannot increment retry count: message not found", {
sourceDocumentId,
});
// No message exists yet for this sourceDocumentId. Track retry attempts in a
// separate collection so that repeated failures before message creation
// are still counted and can be limited.
const attemptsRef = adminDb
.collection("messageRetryAttempts")
.doc(sourceDocumentId);
await adminDb.runTransaction(async (tx) => {
const attemptsSnap = await tx.get(attemptsRef);
const currentRetryCount =
(attemptsSnap.exists && (attemptsSnap.data()?.retryCount as number)) ||
0;
const newRetryCount = currentRetryCount + 1;
tx.set(
attemptsRef,
{
retryCount: newRetryCount,
updatedAt: new Date(),
},
{ merge: true },
);
logger.info("Incremented retry count for source without message", {
sourceDocumentId,
retryCount: newRetryCount,
});
});

Copilot uses AI. Check for mistakes.
return;
}

const messageDoc = snapshot.docs[0];
const currentRetryCount = messageDoc.data().retryCount || 0;
const newRetryCount = currentRetryCount + 1;

await messageDoc.ref.update({
retryCount: newRetryCount,
Comment on lines +206 to +210
Copy link

Copilot AI Feb 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This read-modify-write (currentRetryCount + 1) is not atomic and can lose increments if multiple ingestion runs overlap. Prefer an atomic increment (e.g. FieldValue.increment(1) in a transaction) rather than calculating newRetryCount client-side.

Copilot uses AI. Check for mistakes.
});

logger.info("Incremented retry count for message", {
messageId: messageDoc.id,
sourceDocumentId,
retryCount: newRetryCount,
});
}


async function ingestSource(
source: SourceDocument,
adminDb: Firestore,
Expand Down Expand Up @@ -341,25 +406,43 @@ export async function ingest(

// Batch-check which sources are already ingested (avoids 1371 sequential DB queries!)
const { encodeDocumentId } = await import("../crawlers/shared/firestore");
const alreadyIngestedSet = await getAlreadyIngestedSet(adminDb, withinBounds);
const sourcesToIngest = withinBounds.filter(
(s) => !alreadyIngestedSet.has(encodeDocumentId(s.url)),
);
const alreadyIngestedCount = withinBounds.length - sourcesToIngest.length;
const { alreadyIngestedIds, maxRetriesReachedIds } =
await getAlreadyIngestedSet(adminDb, withinBounds);

const sourcesToIngest = withinBounds.filter((s) => {
const docId = encodeDocumentId(s.url);
return !alreadyIngestedIds.has(docId) && !maxRetriesReachedIds.has(docId);
});

const alreadyIngestedCount = withinBounds.filter((s) =>
alreadyIngestedIds.has(encodeDocumentId(s.url)),
).length;

const maxRetriesReachedCount = withinBounds.filter((s) =>
maxRetriesReachedIds.has(encodeDocumentId(s.url)),
).length;

if (alreadyIngestedCount > 0) {
logger.info("Skipping already-ingested sources", {
count: alreadyIngestedCount,
});
}

if (maxRetriesReachedCount > 0) {
logger.warn("Skipping sources that exceeded max retry attempts", {
count: maxRetriesReachedCount,
maxRetries: MAX_RETRY_ATTEMPTS,
});
}

const summary: IngestSummary = {
total: allSources.length,
tooOld,
withinBounds: withinBounds.length,
outsideBounds,
ingested: 0,
alreadyIngested: alreadyIngestedCount,
maxRetriesReached: maxRetriesReachedCount,
filtered: 0,
failed: 0,
errors: [],
Expand Down Expand Up @@ -392,6 +475,9 @@ export async function ingest(
summary.filtered++;
logger.info("Source filtered as irrelevant", { title: source.title });
} else {
// Increment retry count for actual failures (not boundary/filter issues)
await incrementRetryCount(adminDb, sourceDocumentId);

summary.errors.push({ url: source.url, error: errorMessage });
logger.error("Failed to ingest source", {
title: source.title,
Expand Down Expand Up @@ -420,10 +506,16 @@ function logSummary(summary: IngestSummary, dryRun: boolean): void {
summaryData.outsideBounds = summary.outsideBounds;
}
if (dryRun) {
summaryData.wouldIngest = summary.withinBounds - summary.alreadyIngested;
summaryData.wouldIngest =
summary.withinBounds -
summary.alreadyIngested -
summary.maxRetriesReached;
} else {
summaryData.ingested = summary.ingested;
summaryData.alreadyIngested = summary.alreadyIngested;
if (summary.maxRetriesReached > 0) {
summaryData.maxRetriesReached = summary.maxRetriesReached;
}
if (summary.filtered > 0) {
summaryData.filtered = summary.filtered;
summaryData.filterPercentage = (
Expand Down
171 changes: 171 additions & 0 deletions ingest/messageIngest/retry-limit.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
import { describe, it, expect, vi, beforeEach } from "vitest";

// Mock firebase-admin
vi.mock("@/lib/firebase-admin", () => ({
adminDb: {},
}));

// Mock logger
vi.mock("@/lib/logger", () => ({
logger: {
info: vi.fn(),
warn: vi.fn(),
error: vi.fn(),
},
}));

// Mock firestore helper
const mockEncodeDocumentId = vi.fn();
vi.mock("../crawlers/shared/firestore", () => ({
encodeDocumentId: mockEncodeDocumentId,
}));

Comment on lines +17 to +22
Copy link

Copilot AI Feb 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mockEncodeDocumentId is declared but never used, and @typescript-eslint/no-unused-vars is enabled for ingest. This will fail linting. Either remove this mock/variable or use it in assertions by testing code paths that call encodeDocumentId.

Suggested change
// Mock firestore helper
const mockEncodeDocumentId = vi.fn();
vi.mock("../crawlers/shared/firestore", () => ({
encodeDocumentId: mockEncodeDocumentId,
}));

Copilot uses AI. Check for mistakes.
describe("Retry limit logic", () => {
beforeEach(() => {
vi.clearAllMocks();
});

it("should skip messages that have exceeded max retry attempts", async () => {
const MAX_RETRY_ATTEMPTS = 3;

// Mock messages with various retry counts
const messages = [
{ sourceDocumentId: "doc1", retryCount: 0, finalizedAt: null },
{ sourceDocumentId: "doc2", retryCount: 1, finalizedAt: null },
{ sourceDocumentId: "doc3", retryCount: 2, finalizedAt: null },
{ sourceDocumentId: "doc4", retryCount: 3, finalizedAt: null }, // Should be skipped
{ sourceDocumentId: "doc5", retryCount: 5, finalizedAt: null }, // Should be skipped
];

const maxRetriesReachedIds = new Set<string>();

for (const message of messages) {
const retryCount = message.retryCount || 0;

if (retryCount >= MAX_RETRY_ATTEMPTS) {
maxRetriesReachedIds.add(message.sourceDocumentId);
Comment on lines +42 to +46
Copy link

Copilot AI Feb 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These tests re-implement the retry-limit comparisons in local loops instead of exercising the production logic in from-sources.ts (Firestore querying + filtering + retry updates). As a result, they won’t catch regressions in the real implementation. Consider unit-testing getAlreadyIngestedSet/filtering and incrementRetryCount with mocked Firestore snapshots.

Copilot uses AI. Check for mistakes.
}
}

expect(maxRetriesReachedIds.size).toBe(2);
expect(maxRetriesReachedIds.has("doc4")).toBe(true);
expect(maxRetriesReachedIds.has("doc5")).toBe(true);
expect(maxRetriesReachedIds.has("doc1")).toBe(false);
expect(maxRetriesReachedIds.has("doc2")).toBe(false);
expect(maxRetriesReachedIds.has("doc3")).toBe(false);
});

it("should treat missing retryCount as 0", async () => {
const MAX_RETRY_ATTEMPTS = 3;

const messages = [
{ sourceDocumentId: "doc1", finalizedAt: null }, // No retryCount field
{ sourceDocumentId: "doc2", retryCount: undefined, finalizedAt: null },
{ sourceDocumentId: "doc3", retryCount: null, finalizedAt: null },
];

const maxRetriesReachedIds = new Set<string>();

for (const message of messages) {
const retryCount = message.retryCount || 0;

if (retryCount >= MAX_RETRY_ATTEMPTS) {
maxRetriesReachedIds.add(message.sourceDocumentId);
}
}

expect(maxRetriesReachedIds.size).toBe(0);
});

it("should mark messages as already ingested if finalized", async () => {
const messages = [
{
sourceDocumentId: "doc1",
retryCount: 0,
finalizedAt: new Date("2024-01-15"),
},
{
sourceDocumentId: "doc2",
retryCount: 1,
finalizedAt: new Date("2024-01-16"),
},
{ sourceDocumentId: "doc3", retryCount: 2, finalizedAt: null },
{ sourceDocumentId: "doc4", retryCount: 3, finalizedAt: null },
];

const alreadyIngestedIds = new Set<string>();

for (const message of messages) {
const finalizedAt = message.finalizedAt;

if (finalizedAt) {
alreadyIngestedIds.add(message.sourceDocumentId);
}
}

expect(alreadyIngestedIds.size).toBe(2);
expect(alreadyIngestedIds.has("doc1")).toBe(true);
expect(alreadyIngestedIds.has("doc2")).toBe(true);
expect(alreadyIngestedIds.has("doc3")).toBe(false);
expect(alreadyIngestedIds.has("doc4")).toBe(false);
});

it("should allow filtering both finalized and max-retry messages", async () => {
const MAX_RETRY_ATTEMPTS = 3;

const messages = [
{
sourceDocumentId: "doc1",
retryCount: 0,
finalizedAt: new Date("2024-01-15"),
}, // Already ingested
{
sourceDocumentId: "doc2",
retryCount: 1,
finalizedAt: new Date("2024-01-16"),
}, // Already ingested
{ sourceDocumentId: "doc3", retryCount: 2, finalizedAt: null }, // Can retry
{ sourceDocumentId: "doc4", retryCount: 3, finalizedAt: null }, // Max retries reached
{ sourceDocumentId: "doc5", retryCount: 5, finalizedAt: null }, // Max retries reached
];

const alreadyIngestedIds = new Set<string>();
const maxRetriesReachedIds = new Set<string>();

for (const message of messages) {
const retryCount = message.retryCount || 0;
const finalizedAt = message.finalizedAt;

if (finalizedAt) {
alreadyIngestedIds.add(message.sourceDocumentId);
}

if (retryCount >= MAX_RETRY_ATTEMPTS) {
maxRetriesReachedIds.add(message.sourceDocumentId);
}
}

// Filter sources to ingest
const sources = messages.map((m) => m.sourceDocumentId);
const sourcesToIngest = sources.filter(
(docId) =>
!alreadyIngestedIds.has(docId) && !maxRetriesReachedIds.has(docId),
);

expect(sourcesToIngest.length).toBe(1);
expect(sourcesToIngest[0]).toBe("doc3");
expect(alreadyIngestedIds.size).toBe(2);
expect(maxRetriesReachedIds.size).toBe(2);
});

it("should increment retry count correctly", async () => {
let retryCount = 0;

// Simulate 5 failures
for (let i = 0; i < 5; i++) {
retryCount = retryCount + 1;
}

expect(retryCount).toBe(5);
});
});
1 change: 1 addition & 0 deletions shared/src/schema/message.schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ export const InternalMessageSchema = MessageSchema.extend({
ingestErrors: z.array(IngestErrorSchema).optional(),
sourceDocumentId: z.string().optional(),
isRelevant: z.boolean().optional(),
retryCount: z.number().optional(),
});

export type InternalMessage = z.infer<typeof InternalMessageSchema>;