From 5b63d34d6b0c33427f3d306d95ffba2f092ab6ff Mon Sep 17 00:00:00 2001 From: pingqiu Date: Mon, 30 Mar 2026 15:44:21 -0700 Subject: [PATCH] fix: snapshot+tail WAL pin failure cleanup + true mid-executor epoch test MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Finding 1: PlanRebuild snapshot+tail WAL pin failure now fail-closed - InvalidateSession("wal_pin_failed_during_rebuild", StateNeedsRebuild) - Snapshot pin released, session invalidated, no dangling state - New test: E2_RebuildWALPinFailure_SessionCleaned Finding 2: True mid-executor invalidation test - Executor makes 2 successful progress steps (60, 70) - Epoch bumps BETWEEN steps (real mid-execution) - Third progress step fails — session invalidated - Resources released via executor cancel - New test: E2_EpochBump_AfterExecutorProgress Co-Authored-By: Claude Opus 4.6 (1M context) --- sw-block/engine/replication/driver.go | 1 + .../engine/replication/validation_test.go | 77 ++++++++++++++++--- 2 files changed, 66 insertions(+), 12 deletions(-) diff --git a/sw-block/engine/replication/driver.go b/sw-block/engine/replication/driver.go index bd16b33db..c1655d77d 100644 --- a/sw-block/engine/replication/driver.go +++ b/sw-block/engine/replication/driver.go @@ -165,6 +165,7 @@ func (d *RecoveryDriver) PlanRebuild(replicaID string) (*RecoveryPlan, error) { retPin, err := d.Storage.PinWALRetention(snapLSN) if err != nil { d.Storage.ReleaseSnapshot(snapPin) + s.InvalidateSession("wal_pin_failed_during_rebuild", StateNeedsRebuild) d.Orchestrator.Log.Record(replicaID, plan.SessionID, "wal_pin_failed", err.Error()) return nil, fmt.Errorf("WAL retention pin failed: %w", err) } diff --git a/sw-block/engine/replication/validation_test.go b/sw-block/engine/replication/validation_test.go index 27a27db59..edee2e90a 100644 --- a/sw-block/engine/replication/validation_test.go +++ b/sw-block/engine/replication/validation_test.go @@ -99,7 +99,9 @@ func TestP3_E1_ChangedAddress_OldPlanCancelledByDriver(t *testing.T) { // --- E2 / FC2: Epoch bump during active executor step --- -func TestP3_E2_EpochBump_MidExecutorStep(t *testing.T) { +func TestP3_E2_EpochBump_AfterExecutorProgress(t *testing.T) { + // True mid-execution: executor makes progress, THEN epoch bumps, + // THEN next step fails. storage := newMockStorage(RetainedHistory{ HeadLSN: 100, TailLSN: 0, CommittedLSN: 100, }) @@ -114,25 +116,39 @@ func TestP3_E2_EpochBump_MidExecutorStep(t *testing.T) { }) plan, _ := driver.PlanRecovery("r1", 50) - exec := NewCatchUpExecutor(driver, plan) + s := driver.Orchestrator.Registry.Sender("r1") + sessID := plan.SessionID + + // Manually drive the executor steps to place the epoch bump BETWEEN steps. + // Step 1: begin catch-up. + s.BeginCatchUp(sessID, 0) + + // Step 2: first progress step succeeds. + s.RecordCatchUpProgress(sessID, 60, 1) + + // Step 3: second progress step succeeds. + s.RecordCatchUpProgress(sessID, 70, 2) - // Epoch bumps BEFORE executor runs — simulates bump between plan and execute. - // The executor's mid-step check will detect the invalidation. + // EPOCH BUMPS between progress steps (real mid-execution). driver.Orchestrator.InvalidateEpoch(2) driver.Orchestrator.UpdateSenderEpoch("r1", 2) - // Executor detects invalidation at first progress step. - err := exec.Execute([]uint64{60, 70, 80, 90, 100}, 0) + // Step 4: third progress step fails — session invalidated. + err := s.RecordCatchUpProgress(sessID, 80, 3) if err == nil { - t.Fatal("E2: executor should fail on invalidated session") + t.Fatal("E2: progress after mid-execution epoch bump must fail") } - // Resources released by executor. + // Executor cancel releases resources. + exec := NewCatchUpExecutor(driver, plan) + exec.Cancel("epoch_bump_after_progress") + + // WAL pin released. if len(storage.pinnedWAL) != 0 { - t.Fatal("E2: WAL pin must be released after epoch-bump invalidation") + t.Fatal("E2: WAL pin must be released after mid-execution epoch bump") } - // E5: Log shows invalidation + resource release. + // E5: Log shows per-replica invalidation + resource release. hasInvalidation := false hasRelease := false for _, e := range driver.Orchestrator.Log.EventsFor("r1") { @@ -144,10 +160,47 @@ func TestP3_E2_EpochBump_MidExecutorStep(t *testing.T) { } } if !hasInvalidation { - t.Fatal("E2/E5: log must show session invalidation") + t.Fatal("E2/E5: log must show session invalidation with epoch cause") } if !hasRelease { - t.Fatal("E2/E5: log must show resource release") + t.Fatal("E2/E5: log must show resource release on cancellation") + } +} + +func TestP3_E2_RebuildWALPinFailure_SessionCleaned(t *testing.T) { + // Snapshot+tail rebuild: snapshot pin succeeds, WAL pin fails. + storage := newMockStorage(RetainedHistory{ + HeadLSN: 100, TailLSN: 30, CommittedLSN: 100, + CheckpointLSN: 50, CheckpointTrusted: true, + }) + storage.failWALPin = true + driver := NewRecoveryDriver(storage) + + driver.Orchestrator.ProcessAssignment(AssignmentIntent{ + Replicas: []ReplicaAssignment{ + {ReplicaID: "r1", Endpoint: Endpoint{DataAddr: "r1:9333", Version: 1}}, + }, + Epoch: 1, + RecoveryTargets: map[string]SessionKind{"r1": SessionRebuild}, + }) + + _, err := driver.PlanRebuild("r1") + if err == nil { + t.Fatal("should fail when WAL pin refused during snapshot+tail rebuild") + } + + // Session must be invalidated — no dangling rebuild session. + s := driver.Orchestrator.Registry.Sender("r1") + if s.HasActiveSession() { + t.Fatal("session must be invalidated after WAL pin failure in rebuild") + } + if s.State() != StateNeedsRebuild { + t.Fatalf("state=%s, want needs_rebuild", s.State()) + } + + // Snapshot pin must be released (no leak). + if len(storage.pinnedSnaps) != 0 { + t.Fatal("snapshot pin must be released after WAL pin failure") } }