From 4df61f290bfbf62d474ca801d5072c844d8d1f7b Mon Sep 17 00:00:00 2001 From: pingqiu Date: Mon, 30 Mar 2026 15:51:21 -0700 Subject: [PATCH] fix: true mid-executor invalidation test via OnStep hook CatchUpExecutor.OnStep: optional callback fired between executor-managed progress steps. Enables deterministic fault injection (epoch bump) between steps without racing or manual sender calls. E2_EpochBump_MidExecutorLoop: - Executor runs 5 progress steps - OnStep hook bumps epoch after step 1 (after 2 successful steps) - Executor's own loop detects invalidation at step 2's check - Resources released by executor's release path (not manual cancel) - Log shows session_invalidated + exec_resources_released This closes the remaining FC2 gap: invalidation is now detected and cleaned up by the executor itself, not by external code. Co-Authored-By: Claude Opus 4.6 (1M context) --- sw-block/engine/replication/executor.go | 10 ++++ .../engine/replication/validation_test.go | 51 ++++++++----------- 2 files changed, 31 insertions(+), 30 deletions(-) diff --git a/sw-block/engine/replication/executor.go b/sw-block/engine/replication/executor.go index 410f48194..9fbe3f8fe 100644 --- a/sw-block/engine/replication/executor.go +++ b/sw-block/engine/replication/executor.go @@ -16,6 +16,11 @@ type CatchUpExecutor struct { replicaID string sessID uint64 released bool + + // OnStep is an optional callback invoked between executor-managed steps. + // Used for deterministic fault injection in tests (e.g., epoch bump). + // step is the 0-based index of the completed step. + OnStep func(step int) } // NewCatchUpExecutor creates an executor from a plan. The plan's resources @@ -68,6 +73,11 @@ func (e *CatchUpExecutor) Execute(progressLSNs []uint64, startTick uint64) error return err } + // Fire step callback (test hook for fault injection). + if e.OnStep != nil { + e.OnStep(i) + } + // Check budget after each step. v, err := s.CheckBudget(e.sessID, tick) if err != nil { diff --git a/sw-block/engine/replication/validation_test.go b/sw-block/engine/replication/validation_test.go index edee2e90a..e97693f16 100644 --- a/sw-block/engine/replication/validation_test.go +++ b/sw-block/engine/replication/validation_test.go @@ -99,9 +99,10 @@ func TestP3_E1_ChangedAddress_OldPlanCancelledByDriver(t *testing.T) { // --- E2 / FC2: Epoch bump during active executor step --- -func TestP3_E2_EpochBump_AfterExecutorProgress(t *testing.T) { - // True mid-execution: executor makes progress, THEN epoch bumps, - // THEN next step fails. +func TestP3_E2_EpochBump_MidExecutorLoop(t *testing.T) { + // True mid-execution through executor's own loop: + // executor makes progress steps, epoch bumps BETWEEN executor-managed + // steps via OnStep hook, executor detects invalidation at next step. storage := newMockStorage(RetainedHistory{ HeadLSN: 100, TailLSN: 0, CommittedLSN: 100, }) @@ -116,39 +117,29 @@ func TestP3_E2_EpochBump_AfterExecutorProgress(t *testing.T) { }) plan, _ := driver.PlanRecovery("r1", 50) - s := driver.Orchestrator.Registry.Sender("r1") - sessID := plan.SessionID - - // Manually drive the executor steps to place the epoch bump BETWEEN steps. - // Step 1: begin catch-up. - s.BeginCatchUp(sessID, 0) - - // Step 2: first progress step succeeds. - s.RecordCatchUpProgress(sessID, 60, 1) - - // Step 3: second progress step succeeds. - s.RecordCatchUpProgress(sessID, 70, 2) + exec := NewCatchUpExecutor(driver, plan) - // EPOCH BUMPS between progress steps (real mid-execution). - driver.Orchestrator.InvalidateEpoch(2) - driver.Orchestrator.UpdateSenderEpoch("r1", 2) + // OnStep hook: bump epoch after step 1 (executor has made real progress). + exec.OnStep = func(step int) { + if step == 1 { // after second progress step succeeds + driver.Orchestrator.InvalidateEpoch(2) + driver.Orchestrator.UpdateSenderEpoch("r1", 2) + } + } - // Step 4: third progress step fails — session invalidated. - err := s.RecordCatchUpProgress(sessID, 80, 3) + // Executor runs 5 steps. After step 1, epoch bumps. + // Step 2's invalidation check catches the stale session. + err := exec.Execute([]uint64{60, 70, 80, 90, 100}, 0) if err == nil { - t.Fatal("E2: progress after mid-execution epoch bump must fail") + t.Fatal("E2: executor must fail when epoch bumps between its managed steps") } - // Executor cancel releases resources. - exec := NewCatchUpExecutor(driver, plan) - exec.Cancel("epoch_bump_after_progress") - - // WAL pin released. + // Resources released by executor's own release path. if len(storage.pinnedWAL) != 0 { - t.Fatal("E2: WAL pin must be released after mid-execution epoch bump") + t.Fatal("E2: WAL pin must be released by executor after mid-loop invalidation") } - // E5: Log shows per-replica invalidation + resource release. + // E5: Log shows per-replica invalidation + executor resource release. hasInvalidation := false hasRelease := false for _, e := range driver.Orchestrator.Log.EventsFor("r1") { @@ -160,10 +151,10 @@ func TestP3_E2_EpochBump_AfterExecutorProgress(t *testing.T) { } } if !hasInvalidation { - t.Fatal("E2/E5: log must show session invalidation with epoch cause") + t.Fatal("E2/E5: log must show session invalidation") } if !hasRelease { - t.Fatal("E2/E5: log must show resource release on cancellation") + t.Fatal("E2/E5: log must show executor resource release") } }