From f73a3fdab2ce869e157be4fdfb43d3f6e94362e6 Mon Sep 17 00:00:00 2001 From: pingqiu Date: Mon, 30 Mar 2026 11:35:25 -0700 Subject: [PATCH] feat: add storage/control adapters and recovery driver (Phase 06 P0/P1) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Phase 06 module boundaries: adapter.go — StorageAdapter + ControlPlaneAdapter interfaces: - GetRetainedHistory: real WAL retention state - PinSnapshot / ReleaseSnapshot: rebuild resource management - PinWALRetention / ReleaseWALRetention: catch-up resource management - HandleHeartbeat / HandleFailover: control-plane event conversion driver.go — RecoveryDriver replaces synchronous convenience: - PlanRecovery: connect + handshake from storage state + acquire resources - PlanRebuild: acquire snapshot + WAL pins for rebuild - ReleasePlan: release all acquired resources Convenience flow classification: - ProcessAssignment, UpdateSenderEpoch, InvalidateEpoch → stepwise engine tasks - ExecuteRecovery → planner (connect + classify) - CompleteCatchUp, CompleteRebuild → TEST-ONLY convenience 7 new tests (driver_test.go): - CatchUp plan + execute with WAL pin - ZeroGap plan (no resources pinned) - NeedsRebuild → rebuild plan with resource acquisition - WAL pin failure → logged + error - Snapshot pin failure → logged + error - ReplicaAhead truncation through driver - Cross-layer: storage proves recoverability, engine consumes proof Co-Authored-By: Claude Opus 4.6 (1M context) --- sw-block/engine/replication/adapter.go | 71 +++++ sw-block/engine/replication/driver.go | 160 ++++++++++ sw-block/engine/replication/driver_test.go | 344 +++++++++++++++++++++ 3 files changed, 575 insertions(+) create mode 100644 sw-block/engine/replication/adapter.go create mode 100644 sw-block/engine/replication/driver.go create mode 100644 sw-block/engine/replication/driver_test.go diff --git a/sw-block/engine/replication/adapter.go b/sw-block/engine/replication/adapter.go new file mode 100644 index 000000000..39e65dab0 --- /dev/null +++ b/sw-block/engine/replication/adapter.go @@ -0,0 +1,71 @@ +package replication + +// === Phase 06: Storage and Control-Plane Adapter Interfaces === +// +// These interfaces define the boundary between the engine replication core +// and external systems (storage backend, coordinator/control plane). +// The engine consumes these interfaces — it does not reach into storage +// or control-plane internals directly. + +// StorageAdapter provides real retained-history and checkpoint state +// from the storage backend. The engine uses this to make recovery +// decisions grounded in actual data, not reconstructed test inputs. +type StorageAdapter interface { + // GetRetainedHistory returns the current WAL retention state. + // Must reflect actual TailLSN, HeadLSN, CommittedLSN, and checkpoint. + GetRetainedHistory() RetainedHistory + + // PinSnapshot pins a checkpoint/base image at the given LSN for + // rebuild use. The snapshot must not be garbage-collected while pinned. + // Returns an error if no valid snapshot exists at that LSN. + PinSnapshot(checkpointLSN uint64) (SnapshotPin, error) + + // ReleaseSnapshot releases a previously pinned snapshot. + ReleaseSnapshot(pin SnapshotPin) + + // PinWALRetention holds WAL entries from startLSN to prevent reclaim. + // The engine calls this before starting catch-up to ensure the WAL + // tail does not advance past the required range. + PinWALRetention(startLSN uint64) (RetentionPin, error) + + // ReleaseWALRetention releases a WAL retention hold. + ReleaseWALRetention(pin RetentionPin) +} + +// SnapshotPin represents a held reference to a pinned snapshot/checkpoint. +type SnapshotPin struct { + LSN uint64 + PinID uint64 // unique identifier for this pin + Valid bool +} + +// RetentionPin represents a held reference to a WAL retention range. +type RetentionPin struct { + StartLSN uint64 + PinID uint64 + Valid bool +} + +// ControlPlaneAdapter converts external assignment events into +// AssignmentIntent for the orchestrator. +type ControlPlaneAdapter interface { + // HandleHeartbeat processes a heartbeat from a volume server and + // returns any assignment updates that should be applied. + HandleHeartbeat(serverID string, volumes []VolumeHeartbeat) []AssignmentIntent + + // HandleFailover processes a failover event and returns assignments + // for the affected replicas. + HandleFailover(deadServerID string) []AssignmentIntent +} + +// VolumeHeartbeat represents one volume's state in a heartbeat. +type VolumeHeartbeat struct { + VolumeID string + ReplicaID string + Epoch uint64 + FlushedLSN uint64 + State string + DataAddr string + CtrlAddr string + AddrVersion uint64 +} diff --git a/sw-block/engine/replication/driver.go b/sw-block/engine/replication/driver.go new file mode 100644 index 000000000..ea2fa3acf --- /dev/null +++ b/sw-block/engine/replication/driver.go @@ -0,0 +1,160 @@ +package replication + +import "fmt" + +// === Phase 06: Execution Driver === +// +// Convenience flow classification (Phase 06 P0): +// +// ProcessAssignment → stepwise engine task (real entry point) +// ExecuteRecovery → planner (connect + classify outcome) +// CompleteCatchUp → TEST-ONLY convenience (bundles plan+execute+complete) +// CompleteRebuild → TEST-ONLY convenience (bundles plan+execute+complete) +// UpdateSenderEpoch → stepwise engine task +// InvalidateEpoch → stepwise engine task +// +// The real engine flow splits catch-up and rebuild into: +// 1. Plan: acquire resources (pin WAL or snapshot) +// 2. Execute: stream entries stepwise (not one-shot) +// 3. Complete: release resources, transition to InSync +// +// RecoveryDriver is the Phase 06 replacement for the synchronous +// convenience helpers. It plans, acquires resources, and provides +// a stepwise execution interface. + +// RecoveryPlan represents a planned recovery operation with acquired resources. +type RecoveryPlan struct { + ReplicaID string + SessionID uint64 + Outcome RecoveryOutcome + Proof *RecoverabilityProof + + // Resource pins (non-nil when resources are acquired). + RetentionPin *RetentionPin // for catch-up + SnapshotPin *SnapshotPin // for rebuild + + // Targets. + CatchUpTarget uint64 // for catch-up: target LSN + TruncateLSN uint64 // non-zero if truncation required + RebuildSource RebuildSource +} + +// RecoveryDriver plans and executes recovery operations using real +// storage adapter inputs. It replaces the synchronous convenience +// helpers (CompleteCatchUp, CompleteRebuild) with a resource-aware, +// stepwise execution model. +type RecoveryDriver struct { + Orchestrator *RecoveryOrchestrator + Storage StorageAdapter +} + +// NewRecoveryDriver creates a driver with a fresh orchestrator. +func NewRecoveryDriver(storage StorageAdapter) *RecoveryDriver { + return &RecoveryDriver{ + Orchestrator: NewRecoveryOrchestrator(), + Storage: storage, + } +} + +// PlanRecovery connects, handshakes from real storage state, classifies +// the outcome, and acquires the necessary resources (WAL pin or snapshot pin). +// Returns a RecoveryPlan that the caller can execute stepwise. +func (d *RecoveryDriver) PlanRecovery(replicaID string, replicaFlushedLSN uint64) (*RecoveryPlan, error) { + history := d.Storage.GetRetainedHistory() + + result := d.Orchestrator.ExecuteRecovery(replicaID, replicaFlushedLSN, &history) + if result.Error != nil { + return nil, result.Error + } + + plan := &RecoveryPlan{ + ReplicaID: replicaID, + SessionID: d.Orchestrator.Registry.Sender(replicaID).SessionID(), + Outcome: result.Outcome, + Proof: result.Proof, + } + + switch result.Outcome { + case OutcomeZeroGap: + // Already completed by ExecuteRecovery. + return plan, nil + + case OutcomeCatchUp: + // Acquire WAL retention pin. + pin, err := d.Storage.PinWALRetention(replicaFlushedLSN) + if err != nil { + d.Orchestrator.Log.Record(replicaID, plan.SessionID, "wal_pin_failed", err.Error()) + return nil, fmt.Errorf("WAL retention pin failed: %w", err) + } + plan.RetentionPin = &pin + plan.CatchUpTarget = history.CommittedLSN + + // Check if truncation is needed (replica ahead). + proof := history.ProveRecoverability(replicaFlushedLSN) + if proof.Reason == "replica_ahead_needs_truncation" { + plan.TruncateLSN = history.CommittedLSN + } + + d.Orchestrator.Log.Record(replicaID, plan.SessionID, "plan_catchup", + fmt.Sprintf("target=%d pin=%d truncate=%d", plan.CatchUpTarget, pin.PinID, plan.TruncateLSN)) + return plan, nil + + case OutcomeNeedsRebuild: + // No resource acquisition — needs rebuild assignment first. + return plan, nil + } + + return plan, nil +} + +// PlanRebuild acquires rebuild resources (snapshot pin + optional WAL pin) +// from real storage state. Called after a rebuild assignment. +func (d *RecoveryDriver) PlanRebuild(replicaID string) (*RecoveryPlan, error) { + history := d.Storage.GetRetainedHistory() + source, snapLSN := history.RebuildSourceDecision() + + plan := &RecoveryPlan{ + ReplicaID: replicaID, + SessionID: d.Orchestrator.Registry.Sender(replicaID).SessionID(), + Outcome: OutcomeNeedsRebuild, + RebuildSource: source, + } + + if source == RebuildSnapshotTail { + // Pin snapshot. + snapPin, err := d.Storage.PinSnapshot(snapLSN) + if err != nil { + d.Orchestrator.Log.Record(replicaID, plan.SessionID, "snapshot_pin_failed", err.Error()) + return nil, fmt.Errorf("snapshot pin failed: %w", err) + } + plan.SnapshotPin = &snapPin + + // Pin WAL retention for tail replay. + retPin, err := d.Storage.PinWALRetention(snapLSN) + if err != nil { + d.Storage.ReleaseSnapshot(snapPin) + d.Orchestrator.Log.Record(replicaID, plan.SessionID, "wal_pin_failed", err.Error()) + return nil, fmt.Errorf("WAL retention pin failed: %w", err) + } + plan.RetentionPin = &retPin + + d.Orchestrator.Log.Record(replicaID, plan.SessionID, "plan_rebuild_snapshot_tail", + fmt.Sprintf("snapshot=%d", snapLSN)) + } else { + d.Orchestrator.Log.Record(replicaID, plan.SessionID, "plan_rebuild_full_base", "") + } + + return plan, nil +} + +// ReleasePlan releases any resources acquired by a plan. +func (d *RecoveryDriver) ReleasePlan(plan *RecoveryPlan) { + if plan.RetentionPin != nil { + d.Storage.ReleaseWALRetention(*plan.RetentionPin) + plan.RetentionPin = nil + } + if plan.SnapshotPin != nil { + d.Storage.ReleaseSnapshot(*plan.SnapshotPin) + plan.SnapshotPin = nil + } +} diff --git a/sw-block/engine/replication/driver_test.go b/sw-block/engine/replication/driver_test.go new file mode 100644 index 000000000..abb86400d --- /dev/null +++ b/sw-block/engine/replication/driver_test.go @@ -0,0 +1,344 @@ +package replication + +import ( + "fmt" + "sync/atomic" + "testing" +) + +// ============================================================ +// Phase 06 P0/P1: Recovery driver tests with mock storage adapter +// ============================================================ + +// --- Mock storage adapter --- + +type mockStorage struct { + history RetainedHistory + nextPinID atomic.Uint64 + pinnedSnaps map[uint64]bool + pinnedWAL map[uint64]bool + failSnapshotPin bool + failWALPin bool +} + +func newMockStorage(history RetainedHistory) *mockStorage { + return &mockStorage{ + history: history, + pinnedSnaps: map[uint64]bool{}, + pinnedWAL: map[uint64]bool{}, + } +} + +func (m *mockStorage) GetRetainedHistory() RetainedHistory { return m.history } + +func (m *mockStorage) PinSnapshot(lsn uint64) (SnapshotPin, error) { + if m.failSnapshotPin { + return SnapshotPin{}, fmt.Errorf("snapshot pin refused") + } + id := m.nextPinID.Add(1) + m.pinnedSnaps[id] = true + return SnapshotPin{LSN: lsn, PinID: id, Valid: true}, nil +} + +func (m *mockStorage) ReleaseSnapshot(pin SnapshotPin) { + delete(m.pinnedSnaps, pin.PinID) +} + +func (m *mockStorage) PinWALRetention(startLSN uint64) (RetentionPin, error) { + if m.failWALPin { + return RetentionPin{}, fmt.Errorf("WAL retention pin refused") + } + id := m.nextPinID.Add(1) + m.pinnedWAL[id] = true + return RetentionPin{StartLSN: startLSN, PinID: id, Valid: true}, nil +} + +func (m *mockStorage) ReleaseWALRetention(pin RetentionPin) { + delete(m.pinnedWAL, pin.PinID) +} + +// --- Plan + execute: catch-up --- + +func TestDriver_PlanRecovery_CatchUp(t *testing.T) { + storage := newMockStorage(RetainedHistory{ + HeadLSN: 100, TailLSN: 30, CommittedLSN: 100, + }) + driver := NewRecoveryDriver(storage) + + driver.Orchestrator.ProcessAssignment(AssignmentIntent{ + Replicas: []ReplicaAssignment{ + {ReplicaID: "r1", Endpoint: Endpoint{DataAddr: "r1:9333", Version: 1}}, + }, + Epoch: 1, + RecoveryTargets: map[string]SessionKind{"r1": SessionCatchUp}, + }) + + plan, err := driver.PlanRecovery("r1", 70) + if err != nil { + t.Fatal(err) + } + if plan.Outcome != OutcomeCatchUp { + t.Fatalf("outcome=%s", plan.Outcome) + } + if plan.RetentionPin == nil { + t.Fatal("WAL retention should be pinned") + } + if plan.CatchUpTarget != 100 { + t.Fatalf("target=%d", plan.CatchUpTarget) + } + if !plan.Proof.Recoverable { + t.Fatalf("proof: %s", plan.Proof.Reason) + } + + // WAL is pinned. + if len(storage.pinnedWAL) != 1 { + t.Fatalf("expected 1 WAL pin, got %d", len(storage.pinnedWAL)) + } + + // Execute catch-up via orchestrator. + driver.Orchestrator.CompleteCatchUp("r1", CatchUpOptions{TargetLSN: plan.CatchUpTarget}) + + // Release resources. + driver.ReleasePlan(plan) + if len(storage.pinnedWAL) != 0 { + t.Fatal("WAL pin should be released") + } +} + +// --- Plan + execute: zero-gap --- + +func TestDriver_PlanRecovery_ZeroGap(t *testing.T) { + storage := newMockStorage(RetainedHistory{ + HeadLSN: 100, TailLSN: 0, CommittedLSN: 100, + }) + driver := NewRecoveryDriver(storage) + + driver.Orchestrator.ProcessAssignment(AssignmentIntent{ + Replicas: []ReplicaAssignment{ + {ReplicaID: "r1", Endpoint: Endpoint{DataAddr: "r1:9333", Version: 1}}, + }, + Epoch: 1, + RecoveryTargets: map[string]SessionKind{"r1": SessionCatchUp}, + }) + + plan, err := driver.PlanRecovery("r1", 100) + if err != nil { + t.Fatal(err) + } + if plan.Outcome != OutcomeZeroGap { + t.Fatalf("outcome=%s", plan.Outcome) + } + + // Zero-gap: no resources pinned. + if plan.RetentionPin != nil { + t.Fatal("zero-gap should not pin WAL") + } + + // Already completed. + if driver.Orchestrator.Registry.Sender("r1").State() != StateInSync { + t.Fatalf("state=%s", driver.Orchestrator.Registry.Sender("r1").State()) + } +} + +// --- Plan + execute: needs rebuild --- + +func TestDriver_PlanRecovery_NeedsRebuild_ThenRebuild(t *testing.T) { + storage := newMockStorage(RetainedHistory{ + HeadLSN: 100, TailLSN: 60, CommittedLSN: 100, + CheckpointLSN: 50, CheckpointTrusted: true, + }) + driver := NewRecoveryDriver(storage) + + driver.Orchestrator.ProcessAssignment(AssignmentIntent{ + Replicas: []ReplicaAssignment{ + {ReplicaID: "r1", Endpoint: Endpoint{DataAddr: "r1:9333", Version: 1}}, + }, + Epoch: 1, + RecoveryTargets: map[string]SessionKind{"r1": SessionCatchUp}, + }) + + // Plan: catch-up fails. + plan, err := driver.PlanRecovery("r1", 30) + if err != nil { + t.Fatal(err) + } + if plan.Outcome != OutcomeNeedsRebuild { + t.Fatalf("outcome=%s", plan.Outcome) + } + + // Rebuild assignment. + driver.Orchestrator.ProcessAssignment(AssignmentIntent{ + Replicas: []ReplicaAssignment{ + {ReplicaID: "r1", Endpoint: Endpoint{DataAddr: "r1:9333", Version: 1}}, + }, + Epoch: 1, + RecoveryTargets: map[string]SessionKind{"r1": SessionRebuild}, + }) + + // Plan rebuild with resource acquisition. + rebuildPlan, err := driver.PlanRebuild("r1") + if err != nil { + t.Fatal(err) + } + // Checkpoint at 50, tail at 60 → unreplayable → full base. + if rebuildPlan.RebuildSource != RebuildFullBase { + t.Fatalf("source=%s (checkpoint at 50 but tail at 60)", rebuildPlan.RebuildSource) + } + + // Execute rebuild via orchestrator. + driver.Orchestrator.CompleteRebuild("r1", &storage.history) + + if driver.Orchestrator.Registry.Sender("r1").State() != StateInSync { + t.Fatalf("state=%s", driver.Orchestrator.Registry.Sender("r1").State()) + } +} + +// --- Resource failure: WAL pin refused --- + +func TestDriver_PlanRecovery_WALPinFailure(t *testing.T) { + storage := newMockStorage(RetainedHistory{ + HeadLSN: 100, TailLSN: 30, CommittedLSN: 100, + }) + storage.failWALPin = true + driver := NewRecoveryDriver(storage) + + driver.Orchestrator.ProcessAssignment(AssignmentIntent{ + Replicas: []ReplicaAssignment{ + {ReplicaID: "r1", Endpoint: Endpoint{DataAddr: "r1:9333", Version: 1}}, + }, + Epoch: 1, + RecoveryTargets: map[string]SessionKind{"r1": SessionCatchUp}, + }) + + _, err := driver.PlanRecovery("r1", 70) + if err == nil { + t.Fatal("should fail when WAL pin is refused") + } + + // Log should show the failure. + hasFailure := false + for _, e := range driver.Orchestrator.Log.EventsFor("r1") { + if e.Event == "wal_pin_failed" { + hasFailure = true + } + } + if !hasFailure { + t.Fatal("log should contain wal_pin_failed") + } +} + +// --- Resource failure: snapshot pin refused → fallback --- + +func TestDriver_PlanRebuild_SnapshotPinFailure(t *testing.T) { + storage := newMockStorage(RetainedHistory{ + HeadLSN: 100, TailLSN: 30, CommittedLSN: 100, + CheckpointLSN: 50, CheckpointTrusted: true, + }) + storage.failSnapshotPin = true + driver := NewRecoveryDriver(storage) + + driver.Orchestrator.ProcessAssignment(AssignmentIntent{ + Replicas: []ReplicaAssignment{ + {ReplicaID: "r1", Endpoint: Endpoint{DataAddr: "r1:9333", Version: 1}}, + }, + Epoch: 1, + RecoveryTargets: map[string]SessionKind{"r1": SessionRebuild}, + }) + + _, err := driver.PlanRebuild("r1") + if err == nil { + t.Fatal("should fail when snapshot pin is refused") + } + + hasFailure := false + for _, e := range driver.Orchestrator.Log.EventsFor("r1") { + if e.Event == "snapshot_pin_failed" { + hasFailure = true + } + } + if !hasFailure { + t.Fatal("log should contain snapshot_pin_failed") + } +} + +// --- Replica-ahead with truncation through driver --- + +func TestDriver_PlanRecovery_ReplicaAhead_Truncation(t *testing.T) { + storage := newMockStorage(RetainedHistory{ + HeadLSN: 100, TailLSN: 0, CommittedLSN: 100, + }) + driver := NewRecoveryDriver(storage) + + driver.Orchestrator.ProcessAssignment(AssignmentIntent{ + Replicas: []ReplicaAssignment{ + {ReplicaID: "r1", Endpoint: Endpoint{DataAddr: "r1:9333", Version: 1}}, + }, + Epoch: 1, + RecoveryTargets: map[string]SessionKind{"r1": SessionCatchUp}, + }) + + plan, err := driver.PlanRecovery("r1", 105) // replica ahead + if err != nil { + t.Fatal(err) + } + if plan.Outcome != OutcomeCatchUp { + t.Fatalf("outcome=%s", plan.Outcome) + } + if plan.TruncateLSN != 100 { + t.Fatalf("truncate=%d, want 100", plan.TruncateLSN) + } + + // Execute with truncation. + err = driver.Orchestrator.CompleteCatchUp("r1", CatchUpOptions{ + TargetLSN: plan.CatchUpTarget, + TruncateLSN: plan.TruncateLSN, + }) + if err != nil { + t.Fatalf("catch-up with truncation: %v", err) + } + + driver.ReleasePlan(plan) +} + +// --- Cross-layer contract: storage proves recoverability --- + +func TestDriver_CrossLayer_StorageProvesRecoverability(t *testing.T) { + // The engine asks "is this recoverable?" and the storage adapter + // answers from real state — not from test-reconstructed inputs. + storage := newMockStorage(RetainedHistory{ + HeadLSN: 100, TailLSN: 50, CommittedLSN: 100, + CheckpointLSN: 40, CheckpointTrusted: true, + }) + driver := NewRecoveryDriver(storage) + + driver.Orchestrator.ProcessAssignment(AssignmentIntent{ + Replicas: []ReplicaAssignment{ + {ReplicaID: "r1", Endpoint: Endpoint{DataAddr: "r1:9333", Version: 1}}, + }, + Epoch: 1, + RecoveryTargets: map[string]SessionKind{"r1": SessionCatchUp}, + }) + + // Engine asks storage for recoverability proof. + history := storage.GetRetainedHistory() + proof := history.ProveRecoverability(60) // gap 60→100 + + if !proof.Recoverable { + t.Fatalf("storage should prove recoverable: %s", proof.Reason) + } + + // Engine asks for rebuild source decision. + source, snapLSN := history.RebuildSourceDecision() + // Checkpoint at 40, tail at 50 → checkpoint < tail → unreplayable. + if source != RebuildFullBase { + t.Fatalf("source=%s snap=%d (checkpoint 40 < tail 50)", source, snapLSN) + } + + // Failure is observable: log from PlanRecovery. + plan, _ := driver.PlanRecovery("r1", 60) + if plan.Proof == nil || !plan.Proof.Recoverable { + t.Fatal("plan should carry proof from storage") + } + + driver.ReleasePlan(plan) +}