diff --git a/yarn-project/p2p/src/mem_pools/tx_pool_v2/tx_pool_v2.test.ts b/yarn-project/p2p/src/mem_pools/tx_pool_v2/tx_pool_v2.test.ts index aa0b9af8affa..f778f6bb3f14 100644 --- a/yarn-project/p2p/src/mem_pools/tx_pool_v2/tx_pool_v2.test.ts +++ b/yarn-project/p2p/src/mem_pools/tx_pool_v2/tx_pool_v2.test.ts @@ -5269,4 +5269,121 @@ describe('TxPoolV2', () => { expect(await pool.getTxStatus(tx.getTxHash())).toBeUndefined(); }); }); + + describe('persistence consistency', () => { + it('pool state is consistent across restart when getTxEffect throws for a later tx in batch', async () => { + const testStore = await openTmpStore('p2p-comeback-gettxeffect'); + const testArchiveStore = await openTmpStore('archive-comeback-gettxeffect'); + + try { + const pool1 = new AztecKVTxPoolV2(testStore, testArchiveStore, { + l2BlockSource: mockL2BlockSource, + worldStateSynchronizer: mockWorldState, + createTxValidator: () => Promise.resolve(alwaysValidValidator), + }); + await pool1.start(); + + // Add tx1 (fee=5) with a nullifier + const tx1 = await mockPublicTx(1, 5); + await pool1.addPendingTxs([tx1]); + expect(await pool1.getTxStatus(tx1.getTxHash())).toBe('pending'); + + // Create tx2 (same nullifier as tx1, higher fee — will evict tx1) and tx3 (different nullifiers) + const tx2 = await mockPublicTx(2, 10); + setNullifier(tx2, 0, getNullifier(tx1, 0)); + const tx3 = await mockPublicTx(3, 1); + + // Mock getTxEffect to throw for tx3 (simulates L2BlockSource I/O failure) + const tx3HashStr = tx3.getTxHash().toString(); + mockL2BlockSource.getTxEffect.mockImplementation((txHash: TxHash) => { + if (txHash.toString() === tx3HashStr) { + throw new Error('Simulated L2BlockSource failure'); + } + return Promise.resolve(undefined); + }); + + // Batch fails because tx3's getMinedBlockId throws + await expect(pool1.addPendingTxs([tx2, tx3])).rejects.toThrow('Simulated L2BlockSource failure'); + + const statusBeforeRestart = await pool1.getTxStatus(tx1.getTxHash()); + + await pool1.stop(); + mockL2BlockSource.getTxEffect.mockResolvedValue(undefined); + + const pool2 = new AztecKVTxPoolV2(testStore, testArchiveStore, { + l2BlockSource: mockL2BlockSource, + worldStateSynchronizer: mockWorldState, + createTxValidator: () => Promise.resolve(alwaysValidValidator), + }); + await pool2.start(); + + const statusAfterRestart = await pool2.getTxStatus(tx1.getTxHash()); + expect(statusAfterRestart).toBe(statusBeforeRestart); + + await pool2.stop(); + } finally { + mockL2BlockSource.getTxEffect.mockResolvedValue(undefined); + await testStore.delete(); + await testArchiveStore.delete(); + } + }); + + it('pool state is consistent across restart when validateMeta throws for a later tx in batch', async () => { + const testStore = await openTmpStore('p2p-comeback-validatemeta'); + const testArchiveStore = await openTmpStore('archive-comeback-validatemeta'); + + try { + // Create a validator that throws (not rejects) for tx3 + let tx3HashStr = ''; + const throwingValidator: TxValidator = { + validateTx: (meta: TxMetaData) => { + if (meta.txHash === tx3HashStr) { + throw new Error('Simulated validator crash'); + } + return Promise.resolve({ result: 'valid' }); + }, + }; + + const pool1 = new AztecKVTxPoolV2(testStore, testArchiveStore, { + l2BlockSource: mockL2BlockSource, + worldStateSynchronizer: mockWorldState, + createTxValidator: () => Promise.resolve(throwingValidator), + }); + await pool1.start(); + + // Add tx1 (fee=5) with a nullifier + const tx1 = await mockPublicTx(1, 5); + await pool1.addPendingTxs([tx1]); + expect(await pool1.getTxStatus(tx1.getTxHash())).toBe('pending'); + + // Create tx2 (same nullifier as tx1, higher fee — will evict tx1) and tx3 (different nullifiers) + const tx2 = await mockPublicTx(2, 10); + setNullifier(tx2, 0, getNullifier(tx1, 0)); + const tx3 = await mockPublicTx(3, 1); + tx3HashStr = tx3.getTxHash().toString(); + + // Batch fails because tx3's validateMeta throws + await expect(pool1.addPendingTxs([tx2, tx3])).rejects.toThrow('Simulated validator crash'); + + const statusBeforeRestart = await pool1.getTxStatus(tx1.getTxHash()); + + await pool1.stop(); + + const pool2 = new AztecKVTxPoolV2(testStore, testArchiveStore, { + l2BlockSource: mockL2BlockSource, + worldStateSynchronizer: mockWorldState, + createTxValidator: () => Promise.resolve(alwaysValidValidator), + }); + await pool2.start(); + + const statusAfterRestart = await pool2.getTxStatus(tx1.getTxHash()); + expect(statusAfterRestart).toBe(statusBeforeRestart); + + await pool2.stop(); + } finally { + await testStore.delete(); + await testArchiveStore.delete(); + } + }); + }); }); diff --git a/yarn-project/p2p/src/mem_pools/tx_pool_v2/tx_pool_v2_impl.ts b/yarn-project/p2p/src/mem_pools/tx_pool_v2/tx_pool_v2_impl.ts index 94e4f224dc94..88f6e887b9a8 100644 --- a/yarn-project/p2p/src/mem_pools/tx_pool_v2/tx_pool_v2_impl.ts +++ b/yarn-project/p2p/src/mem_pools/tx_pool_v2/tx_pool_v2_impl.ts @@ -187,6 +187,30 @@ export class TxPoolV2Impl { const errors = new Map(); const acceptedPending = new Set(); + // Phase 1: Pre-compute all throwable I/O outside the transaction. + // If any pre-computation throws, the entire call fails before mutations happen. + const precomputed = new Map(); + + const validator = await this.#createTxValidator(); + + for (const tx of txs) { + const txHash = tx.getTxHash(); + const txHashStr = txHash.toString(); + + const meta = await buildTxMetaData(tx); + const minedBlockId = await this.#getMinedBlockId(txHash); + + // Validate non-mined txs (mined and pre-protected txs bypass validation inside the transaction) + let isValid = true; + if (!minedBlockId) { + isValid = await this.#validateMeta(meta, validator); + } + + precomputed.set(txHashStr, { meta, minedBlockId, isValid }); + } + + // Phase 2: Apply mutations inside the transaction using only pre-computed results, + // in-memory reads, and buffered DB writes. Nothing here can throw an unhandled exception. const poolAccess = this.#createPreAddPoolAccess(); const preAddContext: PreAddContext | undefined = opts.feeComparisonOnly !== undefined ? { feeComparisonOnly: opts.feeComparisonOnly } : undefined; @@ -202,22 +226,25 @@ export class TxPoolV2Impl { continue; } - // Check mined status first (applies to all paths) - const minedBlockId = await this.#getMinedBlockId(txHash); + const { meta, minedBlockId, isValid } = precomputed.get(txHashStr)!; const preProtectedSlot = this.#indices.getProtectionSlot(txHashStr); if (minedBlockId) { // Already mined - add directly (protection already set if pre-protected) - await this.#addTx(tx, { mined: minedBlockId }, opts); + await this.#addTx(tx, { mined: minedBlockId }, opts, meta); accepted.push(txHash); } else if (preProtectedSlot !== undefined) { // Pre-protected and not mined - add as protected (bypass validation) - await this.#addTx(tx, { protected: preProtectedSlot }, opts); + await this.#addTx(tx, { protected: preProtectedSlot }, opts, meta); accepted.push(txHash); + } else if (!isValid) { + // Failed pre-computed validation + rejected.push(txHash); } else { - // Regular pending tx - validate and run pre-add rules + // Regular pending tx - run pre-add rules using pre-computed metadata const result = await this.#tryAddRegularPendingTx( tx, + meta, opts, poolAccess, acceptedPending, @@ -227,8 +254,6 @@ export class TxPoolV2Impl { ); if (result.status === 'accepted') { acceptedPending.add(txHashStr); - } else if (result.status === 'rejected') { - rejected.push(txHash); } else { ignored.push(txHash); } @@ -259,27 +284,21 @@ export class TxPoolV2Impl { return { accepted, ignored, rejected, ...(errors.size > 0 ? { errors } : {}) }; } - /** Validates and adds a regular pending tx. Returns status. */ + /** Adds a validated pending tx, running pre-add rules and evicting conflicts. */ async #tryAddRegularPendingTx( tx: Tx, + precomputedMeta: TxMetaData, opts: { source?: string }, poolAccess: PreAddPoolAccess, acceptedPending: Set, ignored: TxHash[], errors: Map, preAddContext?: PreAddContext, - ): Promise<{ status: 'accepted' | 'ignored' | 'rejected' }> { - const txHash = tx.getTxHash(); - const txHashStr = txHash.toString(); - - // Build metadata and validate using metadata - const meta = await buildTxMetaData(tx); - if (!(await this.#validateMeta(meta))) { - return { status: 'rejected' }; - } + ): Promise<{ status: 'accepted' | 'ignored' }> { + const txHashStr = tx.getTxHash().toString(); // Run pre-add rules - const preAddResult = await this.#evictionManager.runPreAddRules(meta, poolAccess, preAddContext); + const preAddResult = await this.#evictionManager.runPreAddRules(precomputedMeta, poolAccess, preAddContext); if (preAddResult.shouldIgnore) { this.#log.debug(`Ignoring tx ${txHashStr}: ${preAddResult.reason?.message ?? 'unknown reason'}`); @@ -323,7 +342,7 @@ export class TxPoolV2Impl { } // Add the transaction - await this.#addTx(tx, 'pending', opts); + await this.#addTx(tx, 'pending', opts, precomputedMeta); return { status: 'accepted' }; } @@ -765,9 +784,10 @@ export class TxPoolV2Impl { tx: Tx, state: 'pending' | { protected: SlotNumber } | { mined: L2BlockId }, opts: { source?: string } = {}, + precomputedMeta?: TxMetaData, ): Promise { const txHashStr = tx.getTxHash().toString(); - const meta = await buildTxMetaData(tx); + const meta = precomputedMeta ?? (await buildTxMetaData(tx)); meta.receivedAt = this.#dateProvider.now(); await this.#txsDB.set(txHashStr, tx.toBuffer());