From 45bf111ce8a784b32f4f9d94eedbf34d08c4755d Mon Sep 17 00:00:00 2001 From: pingqiu Date: Mon, 30 Mar 2026 12:51:38 -0700 Subject: [PATCH] fix: derive WAL pin from actual replay need, PlanRebuild fails closed WAL pin tied to actual recovery contract: - Truncation-only (replica ahead): no WAL pin acquired - Real catch-up: pins from replicaFlushedLSN (actual replay start) - Logs distinguish plan_truncate_only from plan_catchup PlanRebuild precondition checks: - Error on missing sender - Error on no active session - Error on non-rebuild session kind - All fail closed with clear error messages 4 new tests: - ReplicaAhead_NoWALPin: truncation-only, no WAL resources - PlanRebuild_MissingSender: returns error - PlanRebuild_NoSession: returns error - PlanRebuild_NonRebuildSession: returns error Co-Authored-By: Claude Opus 4.6 (1M context) --- sw-block/engine/replication/driver.go | 42 +++++++++--- sw-block/engine/replication/driver_test.go | 80 ++++++++++++++++++++++ 2 files changed, 111 insertions(+), 11 deletions(-) diff --git a/sw-block/engine/replication/driver.go b/sw-block/engine/replication/driver.go index 89ebcd850..e7b7e2ebf 100644 --- a/sw-block/engine/replication/driver.go +++ b/sw-block/engine/replication/driver.go @@ -82,25 +82,31 @@ func (d *RecoveryDriver) PlanRecovery(replicaID string, replicaFlushedLSN uint64 return plan, nil case OutcomeCatchUp: - // Acquire WAL retention pin BEFORE continuing execution. - // If pin fails, invalidate the session to avoid a dangling live session. - pin, err := d.Storage.PinWALRetention(replicaFlushedLSN) - if err != nil { - s.InvalidateSession("wal_pin_failed", StateDisconnected) - d.Orchestrator.Log.Record(replicaID, plan.SessionID, "wal_pin_failed", err.Error()) - return nil, fmt.Errorf("WAL retention pin failed: %w", err) - } - plan.RetentionPin = &pin plan.CatchUpTarget = history.CommittedLSN // Check if truncation is needed (replica ahead). proof := history.ProveRecoverability(replicaFlushedLSN) if proof.Reason == "replica_ahead_needs_truncation" { plan.TruncateLSN = history.CommittedLSN + // Truncation-only: no WAL replay needed. No pin required. + d.Orchestrator.Log.Record(replicaID, plan.SessionID, "plan_truncate_only", + fmt.Sprintf("truncate_to=%d", plan.TruncateLSN)) + return plan, nil + } + + // Real catch-up: pin WAL from the session's actual replay start. + // The replay start is replicaFlushedLSN (where the replica left off). + replayStart := replicaFlushedLSN + pin, err := d.Storage.PinWALRetention(replayStart) + if err != nil { + s.InvalidateSession("wal_pin_failed", StateDisconnected) + d.Orchestrator.Log.Record(replicaID, plan.SessionID, "wal_pin_failed", err.Error()) + return nil, fmt.Errorf("WAL retention pin failed: %w", err) } + plan.RetentionPin = &pin d.Orchestrator.Log.Record(replicaID, plan.SessionID, "plan_catchup", - fmt.Sprintf("target=%d pin=%d truncate=%d", plan.CatchUpTarget, pin.PinID, plan.TruncateLSN)) + fmt.Sprintf("replay=%d→%d pin=%d", replayStart, plan.CatchUpTarget, pin.PinID)) return plan, nil case OutcomeNeedsRebuild: @@ -113,13 +119,27 @@ func (d *RecoveryDriver) PlanRecovery(replicaID string, replicaFlushedLSN uint64 // PlanRebuild acquires rebuild resources (snapshot pin + optional WAL pin) // from real storage state. Called after a rebuild assignment. +// Fails closed on: missing sender, no active session, non-rebuild session. func (d *RecoveryDriver) PlanRebuild(replicaID string) (*RecoveryPlan, error) { + s := d.Orchestrator.Registry.Sender(replicaID) + if s == nil { + return nil, fmt.Errorf("sender %q not found", replicaID) + } + sessID := s.SessionID() + if sessID == 0 { + return nil, fmt.Errorf("no active session for %q", replicaID) + } + snap := s.SessionSnapshot() + if snap == nil || snap.Kind != SessionRebuild { + return nil, fmt.Errorf("session for %q is not a rebuild session (kind=%s)", replicaID, snap.Kind) + } + history := d.Storage.GetRetainedHistory() source, snapLSN := history.RebuildSourceDecision() plan := &RecoveryPlan{ ReplicaID: replicaID, - SessionID: d.Orchestrator.Registry.Sender(replicaID).SessionID(), + SessionID: sessID, Outcome: OutcomeNeedsRebuild, RebuildSource: source, } diff --git a/sw-block/engine/replication/driver_test.go b/sw-block/engine/replication/driver_test.go index 17ea7731d..6dbe85a2a 100644 --- a/sw-block/engine/replication/driver_test.go +++ b/sw-block/engine/replication/driver_test.go @@ -316,6 +316,86 @@ func TestDriver_PlanRecovery_ReplicaAhead_Truncation(t *testing.T) { driver.ReleasePlan(plan) } +// --- Truncation-only: no WAL pin needed --- + +func TestDriver_PlanRecovery_ReplicaAhead_NoWALPin(t *testing.T) { + storage := newMockStorage(RetainedHistory{ + HeadLSN: 100, TailLSN: 0, CommittedLSN: 100, + }) + driver := NewRecoveryDriver(storage) + + driver.Orchestrator.ProcessAssignment(AssignmentIntent{ + Replicas: []ReplicaAssignment{ + {ReplicaID: "r1", Endpoint: Endpoint{DataAddr: "r1:9333", Version: 1}}, + }, + Epoch: 1, + RecoveryTargets: map[string]SessionKind{"r1": SessionCatchUp}, + }) + + // Replica ahead — truncation only, no WAL replay. + plan, err := driver.PlanRecovery("r1", 105) + if err != nil { + t.Fatal(err) + } + if plan.RetentionPin != nil { + t.Fatal("truncation-only should NOT pin WAL") + } + if plan.TruncateLSN != 100 { + t.Fatalf("truncate=%d, want 100", plan.TruncateLSN) + } + if len(storage.pinnedWAL) != 0 { + t.Fatal("no WAL pins should exist for truncation-only") + } +} + +// --- PlanRebuild precondition checks --- + +func TestDriver_PlanRebuild_MissingSender(t *testing.T) { + storage := newMockStorage(RetainedHistory{CommittedLSN: 100}) + driver := NewRecoveryDriver(storage) + + _, err := driver.PlanRebuild("nonexistent") + if err == nil { + t.Fatal("should fail for missing sender") + } +} + +func TestDriver_PlanRebuild_NoSession(t *testing.T) { + storage := newMockStorage(RetainedHistory{CommittedLSN: 100}) + driver := NewRecoveryDriver(storage) + + driver.Orchestrator.ProcessAssignment(AssignmentIntent{ + Replicas: []ReplicaAssignment{ + {ReplicaID: "r1", Endpoint: Endpoint{DataAddr: "r1:9333", Version: 1}}, + }, + Epoch: 1, + // No recovery target → no session. + }) + + _, err := driver.PlanRebuild("r1") + if err == nil { + t.Fatal("should fail when no session exists") + } +} + +func TestDriver_PlanRebuild_NonRebuildSession(t *testing.T) { + storage := newMockStorage(RetainedHistory{CommittedLSN: 100}) + driver := NewRecoveryDriver(storage) + + driver.Orchestrator.ProcessAssignment(AssignmentIntent{ + Replicas: []ReplicaAssignment{ + {ReplicaID: "r1", Endpoint: Endpoint{DataAddr: "r1:9333", Version: 1}}, + }, + Epoch: 1, + RecoveryTargets: map[string]SessionKind{"r1": SessionCatchUp}, // NOT rebuild + }) + + _, err := driver.PlanRebuild("r1") + if err == nil { + t.Fatal("should fail when session is not rebuild") + } +} + // --- Full-base rebuild pin --- func TestDriver_PlanRebuild_FullBase_PinsBaseImage(t *testing.T) {