diff --git a/CHANGELOG.md b/CHANGELOG.md index 98961e6228..227d9ff6b8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added +- Add disaster recovery for sequencer + - Catch up possible DA-only blocks when restarting. [#3057](https://github.com/evstack/ev-node/pull/3057) + - Verify DA and P2P state on restart (prevent double-signing). [#3061](https://github.com/evstack/ev-node/pull/3061) - Node pruning support. [#2984](https://github.com/evstack/ev-node/pull/2984) - Two different sort of pruning implemented: _Classic pruning_ (`all`): prunes given `HEAD-n` blocks from the databases, including store metadatas. diff --git a/apps/evm/server/force_inclusion_test.go b/apps/evm/server/force_inclusion_test.go index df313d6f68..d04a356622 100644 --- a/apps/evm/server/force_inclusion_test.go +++ b/apps/evm/server/force_inclusion_test.go @@ -74,6 +74,10 @@ func (m *mockDA) HasForcedInclusionNamespace() bool { return true } +func (m *mockDA) GetLatestDAHeight(_ context.Context) (uint64, error) { + return 0, nil +} + func TestForceInclusionServer_handleSendRawTransaction_Success(t *testing.T) { testHeight := uint64(100) diff --git a/block/internal/da/client.go b/block/internal/da/client.go index 3185410cda..a92a9eef28 100644 --- a/block/internal/da/client.go +++ b/block/internal/da/client.go @@ -299,6 +299,23 @@ func (c *client) Retrieve(ctx context.Context, height uint64, namespace []byte) } } +// GetLatestDAHeight returns the latest height available on the DA layer by +// querying the network head. +func (c *client) GetLatestDAHeight(ctx context.Context) (uint64, error) { + headCtx, cancel := context.WithTimeout(ctx, c.defaultTimeout) + defer cancel() + + header, err := c.headerAPI.NetworkHead(headCtx) + if err != nil { + return 0, fmt.Errorf("failed to get DA network head: %w", err) + } + if header == nil { + return 0, fmt.Errorf("DA network head returned nil header") + } + + return header.Height, nil +} + // RetrieveForcedInclusion retrieves blobs from the forced inclusion namespace at the specified height. func (c *client) RetrieveForcedInclusion(ctx context.Context, height uint64) datypes.ResultRetrieve { if !c.hasForcedNamespace { diff --git a/block/internal/da/forced_inclusion_retriever.go b/block/internal/da/forced_inclusion_retriever.go index 9b0ad5529e..a6a2253084 100644 --- a/block/internal/da/forced_inclusion_retriever.go +++ b/block/internal/da/forced_inclusion_retriever.go @@ -163,6 +163,9 @@ func (r *forcedInclusionRetriever) RetrieveForcedIncludedTxs(ctx context.Context if result.Code == datypes.StatusNotFound { r.logger.Debug().Uint64("height", h).Msg("no forced inclusion blobs at height") + syncFetchedBlocks[h] = &BlockData{ + Timestamp: result.Timestamp, + } continue } diff --git a/block/internal/da/interface.go b/block/internal/da/interface.go index 69c2d18f7e..1e9f6cedee 100644 --- a/block/internal/da/interface.go +++ b/block/internal/da/interface.go @@ -17,6 +17,9 @@ type Client interface { // Get retrieves blobs by their IDs. Used for visualization and fetching specific blobs. Get(ctx context.Context, ids []datypes.ID, namespace []byte) ([]datypes.Blob, error) + // GetLatestDAHeight returns the latest height available on the DA layer.. + GetLatestDAHeight(ctx context.Context) (uint64, error) + // Namespace accessors. GetHeaderNamespace() []byte GetDataNamespace() []byte diff --git a/block/internal/da/tracing.go b/block/internal/da/tracing.go index 45fae2e863..4d946a8b74 100644 --- a/block/internal/da/tracing.go +++ b/block/internal/da/tracing.go @@ -123,6 +123,20 @@ func (t *tracedClient) Validate(ctx context.Context, ids []datypes.ID, proofs [] return res, nil } +func (t *tracedClient) GetLatestDAHeight(ctx context.Context) (uint64, error) { + ctx, span := t.tracer.Start(ctx, "DA.GetLatestDAHeight") + defer span.End() + + height, err := t.inner.GetLatestDAHeight(ctx) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + return 0, err + } + span.SetAttributes(attribute.Int64("da.latest_height", int64(height))) + return height, nil +} + func (t *tracedClient) GetHeaderNamespace() []byte { return t.inner.GetHeaderNamespace() } func (t *tracedClient) GetDataNamespace() []byte { return t.inner.GetDataNamespace() } func (t *tracedClient) GetForcedInclusionNamespace() []byte { diff --git a/block/internal/da/tracing_test.go b/block/internal/da/tracing_test.go index ea01c9e425..de32532a31 100644 --- a/block/internal/da/tracing_test.go +++ b/block/internal/da/tracing_test.go @@ -54,10 +54,11 @@ func (m *mockFullClient) Validate(ctx context.Context, ids []datypes.ID, proofs } return nil, nil } -func (m *mockFullClient) GetHeaderNamespace() []byte { return []byte{0x01} } -func (m *mockFullClient) GetDataNamespace() []byte { return []byte{0x02} } -func (m *mockFullClient) GetForcedInclusionNamespace() []byte { return []byte{0x03} } -func (m *mockFullClient) HasForcedInclusionNamespace() bool { return true } +func (m *mockFullClient) GetLatestDAHeight(_ context.Context) (uint64, error) { return 0, nil } +func (m *mockFullClient) GetHeaderNamespace() []byte { return []byte{0x01} } +func (m *mockFullClient) GetDataNamespace() []byte { return []byte{0x02} } +func (m *mockFullClient) GetForcedInclusionNamespace() []byte { return []byte{0x03} } +func (m *mockFullClient) HasForcedInclusionNamespace() bool { return true } // setup a tracer provider + span recorder func setupDATrace(t *testing.T, inner FullClient) (FullClient, *tracetest.SpanRecorder) { diff --git a/block/internal/syncing/block_syncer.go b/block/internal/syncing/block_syncer.go index e48dd46771..e65279a9df 100644 --- a/block/internal/syncing/block_syncer.go +++ b/block/internal/syncing/block_syncer.go @@ -21,5 +21,5 @@ type BlockSyncer interface { ValidateBlock(ctx context.Context, currState types.State, data *types.Data, header *types.SignedHeader) error // VerifyForcedInclusionTxs verifies that forced inclusion transactions are properly handled. - VerifyForcedInclusionTxs(ctx context.Context, currentState types.State, data *types.Data) error + VerifyForcedInclusionTxs(ctx context.Context, daHeight uint64, data *types.Data) error } diff --git a/block/internal/syncing/syncer.go b/block/internal/syncing/syncer.go index a408c5b7c3..349b5c97dd 100644 --- a/block/internal/syncing/syncer.go +++ b/block/internal/syncing/syncer.go @@ -748,9 +748,18 @@ func (s *Syncer) TrySyncNextBlock(ctx context.Context, event *common.DAHeightEve return err } - // Verify forced inclusion transactions if configured - if event.Source == common.SourceDA { - if err := s.VerifyForcedInclusionTxs(ctx, currentState, data); err != nil { + // Verify forced inclusion transactions if configured. + // The checks is actually only performed on DA only enabled nodes, or P2P nodes catching up with the HEAD. + // P2P nodes at HEAD aren't actually able to verify forced inclusions txs as DA inclusion happens later (so DA hints are not available). This is a known limitation described in the ADR. + if event.Source == common.SourceDA || event.DaHeightHints != [2]uint64{0, 0} { + currentDAHeight := currentState.DAHeight + if event.DaHeightHints[0] > currentDAHeight { + currentDAHeight = event.DaHeightHints[0] + } else if event.DaHeightHints[1] > currentDAHeight { + currentDAHeight = event.DaHeightHints[1] + } + + if err := s.VerifyForcedInclusionTxs(ctx, currentDAHeight, data); err != nil { s.logger.Error().Err(err).Uint64("height", nextHeight).Msg("forced inclusion verification failed") if errors.Is(err, errMaliciousProposer) { // remove header as da included from cache @@ -770,9 +779,49 @@ func (s *Syncer) TrySyncNextBlock(ctx context.Context, event *common.DAHeightEve // Update DA height if needed // This height is only updated when a height is processed from DA as P2P - // events do not contain DA height information + // events do not contain DA height information. + // + // When a sequencer restarts after extended downtime, it produces "catch-up" + // blocks containing forced inclusion transactions from missed DA epochs and + // submits them to DA at the current (much higher) DA height. This creates a + // gap between the state's DAHeight (tracking forced inclusion epoch progress) + // and event.DaHeight (the DA submission height). + // + // If we jump state.DAHeight directly to event.DaHeight, subsequent calls to + // VerifyForcedInclusionTxs would check the wrong epoch (the submission epoch + // instead of the next forced-inclusion epoch), causing valid catch-up blocks + // to be incorrectly flagged as malicious. + // + // To handle this, when the gap exceeds one DA epoch, we advance DAHeight by + // exactly one epoch per block. This lets the forced inclusion verifier check + // the correct epoch for each catch-up block. Once the sequencer finishes + // catching up and the gap closes, DAHeight converges to event.DaHeight. if event.DaHeight > newState.DAHeight { - newState.DAHeight = event.DaHeight + epochSize := s.genesis.DAEpochForcedInclusion + gap := event.DaHeight - newState.DAHeight + + if epochSize > 0 && gap > epochSize { + // Large gap detected — likely catch-up blocks from a restarted sequencer. + // Advance DAHeight by one epoch to keep forced inclusion verification + // aligned with the epoch the sequencer is replaying. + _, epochEnd, _ := types.CalculateEpochBoundaries( + newState.DAHeight, s.genesis.DAStartHeight, epochSize, + ) + nextEpochStart := epochEnd + 1 + if nextEpochStart > event.DaHeight { + // Shouldn't happen, but clamp to event.DaHeight as a safety net. + nextEpochStart = event.DaHeight + } + s.logger.Debug(). + Uint64("current_da_height", newState.DAHeight). + Uint64("event_da_height", event.DaHeight). + Uint64("advancing_to", nextEpochStart). + Uint64("gap", gap). + Msg("large DA height gap detected (sequencer catch-up), advancing DA height by one epoch") + newState.DAHeight = nextEpochStart + } else { + newState.DAHeight = event.DaHeight + } } batch, err := s.store.NewBatch(ctx) @@ -971,7 +1020,7 @@ func (s *Syncer) getEffectiveGracePeriod() uint64 { // Note: Due to block size constraints (MaxBytes), sequencers may defer forced inclusion transactions // to future blocks (smoothing). This is legitimate behavior within an epoch. // However, ALL forced inclusion txs from an epoch MUST be included before the next epoch begins or grace boundary (whichever comes later). -func (s *Syncer) VerifyForcedInclusionTxs(ctx context.Context, currentState types.State, data *types.Data) error { +func (s *Syncer) VerifyForcedInclusionTxs(ctx context.Context, daHeight uint64, data *types.Data) error { if s.fiRetriever == nil { return nil } @@ -981,7 +1030,7 @@ func (s *Syncer) VerifyForcedInclusionTxs(ctx context.Context, currentState type s.updateDynamicGracePeriod(blockFullness) // Retrieve forced inclusion transactions from DA for current epoch - forcedIncludedTxsEvent, err := s.fiRetriever.RetrieveForcedIncludedTxs(ctx, currentState.DAHeight) + forcedIncludedTxsEvent, err := s.fiRetriever.RetrieveForcedIncludedTxs(ctx, daHeight) if err != nil { if errors.Is(err, da.ErrForceInclusionNotConfigured) { s.logger.Debug().Msg("forced inclusion namespace not configured, skipping verification") @@ -1068,10 +1117,10 @@ func (s *Syncer) VerifyForcedInclusionTxs(ctx context.Context, currentState type effectiveGracePeriod := s.getEffectiveGracePeriod() graceBoundary := pending.EpochEnd + (effectiveGracePeriod * s.genesis.DAEpochForcedInclusion) - if currentState.DAHeight > graceBoundary { + if daHeight > graceBoundary { maliciousTxs = append(maliciousTxs, pending) s.logger.Warn(). - Uint64("current_da_height", currentState.DAHeight). + Uint64("current_da_height", daHeight). Uint64("epoch_end", pending.EpochEnd). Uint64("grace_boundary", graceBoundary). Uint64("base_grace_periods", s.gracePeriodConfig.basePeriod). @@ -1081,7 +1130,7 @@ func (s *Syncer) VerifyForcedInclusionTxs(ctx context.Context, currentState type Msg("forced inclusion transaction past grace boundary - marking as malicious") } else { remainingPending = append(remainingPending, pending) - if currentState.DAHeight > pending.EpochEnd { + if daHeight > pending.EpochEnd { txsInGracePeriod++ } } @@ -1105,7 +1154,7 @@ func (s *Syncer) VerifyForcedInclusionTxs(ctx context.Context, currentState type effectiveGracePeriod := s.getEffectiveGracePeriod() s.logger.Error(). Uint64("height", data.Height()). - Uint64("current_da_height", currentState.DAHeight). + Uint64("current_da_height", daHeight). Int("malicious_count", len(maliciousTxs)). Uint64("base_grace_periods", s.gracePeriodConfig.basePeriod). Uint64("effective_grace_periods", effectiveGracePeriod). @@ -1125,7 +1174,7 @@ func (s *Syncer) VerifyForcedInclusionTxs(ctx context.Context, currentState type s.logger.Info(). Uint64("height", data.Height()). - Uint64("da_height", currentState.DAHeight). + Uint64("da_height", daHeight). Uint64("epoch_start", forcedIncludedTxsEvent.StartDaHeight). Uint64("epoch_end", forcedIncludedTxsEvent.EndDaHeight). Int("included_count", includedCount). diff --git a/block/internal/syncing/syncer_forced_inclusion_test.go b/block/internal/syncing/syncer_forced_inclusion_test.go index 3c03992fce..6cb34901c2 100644 --- a/block/internal/syncing/syncer_forced_inclusion_test.go +++ b/block/internal/syncing/syncer_forced_inclusion_test.go @@ -426,11 +426,8 @@ func TestVerifyForcedInclusionTxs_AllTransactionsIncluded(t *testing.T) { data := makeData(gen.ChainID, 1, 1) data.Txs[0] = types.Tx(dataBin) - currentState := s.getLastState() - currentState.DAHeight = 0 - // Verify - should pass since all forced txs are included - err = s.VerifyForcedInclusionTxs(t.Context(), currentState, data) + err = s.VerifyForcedInclusionTxs(t.Context(), 0, data) require.NoError(t, err) } @@ -504,11 +501,8 @@ func TestVerifyForcedInclusionTxs_MissingTransactions(t *testing.T) { data.Txs[0] = types.Tx([]byte("regular_tx_1")) data.Txs[1] = types.Tx([]byte("regular_tx_2")) - currentState := s.getLastState() - currentState.DAHeight = 0 - // Verify - should pass since forced tx blob may be legitimately deferred within the epoch - err = s.VerifyForcedInclusionTxs(t.Context(), currentState, data) + err = s.VerifyForcedInclusionTxs(t.Context(), 0, data) require.NoError(t, err) // Mock DA for next epoch to return no forced inclusion transactions @@ -517,11 +511,10 @@ func TestVerifyForcedInclusionTxs_MissingTransactions(t *testing.T) { }).Once() // Move to next epoch but still within grace period - currentState.DAHeight = 1 // Move to epoch end (epoch was [0, 0]) data2 := makeData(gen.ChainID, 2, 1) data2.Txs[0] = []byte("regular_tx_3") - err = s.VerifyForcedInclusionTxs(t.Context(), currentState, data2) + err = s.VerifyForcedInclusionTxs(t.Context(), 1, data2) require.NoError(t, err) // Should pass since DAHeight=1 equals grace boundary, not past it // Mock DA for height 2 to return no forced inclusion transactions @@ -530,11 +523,10 @@ func TestVerifyForcedInclusionTxs_MissingTransactions(t *testing.T) { }).Once() // Now move past grace boundary - should fail if tx still not included - currentState.DAHeight = 2 // Move past grace boundary (graceBoundary = 0 + 1*1 = 1) data3 := makeData(gen.ChainID, 3, 1) data3.Txs[0] = types.Tx([]byte("regular_tx_4")) - err = s.VerifyForcedInclusionTxs(t.Context(), currentState, data3) + err = s.VerifyForcedInclusionTxs(t.Context(), 2, data3) require.Error(t, err) require.Contains(t, err.Error(), "sequencer is malicious") require.Contains(t, err.Error(), "past grace boundary") @@ -611,11 +603,8 @@ func TestVerifyForcedInclusionTxs_PartiallyIncluded(t *testing.T) { data.Txs[1] = types.Tx([]byte("regular_tx")) // dataBin2 is missing - currentState := s.getLastState() - currentState.DAHeight = 0 - // Verify - should pass since dataBin2 may be legitimately deferred within the epoch - err = s.VerifyForcedInclusionTxs(t.Context(), currentState, data) + err = s.VerifyForcedInclusionTxs(t.Context(), 0, data) require.NoError(t, err) // Mock DA for next epoch to return no forced inclusion transactions @@ -624,12 +613,11 @@ func TestVerifyForcedInclusionTxs_PartiallyIncluded(t *testing.T) { }).Once() // Move to DAHeight=1 (still within grace period since graceBoundary = 0 + 1*1 = 1) - currentState.DAHeight = 1 data2 := makeData(gen.ChainID, 2, 1) data2.Txs[0] = types.Tx([]byte("regular_tx_3")) // Verify - should pass since we're at the grace boundary, not past it - err = s.VerifyForcedInclusionTxs(t.Context(), currentState, data2) + err = s.VerifyForcedInclusionTxs(t.Context(), 1, data2) require.NoError(t, err) // Mock DA for height 2 (when we move to DAHeight 2) @@ -640,11 +628,10 @@ func TestVerifyForcedInclusionTxs_PartiallyIncluded(t *testing.T) { // Now simulate moving past grace boundary - should fail if dataBin2 still not included // With basePeriod=1 and DAEpochForcedInclusion=1, graceBoundary = 0 + (1*1) = 1 // So we need DAHeight > 1 to trigger the error - currentState.DAHeight = 2 // Move past grace boundary data3 := makeData(gen.ChainID, 3, 1) data3.Txs[0] = types.Tx([]byte("regular_tx_4")) - err = s.VerifyForcedInclusionTxs(t.Context(), currentState, data3) + err = s.VerifyForcedInclusionTxs(t.Context(), 2, data3) require.Error(t, err) require.Contains(t, err.Error(), "sequencer is malicious") require.Contains(t, err.Error(), "past grace boundary") @@ -713,11 +700,8 @@ func TestVerifyForcedInclusionTxs_NoForcedTransactions(t *testing.T) { // Create block data data := makeData(gen.ChainID, 1, 2) - currentState := s.getLastState() - currentState.DAHeight = 0 - // Verify - should pass since no forced txs to verify - err = s.VerifyForcedInclusionTxs(t.Context(), currentState, data) + err = s.VerifyForcedInclusionTxs(t.Context(), 0, data) require.NoError(t, err) } @@ -778,11 +762,8 @@ func TestVerifyForcedInclusionTxs_NamespaceNotConfigured(t *testing.T) { // Create block data data := makeData(gen.ChainID, 1, 2) - currentState := s.getLastState() - currentState.DAHeight = 0 - // Verify - should pass since namespace not configured - err = s.VerifyForcedInclusionTxs(t.Context(), currentState, data) + err = s.VerifyForcedInclusionTxs(t.Context(), 0, data) require.NoError(t, err) } @@ -867,11 +848,10 @@ func TestVerifyForcedInclusionTxs_DeferralWithinEpoch(t *testing.T) { data1.Txs[0] = types.Tx(dataBin1) data1.Txs[1] = types.Tx([]byte("regular_tx_1")) - currentState := s.getLastState() - currentState.DAHeight = 104 + daHeight := uint64(104) // Verify - should pass since dataBin2 can be deferred within epoch - err = s.VerifyForcedInclusionTxs(t.Context(), currentState, data1) + err = s.VerifyForcedInclusionTxs(t.Context(), daHeight, data1) require.NoError(t, err) // Verify that dataBin2 is now tracked as pending @@ -900,7 +880,7 @@ func TestVerifyForcedInclusionTxs_DeferralWithinEpoch(t *testing.T) { data2.Txs[1] = types.Tx(dataBin2) // The deferred one we're waiting for // Verify - should pass since dataBin2 is now included and clears pending - err = s.VerifyForcedInclusionTxs(t.Context(), currentState, data2) + err = s.VerifyForcedInclusionTxs(t.Context(), daHeight, data2) require.NoError(t, err) // Verify that pending queue is now empty (dataBin2 was included) @@ -993,11 +973,8 @@ func TestVerifyForcedInclusionTxs_MaliciousAfterEpochEnd(t *testing.T) { data1 := makeData(gen.ChainID, 1, 1) data1.Txs[0] = types.Tx([]byte("regular_tx_1")) - currentState := s.getLastState() - currentState.DAHeight = 102 - // Verify - should pass, tx can be deferred within epoch - err = s.VerifyForcedInclusionTxs(t.Context(), currentState, data1) + err = s.VerifyForcedInclusionTxs(t.Context(), 102, data1) require.NoError(t, err) } @@ -1090,9 +1067,6 @@ func TestVerifyForcedInclusionTxs_SmoothingExceedsEpoch(t *testing.T) { data1.Txs[0] = types.Tx(dataBin1) data1.Txs[1] = types.Tx(dataBin2) - currentState := s.getLastState() - currentState.DAHeight = 102 // At epoch end - - err = s.VerifyForcedInclusionTxs(t.Context(), currentState, data1) + err = s.VerifyForcedInclusionTxs(t.Context(), 102 /* epoch end */, data1) require.NoError(t, err, "smoothing within epoch should be allowed") } diff --git a/block/internal/syncing/tracing.go b/block/internal/syncing/tracing.go index bc43263664..1877886d33 100644 --- a/block/internal/syncing/tracing.go +++ b/block/internal/syncing/tracing.go @@ -85,16 +85,16 @@ func (t *tracedBlockSyncer) ValidateBlock(ctx context.Context, currState types.S return err } -func (t *tracedBlockSyncer) VerifyForcedInclusionTxs(ctx context.Context, currentState types.State, data *types.Data) error { +func (t *tracedBlockSyncer) VerifyForcedInclusionTxs(ctx context.Context, daHeight uint64, data *types.Data) error { ctx, span := t.tracer.Start(ctx, "BlockSyncer.VerifyForcedInclusion", trace.WithAttributes( attribute.Int64("block.height", int64(data.Height())), - attribute.Int64("da.height", int64(currentState.DAHeight)), + attribute.Int64("da.height", int64(daHeight)), ), ) defer span.End() - err := t.inner.VerifyForcedInclusionTxs(ctx, currentState, data) + err := t.inner.VerifyForcedInclusionTxs(ctx, daHeight, data) if err != nil { span.RecordError(err) span.SetStatus(codes.Error, err.Error()) diff --git a/block/internal/syncing/tracing_test.go b/block/internal/syncing/tracing_test.go index 679f3f7a33..b49235871d 100644 --- a/block/internal/syncing/tracing_test.go +++ b/block/internal/syncing/tracing_test.go @@ -21,7 +21,7 @@ type mockBlockSyncer struct { trySyncNextBlockFn func(ctx context.Context, event *common.DAHeightEvent) error applyBlockFn func(ctx context.Context, header types.Header, data *types.Data, currentState types.State) (types.State, error) validateBlockFn func(ctx context.Context, currState types.State, data *types.Data, header *types.SignedHeader) error - verifyForcedInclusionFn func(ctx context.Context, currentState types.State, data *types.Data) error + verifyForcedInclusionFn func(ctx context.Context, daHeight uint64, data *types.Data) error } func (m *mockBlockSyncer) TrySyncNextBlock(ctx context.Context, event *common.DAHeightEvent) error { @@ -45,9 +45,9 @@ func (m *mockBlockSyncer) ValidateBlock(ctx context.Context, currState types.Sta return nil } -func (m *mockBlockSyncer) VerifyForcedInclusionTxs(ctx context.Context, currentState types.State, data *types.Data) error { +func (m *mockBlockSyncer) VerifyForcedInclusionTxs(ctx context.Context, daHeight uint64, data *types.Data) error { if m.verifyForcedInclusionFn != nil { - return m.verifyForcedInclusionFn(ctx, currentState, data) + return m.verifyForcedInclusionFn(ctx, daHeight, data) } return nil } @@ -248,7 +248,7 @@ func TestTracedBlockSyncer_ValidateBlock_Error(t *testing.T) { func TestTracedBlockSyncer_VerifyForcedInclusionTxs_Success(t *testing.T) { mock := &mockBlockSyncer{ - verifyForcedInclusionFn: func(ctx context.Context, currentState types.State, data *types.Data) error { + verifyForcedInclusionFn: func(ctx context.Context, daHeight uint64, data *types.Data) error { return nil }, } @@ -260,11 +260,8 @@ func TestTracedBlockSyncer_VerifyForcedInclusionTxs_Success(t *testing.T) { Height: 100, }, } - state := types.State{ - DAHeight: 50, - } - err := syncer.VerifyForcedInclusionTxs(ctx, state, data) + err := syncer.VerifyForcedInclusionTxs(ctx, 50, data) require.NoError(t, err) spans := sr.Ended() @@ -280,7 +277,7 @@ func TestTracedBlockSyncer_VerifyForcedInclusionTxs_Success(t *testing.T) { func TestTracedBlockSyncer_VerifyForcedInclusionTxs_Error(t *testing.T) { mock := &mockBlockSyncer{ - verifyForcedInclusionFn: func(ctx context.Context, currentState types.State, data *types.Data) error { + verifyForcedInclusionFn: func(ctx context.Context, daHeight uint64, data *types.Data) error { return errors.New("forced inclusion verification failed") }, } @@ -292,11 +289,8 @@ func TestTracedBlockSyncer_VerifyForcedInclusionTxs_Error(t *testing.T) { Height: 100, }, } - state := types.State{ - DAHeight: 50, - } - err := syncer.VerifyForcedInclusionTxs(ctx, state, data) + err := syncer.VerifyForcedInclusionTxs(ctx, 50, data) require.Error(t, err) spans := sr.Ended() diff --git a/docs/adr/adr-019-forced-inclusion-mechanism.md b/docs/adr/adr-019-forced-inclusion-mechanism.md index ec025222e6..567e449e7c 100644 --- a/docs/adr/adr-019-forced-inclusion-mechanism.md +++ b/docs/adr/adr-019-forced-inclusion-mechanism.md @@ -5,6 +5,7 @@ - 2025-03-24: Initial draft - 2025-04-23: Renumbered from ADR-018 to ADR-019 to maintain chronological order. - 2025-11-10: Updated to reflect actual implementation +- 2026-02-23: Added sequencer catch-up mode documentation ## Context @@ -445,6 +446,58 @@ if errors.Is(err, coreda.ErrHeightFromFuture) { } ``` +#### Sequencer Catch-Up Mode + +When a single sequencer comes back online after downtime spanning multiple DA epochs, it enters **catch-up mode** to ensure consistency with base sequencing behavior. + +**Problem**: If the sequencer was offline for several DA epochs, it missed mempool transactions that were submitted during that time. However, forced inclusion transactions were still being posted to DA and processed by full nodes running in base sequencing mode. When the sequencer restarts, it must produce blocks that match what base sequencing would have produced during the downtime. + +**Solution**: The sequencer detects if it has fallen more than one epoch behind the DA head and enters catch-up mode: + +1. **Detection**: On the first epoch fetch after startup, query `GetLatestDAHeight()` to determine the gap +2. **Catch-Up Mode**: If more than one epoch behind, enter catch-up mode: + - Only produce blocks with forced inclusion transactions (no mempool) + - Use DA epoch end timestamps for block timestamps (to match base sequencing) +3. **Exit**: When `ErrHeightFromFuture` is encountered (reached DA head), exit catch-up mode and resume normal operation + +**Key Behaviors During Catch-Up**: + +- **No Mempool Transactions**: Only forced inclusion transactions are included in blocks +- **Matching Timestamps**: Block timestamps are derived from DA epoch end times to match base sequencing +- **Checkpoint Persistence**: Progress is tracked via checkpoint to handle crashes during catch-up +- **Single Check**: The `GetLatestDAHeight()` query is performed only once per sequencer lifecycle + +**Example**: + +``` +Sequencer offline during epochs 100-150 (5 epochs of 10 blocks each) +Full nodes (base sequencing) produced blocks with forced txs only + +Sequencer restarts at epoch 160: +1. Checkpoint DA height: 100 +2. Latest DA height: 160 +3. Missed epochs: 6 (more than 1) +4. Enter catch-up mode + +Catch-up process: +- Epoch 101-110: Produce blocks with forced txs only, use epoch timestamps +- Epoch 111-120: Continue catch-up... +- ... +- Epoch 151-160: Still catching up +- Epoch 161: ErrHeightFromFuture -> exit catch-up mode + +Normal operation resumes: +- Include both forced txs and mempool txs +- Use current timestamps +``` + +**Benefits**: + +- Ensures sequencer produces identical blocks to what base sequencing would have produced +- Maintains consistency across the network regardless of sequencer downtime +- Automatic detection and recovery without operator intervention +- Safe restart after crashes (checkpoint tracks progress) + #### Grace Period for Forced Inclusion The grace period mechanism provides tolerance for chain congestion while maintaining censorship resistance: @@ -686,7 +739,7 @@ based_sequencer = true # Use based sequencer ### Full Node Verification Flow -1. Receive block from DA or P2P +1. Receive block from DA 2. Before applying block: a. Fetch forced inclusion txs from DA at block's DA height (epoch-based) b. Build map of transactions in block @@ -699,6 +752,8 @@ based_sequencer = true # Use based sequencer h. If txs within grace period: keep in pending queue, allow block 3. Apply block if verification passes +NOTE: P2P nodes only do not proceed to any verification. This is because DA inclusion happens later than block productions, and thus DA hints are added later to broadcasted blocks. + **Grace Period Example** (with base grace period = 1 epoch, `DAEpochForcedInclusion = 50`): - Forced tx appears in epoch ending at DA height 100 @@ -722,18 +777,6 @@ based_sequencer = true # Use based sequencer Every `DAEpochForcedInclusion` DA blocks -### Security Considerations - -1. **Malicious Proposer Detection**: Full nodes reject blocks missing forced transactions -2. **No Timing Attacks**: Epoch boundaries are deterministic, no time-based logic -3. **Blob Size Limits**: Two-tier size validation prevents DoS - - Absolute limit (1.5MB): Blobs exceeding this are permanently rejected - - Batch limit (`MaxBytes`): Ensures no batch exceeds DA submission limits -4. **Graceful Degradation**: Continues operation if forced inclusion not configured -5. **Height Validation**: Handles "height from future" errors without state corruption -6. **Transaction Preservation**: No valid transactions are lost due to size constraints -7. **Strict MaxBytes Enforcement**: Batches NEVER exceed `req.MaxBytes`, preventing DA layer rejections - **Attack Vectors**: ### Security Considerations @@ -770,15 +813,14 @@ Accepted and Implemented 9. **Transaction Preservation**: All valid transactions are preserved in queues, nothing is lost 10. **Strict MaxBytes Compliance**: Batches never exceed limits, preventing DA submission failures 11. **DA Fault Tolerance**: Grace period prevents false positives during temporary chain congestion +12. **Automatic Recovery**: Sequencer catch-up mode ensures consistency after downtime without operator intervention ### Negative 1. **Increased Latency**: Forced transactions subject to epoch boundaries -2. **DA Dependency**: Requires DA layer to support multiple namespaces +2. **DA Dependency**: Requires DA layer to be enabled on nodes for verification 3. **Higher DA Costs**: Users pay DA posting fees for forced inclusion -4. **Additional Complexity**: New component (DA Retriever) and verification logic with grace period tracking -5. **Epoch Configuration**: Requires setting `DAEpochForcedInclusion` in genesis (consensus parameter) -6. **Grace Period Adjustment**: Grace period is dynamically adjusted based on block fullness to balance censorship detection with operational reliability +4. **Epoch Configuration**: Requires setting `DAEpochForcedInclusion` in genesis (consensus parameter) ### Neutral diff --git a/pkg/config/config.go b/pkg/config/config.go index 8e283a81e7..c9f7806916 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -258,9 +258,9 @@ func (d *DAConfig) GetForcedInclusionNamespace() string { // NodeConfig contains all Rollkit specific configuration parameters type NodeConfig struct { // Node mode configuration - Aggregator bool `yaml:"aggregator" comment:"Run node in aggregator mode"` - BasedSequencer bool `yaml:"based_sequencer" comment:"Run node with based sequencer (fetches transactions only from DA forced inclusion namespace). Requires aggregator mode to be enabled."` - Light bool `yaml:"light" comment:"Run node in light mode"` + Aggregator bool `mapstructure:"aggregator" yaml:"aggregator" comment:"Run node in aggregator mode"` + BasedSequencer bool `mapstructure:"based_sequencer" yaml:"based_sequencer" comment:"Run node with based sequencer (fetches transactions only from DA forced inclusion namespace). Requires aggregator mode to be enabled."` + Light bool `mapstructure:"light" yaml:"light" comment:"Run node in light mode"` // Block management configuration BlockTime DurationWrapper `mapstructure:"block_time" yaml:"block_time" comment:"Block time (duration). Examples: \"500ms\", \"1s\", \"5s\", \"1m\", \"2m30s\", \"10m\"."` diff --git a/pkg/sequencers/single/sequencer.go b/pkg/sequencers/single/sequencer.go index 228bde2791..0100812565 100644 --- a/pkg/sequencers/single/sequencer.go +++ b/pkg/sequencers/single/sequencer.go @@ -21,11 +21,19 @@ import ( "github.com/evstack/ev-node/pkg/genesis" seqcommon "github.com/evstack/ev-node/pkg/sequencers/common" "github.com/evstack/ev-node/pkg/store" + "github.com/evstack/ev-node/types" ) // ErrInvalidId is returned when the chain id is invalid var ErrInvalidId = errors.New("invalid chain id") +// Catch-up state machine states +const ( + catchUpUnchecked int32 = iota // haven't checked DA height + catchUpInProgress // replaying missed DA epochs + catchUpDone // caught up or never behind +) + var _ coresequencer.Sequencer = (*Sequencer)(nil) // Sequencer implements core sequencing interface @@ -51,6 +59,11 @@ type Sequencer struct { // Cached forced inclusion transactions from the current epoch cachedForcedInclusionTxs [][]byte + + // catchUpState tracks catch-up lifecycle (see constants above) + catchUpState atomic.Int32 + // currentDAEndTime is the DA epoch end timestamp, used during catch-up + currentDAEndTime time.Time } // NewSequencer creates a new Single Sequencer @@ -87,7 +100,7 @@ func NewSequencer( return nil, fmt.Errorf("failed to load batch queue from DB: %w", err) } - // Load checkpoint from DB, or initialize if none exists + // Load checkpoint from DB or initialize checkpoint, err := s.checkpointStore.Load(loadCtx) if err != nil { if errors.Is(err, seqcommon.ErrCheckpointNotFound) { @@ -166,8 +179,8 @@ func (c *Sequencer) SubmitBatchTxs(ctx context.Context, req coresequencer.Submit return &coresequencer.SubmitBatchTxsResponse{}, nil } -// GetNextBatch implements sequencing.Sequencer. -// It gets the next batch of transactions and fetch for forced included transactions. +// GetNextBatch gets the next batch. During catch-up, only forced inclusion txs +// are returned to match based sequencing behavior. func (c *Sequencer) GetNextBatch(ctx context.Context, req coresequencer.GetNextBatchRequest) (*coresequencer.GetNextBatchResponse, error) { if !c.isValid(req.Id) { return nil, ErrInvalidId @@ -175,15 +188,14 @@ func (c *Sequencer) GetNextBatch(ctx context.Context, req coresequencer.GetNextB daHeight := c.GetDAHeight() - // checkpoint init path, only hit when sequencer is bootstrapping + // checkpoint init path (sequencer bootstrapping) if daHeight > 0 && c.checkpoint.DAHeight == 0 { c.checkpoint = &seqcommon.Checkpoint{ DAHeight: daHeight, TxIndex: 0, } - // override forced inclusion retriever, as the da start height have been updated - // Stop the old retriever first + // Reinitialize forced inclusion retriever with updated DA start height if c.fiRetriever != nil { c.fiRetriever.Stop() } @@ -197,7 +209,6 @@ func (c *Sequencer) GetNextBatch(ctx context.Context, req coresequencer.GetNextB if err != nil { return nil, err } - daHeight = daEndHeight } @@ -208,21 +219,29 @@ func (c *Sequencer) GetNextBatch(ctx context.Context, req coresequencer.GetNextB forcedTxs = c.cachedForcedInclusionTxs[c.checkpoint.TxIndex:] } - // Get mempool transactions from queue - mempoolBatch, err := c.queue.Next(ctx) - if err != nil { - return nil, err + // Skip mempool during catch-up to match based sequencing + var mempoolBatch *coresequencer.Batch + if c.catchUpState.Load() != catchUpInProgress { + var err error + mempoolBatch, err = c.queue.Next(ctx) + if err != nil { + return nil, err + } + } else { + mempoolBatch = &coresequencer.Batch{} + c.logger.Debug(). + Uint64("checkpoint_da_height", c.checkpoint.DAHeight). + Int("forced_txs", len(forcedTxs)). + Msg("catch-up mode: skipping mempool transactions") } - // Build combined tx list for filtering + // Build combined tx list allTxs := make([][]byte, 0, len(forcedTxs)+len(mempoolBatch.Transactions)) allTxs = append(allTxs, forcedTxs...) allTxs = append(allTxs, mempoolBatch.Transactions...) - - // Track where forced txs end and mempool txs begin forcedTxCount := len(forcedTxs) - // Get current gas limit from execution layer + // Get gas limit from execution layer var maxGas uint64 info, err := c.executor.GetExecutionInfo(ctx) if err != nil { @@ -235,7 +254,6 @@ func (c *Sequencer) GetNextBatch(ctx context.Context, req coresequencer.GetNextB filterStatuses, err := c.executor.FilterTxs(ctx, allTxs, req.MaxBytes, maxGas, forcedTxCount > 0) if err != nil { c.logger.Warn().Err(err).Msg("failed to filter transactions, proceeding with unfiltered") - // Fall back to using all txs as OK filterStatuses = make([]execution.FilterStatus, len(allTxs)) for i := range filterStatuses { filterStatuses[i] = execution.FilterOK @@ -298,18 +316,14 @@ func (c *Sequencer) GetNextBatch(ctx context.Context, req coresequencer.GetNextB // Advance TxIndex by the number of consumed forced transactions c.checkpoint.TxIndex += forcedTxConsumedCount - // Check if we've consumed all transactions from the epoch if c.checkpoint.TxIndex >= uint64(len(c.cachedForcedInclusionTxs)) { // All forced txs were consumed (OK or Remove), move to next DA epoch c.checkpoint.DAHeight = daHeight + 1 c.checkpoint.TxIndex = 0 c.cachedForcedInclusionTxs = nil - - // Update the global DA height c.SetDAHeight(c.checkpoint.DAHeight) } - // Persist checkpoint if err := c.checkpointStore.Save(ctx, c.checkpoint); err != nil { return nil, fmt.Errorf("failed to save checkpoint: %w", err) } @@ -318,6 +332,7 @@ func (c *Sequencer) GetNextBatch(ctx context.Context, req coresequencer.GetNextB Uint64("consumed_count", forcedTxConsumedCount). Uint64("checkpoint_tx_index", c.checkpoint.TxIndex). Uint64("checkpoint_da_height", c.checkpoint.DAHeight). + Bool("catching_up", c.catchUpState.Load() == catchUpInProgress). Msg("updated checkpoint after processing forced inclusion transactions") } @@ -326,11 +341,21 @@ func (c *Sequencer) GetNextBatch(ctx context.Context, req coresequencer.GetNextB batchTxs = append(batchTxs, validForcedTxs...) batchTxs = append(batchTxs, validMempoolTxs...) + // Use DA epoch timestamp during catch-up + timestamp := time.Now() + if c.catchUpState.Load() == catchUpInProgress && !c.currentDAEndTime.IsZero() { + var remainingForcedTxs uint64 + if len(c.cachedForcedInclusionTxs) > 0 { + remainingForcedTxs = uint64(len(c.cachedForcedInclusionTxs)) - c.checkpoint.TxIndex + } + timestamp = c.currentDAEndTime.Add(-time.Duration(remainingForcedTxs) * time.Millisecond) + } + return &coresequencer.GetNextBatchResponse{ Batch: &coresequencer.Batch{ Transactions: batchTxs, }, - Timestamp: time.Now(), + Timestamp: timestamp, BatchData: req.LastBatchData, }, nil } @@ -374,13 +399,25 @@ func (c *Sequencer) GetDAHeight() uint64 { return c.daHeight.Load() } -// fetchNextDAEpoch fetches transactions from the next DA epoch using checkpoint +// isCatchingUp returns whether the sequencer is in catch-up mode. +func (c *Sequencer) isCatchingUp() bool { + return c.catchUpState.Load() == catchUpInProgress +} + +// fetchNextDAEpoch fetches transactions from the next DA epoch. It also +// updates catch-up state: entering catch-up if behind, exiting when reaching DA head. func (c *Sequencer) fetchNextDAEpoch(ctx context.Context, maxBytes uint64) (uint64, error) { currentDAHeight := c.checkpoint.DAHeight + // Determine catch-up state before the (potentially expensive) epoch fetch. + // This is done once per sequencer lifecycle — subsequent catch-up exits are + // handled by ErrHeightFromFuture below. + c.updateCatchUpState(ctx) + c.logger.Debug(). Uint64("da_height", currentDAHeight). Uint64("tx_index", c.checkpoint.TxIndex). + Bool("catching_up", c.catchUpState.Load() == catchUpInProgress). Msg("fetching forced inclusion transactions from DA") forcedTxsEvent, err := c.fiRetriever.RetrieveForcedIncludedTxs(ctx, currentDAHeight) @@ -389,17 +426,29 @@ func (c *Sequencer) fetchNextDAEpoch(ctx context.Context, maxBytes uint64) (uint c.logger.Debug(). Uint64("da_height", currentDAHeight). Msg("DA height from future, waiting for DA to produce block") + + if c.catchUpState.Load() == catchUpInProgress { + c.logger.Info().Uint64("da_height", currentDAHeight). + Msg("catch-up complete: reached DA head, resuming normal sequencing") + c.catchUpState.Store(catchUpDone) + } + return 0, nil } else if errors.Is(err, block.ErrForceInclusionNotConfigured) { - // Forced inclusion not configured, continue without forced txs c.cachedForcedInclusionTxs = [][]byte{} + c.catchUpState.Store(catchUpDone) return 0, nil } return 0, fmt.Errorf("failed to retrieve forced inclusion transactions: %w", err) } - // Validate and filter transactions + // Store DA epoch end time for timestamp usage during catch-up + if !forcedTxsEvent.Timestamp.IsZero() { + c.currentDAEndTime = forcedTxsEvent.Timestamp.UTC() + } + + // Filter out oversized transactions validTxs := make([][]byte, 0, len(forcedTxsEvent.Txs)) skippedTxs := 0 for _, tx := range forcedTxsEvent.Txs { @@ -420,10 +469,67 @@ func (c *Sequencer) fetchNextDAEpoch(ctx context.Context, maxBytes uint64) (uint Int("skipped_tx_count", skippedTxs). Uint64("da_height_start", forcedTxsEvent.StartDaHeight). Uint64("da_height_end", forcedTxsEvent.EndDaHeight). + Bool("catching_up", c.catchUpState.Load() == catchUpInProgress). Msg("fetched forced inclusion transactions from DA") - // Cache the transactions c.cachedForcedInclusionTxs = validTxs return forcedTxsEvent.EndDaHeight, nil } + +// updateCatchUpState checks if catch-up is needed by comparing checkpoint +// DA height with latest DA height. Runs once per sequencer lifecycle. +// If more than one epoch behind, enters catch-up mode (forced txs only, no mempool). +func (c *Sequencer) updateCatchUpState(ctx context.Context) { + if c.catchUpState.Load() != catchUpUnchecked { + return + } + // Optimistically mark as done; overridden to catchUpInProgress below if + // catch-up is actually needed. + c.catchUpState.Store(catchUpDone) + + epochSize := c.genesis.DAEpochForcedInclusion + if epochSize == 0 { + return + } + + currentDAHeight := c.checkpoint.DAHeight + daStartHeight := c.genesis.DAStartHeight + + latestDAHeight, err := c.daClient.GetLatestDAHeight(ctx) + if err != nil { + c.logger.Warn().Err(err). + Msg("failed to get latest DA height for catch-up detection, skipping check") + return + } + + // At head, no catch-up needed + if latestDAHeight <= currentDAHeight { + return + } + + // Calculate missed epochs + currentEpoch := types.CalculateEpochNumber(currentDAHeight, daStartHeight, epochSize) + latestEpoch := types.CalculateEpochNumber(latestDAHeight, daStartHeight, epochSize) + missedEpochs := latestEpoch - currentEpoch + + if missedEpochs <= 1 { + c.logger.Debug(). + Uint64("checkpoint_da_height", currentDAHeight). + Uint64("latest_da_height", latestDAHeight). + Uint64("current_epoch", currentEpoch). + Uint64("latest_epoch", latestEpoch). + Msg("sequencer within one epoch of DA head, no catch-up needed") + return + } + + // More than one epoch behind - enter catch-up mode + c.catchUpState.Store(catchUpInProgress) + c.logger.Warn(). + Uint64("checkpoint_da_height", currentDAHeight). + Uint64("latest_da_height", latestDAHeight). + Uint64("current_epoch", currentEpoch). + Uint64("latest_epoch", latestEpoch). + Uint64("missed_epochs", missedEpochs). + Msg("entering catch-up mode: replaying missed epochs with forced inclusion txs only") +} diff --git a/pkg/sequencers/single/sequencer_test.go b/pkg/sequencers/single/sequencer_test.go index 6a4b114be6..a88dce3b26 100644 --- a/pkg/sequencers/single/sequencer_test.go +++ b/pkg/sequencers/single/sequencer_test.go @@ -365,7 +365,7 @@ func TestSequencer_GetNextBatch_BeforeDASubmission(t *testing.T) { func TestSequencer_GetNextBatch_ForcedInclusionAndBatch_MaxBytes(t *testing.T) { ctx := context.Background() - logger := zerolog.New(zerolog.NewConsoleWriter()) + logger := zerolog.New(zerolog.NewTestWriter(t)) // Create in-memory datastore db := ds.NewMapDatastore() @@ -381,6 +381,9 @@ func TestSequencer_GetNextBatch_ForcedInclusionAndBatch_MaxBytes(t *testing.T) { mockDA.MockClient.On("GetForcedInclusionNamespace").Return(forcedInclusionNS).Maybe() mockDA.MockClient.On("HasForcedInclusionNamespace").Return(true).Maybe() + // DA head is at 100 — same as sequencer start, no catch-up needed + mockDA.MockClient.On("GetLatestDAHeight", mock.Anything).Return(uint64(100), nil).Maybe() + // Create forced inclusion txs that are 50 and 60 bytes forcedTx1 := make([]byte, 50) forcedTx2 := make([]byte, 60) @@ -455,7 +458,7 @@ func TestSequencer_GetNextBatch_ForcedInclusionAndBatch_MaxBytes(t *testing.T) { func TestSequencer_GetNextBatch_ForcedInclusion_ExceedsMaxBytes(t *testing.T) { ctx := context.Background() - logger := zerolog.New(zerolog.NewConsoleWriter()) + logger := zerolog.New(zerolog.NewTestWriter(t)) db := ds.NewMapDatastore() defer db.Close() @@ -469,6 +472,9 @@ func TestSequencer_GetNextBatch_ForcedInclusion_ExceedsMaxBytes(t *testing.T) { mockDA.MockClient.On("GetForcedInclusionNamespace").Return(forcedInclusionNS).Maybe() mockDA.MockClient.On("HasForcedInclusionNamespace").Return(true).Maybe() + // DA head is at 100 — same as sequencer start, no catch-up needed + mockDA.MockClient.On("GetLatestDAHeight", mock.Anything).Return(uint64(100), nil).Maybe() + // Create forced inclusion txs where combined they exceed maxBytes forcedTx1 := make([]byte, 100) forcedTx2 := make([]byte, 80) // This would be deferred @@ -535,7 +541,7 @@ func TestSequencer_GetNextBatch_ForcedInclusion_ExceedsMaxBytes(t *testing.T) { func TestSequencer_GetNextBatch_AlwaysCheckPendingForcedInclusion(t *testing.T) { ctx := context.Background() - logger := zerolog.New(zerolog.NewConsoleWriter()) + logger := zerolog.New(zerolog.NewTestWriter(t)) db := ds.NewMapDatastore() defer db.Close() @@ -549,6 +555,9 @@ func TestSequencer_GetNextBatch_AlwaysCheckPendingForcedInclusion(t *testing.T) mockDA.MockClient.On("GetForcedInclusionNamespace").Return(forcedInclusionNS).Maybe() mockDA.MockClient.On("HasForcedInclusionNamespace").Return(true).Maybe() + // DA head is at 100 — same as sequencer start, no catch-up needed + mockDA.MockClient.On("GetLatestDAHeight", mock.Anything).Return(uint64(100), nil).Maybe() + // First call returns large forced txs largeForcedTx1, largeForcedTx2 := make([]byte, 75), make([]byte, 75) mockDA.MockClient.On("Retrieve", mock.Anything, uint64(100), forcedInclusionNS).Return(datypes.ResultRetrieve{ @@ -873,7 +882,7 @@ func TestSequencer_DAFailureAndQueueThrottling_Integration(t *testing.T) { func TestSequencer_CheckpointPersistence_CrashRecovery(t *testing.T) { ctx := context.Background() - logger := zerolog.New(zerolog.NewConsoleWriter()) + logger := zerolog.New(zerolog.NewTestWriter(t)) db := ds.NewMapDatastore() defer db.Close() @@ -887,6 +896,10 @@ func TestSequencer_CheckpointPersistence_CrashRecovery(t *testing.T) { mockDA.MockClient.On("GetForcedInclusionNamespace").Return(forcedInclusionNS).Maybe() mockDA.MockClient.On("HasForcedInclusionNamespace").Return(true).Maybe() + // DA head is at 101 — close to sequencer start (100), no catch-up needed. + // Use Maybe() since two sequencer instances share this mock. + mockDA.MockClient.On("GetLatestDAHeight", mock.Anything).Return(uint64(101), nil).Maybe() + // Create forced inclusion txs at DA height 100 // Use sizes that all fit in one batch to test checkpoint advancing forcedTx1 := make([]byte, 50) @@ -986,6 +999,9 @@ func TestSequencer_GetNextBatch_EmptyDABatch_IncreasesDAHeight(t *testing.T) { mockDA.MockClient.On("GetForcedInclusionNamespace").Return(forcedInclusionNS).Maybe() mockDA.MockClient.On("HasForcedInclusionNamespace").Return(true).Maybe() + // DA head is at 100 — same as sequencer start, no catch-up needed + mockDA.MockClient.On("GetLatestDAHeight", mock.Anything).Return(uint64(100), nil).Maybe() + // First DA epoch returns empty transactions mockDA.MockClient.On("Retrieve", mock.Anything, uint64(100), forcedInclusionNS).Return(datypes.ResultRetrieve{ BaseResult: datypes.BaseResult{Code: datypes.StatusSuccess}, @@ -1224,6 +1240,881 @@ func TestSequencer_GetNextBatch_GasFilterError(t *testing.T) { // preserves any transactions that weren't even processed yet due to maxBytes limits. // // This test uses maxBytes to limit how many txs are fetched, triggering the unprocessed txs scenario. +func TestSequencer_CatchUp_DetectsOldEpoch(t *testing.T) { + ctx := context.Background() + logger := zerolog.New(zerolog.NewTestWriter(t)) + + db := ds.NewMapDatastore() + defer db.Close() + + mockDA := newMockFullDAClient(t) + forcedInclusionNS := []byte("forced-inclusion") + + mockDA.MockClient.On("GetHeaderNamespace").Return([]byte("header")).Maybe() + mockDA.MockClient.On("GetDataNamespace").Return([]byte("data")).Maybe() + mockDA.MockClient.On("GetForcedInclusionNamespace").Return(forcedInclusionNS).Maybe() + mockDA.MockClient.On("HasForcedInclusionNamespace").Return(true).Maybe() + + // DA head is at height 105 — sequencer starts at 100 with epoch size 1, + // so it has missed 5 epochs (>1), triggering catch-up. + mockDA.MockClient.On("GetLatestDAHeight", mock.Anything).Return(uint64(105), nil).Once() + + // DA epoch at height 100 + oldTimestamp := time.Now().Add(-10 * time.Minute) + mockDA.MockClient.On("Retrieve", mock.Anything, uint64(100), forcedInclusionNS).Return(datypes.ResultRetrieve{ + BaseResult: datypes.BaseResult{Code: datypes.StatusSuccess, Timestamp: oldTimestamp}, + Data: [][]byte{[]byte("forced-tx-1")}, + }).Once() + + gen := genesis.Genesis{ + ChainID: "test-chain", + DAStartHeight: 100, + DAEpochForcedInclusion: 1, + } + + seq, err := NewSequencer( + logger, + db, + mockDA, + config.DefaultConfig(), + []byte("test-chain"), + 1000, + gen, + createDefaultMockExecutor(t), + ) + require.NoError(t, err) + + // Submit a mempool transaction + _, err = seq.SubmitBatchTxs(ctx, coresequencer.SubmitBatchTxsRequest{ + Id: []byte("test-chain"), + Batch: &coresequencer.Batch{Transactions: [][]byte{[]byte("mempool-tx-1")}}, + }) + require.NoError(t, err) + + assert.False(t, seq.isCatchingUp(), "should not be catching up initially") + + // First GetNextBatch — DA head is far ahead, should enter catch-up + req := coresequencer.GetNextBatchRequest{ + Id: []byte("test-chain"), + MaxBytes: 1000000, + LastBatchData: nil, + } + resp, err := seq.GetNextBatch(ctx, req) + require.NoError(t, err) + require.NotNil(t, resp.Batch) + + assert.True(t, seq.isCatchingUp(), "should be catching up after detecting epoch gap") + + // During catch-up, batch should contain only forced inclusion tx, no mempool tx + assert.Equal(t, 1, len(resp.Batch.Transactions), "should have only forced inclusion tx during catch-up") + assert.Equal(t, []byte("forced-tx-1"), resp.Batch.Transactions[0]) +} + +func TestSequencer_CatchUp_SkipsMempoolDuringCatchUp(t *testing.T) { + ctx := context.Background() + logger := zerolog.New(zerolog.NewTestWriter(t)) + + db := ds.NewMapDatastore() + defer db.Close() + + mockDA := newMockFullDAClient(t) + forcedInclusionNS := []byte("forced-inclusion") + + mockDA.MockClient.On("GetHeaderNamespace").Return([]byte("header")).Maybe() + mockDA.MockClient.On("GetDataNamespace").Return([]byte("data")).Maybe() + mockDA.MockClient.On("GetForcedInclusionNamespace").Return(forcedInclusionNS).Maybe() + mockDA.MockClient.On("HasForcedInclusionNamespace").Return(true).Maybe() + + // DA head is at 105 — sequencer starts at 100 with epoch size 1, + // so it has missed multiple epochs, triggering catch-up. + // Called once on first fetchNextDAEpoch; subsequent fetches skip the check. + mockDA.MockClient.On("GetLatestDAHeight", mock.Anything).Return(uint64(105), nil).Once() + + // Epoch at height 100: two forced txs + oldTimestamp := time.Now().Add(-5 * time.Minute) + mockDA.MockClient.On("Retrieve", mock.Anything, uint64(100), forcedInclusionNS).Return(datypes.ResultRetrieve{ + BaseResult: datypes.BaseResult{Code: datypes.StatusSuccess, Timestamp: oldTimestamp}, + Data: [][]byte{[]byte("forced-1"), []byte("forced-2")}, + }).Once() + + // Epoch at height 101: one forced tx + oldTimestamp2 := time.Now().Add(-4 * time.Minute) + mockDA.MockClient.On("Retrieve", mock.Anything, uint64(101), forcedInclusionNS).Return(datypes.ResultRetrieve{ + BaseResult: datypes.BaseResult{Code: datypes.StatusSuccess, Timestamp: oldTimestamp2}, + Data: [][]byte{[]byte("forced-3")}, + }).Once() + + // Epoch at height 102: from the future (head reached during replay) + mockDA.MockClient.On("Retrieve", mock.Anything, uint64(102), forcedInclusionNS).Return(datypes.ResultRetrieve{ + BaseResult: datypes.BaseResult{Code: datypes.StatusHeightFromFuture}, + }).Maybe() + + gen := genesis.Genesis{ + ChainID: "test-chain", + DAStartHeight: 100, + DAEpochForcedInclusion: 1, + } + + seq, err := NewSequencer( + logger, + db, + mockDA, + config.DefaultConfig(), + []byte("test-chain"), + 1000, + gen, + createDefaultMockExecutor(t), + ) + require.NoError(t, err) + + // Submit several mempool transactions + for i := 0; i < 5; i++ { + _, err = seq.SubmitBatchTxs(ctx, coresequencer.SubmitBatchTxsRequest{ + Id: []byte("test-chain"), + Batch: &coresequencer.Batch{Transactions: [][]byte{[]byte("mempool-tx")}}, + }) + require.NoError(t, err) + } + + req := coresequencer.GetNextBatchRequest{ + Id: []byte("test-chain"), + MaxBytes: 1000000, + LastBatchData: nil, + } + + // First batch (epoch 100): only forced txs + resp1, err := seq.GetNextBatch(ctx, req) + require.NoError(t, err) + assert.True(t, seq.isCatchingUp()) + + for _, tx := range resp1.Batch.Transactions { + assert.NotEqual(t, []byte("mempool-tx"), tx, "mempool tx should not appear during catch-up") + } + assert.Equal(t, 2, len(resp1.Batch.Transactions), "should have 2 forced txs from epoch 100") + + // Second batch (epoch 101): only forced txs + resp2, err := seq.GetNextBatch(ctx, req) + require.NoError(t, err) + assert.True(t, seq.isCatchingUp()) + + for _, tx := range resp2.Batch.Transactions { + assert.NotEqual(t, []byte("mempool-tx"), tx, "mempool tx should not appear during catch-up") + } + assert.Equal(t, 1, len(resp2.Batch.Transactions), "should have 1 forced tx from epoch 101") +} + +func TestSequencer_CatchUp_UsesDATimestamp(t *testing.T) { + ctx := context.Background() + + db := ds.NewMapDatastore() + defer db.Close() + + mockDA := newMockFullDAClient(t) + forcedInclusionNS := []byte("forced-inclusion") + + mockDA.MockClient.On("GetHeaderNamespace").Return([]byte("header")).Maybe() + mockDA.MockClient.On("GetDataNamespace").Return([]byte("data")).Maybe() + mockDA.MockClient.On("GetForcedInclusionNamespace").Return(forcedInclusionNS).Maybe() + mockDA.MockClient.On("HasForcedInclusionNamespace").Return(true).Maybe() + + // DA head is at 105 — multiple epochs ahead, triggers catch-up + mockDA.MockClient.On("GetLatestDAHeight", mock.Anything).Return(uint64(105), nil).Once() + + // Epoch at height 100: timestamp 5 minutes ago + epochTimestamp := time.Now().Add(-5 * time.Minute).UTC() + mockDA.MockClient.On("Retrieve", mock.Anything, uint64(100), forcedInclusionNS).Return(datypes.ResultRetrieve{ + BaseResult: datypes.BaseResult{Code: datypes.StatusSuccess, Timestamp: epochTimestamp}, + Data: [][]byte{[]byte("forced-tx")}, + }).Once() + + gen := genesis.Genesis{ + ChainID: "test-chain", + DAStartHeight: 100, + DAEpochForcedInclusion: 1, + } + + seq, err := NewSequencer( + zerolog.Nop(), + db, + mockDA, + config.DefaultConfig(), + []byte("test-chain"), + 1000, + gen, + createDefaultMockExecutor(t), + ) + require.NoError(t, err) + + req := coresequencer.GetNextBatchRequest{ + Id: []byte("test-chain"), + MaxBytes: 1000000, + LastBatchData: nil, + } + + resp, err := seq.GetNextBatch(ctx, req) + require.NoError(t, err) + require.NotNil(t, resp) + assert.True(t, seq.isCatchingUp(), "should be in catch-up mode") + + // During catch-up, the timestamp should be the DA epoch end time, not time.Now() + assert.Equal(t, epochTimestamp, resp.Timestamp, + "catch-up batch timestamp should match DA epoch timestamp") +} + +func TestSequencer_CatchUp_ExitsCatchUpAtDAHead(t *testing.T) { + ctx := context.Background() + logger := zerolog.New(zerolog.NewTestWriter(t)) + + db := ds.NewMapDatastore() + defer db.Close() + + mockDA := newMockFullDAClient(t) + forcedInclusionNS := []byte("forced-inclusion") + + mockDA.MockClient.On("GetHeaderNamespace").Return([]byte("header")).Maybe() + mockDA.MockClient.On("GetDataNamespace").Return([]byte("data")).Maybe() + mockDA.MockClient.On("GetForcedInclusionNamespace").Return(forcedInclusionNS).Maybe() + mockDA.MockClient.On("HasForcedInclusionNamespace").Return(true).Maybe() + + // DA head is at 105 — multiple epochs ahead, triggers catch-up + mockDA.MockClient.On("GetLatestDAHeight", mock.Anything).Return(uint64(105), nil).Once() + + // Epoch 100: old (catch-up) + mockDA.MockClient.On("Retrieve", mock.Anything, uint64(100), forcedInclusionNS).Return(datypes.ResultRetrieve{ + BaseResult: datypes.BaseResult{Code: datypes.StatusSuccess, Timestamp: time.Now().Add(-5 * time.Minute)}, + Data: [][]byte{[]byte("forced-old")}, + }).Once() + + // Epoch 101: fetched during catch-up, but returns HeightFromFuture to exit catch-up + mockDA.MockClient.On("Retrieve", mock.Anything, uint64(101), forcedInclusionNS).Return(datypes.ResultRetrieve{ + BaseResult: datypes.BaseResult{Code: datypes.StatusHeightFromFuture}, + }).Once() + + gen := genesis.Genesis{ + ChainID: "test-chain", + DAStartHeight: 100, + DAEpochForcedInclusion: 1, + } + + seq, err := NewSequencer( + logger, + db, + mockDA, + config.DefaultConfig(), + []byte("test-chain"), + 1000, + gen, + createDefaultMockExecutor(t), + ) + require.NoError(t, err) + + // Submit mempool tx + _, err = seq.SubmitBatchTxs(ctx, coresequencer.SubmitBatchTxsRequest{ + Id: []byte("test-chain"), + Batch: &coresequencer.Batch{Transactions: [][]byte{[]byte("mempool-tx")}}, + }) + require.NoError(t, err) + + req := coresequencer.GetNextBatchRequest{ + Id: []byte("test-chain"), + MaxBytes: 1000000, + LastBatchData: nil, + } + + // First batch: catch-up (old epoch 100) + resp1, err := seq.GetNextBatch(ctx, req) + require.NoError(t, err) + assert.True(t, seq.isCatchingUp(), "should be catching up during old epoch") + assert.Equal(t, 1, len(resp1.Batch.Transactions), "catch-up: only forced tx") + assert.Equal(t, []byte("forced-old"), resp1.Batch.Transactions[0]) + + // Second batch: epoch 101 returns HeightFromFuture — should exit catch-up + resp2, err := seq.GetNextBatch(ctx, req) + require.NoError(t, err) + assert.False(t, seq.isCatchingUp(), "should have exited catch-up after reaching DA head") + + // Should include mempool tx now (no forced txs available) + hasMempoolTx := false + for _, tx := range resp2.Batch.Transactions { + if bytes.Equal(tx, []byte("mempool-tx")) { + hasMempoolTx = true + } + } + assert.True(t, hasMempoolTx, "should contain mempool tx after exiting catch-up") +} + +func TestSequencer_CatchUp_HeightFromFutureExitsCatchUp(t *testing.T) { + ctx := context.Background() + + db := ds.NewMapDatastore() + defer db.Close() + + mockDA := newMockFullDAClient(t) + forcedInclusionNS := []byte("forced-inclusion") + + mockDA.MockClient.On("GetHeaderNamespace").Return([]byte("header")).Maybe() + mockDA.MockClient.On("GetDataNamespace").Return([]byte("data")).Maybe() + mockDA.MockClient.On("GetForcedInclusionNamespace").Return(forcedInclusionNS).Maybe() + mockDA.MockClient.On("HasForcedInclusionNamespace").Return(true).Maybe() + + // DA head is at 105 — multiple epochs ahead, triggers catch-up + mockDA.MockClient.On("GetLatestDAHeight", mock.Anything).Return(uint64(105), nil).Once() + + // Epoch 100: success, fetched during catch-up + mockDA.MockClient.On("Retrieve", mock.Anything, uint64(100), forcedInclusionNS).Return(datypes.ResultRetrieve{ + BaseResult: datypes.BaseResult{Code: datypes.StatusSuccess, Timestamp: time.Now().Add(-5 * time.Minute)}, + Data: [][]byte{[]byte("forced-tx")}, + }).Once() + + // Epoch 101: from the future — DA head reached, exits catch-up + mockDA.MockClient.On("Retrieve", mock.Anything, uint64(101), forcedInclusionNS).Return(datypes.ResultRetrieve{ + BaseResult: datypes.BaseResult{Code: datypes.StatusHeightFromFuture}, + }).Once() + + gen := genesis.Genesis{ + ChainID: "test-chain", + DAStartHeight: 100, + DAEpochForcedInclusion: 1, + } + + seq, err := NewSequencer( + zerolog.Nop(), + db, + mockDA, + config.DefaultConfig(), + []byte("test-chain"), + 1000, + gen, + createDefaultMockExecutor(t), + ) + require.NoError(t, err) + + req := coresequencer.GetNextBatchRequest{ + Id: []byte("test-chain"), + MaxBytes: 1000000, + LastBatchData: nil, + } + + // First call: fetches epoch 100, enters catch-up via epoch gap detection + resp1, err := seq.GetNextBatch(ctx, req) + require.NoError(t, err) + assert.True(t, seq.isCatchingUp()) + assert.Equal(t, 1, len(resp1.Batch.Transactions)) + + // Second call: epoch 101 is from the future, should exit catch-up + resp2, err := seq.GetNextBatch(ctx, req) + require.NoError(t, err) + assert.False(t, seq.isCatchingUp(), "should exit catch-up when DA returns HeightFromFuture") + // No forced txs available, batch is empty + assert.Equal(t, 0, len(resp2.Batch.Transactions)) +} + +func TestSequencer_CatchUp_NoCatchUpWhenRecentEpoch(t *testing.T) { + ctx := context.Background() + + db := ds.NewMapDatastore() + defer db.Close() + + mockDA := newMockFullDAClient(t) + forcedInclusionNS := []byte("forced-inclusion") + + mockDA.MockClient.On("GetHeaderNamespace").Return([]byte("header")).Maybe() + mockDA.MockClient.On("GetDataNamespace").Return([]byte("data")).Maybe() + mockDA.MockClient.On("GetForcedInclusionNamespace").Return(forcedInclusionNS).Maybe() + mockDA.MockClient.On("HasForcedInclusionNamespace").Return(true).Maybe() + + // DA head is at 100 — sequencer starts at 100 with epoch size 1, + // so it is within the same epoch (0 missed). No catch-up. + mockDA.MockClient.On("GetLatestDAHeight", mock.Anything).Return(uint64(100), nil).Once() + + // Epoch at height 100: current epoch + mockDA.MockClient.On("Retrieve", mock.Anything, uint64(100), forcedInclusionNS).Return(datypes.ResultRetrieve{ + BaseResult: datypes.BaseResult{Code: datypes.StatusSuccess, Timestamp: time.Now()}, + Data: [][]byte{[]byte("forced-tx")}, + }).Once() + + gen := genesis.Genesis{ + ChainID: "test-chain", + DAStartHeight: 100, + DAEpochForcedInclusion: 1, + } + + seq, err := NewSequencer( + zerolog.Nop(), + db, + mockDA, + config.DefaultConfig(), + []byte("test-chain"), + 1000, + gen, + createDefaultMockExecutor(t), + ) + require.NoError(t, err) + + // Submit a mempool tx + _, err = seq.SubmitBatchTxs(ctx, coresequencer.SubmitBatchTxsRequest{ + Id: []byte("test-chain"), + Batch: &coresequencer.Batch{Transactions: [][]byte{[]byte("mempool-tx")}}, + }) + require.NoError(t, err) + + req := coresequencer.GetNextBatchRequest{ + Id: []byte("test-chain"), + MaxBytes: 1000000, + LastBatchData: nil, + } + + resp, err := seq.GetNextBatch(ctx, req) + require.NoError(t, err) + assert.False(t, seq.isCatchingUp(), "should NOT be catching up when within one epoch of DA head") + + // Should have both forced and mempool txs (normal operation) + assert.Equal(t, 2, len(resp.Batch.Transactions), "should have forced + mempool tx in normal mode") +} + +func TestSequencer_CatchUp_MultiEpochReplay(t *testing.T) { + // Simulates a sequencer that missed 3 DA epochs and must replay them all + // before resuming normal operation. + ctx := context.Background() + logger := zerolog.New(zerolog.NewTestWriter(t)) + + db := ds.NewMapDatastore() + defer db.Close() + + mockDA := newMockFullDAClient(t) + forcedInclusionNS := []byte("forced-inclusion") + + mockDA.MockClient.On("GetHeaderNamespace").Return([]byte("header")).Maybe() + mockDA.MockClient.On("GetDataNamespace").Return([]byte("data")).Maybe() + mockDA.MockClient.On("GetForcedInclusionNamespace").Return(forcedInclusionNS).Maybe() + mockDA.MockClient.On("HasForcedInclusionNamespace").Return(true).Maybe() + + // DA head is at 106 — sequencer starts at 100 with epoch size 1, + // so it has missed 6 epochs (>1), triggering catch-up. + // Called once on first fetchNextDAEpoch. + mockDA.MockClient.On("GetLatestDAHeight", mock.Anything).Return(uint64(106), nil).Once() + + // 3 old epochs (100, 101, 102) — all with timestamps far in the past + for h := uint64(100); h <= 102; h++ { + ts := time.Now().Add(-time.Duration(103-h) * time.Minute) // older epochs further in the past + txData := []byte("forced-from-epoch-" + string(rune('0'+h-100))) + mockDA.MockClient.On("Retrieve", mock.Anything, h, forcedInclusionNS).Return(datypes.ResultRetrieve{ + BaseResult: datypes.BaseResult{Code: datypes.StatusSuccess, Timestamp: ts}, + Data: [][]byte{txData}, + }).Once() + } + + // Epoch 103: returns HeightFromFuture — DA head reached, exits catch-up + mockDA.MockClient.On("Retrieve", mock.Anything, uint64(103), forcedInclusionNS).Return(datypes.ResultRetrieve{ + BaseResult: datypes.BaseResult{Code: datypes.StatusHeightFromFuture}, + }).Once() + + gen := genesis.Genesis{ + ChainID: "test-chain", + DAStartHeight: 100, + DAEpochForcedInclusion: 1, + } + + seq, err := NewSequencer( + logger, + db, + mockDA, + config.DefaultConfig(), + []byte("test-chain"), + 1000, + gen, + createDefaultMockExecutor(t), + ) + require.NoError(t, err) + + // Submit mempool txs + _, err = seq.SubmitBatchTxs(ctx, coresequencer.SubmitBatchTxsRequest{ + Id: []byte("test-chain"), + Batch: &coresequencer.Batch{Transactions: [][]byte{[]byte("mempool-1"), []byte("mempool-2")}}, + }) + require.NoError(t, err) + + req := coresequencer.GetNextBatchRequest{ + Id: []byte("test-chain"), + MaxBytes: 1000000, + LastBatchData: nil, + } + + // Process the 3 old epochs — all should be catch-up (no mempool) + for i := 0; i < 3; i++ { + resp, err := seq.GetNextBatch(ctx, req) + require.NoError(t, err) + assert.True(t, seq.isCatchingUp(), "should be catching up during epoch %d", 100+i) + assert.Equal(t, 1, len(resp.Batch.Transactions), + "epoch %d: should have exactly 1 forced tx", 100+i) + + for _, tx := range resp.Batch.Transactions { + assert.NotEqual(t, []byte("mempool-1"), tx, "no mempool during catch-up epoch %d", 100+i) + assert.NotEqual(t, []byte("mempool-2"), tx, "no mempool during catch-up epoch %d", 100+i) + } + } + + // DA height should have advanced through the 3 old epochs + assert.Equal(t, uint64(103), seq.GetDAHeight(), "DA height should be at 103 after replaying 3 epochs") + + // Next batch: epoch 103 returns HeightFromFuture — should exit catch-up and include mempool + resp4, err := seq.GetNextBatch(ctx, req) + require.NoError(t, err) + assert.False(t, seq.isCatchingUp(), "should have exited catch-up at DA head") + + hasMempoolTx := false + for _, tx := range resp4.Batch.Transactions { + if bytes.Equal(tx, []byte("mempool-1")) || bytes.Equal(tx, []byte("mempool-2")) { + hasMempoolTx = true + } + } + assert.True(t, hasMempoolTx, "should include mempool txs after exiting catch-up") +} + +func TestSequencer_CatchUp_NoForcedInclusionConfigured(t *testing.T) { + // When forced inclusion is not configured, catch-up should never activate. + // GetLatestDAHeight should NOT be called because DAEpochForcedInclusion == 0 + // causes updateCatchUpState to bail out early. + ctx := context.Background() + + db := ds.NewMapDatastore() + defer db.Close() + + mockDA := newMockFullDAClient(t) + // No forced inclusion namespace configured + mockDA.MockClient.On("GetHeaderNamespace").Return([]byte("header")).Maybe() + mockDA.MockClient.On("GetDataNamespace").Return([]byte("data")).Maybe() + mockDA.MockClient.On("GetForcedInclusionNamespace").Return([]byte(nil)).Maybe() + mockDA.MockClient.On("HasForcedInclusionNamespace").Return(false).Maybe() + + gen := genesis.Genesis{ + ChainID: "test-chain", + DAStartHeight: 100, + DAEpochForcedInclusion: 0, // no epoch-based forced inclusion + } + + seq, err := NewSequencer( + zerolog.Nop(), + db, + mockDA, + config.DefaultConfig(), + []byte("test-chain"), + 1000, + gen, + createDefaultMockExecutor(t), + ) + require.NoError(t, err) + + // Submit mempool tx + _, err = seq.SubmitBatchTxs(ctx, coresequencer.SubmitBatchTxsRequest{ + Id: []byte("test-chain"), + Batch: &coresequencer.Batch{Transactions: [][]byte{[]byte("mempool-tx")}}, + }) + require.NoError(t, err) + + req := coresequencer.GetNextBatchRequest{ + Id: []byte("test-chain"), + MaxBytes: 1000000, + LastBatchData: nil, + } + + resp, err := seq.GetNextBatch(ctx, req) + require.NoError(t, err) + assert.False(t, seq.isCatchingUp(), "should never catch up when forced inclusion not configured") + assert.Equal(t, 1, len(resp.Batch.Transactions)) + assert.Equal(t, []byte("mempool-tx"), resp.Batch.Transactions[0]) +} + +func TestSequencer_CatchUp_CheckpointAdvancesDuringCatchUp(t *testing.T) { + // Verify that the checkpoint (DA epoch tracking) advances correctly during catch-up. + ctx := context.Background() + + db := ds.NewMapDatastore() + defer db.Close() + + mockDA := newMockFullDAClient(t) + forcedInclusionNS := []byte("forced-inclusion") + + mockDA.MockClient.On("GetHeaderNamespace").Return([]byte("header")).Maybe() + mockDA.MockClient.On("GetDataNamespace").Return([]byte("data")).Maybe() + mockDA.MockClient.On("GetForcedInclusionNamespace").Return(forcedInclusionNS).Maybe() + mockDA.MockClient.On("HasForcedInclusionNamespace").Return(true).Maybe() + + // DA head is at 105 — multiple epochs ahead, triggers catch-up + mockDA.MockClient.On("GetLatestDAHeight", mock.Anything).Return(uint64(105), nil).Once() + + // Epoch 100: old + mockDA.MockClient.On("Retrieve", mock.Anything, uint64(100), forcedInclusionNS).Return(datypes.ResultRetrieve{ + BaseResult: datypes.BaseResult{Code: datypes.StatusSuccess, Timestamp: time.Now().Add(-5 * time.Minute)}, + Data: [][]byte{[]byte("tx-a"), []byte("tx-b")}, + }).Once() + + // Epoch 101: old + mockDA.MockClient.On("Retrieve", mock.Anything, uint64(101), forcedInclusionNS).Return(datypes.ResultRetrieve{ + BaseResult: datypes.BaseResult{Code: datypes.StatusSuccess, Timestamp: time.Now().Add(-4 * time.Minute)}, + Data: [][]byte{[]byte("tx-c")}, + }).Once() + + // Epoch 102: from the future + mockDA.MockClient.On("Retrieve", mock.Anything, uint64(102), forcedInclusionNS).Return(datypes.ResultRetrieve{ + BaseResult: datypes.BaseResult{Code: datypes.StatusHeightFromFuture}, + }).Maybe() + + gen := genesis.Genesis{ + ChainID: "test-chain", + DAStartHeight: 100, + DAEpochForcedInclusion: 1, + } + + seq, err := NewSequencer( + zerolog.Nop(), + db, + mockDA, + config.DefaultConfig(), + []byte("test-chain"), + 1000, + gen, + createDefaultMockExecutor(t), + ) + require.NoError(t, err) + + // Initial checkpoint + assert.Equal(t, uint64(100), seq.checkpoint.DAHeight) + assert.Equal(t, uint64(0), seq.checkpoint.TxIndex) + + req := coresequencer.GetNextBatchRequest{ + Id: []byte("test-chain"), + MaxBytes: 1000000, + LastBatchData: nil, + } + + // Process epoch 100 + resp1, err := seq.GetNextBatch(ctx, req) + require.NoError(t, err) + assert.Equal(t, 2, len(resp1.Batch.Transactions)) + + // Checkpoint should advance to epoch 101 + assert.Equal(t, uint64(101), seq.checkpoint.DAHeight) + assert.Equal(t, uint64(0), seq.checkpoint.TxIndex) + assert.Equal(t, uint64(101), seq.GetDAHeight()) + + // Process epoch 101 + resp2, err := seq.GetNextBatch(ctx, req) + require.NoError(t, err) + assert.Equal(t, 1, len(resp2.Batch.Transactions)) + + // Checkpoint should advance to epoch 102 + assert.Equal(t, uint64(102), seq.checkpoint.DAHeight) + assert.Equal(t, uint64(0), seq.checkpoint.TxIndex) + assert.Equal(t, uint64(102), seq.GetDAHeight()) +} + +func TestSequencer_CatchUp_MonotonicTimestamps(t *testing.T) { + // When a single DA epoch has more forced txs than fit in one block, + // catch-up must produce strictly monotonic timestamps across the + // resulting blocks. This uses the same jitter scheme as the based + // sequencer: timestamp = DAEndTime - (remainingForcedTxs * 1ms). + ctx := context.Background() + logger := zerolog.New(zerolog.NewTestWriter(t)) + + db := ds.NewMapDatastore() + defer db.Close() + + mockDA := newMockFullDAClient(t) + forcedInclusionNS := []byte("forced-inclusion") + + mockDA.MockClient.On("GetHeaderNamespace").Return([]byte("header")).Maybe() + mockDA.MockClient.On("GetDataNamespace").Return([]byte("data")).Maybe() + mockDA.MockClient.On("GetForcedInclusionNamespace").Return(forcedInclusionNS).Maybe() + mockDA.MockClient.On("HasForcedInclusionNamespace").Return(true).Maybe() + + // DA head is far ahead — triggers catch-up + mockDA.MockClient.On("GetLatestDAHeight", mock.Anything).Return(uint64(110), nil).Once() + + // Epoch at height 100: 3 forced txs, each 100 bytes + epochTimestamp := time.Date(2025, 1, 1, 12, 0, 0, 0, time.UTC) + tx1 := make([]byte, 100) + tx2 := make([]byte, 100) + tx3 := make([]byte, 100) + copy(tx1, "forced-tx-1") + copy(tx2, "forced-tx-2") + copy(tx3, "forced-tx-3") + mockDA.MockClient.On("Retrieve", mock.Anything, uint64(100), forcedInclusionNS).Return(datypes.ResultRetrieve{ + BaseResult: datypes.BaseResult{Code: datypes.StatusSuccess, Timestamp: epochTimestamp}, + Data: [][]byte{tx1, tx2, tx3}, + }).Once() + + // Epoch at height 101: single tx (to verify cross-epoch monotonicity) + epoch2Timestamp := time.Date(2025, 1, 1, 12, 0, 10, 0, time.UTC) // 10 seconds later + mockDA.MockClient.On("Retrieve", mock.Anything, uint64(101), forcedInclusionNS).Return(datypes.ResultRetrieve{ + BaseResult: datypes.BaseResult{Code: datypes.StatusSuccess, Timestamp: epoch2Timestamp}, + Data: [][]byte{[]byte("forced-tx-4")}, + }).Once() + + // Epoch 102: future — exits catch-up + mockDA.MockClient.On("Retrieve", mock.Anything, uint64(102), forcedInclusionNS).Return(datypes.ResultRetrieve{ + BaseResult: datypes.BaseResult{Code: datypes.StatusHeightFromFuture}, + }).Maybe() + + gen := genesis.Genesis{ + ChainID: "test-chain", + DAStartHeight: 100, + DAEpochForcedInclusion: 1, + } + + // Custom executor: only 1 tx fits per block (gas-limited) + mockExec := mocks.NewMockExecutor(t) + mockExec.On("GetExecutionInfo", mock.Anything).Return(execution.ExecutionInfo{MaxGas: 1000000}, nil).Maybe() + mockExec.On("FilterTxs", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return( + func(ctx context.Context, txs [][]byte, maxBytes, maxGas uint64, hasForceIncludedTransaction bool) []execution.FilterStatus { + result := make([]execution.FilterStatus, len(txs)) + // Only first tx fits, rest are postponed + for i := range result { + if i == 0 { + result[i] = execution.FilterOK + } else { + result[i] = execution.FilterPostpone + } + } + return result + }, + nil, + ).Maybe() + + seq, err := NewSequencer( + logger, + db, + mockDA, + config.DefaultConfig(), + []byte("test-chain"), + 1000, + gen, + mockExec, + ) + require.NoError(t, err) + + req := coresequencer.GetNextBatchRequest{ + Id: []byte("test-chain"), + MaxBytes: 1000000, + LastBatchData: nil, + } + + // Produce 3 blocks from epoch 100 (1 tx each due to gas filter) + var timestamps []time.Time + for i := 0; i < 3; i++ { + resp, err := seq.GetNextBatch(ctx, req) + require.NoError(t, err) + assert.True(t, seq.isCatchingUp(), "should be catching up during block %d", i) + assert.Equal(t, 1, len(resp.Batch.Transactions), "block %d: exactly 1 forced tx", i) + timestamps = append(timestamps, resp.Timestamp) + } + + // All 3 timestamps must be strictly monotonically increasing + for i := 1; i < len(timestamps); i++ { + assert.True(t, timestamps[i].After(timestamps[i-1]), + "timestamp[%d] (%v) must be strictly after timestamp[%d] (%v)", + i, timestamps[i], i-1, timestamps[i-1]) + } + + // Verify exact jitter values: + // Block 0: 3 txs total, 1 consumed → 2 remaining → T - 2ms + // Block 1: 1 consumed → 1 remaining → T - 1ms + // Block 2: 1 consumed → 0 remaining → T + assert.Equal(t, epochTimestamp.Add(-2*time.Millisecond), timestamps[0], "block 0: T - 2ms") + assert.Equal(t, epochTimestamp.Add(-1*time.Millisecond), timestamps[1], "block 1: T - 1ms") + assert.Equal(t, epochTimestamp, timestamps[2], "block 2: T (exact epoch end time)") + + // Block from epoch 101 should also be monotonically after epoch 100's last block + resp4, err := seq.GetNextBatch(ctx, req) + require.NoError(t, err) + assert.True(t, seq.isCatchingUp(), "should still be catching up") + assert.Equal(t, 1, len(resp4.Batch.Transactions)) + assert.True(t, resp4.Timestamp.After(timestamps[2]), + "epoch 101 timestamp (%v) must be after epoch 100 last timestamp (%v)", + resp4.Timestamp, timestamps[2]) + assert.Equal(t, epoch2Timestamp, resp4.Timestamp, "single-tx epoch gets exact DA end time") +} + +func TestSequencer_CatchUp_MonotonicTimestamps_EmptyEpoch(t *testing.T) { + // Verify that an empty DA epoch (no forced txs) still advances the + // checkpoint and updates currentDAEndTime so subsequent epochs get + // correct timestamps. + ctx := context.Background() + + db := ds.NewMapDatastore() + defer db.Close() + + mockDA := newMockFullDAClient(t) + forcedInclusionNS := []byte("forced-inclusion") + + mockDA.MockClient.On("GetHeaderNamespace").Return([]byte("header")).Maybe() + mockDA.MockClient.On("GetDataNamespace").Return([]byte("data")).Maybe() + mockDA.MockClient.On("GetForcedInclusionNamespace").Return(forcedInclusionNS).Maybe() + mockDA.MockClient.On("HasForcedInclusionNamespace").Return(true).Maybe() + + mockDA.MockClient.On("GetLatestDAHeight", mock.Anything).Return(uint64(110), nil).Once() + + // Epoch 100: empty (no forced txs) but valid timestamp + emptyEpochTimestamp := time.Date(2025, 1, 1, 12, 0, 0, 0, time.UTC) + mockDA.MockClient.On("Retrieve", mock.Anything, uint64(100), forcedInclusionNS).Return(datypes.ResultRetrieve{ + BaseResult: datypes.BaseResult{Code: datypes.StatusSuccess, Timestamp: emptyEpochTimestamp}, + Data: [][]byte{}, + }).Once() + + // Epoch 101: has a forced tx with a later timestamp + epoch2Timestamp := time.Date(2025, 1, 1, 12, 0, 15, 0, time.UTC) + mockDA.MockClient.On("Retrieve", mock.Anything, uint64(101), forcedInclusionNS).Return(datypes.ResultRetrieve{ + BaseResult: datypes.BaseResult{Code: datypes.StatusSuccess, Timestamp: epoch2Timestamp}, + Data: [][]byte{[]byte("forced-tx-after-empty")}, + }).Once() + + // Epoch 102: future + mockDA.MockClient.On("Retrieve", mock.Anything, uint64(102), forcedInclusionNS).Return(datypes.ResultRetrieve{ + BaseResult: datypes.BaseResult{Code: datypes.StatusHeightFromFuture}, + }).Maybe() + + gen := genesis.Genesis{ + ChainID: "test-chain", + DAStartHeight: 100, + DAEpochForcedInclusion: 1, + } + + seq, err := NewSequencer( + zerolog.Nop(), + db, + mockDA, + config.DefaultConfig(), + []byte("test-chain"), + 1000, + gen, + createDefaultMockExecutor(t), + ) + require.NoError(t, err) + + req := coresequencer.GetNextBatchRequest{ + Id: []byte("test-chain"), + MaxBytes: 1000000, + LastBatchData: nil, + } + + // First call processes the empty epoch 100 — empty batch, but checkpoint advances + resp1, err := seq.GetNextBatch(ctx, req) + require.NoError(t, err) + assert.True(t, seq.isCatchingUp()) + assert.Equal(t, 0, len(resp1.Batch.Transactions), "empty epoch should produce empty batch") + assert.Equal(t, emptyEpochTimestamp, resp1.Timestamp, + "empty epoch batch should use epoch DA end time (0 remaining)") + + // Second call processes epoch 101 — should have later timestamp + resp2, err := seq.GetNextBatch(ctx, req) + require.NoError(t, err) + assert.True(t, seq.isCatchingUp()) + assert.Equal(t, 1, len(resp2.Batch.Transactions)) + assert.True(t, resp2.Timestamp.After(resp1.Timestamp), + "epoch 101 timestamp (%v) must be after empty epoch 100 timestamp (%v)", + resp2.Timestamp, resp1.Timestamp) +} + func TestSequencer_GetNextBatch_GasFilteringPreservesUnprocessedTxs(t *testing.T) { db := ds.NewMapDatastore() logger := zerolog.New(zerolog.NewTestWriter(t)) diff --git a/pkg/store/cached_store.go b/pkg/store/cached_store.go index 86f81129da..8a5cca5b38 100644 --- a/pkg/store/cached_store.go +++ b/pkg/store/cached_store.go @@ -169,12 +169,6 @@ func (cs *CachedStore) PruneBlocks(ctx context.Context, height uint64) error { return nil } -// DeleteStateAtHeight removes the state entry at the given height from the underlying store. -func (cs *CachedStore) DeleteStateAtHeight(ctx context.Context, height uint64) error { - // This value is not cached, so nothing to invalidate. - return cs.Store.DeleteStateAtHeight(ctx, height) -} - // Close closes the underlying store. func (cs *CachedStore) Close() error { cs.ClearCache() diff --git a/test/e2e/evm_force_inclusion_e2e_test.go b/test/e2e/evm_force_inclusion_e2e_test.go index 261944a9ed..9bb5606cec 100644 --- a/test/e2e/evm_force_inclusion_e2e_test.go +++ b/test/e2e/evm_force_inclusion_e2e_test.go @@ -10,6 +10,7 @@ import ( "net/http" "os" "path/filepath" + "syscall" "testing" "time" @@ -356,10 +357,16 @@ func setupFullNodeWithForceInclusionCheck(t *testing.T, sut *SystemUnderTest, fu "--evnode.da.forced_inclusion_namespace", "forced-inc", // Enables forced inclusion verification "--evnode.rpc.address", endpoints.GetFullNodeRPCListen(), "--evnode.p2p.listen_address", endpoints.GetFullNodeP2PAddress(), - "--evnode.p2p.peers", sequencerP2PAddr, "--evm.engine-url", endpoints.GetFullNodeEngineURL(), "--evm.eth-url", endpoints.GetFullNodeEthURL(), } + // Only add P2P peers if a peer address is provided (disabled for malicious sequencer test) + if sequencerP2PAddr != "" { + fnArgs = append(fnArgs, "--evnode.p2p.peers", sequencerP2PAddr) + } else { + // Disable P2P entirely when no peers provided (for malicious sequencer test) + fnArgs = append(fnArgs, "--evnode.p2p.disabled", "true") + } sut.ExecCmd(evmSingleBinaryPath, fnArgs...) sut.AwaitNodeLive(t, endpoints.GetFullNodeRPCAddress(), NodeStartupTimeout) } @@ -400,8 +407,6 @@ func setupFullNodeWithForceInclusionCheck(t *testing.T, sut *SystemUnderTest, fu // Note: This test simulates the scenario by having the sequencer configured to // listen to the wrong namespace, while we submit directly to the correct namespace. func TestEvmSyncerMaliciousSequencerForceInclusionE2E(t *testing.T) { - t.Skip() // Unskip once https://github.com/evstack/ev-node/pull/2963 is merged - sut := NewSystemUnderTest(t) workDir := t.TempDir() sequencerHome := filepath.Join(workDir, "sequencer") @@ -412,12 +417,9 @@ func TestEvmSyncerMaliciousSequencerForceInclusionE2E(t *testing.T) { t.Log("Malicious sequencer started listening to WRONG forced inclusion namespace") t.Log("NOTE: Sequencer listens to 'wrong-namespace', won't see txs on 'forced-inc'") - sequencerP2PAddress := getNodeP2PAddress(t, sut, sequencerHome, endpoints.RollkitRPCPort) - t.Logf("Sequencer P2P address: %s", sequencerP2PAddress) - - // Setup full node that will sync from the sequencer and verify forced inclusion - setupFullNodeWithForceInclusionCheck(t, sut, fullNodeHome, sequencerHome, fullNodeJwtSecret, genesisHash, sequencerP2PAddress, endpoints) - t.Log("Full node (syncer) is up and will verify forced inclusion from DA") + // Disable P2P sync - the full node will sync blocks directly from DA. + setupFullNodeWithForceInclusionCheck(t, sut, fullNodeHome, sequencerHome, fullNodeJwtSecret, genesisHash, "", endpoints) + t.Log("Full node (syncer) is up and will verify forced inclusion from DA (P2P disabled)") // Connect to clients seqClient, err := ethclient.Dial(endpoints.GetSequencerEthURL()) @@ -560,3 +562,392 @@ func TestEvmSyncerMaliciousSequencerForceInclusionE2E(t *testing.T) { require.False(t, evm.CheckTxIncluded(seqClient, txForce.Hash()), "Malicious sequencer should NOT have included the forced inclusion transaction") } + +// setDAStartHeightInGenesis modifies the genesis file to set da_start_height. +// This is needed because the based sequencer requires non-zero DAStartHeight, +// and catch-up detection via CalculateEpochNumber also depends on it. +func setDAStartHeightInGenesis(t *testing.T, homeDir string, height uint64) { + t.Helper() + genesisPath := filepath.Join(homeDir, "config", "genesis.json") + data, err := os.ReadFile(genesisPath) + require.NoError(t, err) + + var genesis map[string]interface{} + err = json.Unmarshal(data, &genesis) + require.NoError(t, err) + + genesis["da_start_height"] = height + + newData, err := json.MarshalIndent(genesis, "", " ") + require.NoError(t, err) + + err = os.WriteFile(genesisPath, newData, 0644) + require.NoError(t, err) +} + +// TestEvmSequencerCatchUpBasedSequencerE2E tests that when a sequencer restarts after +// extended downtime (multiple DA epochs), it correctly enters catch-up mode, replays +// missed forced inclusion transactions from DA (matching what a based sequencer would +// produce), and then resumes normal operation. +// +// Test Flow: +// 1. a) Start sequencer +// 1. b) Start sync node (full node) +// 2. Wait for sync node to sync and send txs +// 3. a) Stop sequencer +// 3. b) Stop sync node +// 4. Restart sync node as based sequencer (reuse home directory, add --based_sequencer flag) +// 5. Send txs to force inclusion namespace +// 6. Wait for node (based sequencer) to produce one block (must contain those transactions) +// 7. Start sequencer +// 8. Verify blocks are produced and equal to based sequencer blocks +// 9. Stop based sequencer and restart as normal sync node +// 10. Verify they are in sync. +func TestEvmSequencerCatchUpBasedSequencerE2E(t *testing.T) { + sut := NewSystemUnderTest(t) + workDir := t.TempDir() + sequencerHome := filepath.Join(workDir, "sequencer") + fullNodeHome := filepath.Join(workDir, "fullnode") + + // ===== PHASE 1: Setup - Start Sequencer and Sync Node ===== + t.Log("Phase 1: Setup - Start Sequencer and Sync Node") + + jwtSecret, fullNodeJwtSecret, genesisHash, endpoints, _ := setupCommonEVMTest(t, sut, true) + + // Create passphrase and JWT secret files for sequencer + seqPassphraseFile := createPassphraseFile(t, sequencerHome) + seqJwtSecretFile := createJWTSecretFile(t, sequencerHome, jwtSecret) + + // Initialize sequencer node + output, err := sut.RunCmd(evmSingleBinaryPath, + "init", + "--evnode.node.aggregator=true", + "--evnode.signer.passphrase_file", seqPassphraseFile, + "--home", sequencerHome, + ) + require.NoError(t, err, "failed to init sequencer", output) + + // Modify genesis: enable force inclusion with epoch=2, set da_start_height=1 + enableForceInclusionInGenesis(t, sequencerHome, 2) + setDAStartHeightInGenesis(t, sequencerHome, 1) + + // Copy genesis to full node (will be used when restarting as based sequencer) + output, err = sut.RunCmd(evmSingleBinaryPath, + "init", + "--home", fullNodeHome, + ) + require.NoError(t, err, "failed to init full node", output) + MustCopyFile(t, + filepath.Join(sequencerHome, "config", "genesis.json"), + filepath.Join(fullNodeHome, "config", "genesis.json"), + ) + + // Start sequencer with forced inclusion namespace + seqProcess := sut.ExecCmd(evmSingleBinaryPath, + "start", + "--evm.jwt-secret-file", seqJwtSecretFile, + "--evm.genesis-hash", genesisHash, + "--evnode.node.block_time", DefaultBlockTime, + "--evnode.node.aggregator=true", + "--evnode.signer.passphrase_file", seqPassphraseFile, + "--home", sequencerHome, + "--evnode.da.block_time", DefaultDABlockTime, + "--evnode.da.address", endpoints.GetDAAddress(), + "--evnode.da.namespace", DefaultDANamespace, + "--evnode.da.forced_inclusion_namespace", "forced-inc", + "--evnode.rpc.address", endpoints.GetRollkitRPCListen(), + "--evnode.p2p.listen_address", endpoints.GetRollkitP2PAddress(), + "--evm.engine-url", endpoints.GetSequencerEngineURL(), + "--evm.eth-url", endpoints.GetSequencerEthURL(), + ) + sut.AwaitNodeUp(t, endpoints.GetRollkitRPCAddress(), NodeStartupTimeout) + t.Log("Sequencer is up with force inclusion enabled") + + // Get sequencer P2P address for sync node to connect to + sequencerP2PAddress := getNodeP2PAddress(t, sut, sequencerHome, endpoints.RollkitRPCPort) + t.Logf("Sequencer P2P address: %s", sequencerP2PAddress) + + // Create JWT secret file for full node + fnJwtSecretFile := createJWTSecretFile(t, fullNodeHome, fullNodeJwtSecret) + + // Start sync node (full node) - connects to sequencer via P2P + fnProcess := sut.ExecCmd(evmSingleBinaryPath, + "start", + "--evm.jwt-secret-file", fnJwtSecretFile, + "--evm.genesis-hash", genesisHash, + "--home", fullNodeHome, + "--evnode.da.block_time", DefaultDABlockTime, + "--evnode.da.address", endpoints.GetDAAddress(), + "--evnode.da.namespace", DefaultDANamespace, + "--evnode.da.forced_inclusion_namespace", "forced-inc", + "--evnode.rpc.address", endpoints.GetFullNodeRPCListen(), + "--evnode.p2p.listen_address", endpoints.GetFullNodeP2PAddress(), + "--evnode.p2p.peers", sequencerP2PAddress, + "--evm.engine-url", endpoints.GetFullNodeEngineURL(), + "--evm.eth-url", endpoints.GetFullNodeEthURL(), + ) + sut.AwaitNodeLive(t, endpoints.GetFullNodeRPCAddress(), NodeStartupTimeout) + t.Log("Sync node (full node) is up and syncing from sequencer") + + // ===== PHASE 2: Send Transactions and Wait for Sync ===== + t.Log("Phase 2: Send Transactions and Wait for Sync") + + seqClient, err := ethclient.Dial(endpoints.GetSequencerEthURL()) + require.NoError(t, err) + defer seqClient.Close() + + fnClient, err := ethclient.Dial(endpoints.GetFullNodeEthURL()) + require.NoError(t, err) + defer fnClient.Close() + + ctx := context.Background() + var nonce uint64 = 0 + + // Submit 2 normal transactions to sequencer + var normalTxHashes []common.Hash + for i := 0; i < 2; i++ { + tx := evm.GetRandomTransaction(t, TestPrivateKey, TestToAddress, DefaultChainID, DefaultGasLimit, &nonce) + err = seqClient.SendTransaction(ctx, tx) + require.NoError(t, err) + normalTxHashes = append(normalTxHashes, tx.Hash()) + t.Logf("Submitted normal tx %d: %s (nonce=%d)", i+1, tx.Hash().Hex(), tx.Nonce()) + } + + // Wait for sync node to sync the transactions + for i, txHash := range normalTxHashes { + require.Eventually(t, func() bool { + return evm.CheckTxIncluded(fnClient, txHash) + }, 20*time.Second, 500*time.Millisecond, "Normal tx %d not synced to full node", i+1) + t.Logf("Normal tx %d synced to full node", i+1) + } + + // Record heights before stopping + seqHeader, err := seqClient.HeaderByNumber(ctx, nil) + require.NoError(t, err) + fnHeader, err := fnClient.HeaderByNumber(ctx, nil) + require.NoError(t, err) + t.Logf("Sequencer height: %d, Full node height: %d", seqHeader.Number.Uint64(), fnHeader.Number.Uint64()) + + // ===== PHASE 3: Stop Sequencer and Sync Node ===== + t.Log("Phase 3: Stop Sequencer and Sync Node") + + // Stop sequencer process + err = seqProcess.Signal(syscall.SIGTERM) + require.NoError(t, err, "failed to stop sequencer process") + time.Sleep(1 * time.Second) + + // Stop sync node process + err = fnProcess.Signal(syscall.SIGTERM) + require.NoError(t, err, "failed to stop full node process") + time.Sleep(1 * time.Second) + t.Log("Both sequencer and sync node stopped") + + // ===== PHASE 4: Restart Sync Node as Based Sequencer ===== + t.Log("Phase 4: Restart Sync Node as Based Sequencer") + + // Restart the same full node as a based sequencer + // Reuse the same home directory and data, just add the --based_sequencer flag + basedSeqProcess := sut.ExecCmd(evmSingleBinaryPath, + "start", + "--evnode.node.aggregator=true", + "--evnode.node.based_sequencer=true", + "--evm.jwt-secret-file", fnJwtSecretFile, + "--evm.genesis-hash", genesisHash, + "--home", fullNodeHome, + "--evnode.da.block_time", DefaultDABlockTime, + "--evnode.da.address", endpoints.GetDAAddress(), + "--evnode.da.namespace", DefaultDANamespace, + "--evnode.da.forced_inclusion_namespace", "forced-inc", + "--evnode.rpc.address", endpoints.GetFullNodeRPCListen(), + "--evnode.p2p.listen_address", endpoints.GetFullNodeP2PAddress(), + "--evm.engine-url", endpoints.GetFullNodeEngineURL(), + "--evm.eth-url", endpoints.GetFullNodeEthURL(), + ) + sut.AwaitNodeLive(t, endpoints.GetFullNodeRPCAddress(), NodeStartupTimeout) + t.Log("Sync node restarted as based sequencer") + + // Reconnect to based sequencer + basedSeqClient, err := ethclient.Dial(endpoints.GetFullNodeEthURL()) + require.NoError(t, err) + defer basedSeqClient.Close() + + // ===== PHASE 5: Submit Forced Inclusion Transactions to DA ===== + t.Log("Phase 5: Submit Forced Inclusion Transactions to DA") + + blobClient, err := blobrpc.NewClient(ctx, endpoints.GetDAAddress(), "", "") + require.NoError(t, err, "Failed to create blob RPC client") + defer blobClient.Close() + + daClient := block.NewDAClient( + blobClient, + config.Config{ + DA: config.DAConfig{ + Namespace: DefaultDANamespace, + ForcedInclusionNamespace: "forced-inc", + }, + }, + zerolog.Nop(), + ) + + // Create and submit 3 forced inclusion txs to DA + var forcedTxHashes []common.Hash + for i := 0; i < 3; i++ { + txForce := evm.GetRandomTransaction(t, TestPrivateKey, TestToAddress, DefaultChainID, DefaultGasLimit, &nonce) + txBytes, err := txForce.MarshalBinary() + require.NoError(t, err) + + result := daClient.Submit(ctx, [][]byte{txBytes}, -1, daClient.GetForcedInclusionNamespace(), nil) + require.Equal(t, da.StatusSuccess, result.Code, "Failed to submit forced tx %d to DA: %s", i+1, result.Message) + + forcedTxHashes = append(forcedTxHashes, txForce.Hash()) + t.Logf("Submitted forced inclusion tx %d to DA: %s (nonce=%d)", i+1, txForce.Hash().Hex(), txForce.Nonce()) + } + + // Wait for DA to advance past multiple epochs + t.Log("Waiting for DA to advance past multiple epochs...") + time.Sleep(6 * time.Second) + + // ===== PHASE 6: Verify Based Sequencer Includes Forced Txs ===== + t.Log("Phase 6: Verify Based Sequencer Includes Forced Txs") + + // Wait for based sequencer to include forced inclusion txs + for i, txHash := range forcedTxHashes { + require.Eventually(t, func() bool { + return evm.CheckTxIncluded(basedSeqClient, txHash) + }, 60*time.Second, 1*time.Second, + "Forced inclusion tx %d (%s) not included in based sequencer", i+1, txHash.Hex()) + t.Logf("Based sequencer included forced tx %d: %s", i+1, txHash.Hex()) + } + t.Log("All forced inclusion txs verified on based sequencer") + + // Get the based sequencer's block height after including forced txs + basedSeqHeader, err := basedSeqClient.HeaderByNumber(ctx, nil) + require.NoError(t, err) + basedSeqFinalHeight := basedSeqHeader.Number.Uint64() + t.Logf("Based sequencer final height: %d", basedSeqFinalHeight) + + // ===== PHASE 7: Restart Original Sequencer ===== + t.Log("Phase 7: Restart Original Sequencer") + + // Restart the original sequencer + seqProcess = sut.ExecCmd(evmSingleBinaryPath, + "start", + "--evm.jwt-secret-file", seqJwtSecretFile, + "--evm.genesis-hash", genesisHash, + "--evnode.node.block_time", DefaultBlockTime, + "--evnode.node.aggregator=true", + "--evnode.signer.passphrase_file", seqPassphraseFile, + "--home", sequencerHome, + "--evnode.da.block_time", DefaultDABlockTime, + "--evnode.da.address", endpoints.GetDAAddress(), + "--evnode.da.namespace", DefaultDANamespace, + "--evnode.da.forced_inclusion_namespace", "forced-inc", + "--evnode.rpc.address", endpoints.GetRollkitRPCListen(), + "--evnode.p2p.listen_address", endpoints.GetRollkitP2PAddress(), + "--evm.engine-url", endpoints.GetSequencerEngineURL(), + "--evm.eth-url", endpoints.GetSequencerEthURL(), + ) + sut.AwaitNodeUp(t, endpoints.GetRollkitRPCAddress(), NodeStartupTimeout) + t.Log("Sequencer restarted successfully") + + // Reconnect to sequencer + seqClient, err = ethclient.Dial(endpoints.GetSequencerEthURL()) + require.NoError(t, err) + + // ===== PHASE 8: Verify Sequencer Catches Up ===== + t.Log("Phase 8: Verify Sequencer Catches Up") + + // Wait for sequencer to catch up and include forced txs + for i, txHash := range forcedTxHashes { + require.Eventually(t, func() bool { + return evm.CheckTxIncluded(seqClient, txHash) + }, 30*time.Second, 1*time.Second, + "Forced inclusion tx %d (%s) should be included after catch-up", i+1, txHash.Hex()) + t.Logf("Sequencer caught up with forced tx %d: %s", i+1, txHash.Hex()) + } + t.Log("All forced inclusion txs verified on sequencer after catch-up") + + // Verify sequencer produces blocks and reaches same height as based sequencer + require.Eventually(t, func() bool { + seqHeader, err := seqClient.HeaderByNumber(ctx, nil) + if err != nil { + return false + } + return seqHeader.Number.Uint64() >= basedSeqFinalHeight + }, 30*time.Second, 1*time.Second, "Sequencer should catch up to based sequencer height") + + seqHeader, err = seqClient.HeaderByNumber(ctx, nil) + require.NoError(t, err) + t.Logf("Sequencer caught up to height: %d", seqHeader.Number.Uint64()) + + // ===== PHASE 9: Stop Based Sequencer and Restart as Normal Sync Node ===== + t.Log("Phase 9: Stop Based Sequencer and Restart as Normal Sync Node") + + // Stop based sequencer + err = basedSeqProcess.Signal(syscall.SIGTERM) + require.NoError(t, err, "failed to stop based sequencer process") + time.Sleep(1 * time.Second) + + // Restart as normal sync node (without --based_sequencer flag, with --p2p.peers to connect to sequencer) + fnProcess = sut.ExecCmd(evmSingleBinaryPath, + "start", + "--evm.jwt-secret-file", fnJwtSecretFile, + "--evm.genesis-hash", genesisHash, + "--home", fullNodeHome, + "--evnode.da.block_time", DefaultDABlockTime, + "--evnode.da.address", endpoints.GetDAAddress(), + "--evnode.da.namespace", DefaultDANamespace, + "--evnode.da.forced_inclusion_namespace", "forced-inc", + "--evnode.rpc.address", endpoints.GetFullNodeRPCListen(), + "--evnode.p2p.listen_address", endpoints.GetFullNodeP2PAddress(), + "--evnode.p2p.peers", sequencerP2PAddress, + "--evm.engine-url", endpoints.GetFullNodeEngineURL(), + "--evm.eth-url", endpoints.GetFullNodeEthURL(), + ) + sut.AwaitNodeLive(t, endpoints.GetFullNodeRPCAddress(), NodeStartupTimeout) + t.Log("Sync node restarted as normal full node") + + // Reconnect to sync node + fnClient, err = ethclient.Dial(endpoints.GetFullNodeEthURL()) + require.NoError(t, err) + + // ===== PHASE 10: Verify Nodes Are in Sync ===== + t.Log("Phase 10: Verify Nodes Are in Sync") + + // Wait for sync node to catch up to sequencer + require.Eventually(t, func() bool { + seqHeader, err1 := seqClient.HeaderByNumber(ctx, nil) + fnHeader, err2 := fnClient.HeaderByNumber(ctx, nil) + if err1 != nil || err2 != nil { + return false + } + return fnHeader.Number.Uint64() >= seqHeader.Number.Uint64() + }, 30*time.Second, 1*time.Second, "Sync node should catch up to sequencer") + + // Verify both nodes have all forced inclusion txs + for i, txHash := range forcedTxHashes { + seqIncluded := evm.CheckTxIncluded(seqClient, txHash) + fnIncluded := evm.CheckTxIncluded(fnClient, txHash) + require.True(t, seqIncluded, "Forced tx %d should be on sequencer", i+1) + require.True(t, fnIncluded, "Forced tx %d should be on sync node", i+1) + t.Logf("Forced tx %d verified on both nodes: %s", i+1, txHash.Hex()) + } + + // Send a new transaction and verify both nodes get it + txFinal := evm.GetRandomTransaction(t, TestPrivateKey, TestToAddress, DefaultChainID, DefaultGasLimit, &nonce) + err = seqClient.SendTransaction(ctx, txFinal) + require.NoError(t, err) + t.Logf("Submitted final tx: %s (nonce=%d)", txFinal.Hash().Hex(), txFinal.Nonce()) + + require.Eventually(t, func() bool { + return evm.CheckTxIncluded(seqClient, txFinal.Hash()) && evm.CheckTxIncluded(fnClient, txFinal.Hash()) + }, 20*time.Second, 500*time.Millisecond, "Final tx should be included on both nodes") + t.Log("Final tx included on both nodes - nodes are in sync") + + t.Log("Test PASSED: Sequencer catch-up with based sequencer verified successfully") + t.Logf(" - Sequencer processed %d normal txs before downtime", len(normalTxHashes)) + t.Logf(" - %d forced inclusion txs submitted to DA during downtime", len(forcedTxHashes)) + t.Logf(" - Based sequencer included all forced txs from DA") + t.Logf(" - Sequencer caught up and replayed all forced txs after restart") + t.Logf(" - Both nodes are in sync") +} diff --git a/test/mocks/da.go b/test/mocks/da.go index 0b5c71a49c..f5293d907a 100644 --- a/test/mocks/da.go +++ b/test/mocks/da.go @@ -250,6 +250,66 @@ func (_c *MockClient_GetHeaderNamespace_Call) RunAndReturn(run func() []byte) *M return _c } +// GetLatestDAHeight provides a mock function for the type MockClient +func (_mock *MockClient) GetLatestDAHeight(ctx context.Context) (uint64, error) { + ret := _mock.Called(ctx) + + if len(ret) == 0 { + panic("no return value specified for GetLatestDAHeight") + } + + var r0 uint64 + var r1 error + if returnFunc, ok := ret.Get(0).(func(context.Context) (uint64, error)); ok { + return returnFunc(ctx) + } + if returnFunc, ok := ret.Get(0).(func(context.Context) uint64); ok { + r0 = returnFunc(ctx) + } else { + r0 = ret.Get(0).(uint64) + } + if returnFunc, ok := ret.Get(1).(func(context.Context) error); ok { + r1 = returnFunc(ctx) + } else { + r1 = ret.Error(1) + } + return r0, r1 +} + +// MockClient_GetLatestDAHeight_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetLatestDAHeight' +type MockClient_GetLatestDAHeight_Call struct { + *mock.Call +} + +// GetLatestDAHeight is a helper method to define mock.On call +// - ctx context.Context +func (_e *MockClient_Expecter) GetLatestDAHeight(ctx interface{}) *MockClient_GetLatestDAHeight_Call { + return &MockClient_GetLatestDAHeight_Call{Call: _e.mock.On("GetLatestDAHeight", ctx)} +} + +func (_c *MockClient_GetLatestDAHeight_Call) Run(run func(ctx context.Context)) *MockClient_GetLatestDAHeight_Call { + _c.Call.Run(func(args mock.Arguments) { + var arg0 context.Context + if args[0] != nil { + arg0 = args[0].(context.Context) + } + run( + arg0, + ) + }) + return _c +} + +func (_c *MockClient_GetLatestDAHeight_Call) Return(v uint64, err error) *MockClient_GetLatestDAHeight_Call { + _c.Call.Return(v, err) + return _c +} + +func (_c *MockClient_GetLatestDAHeight_Call) RunAndReturn(run func(ctx context.Context) (uint64, error)) *MockClient_GetLatestDAHeight_Call { + _c.Call.Return(run) + return _c +} + // HasForcedInclusionNamespace provides a mock function for the type MockClient func (_mock *MockClient) HasForcedInclusionNamespace() bool { ret := _mock.Called() diff --git a/test/testda/dummy.go b/test/testda/dummy.go index 684d3fcee5..648021b76a 100644 --- a/test/testda/dummy.go +++ b/test/testda/dummy.go @@ -184,6 +184,11 @@ func (d *DummyDA) GetForcedInclusionNamespace() []byte { return nil } // HasForcedInclusionNamespace reports whether forced inclusion is configured. func (d *DummyDA) HasForcedInclusionNamespace() bool { return false } +// GetLatestDAHeight returns the current DA height (the latest height available). +func (d *DummyDA) GetLatestDAHeight(_ context.Context) (uint64, error) { + return d.height.Load(), nil +} + // Get retrieves blobs by ID (stub implementation). func (d *DummyDA) Get(_ context.Context, _ []datypes.ID, _ []byte) ([]datypes.Blob, error) { return nil, nil