From d4f7697dd8f21fbec34a255a9c0085c35188cf05 Mon Sep 17 00:00:00 2001 From: pingqiu Date: Mon, 30 Mar 2026 12:20:24 -0700 Subject: [PATCH] fix: add full-base pin and clean up session on WAL pin failure Full-base rebuild resource: - StorageAdapter.PinFullBase/ReleaseFullBase for full-extent base image - PlanRebuild full_base branch now acquires FullBasePin - RecoveryPlan.FullBasePin field, released by ReleasePlan Session cleanup on resource failure: - PlanRecovery invalidates session when WAL pin fails (no dangling live session after failed resource acquisition) 3 new tests: - PlanRebuild_FullBase_PinsBaseImage: pin acquired + released - PlanRebuild_FullBase_PinFailure: logged + error - PlanRecovery_WALPinFailure_CleansUpSession: session invalidated, sender disconnected (no dangling state) Co-Authored-By: Claude Opus 4.6 (1M context) --- sw-block/engine/replication/adapter.go | 16 +++ sw-block/engine/replication/driver.go | 27 ++++- sw-block/engine/replication/driver_test.go | 132 +++++++++++++++++++-- 3 files changed, 163 insertions(+), 12 deletions(-) diff --git a/sw-block/engine/replication/adapter.go b/sw-block/engine/replication/adapter.go index 39e65dab0..890c4bbb3 100644 --- a/sw-block/engine/replication/adapter.go +++ b/sw-block/engine/replication/adapter.go @@ -30,6 +30,22 @@ type StorageAdapter interface { // ReleaseWALRetention releases a WAL retention hold. ReleaseWALRetention(pin RetentionPin) + + // PinFullBase pins a consistent full-extent base image for full-base + // rebuild. The image must not be mutated while pinned. This is the + // resource contract for the RebuildFullBase path — the hardest rebuild + // case must also have a real pinned source. + PinFullBase(committedLSN uint64) (FullBasePin, error) + + // ReleaseFullBase releases a pinned full base image. + ReleaseFullBase(pin FullBasePin) +} + +// FullBasePin represents a held reference to a pinned full-extent base image. +type FullBasePin struct { + CommittedLSN uint64 + PinID uint64 + Valid bool } // SnapshotPin represents a held reference to a pinned snapshot/checkpoint. diff --git a/sw-block/engine/replication/driver.go b/sw-block/engine/replication/driver.go index ea2fa3acf..89ebcd850 100644 --- a/sw-block/engine/replication/driver.go +++ b/sw-block/engine/replication/driver.go @@ -30,8 +30,9 @@ type RecoveryPlan struct { Proof *RecoverabilityProof // Resource pins (non-nil when resources are acquired). - RetentionPin *RetentionPin // for catch-up - SnapshotPin *SnapshotPin // for rebuild + RetentionPin *RetentionPin // for catch-up or snapshot+tail rebuild + SnapshotPin *SnapshotPin // for snapshot+tail rebuild + FullBasePin *FullBasePin // for full-base rebuild // Targets. CatchUpTarget uint64 // for catch-up: target LSN @@ -67,9 +68,10 @@ func (d *RecoveryDriver) PlanRecovery(replicaID string, replicaFlushedLSN uint64 return nil, result.Error } + s := d.Orchestrator.Registry.Sender(replicaID) plan := &RecoveryPlan{ ReplicaID: replicaID, - SessionID: d.Orchestrator.Registry.Sender(replicaID).SessionID(), + SessionID: s.SessionID(), Outcome: result.Outcome, Proof: result.Proof, } @@ -80,9 +82,11 @@ func (d *RecoveryDriver) PlanRecovery(replicaID string, replicaFlushedLSN uint64 return plan, nil case OutcomeCatchUp: - // Acquire WAL retention pin. + // Acquire WAL retention pin BEFORE continuing execution. + // If pin fails, invalidate the session to avoid a dangling live session. pin, err := d.Storage.PinWALRetention(replicaFlushedLSN) if err != nil { + s.InvalidateSession("wal_pin_failed", StateDisconnected) d.Orchestrator.Log.Record(replicaID, plan.SessionID, "wal_pin_failed", err.Error()) return nil, fmt.Errorf("WAL retention pin failed: %w", err) } @@ -141,7 +145,16 @@ func (d *RecoveryDriver) PlanRebuild(replicaID string) (*RecoveryPlan, error) { d.Orchestrator.Log.Record(replicaID, plan.SessionID, "plan_rebuild_snapshot_tail", fmt.Sprintf("snapshot=%d", snapLSN)) } else { - d.Orchestrator.Log.Record(replicaID, plan.SessionID, "plan_rebuild_full_base", "") + // Full-base rebuild: pin a consistent full-extent base image. + basePin, err := d.Storage.PinFullBase(history.CommittedLSN) + if err != nil { + d.Orchestrator.Log.Record(replicaID, plan.SessionID, "full_base_pin_failed", err.Error()) + return nil, fmt.Errorf("full base pin failed: %w", err) + } + plan.FullBasePin = &basePin + + d.Orchestrator.Log.Record(replicaID, plan.SessionID, "plan_rebuild_full_base", + fmt.Sprintf("committed=%d", history.CommittedLSN)) } return plan, nil @@ -157,4 +170,8 @@ func (d *RecoveryDriver) ReleasePlan(plan *RecoveryPlan) { d.Storage.ReleaseSnapshot(*plan.SnapshotPin) plan.SnapshotPin = nil } + if plan.FullBasePin != nil { + d.Storage.ReleaseFullBase(*plan.FullBasePin) + plan.FullBasePin = nil + } } diff --git a/sw-block/engine/replication/driver_test.go b/sw-block/engine/replication/driver_test.go index abb86400d..17ea7731d 100644 --- a/sw-block/engine/replication/driver_test.go +++ b/sw-block/engine/replication/driver_test.go @@ -13,19 +13,22 @@ import ( // --- Mock storage adapter --- type mockStorage struct { - history RetainedHistory - nextPinID atomic.Uint64 - pinnedSnaps map[uint64]bool - pinnedWAL map[uint64]bool + history RetainedHistory + nextPinID atomic.Uint64 + pinnedSnaps map[uint64]bool + pinnedWAL map[uint64]bool + pinnedFullBase map[uint64]bool failSnapshotPin bool failWALPin bool + failFullBasePin bool } func newMockStorage(history RetainedHistory) *mockStorage { return &mockStorage{ - history: history, - pinnedSnaps: map[uint64]bool{}, - pinnedWAL: map[uint64]bool{}, + history: history, + pinnedSnaps: map[uint64]bool{}, + pinnedWAL: map[uint64]bool{}, + pinnedFullBase: map[uint64]bool{}, } } @@ -57,6 +60,19 @@ func (m *mockStorage) ReleaseWALRetention(pin RetentionPin) { delete(m.pinnedWAL, pin.PinID) } +func (m *mockStorage) PinFullBase(committedLSN uint64) (FullBasePin, error) { + if m.failFullBasePin { + return FullBasePin{}, fmt.Errorf("full base pin refused") + } + id := m.nextPinID.Add(1) + m.pinnedFullBase[id] = true + return FullBasePin{CommittedLSN: committedLSN, PinID: id, Valid: true}, nil +} + +func (m *mockStorage) ReleaseFullBase(pin FullBasePin) { + delete(m.pinnedFullBase, pin.PinID) +} + // --- Plan + execute: catch-up --- func TestDriver_PlanRecovery_CatchUp(t *testing.T) { @@ -300,6 +316,108 @@ func TestDriver_PlanRecovery_ReplicaAhead_Truncation(t *testing.T) { driver.ReleasePlan(plan) } +// --- Full-base rebuild pin --- + +func TestDriver_PlanRebuild_FullBase_PinsBaseImage(t *testing.T) { + storage := newMockStorage(RetainedHistory{ + HeadLSN: 100, TailLSN: 60, CommittedLSN: 100, + CheckpointLSN: 40, CheckpointTrusted: true, + }) + driver := NewRecoveryDriver(storage) + + driver.Orchestrator.ProcessAssignment(AssignmentIntent{ + Replicas: []ReplicaAssignment{ + {ReplicaID: "r1", Endpoint: Endpoint{DataAddr: "r1:9333", Version: 1}}, + }, + Epoch: 1, + RecoveryTargets: map[string]SessionKind{"r1": SessionRebuild}, + }) + + plan, err := driver.PlanRebuild("r1") + if err != nil { + t.Fatal(err) + } + // Checkpoint at 40, tail at 60 → unreplayable → full base. + if plan.RebuildSource != RebuildFullBase { + t.Fatalf("source=%s", plan.RebuildSource) + } + if plan.FullBasePin == nil { + t.Fatal("full_base rebuild must have a pinned base image") + } + if len(storage.pinnedFullBase) != 1 { + t.Fatalf("expected 1 full base pin, got %d", len(storage.pinnedFullBase)) + } + + driver.ReleasePlan(plan) + if len(storage.pinnedFullBase) != 0 { + t.Fatal("full base pin should be released") + } +} + +func TestDriver_PlanRebuild_FullBase_PinFailure(t *testing.T) { + storage := newMockStorage(RetainedHistory{ + HeadLSN: 100, TailLSN: 60, CommittedLSN: 100, + CheckpointLSN: 40, CheckpointTrusted: true, + }) + storage.failFullBasePin = true + driver := NewRecoveryDriver(storage) + + driver.Orchestrator.ProcessAssignment(AssignmentIntent{ + Replicas: []ReplicaAssignment{ + {ReplicaID: "r1", Endpoint: Endpoint{DataAddr: "r1:9333", Version: 1}}, + }, + Epoch: 1, + RecoveryTargets: map[string]SessionKind{"r1": SessionRebuild}, + }) + + _, err := driver.PlanRebuild("r1") + if err == nil { + t.Fatal("should fail when full base pin is refused") + } + + hasFailure := false + for _, e := range driver.Orchestrator.Log.EventsFor("r1") { + if e.Event == "full_base_pin_failed" { + hasFailure = true + } + } + if !hasFailure { + t.Fatal("log should contain full_base_pin_failed") + } +} + +// --- WAL pin failure cleans up session --- + +func TestDriver_PlanRecovery_WALPinFailure_CleansUpSession(t *testing.T) { + storage := newMockStorage(RetainedHistory{ + HeadLSN: 100, TailLSN: 30, CommittedLSN: 100, + }) + storage.failWALPin = true + driver := NewRecoveryDriver(storage) + + driver.Orchestrator.ProcessAssignment(AssignmentIntent{ + Replicas: []ReplicaAssignment{ + {ReplicaID: "r1", Endpoint: Endpoint{DataAddr: "r1:9333", Version: 1}}, + }, + Epoch: 1, + RecoveryTargets: map[string]SessionKind{"r1": SessionCatchUp}, + }) + + _, err := driver.PlanRecovery("r1", 70) + if err == nil { + t.Fatal("should fail when WAL pin is refused") + } + + // Session must be invalidated — no dangling live session. + s := driver.Orchestrator.Registry.Sender("r1") + if s.HasActiveSession() { + t.Fatal("session should be invalidated after WAL pin failure") + } + if s.State() != StateDisconnected { + t.Fatalf("sender should be disconnected after pin failure, got %s", s.State()) + } +} + // --- Cross-layer contract: storage proves recoverability --- func TestDriver_CrossLayer_StorageProvesRecoverability(t *testing.T) {