Browse Source

fix: add full-base pin and clean up session on WAL pin failure

Full-base rebuild resource:
- StorageAdapter.PinFullBase/ReleaseFullBase for full-extent base image
- PlanRebuild full_base branch now acquires FullBasePin
- RecoveryPlan.FullBasePin field, released by ReleasePlan

Session cleanup on resource failure:
- PlanRecovery invalidates session when WAL pin fails
  (no dangling live session after failed resource acquisition)

3 new tests:
- PlanRebuild_FullBase_PinsBaseImage: pin acquired + released
- PlanRebuild_FullBase_PinFailure: logged + error
- PlanRecovery_WALPinFailure_CleansUpSession: session invalidated,
  sender disconnected (no dangling state)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
feature/sw-block
pingqiu 1 day ago
parent
commit
d4f7697dd8
  1. 16
      sw-block/engine/replication/adapter.go
  2. 27
      sw-block/engine/replication/driver.go
  3. 132
      sw-block/engine/replication/driver_test.go

16
sw-block/engine/replication/adapter.go

@ -30,6 +30,22 @@ type StorageAdapter interface {
// ReleaseWALRetention releases a WAL retention hold.
ReleaseWALRetention(pin RetentionPin)
// PinFullBase pins a consistent full-extent base image for full-base
// rebuild. The image must not be mutated while pinned. This is the
// resource contract for the RebuildFullBase path — the hardest rebuild
// case must also have a real pinned source.
PinFullBase(committedLSN uint64) (FullBasePin, error)
// ReleaseFullBase releases a pinned full base image.
ReleaseFullBase(pin FullBasePin)
}
// FullBasePin represents a held reference to a pinned full-extent base image.
type FullBasePin struct {
CommittedLSN uint64
PinID uint64
Valid bool
}
// SnapshotPin represents a held reference to a pinned snapshot/checkpoint.

27
sw-block/engine/replication/driver.go

@ -30,8 +30,9 @@ type RecoveryPlan struct {
Proof *RecoverabilityProof
// Resource pins (non-nil when resources are acquired).
RetentionPin *RetentionPin // for catch-up
SnapshotPin *SnapshotPin // for rebuild
RetentionPin *RetentionPin // for catch-up or snapshot+tail rebuild
SnapshotPin *SnapshotPin // for snapshot+tail rebuild
FullBasePin *FullBasePin // for full-base rebuild
// Targets.
CatchUpTarget uint64 // for catch-up: target LSN
@ -67,9 +68,10 @@ func (d *RecoveryDriver) PlanRecovery(replicaID string, replicaFlushedLSN uint64
return nil, result.Error
}
s := d.Orchestrator.Registry.Sender(replicaID)
plan := &RecoveryPlan{
ReplicaID: replicaID,
SessionID: d.Orchestrator.Registry.Sender(replicaID).SessionID(),
SessionID: s.SessionID(),
Outcome: result.Outcome,
Proof: result.Proof,
}
@ -80,9 +82,11 @@ func (d *RecoveryDriver) PlanRecovery(replicaID string, replicaFlushedLSN uint64
return plan, nil
case OutcomeCatchUp:
// Acquire WAL retention pin.
// Acquire WAL retention pin BEFORE continuing execution.
// If pin fails, invalidate the session to avoid a dangling live session.
pin, err := d.Storage.PinWALRetention(replicaFlushedLSN)
if err != nil {
s.InvalidateSession("wal_pin_failed", StateDisconnected)
d.Orchestrator.Log.Record(replicaID, plan.SessionID, "wal_pin_failed", err.Error())
return nil, fmt.Errorf("WAL retention pin failed: %w", err)
}
@ -141,7 +145,16 @@ func (d *RecoveryDriver) PlanRebuild(replicaID string) (*RecoveryPlan, error) {
d.Orchestrator.Log.Record(replicaID, plan.SessionID, "plan_rebuild_snapshot_tail",
fmt.Sprintf("snapshot=%d", snapLSN))
} else {
d.Orchestrator.Log.Record(replicaID, plan.SessionID, "plan_rebuild_full_base", "")
// Full-base rebuild: pin a consistent full-extent base image.
basePin, err := d.Storage.PinFullBase(history.CommittedLSN)
if err != nil {
d.Orchestrator.Log.Record(replicaID, plan.SessionID, "full_base_pin_failed", err.Error())
return nil, fmt.Errorf("full base pin failed: %w", err)
}
plan.FullBasePin = &basePin
d.Orchestrator.Log.Record(replicaID, plan.SessionID, "plan_rebuild_full_base",
fmt.Sprintf("committed=%d", history.CommittedLSN))
}
return plan, nil
@ -157,4 +170,8 @@ func (d *RecoveryDriver) ReleasePlan(plan *RecoveryPlan) {
d.Storage.ReleaseSnapshot(*plan.SnapshotPin)
plan.SnapshotPin = nil
}
if plan.FullBasePin != nil {
d.Storage.ReleaseFullBase(*plan.FullBasePin)
plan.FullBasePin = nil
}
}

132
sw-block/engine/replication/driver_test.go

@ -13,19 +13,22 @@ import (
// --- Mock storage adapter ---
type mockStorage struct {
history RetainedHistory
nextPinID atomic.Uint64
pinnedSnaps map[uint64]bool
pinnedWAL map[uint64]bool
history RetainedHistory
nextPinID atomic.Uint64
pinnedSnaps map[uint64]bool
pinnedWAL map[uint64]bool
pinnedFullBase map[uint64]bool
failSnapshotPin bool
failWALPin bool
failFullBasePin bool
}
func newMockStorage(history RetainedHistory) *mockStorage {
return &mockStorage{
history: history,
pinnedSnaps: map[uint64]bool{},
pinnedWAL: map[uint64]bool{},
history: history,
pinnedSnaps: map[uint64]bool{},
pinnedWAL: map[uint64]bool{},
pinnedFullBase: map[uint64]bool{},
}
}
@ -57,6 +60,19 @@ func (m *mockStorage) ReleaseWALRetention(pin RetentionPin) {
delete(m.pinnedWAL, pin.PinID)
}
func (m *mockStorage) PinFullBase(committedLSN uint64) (FullBasePin, error) {
if m.failFullBasePin {
return FullBasePin{}, fmt.Errorf("full base pin refused")
}
id := m.nextPinID.Add(1)
m.pinnedFullBase[id] = true
return FullBasePin{CommittedLSN: committedLSN, PinID: id, Valid: true}, nil
}
func (m *mockStorage) ReleaseFullBase(pin FullBasePin) {
delete(m.pinnedFullBase, pin.PinID)
}
// --- Plan + execute: catch-up ---
func TestDriver_PlanRecovery_CatchUp(t *testing.T) {
@ -300,6 +316,108 @@ func TestDriver_PlanRecovery_ReplicaAhead_Truncation(t *testing.T) {
driver.ReleasePlan(plan)
}
// --- Full-base rebuild pin ---
func TestDriver_PlanRebuild_FullBase_PinsBaseImage(t *testing.T) {
storage := newMockStorage(RetainedHistory{
HeadLSN: 100, TailLSN: 60, CommittedLSN: 100,
CheckpointLSN: 40, CheckpointTrusted: true,
})
driver := NewRecoveryDriver(storage)
driver.Orchestrator.ProcessAssignment(AssignmentIntent{
Replicas: []ReplicaAssignment{
{ReplicaID: "r1", Endpoint: Endpoint{DataAddr: "r1:9333", Version: 1}},
},
Epoch: 1,
RecoveryTargets: map[string]SessionKind{"r1": SessionRebuild},
})
plan, err := driver.PlanRebuild("r1")
if err != nil {
t.Fatal(err)
}
// Checkpoint at 40, tail at 60 → unreplayable → full base.
if plan.RebuildSource != RebuildFullBase {
t.Fatalf("source=%s", plan.RebuildSource)
}
if plan.FullBasePin == nil {
t.Fatal("full_base rebuild must have a pinned base image")
}
if len(storage.pinnedFullBase) != 1 {
t.Fatalf("expected 1 full base pin, got %d", len(storage.pinnedFullBase))
}
driver.ReleasePlan(plan)
if len(storage.pinnedFullBase) != 0 {
t.Fatal("full base pin should be released")
}
}
func TestDriver_PlanRebuild_FullBase_PinFailure(t *testing.T) {
storage := newMockStorage(RetainedHistory{
HeadLSN: 100, TailLSN: 60, CommittedLSN: 100,
CheckpointLSN: 40, CheckpointTrusted: true,
})
storage.failFullBasePin = true
driver := NewRecoveryDriver(storage)
driver.Orchestrator.ProcessAssignment(AssignmentIntent{
Replicas: []ReplicaAssignment{
{ReplicaID: "r1", Endpoint: Endpoint{DataAddr: "r1:9333", Version: 1}},
},
Epoch: 1,
RecoveryTargets: map[string]SessionKind{"r1": SessionRebuild},
})
_, err := driver.PlanRebuild("r1")
if err == nil {
t.Fatal("should fail when full base pin is refused")
}
hasFailure := false
for _, e := range driver.Orchestrator.Log.EventsFor("r1") {
if e.Event == "full_base_pin_failed" {
hasFailure = true
}
}
if !hasFailure {
t.Fatal("log should contain full_base_pin_failed")
}
}
// --- WAL pin failure cleans up session ---
func TestDriver_PlanRecovery_WALPinFailure_CleansUpSession(t *testing.T) {
storage := newMockStorage(RetainedHistory{
HeadLSN: 100, TailLSN: 30, CommittedLSN: 100,
})
storage.failWALPin = true
driver := NewRecoveryDriver(storage)
driver.Orchestrator.ProcessAssignment(AssignmentIntent{
Replicas: []ReplicaAssignment{
{ReplicaID: "r1", Endpoint: Endpoint{DataAddr: "r1:9333", Version: 1}},
},
Epoch: 1,
RecoveryTargets: map[string]SessionKind{"r1": SessionCatchUp},
})
_, err := driver.PlanRecovery("r1", 70)
if err == nil {
t.Fatal("should fail when WAL pin is refused")
}
// Session must be invalidated — no dangling live session.
s := driver.Orchestrator.Registry.Sender("r1")
if s.HasActiveSession() {
t.Fatal("session should be invalidated after WAL pin failure")
}
if s.State() != StateDisconnected {
t.Fatalf("sender should be disconnected after pin failure, got %s", s.State())
}
}
// --- Cross-layer contract: storage proves recoverability ---
func TestDriver_CrossLayer_StorageProvesRecoverability(t *testing.T) {

Loading…
Cancel
Save