Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
117 changes: 117 additions & 0 deletions yarn-project/p2p/src/mem_pools/tx_pool_v2/tx_pool_v2.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5269,4 +5269,121 @@ describe('TxPoolV2', () => {
expect(await pool.getTxStatus(tx.getTxHash())).toBeUndefined();
});
});

describe('persistence consistency', () => {
it('pool state is consistent across restart when getTxEffect throws for a later tx in batch', async () => {
const testStore = await openTmpStore('p2p-comeback-gettxeffect');
const testArchiveStore = await openTmpStore('archive-comeback-gettxeffect');

try {
const pool1 = new AztecKVTxPoolV2(testStore, testArchiveStore, {
l2BlockSource: mockL2BlockSource,
worldStateSynchronizer: mockWorldState,
createTxValidator: () => Promise.resolve(alwaysValidValidator),
});
await pool1.start();

// Add tx1 (fee=5) with a nullifier
const tx1 = await mockPublicTx(1, 5);
await pool1.addPendingTxs([tx1]);
expect(await pool1.getTxStatus(tx1.getTxHash())).toBe('pending');

// Create tx2 (same nullifier as tx1, higher fee — will evict tx1) and tx3 (different nullifiers)
const tx2 = await mockPublicTx(2, 10);
setNullifier(tx2, 0, getNullifier(tx1, 0));
const tx3 = await mockPublicTx(3, 1);

// Mock getTxEffect to throw for tx3 (simulates L2BlockSource I/O failure)
const tx3HashStr = tx3.getTxHash().toString();
mockL2BlockSource.getTxEffect.mockImplementation((txHash: TxHash) => {
if (txHash.toString() === tx3HashStr) {
throw new Error('Simulated L2BlockSource failure');
}
return Promise.resolve(undefined);
});

// Batch fails because tx3's getMinedBlockId throws
await expect(pool1.addPendingTxs([tx2, tx3])).rejects.toThrow('Simulated L2BlockSource failure');

const statusBeforeRestart = await pool1.getTxStatus(tx1.getTxHash());

await pool1.stop();
mockL2BlockSource.getTxEffect.mockResolvedValue(undefined);

const pool2 = new AztecKVTxPoolV2(testStore, testArchiveStore, {
l2BlockSource: mockL2BlockSource,
worldStateSynchronizer: mockWorldState,
createTxValidator: () => Promise.resolve(alwaysValidValidator),
});
await pool2.start();

const statusAfterRestart = await pool2.getTxStatus(tx1.getTxHash());
expect(statusAfterRestart).toBe(statusBeforeRestart);

await pool2.stop();
} finally {
mockL2BlockSource.getTxEffect.mockResolvedValue(undefined);
await testStore.delete();
await testArchiveStore.delete();
}
});

it('pool state is consistent across restart when validateMeta throws for a later tx in batch', async () => {
const testStore = await openTmpStore('p2p-comeback-validatemeta');
const testArchiveStore = await openTmpStore('archive-comeback-validatemeta');

try {
// Create a validator that throws (not rejects) for tx3
let tx3HashStr = '';
const throwingValidator: TxValidator<TxMetaData> = {
validateTx: (meta: TxMetaData) => {
if (meta.txHash === tx3HashStr) {
throw new Error('Simulated validator crash');
}
return Promise.resolve({ result: 'valid' });
},
};

const pool1 = new AztecKVTxPoolV2(testStore, testArchiveStore, {
l2BlockSource: mockL2BlockSource,
worldStateSynchronizer: mockWorldState,
createTxValidator: () => Promise.resolve(throwingValidator),
});
await pool1.start();

// Add tx1 (fee=5) with a nullifier
const tx1 = await mockPublicTx(1, 5);
await pool1.addPendingTxs([tx1]);
expect(await pool1.getTxStatus(tx1.getTxHash())).toBe('pending');

// Create tx2 (same nullifier as tx1, higher fee — will evict tx1) and tx3 (different nullifiers)
const tx2 = await mockPublicTx(2, 10);
setNullifier(tx2, 0, getNullifier(tx1, 0));
const tx3 = await mockPublicTx(3, 1);
tx3HashStr = tx3.getTxHash().toString();

// Batch fails because tx3's validateMeta throws
await expect(pool1.addPendingTxs([tx2, tx3])).rejects.toThrow('Simulated validator crash');

const statusBeforeRestart = await pool1.getTxStatus(tx1.getTxHash());

await pool1.stop();

const pool2 = new AztecKVTxPoolV2(testStore, testArchiveStore, {
l2BlockSource: mockL2BlockSource,
worldStateSynchronizer: mockWorldState,
createTxValidator: () => Promise.resolve(alwaysValidValidator),
});
await pool2.start();

const statusAfterRestart = await pool2.getTxStatus(tx1.getTxHash());
expect(statusAfterRestart).toBe(statusBeforeRestart);

await pool2.stop();
} finally {
await testStore.delete();
await testArchiveStore.delete();
}
});
});
});
60 changes: 40 additions & 20 deletions yarn-project/p2p/src/mem_pools/tx_pool_v2/tx_pool_v2_impl.ts
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,30 @@ export class TxPoolV2Impl {
const errors = new Map<string, TxPoolRejectionError>();
const acceptedPending = new Set<string>();

// Phase 1: Pre-compute all throwable I/O outside the transaction.
// If any pre-computation throws, the entire call fails before mutations happen.
const precomputed = new Map<string, { meta: TxMetaData; minedBlockId: L2BlockId | undefined; isValid: boolean }>();

const validator = await this.#createTxValidator();

for (const tx of txs) {
const txHash = tx.getTxHash();
const txHashStr = txHash.toString();

const meta = await buildTxMetaData(tx);
const minedBlockId = await this.#getMinedBlockId(txHash);

// Validate non-mined txs (mined and pre-protected txs bypass validation inside the transaction)
let isValid = true;
if (!minedBlockId) {
isValid = await this.#validateMeta(meta, validator);
}

precomputed.set(txHashStr, { meta, minedBlockId, isValid });
}

// Phase 2: Apply mutations inside the transaction using only pre-computed results,
// in-memory reads, and buffered DB writes. Nothing here can throw an unhandled exception.
const poolAccess = this.#createPreAddPoolAccess();
const preAddContext: PreAddContext | undefined =
opts.feeComparisonOnly !== undefined ? { feeComparisonOnly: opts.feeComparisonOnly } : undefined;
Expand All @@ -202,22 +226,25 @@ export class TxPoolV2Impl {
continue;
}

// Check mined status first (applies to all paths)
const minedBlockId = await this.#getMinedBlockId(txHash);
const { meta, minedBlockId, isValid } = precomputed.get(txHashStr)!;
const preProtectedSlot = this.#indices.getProtectionSlot(txHashStr);

if (minedBlockId) {
// Already mined - add directly (protection already set if pre-protected)
await this.#addTx(tx, { mined: minedBlockId }, opts);
await this.#addTx(tx, { mined: minedBlockId }, opts, meta);
accepted.push(txHash);
} else if (preProtectedSlot !== undefined) {
// Pre-protected and not mined - add as protected (bypass validation)
await this.#addTx(tx, { protected: preProtectedSlot }, opts);
await this.#addTx(tx, { protected: preProtectedSlot }, opts, meta);
accepted.push(txHash);
} else if (!isValid) {
// Failed pre-computed validation
rejected.push(txHash);
} else {
// Regular pending tx - validate and run pre-add rules
// Regular pending tx - run pre-add rules using pre-computed metadata
const result = await this.#tryAddRegularPendingTx(
tx,
meta,
opts,
poolAccess,
acceptedPending,
Expand All @@ -227,8 +254,6 @@ export class TxPoolV2Impl {
);
if (result.status === 'accepted') {
acceptedPending.add(txHashStr);
} else if (result.status === 'rejected') {
rejected.push(txHash);
} else {
ignored.push(txHash);
}
Expand Down Expand Up @@ -259,27 +284,21 @@ export class TxPoolV2Impl {
return { accepted, ignored, rejected, ...(errors.size > 0 ? { errors } : {}) };
}

/** Validates and adds a regular pending tx. Returns status. */
/** Adds a validated pending tx, running pre-add rules and evicting conflicts. */
async #tryAddRegularPendingTx(
tx: Tx,
precomputedMeta: TxMetaData,
opts: { source?: string },
poolAccess: PreAddPoolAccess,
acceptedPending: Set<string>,
ignored: TxHash[],
errors: Map<string, TxPoolRejectionError>,
preAddContext?: PreAddContext,
): Promise<{ status: 'accepted' | 'ignored' | 'rejected' }> {
const txHash = tx.getTxHash();
const txHashStr = txHash.toString();

// Build metadata and validate using metadata
const meta = await buildTxMetaData(tx);
if (!(await this.#validateMeta(meta))) {
return { status: 'rejected' };
}
): Promise<{ status: 'accepted' | 'ignored' }> {
const txHashStr = tx.getTxHash().toString();

// Run pre-add rules
const preAddResult = await this.#evictionManager.runPreAddRules(meta, poolAccess, preAddContext);
const preAddResult = await this.#evictionManager.runPreAddRules(precomputedMeta, poolAccess, preAddContext);

if (preAddResult.shouldIgnore) {
this.#log.debug(`Ignoring tx ${txHashStr}: ${preAddResult.reason?.message ?? 'unknown reason'}`);
Expand Down Expand Up @@ -323,7 +342,7 @@ export class TxPoolV2Impl {
}

// Add the transaction
await this.#addTx(tx, 'pending', opts);
await this.#addTx(tx, 'pending', opts, precomputedMeta);
return { status: 'accepted' };
}

Expand Down Expand Up @@ -765,9 +784,10 @@ export class TxPoolV2Impl {
tx: Tx,
state: 'pending' | { protected: SlotNumber } | { mined: L2BlockId },
opts: { source?: string } = {},
precomputedMeta?: TxMetaData,
): Promise<TxMetaData> {
const txHashStr = tx.getTxHash().toString();
const meta = await buildTxMetaData(tx);
const meta = precomputedMeta ?? (await buildTxMetaData(tx));
meta.receivedAt = this.#dateProvider.now();

await this.#txsDB.set(txHashStr, tx.toBuffer());
Expand Down
Loading