Browse Source

feat: add storage/control adapters and recovery driver (Phase 06 P0/P1)

Phase 06 module boundaries:

adapter.go — StorageAdapter + ControlPlaneAdapter interfaces:
- GetRetainedHistory: real WAL retention state
- PinSnapshot / ReleaseSnapshot: rebuild resource management
- PinWALRetention / ReleaseWALRetention: catch-up resource management
- HandleHeartbeat / HandleFailover: control-plane event conversion

driver.go — RecoveryDriver replaces synchronous convenience:
- PlanRecovery: connect + handshake from storage state + acquire resources
- PlanRebuild: acquire snapshot + WAL pins for rebuild
- ReleasePlan: release all acquired resources

Convenience flow classification:
- ProcessAssignment, UpdateSenderEpoch, InvalidateEpoch → stepwise engine tasks
- ExecuteRecovery → planner (connect + classify)
- CompleteCatchUp, CompleteRebuild → TEST-ONLY convenience

7 new tests (driver_test.go):
- CatchUp plan + execute with WAL pin
- ZeroGap plan (no resources pinned)
- NeedsRebuild → rebuild plan with resource acquisition
- WAL pin failure → logged + error
- Snapshot pin failure → logged + error
- ReplicaAhead truncation through driver
- Cross-layer: storage proves recoverability, engine consumes proof

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
feature/sw-block
pingqiu 1 day ago
parent
commit
f73a3fdab2
  1. 71
      sw-block/engine/replication/adapter.go
  2. 160
      sw-block/engine/replication/driver.go
  3. 344
      sw-block/engine/replication/driver_test.go

71
sw-block/engine/replication/adapter.go

@ -0,0 +1,71 @@
package replication
// === Phase 06: Storage and Control-Plane Adapter Interfaces ===
//
// These interfaces define the boundary between the engine replication core
// and external systems (storage backend, coordinator/control plane).
// The engine consumes these interfaces — it does not reach into storage
// or control-plane internals directly.
// StorageAdapter provides real retained-history and checkpoint state
// from the storage backend. The engine uses this to make recovery
// decisions grounded in actual data, not reconstructed test inputs.
type StorageAdapter interface {
// GetRetainedHistory returns the current WAL retention state.
// Must reflect actual TailLSN, HeadLSN, CommittedLSN, and checkpoint.
GetRetainedHistory() RetainedHistory
// PinSnapshot pins a checkpoint/base image at the given LSN for
// rebuild use. The snapshot must not be garbage-collected while pinned.
// Returns an error if no valid snapshot exists at that LSN.
PinSnapshot(checkpointLSN uint64) (SnapshotPin, error)
// ReleaseSnapshot releases a previously pinned snapshot.
ReleaseSnapshot(pin SnapshotPin)
// PinWALRetention holds WAL entries from startLSN to prevent reclaim.
// The engine calls this before starting catch-up to ensure the WAL
// tail does not advance past the required range.
PinWALRetention(startLSN uint64) (RetentionPin, error)
// ReleaseWALRetention releases a WAL retention hold.
ReleaseWALRetention(pin RetentionPin)
}
// SnapshotPin represents a held reference to a pinned snapshot/checkpoint.
type SnapshotPin struct {
LSN uint64
PinID uint64 // unique identifier for this pin
Valid bool
}
// RetentionPin represents a held reference to a WAL retention range.
type RetentionPin struct {
StartLSN uint64
PinID uint64
Valid bool
}
// ControlPlaneAdapter converts external assignment events into
// AssignmentIntent for the orchestrator.
type ControlPlaneAdapter interface {
// HandleHeartbeat processes a heartbeat from a volume server and
// returns any assignment updates that should be applied.
HandleHeartbeat(serverID string, volumes []VolumeHeartbeat) []AssignmentIntent
// HandleFailover processes a failover event and returns assignments
// for the affected replicas.
HandleFailover(deadServerID string) []AssignmentIntent
}
// VolumeHeartbeat represents one volume's state in a heartbeat.
type VolumeHeartbeat struct {
VolumeID string
ReplicaID string
Epoch uint64
FlushedLSN uint64
State string
DataAddr string
CtrlAddr string
AddrVersion uint64
}

160
sw-block/engine/replication/driver.go

@ -0,0 +1,160 @@
package replication
import "fmt"
// === Phase 06: Execution Driver ===
//
// Convenience flow classification (Phase 06 P0):
//
// ProcessAssignment → stepwise engine task (real entry point)
// ExecuteRecovery → planner (connect + classify outcome)
// CompleteCatchUp → TEST-ONLY convenience (bundles plan+execute+complete)
// CompleteRebuild → TEST-ONLY convenience (bundles plan+execute+complete)
// UpdateSenderEpoch → stepwise engine task
// InvalidateEpoch → stepwise engine task
//
// The real engine flow splits catch-up and rebuild into:
// 1. Plan: acquire resources (pin WAL or snapshot)
// 2. Execute: stream entries stepwise (not one-shot)
// 3. Complete: release resources, transition to InSync
//
// RecoveryDriver is the Phase 06 replacement for the synchronous
// convenience helpers. It plans, acquires resources, and provides
// a stepwise execution interface.
// RecoveryPlan represents a planned recovery operation with acquired resources.
type RecoveryPlan struct {
ReplicaID string
SessionID uint64
Outcome RecoveryOutcome
Proof *RecoverabilityProof
// Resource pins (non-nil when resources are acquired).
RetentionPin *RetentionPin // for catch-up
SnapshotPin *SnapshotPin // for rebuild
// Targets.
CatchUpTarget uint64 // for catch-up: target LSN
TruncateLSN uint64 // non-zero if truncation required
RebuildSource RebuildSource
}
// RecoveryDriver plans and executes recovery operations using real
// storage adapter inputs. It replaces the synchronous convenience
// helpers (CompleteCatchUp, CompleteRebuild) with a resource-aware,
// stepwise execution model.
type RecoveryDriver struct {
Orchestrator *RecoveryOrchestrator
Storage StorageAdapter
}
// NewRecoveryDriver creates a driver with a fresh orchestrator.
func NewRecoveryDriver(storage StorageAdapter) *RecoveryDriver {
return &RecoveryDriver{
Orchestrator: NewRecoveryOrchestrator(),
Storage: storage,
}
}
// PlanRecovery connects, handshakes from real storage state, classifies
// the outcome, and acquires the necessary resources (WAL pin or snapshot pin).
// Returns a RecoveryPlan that the caller can execute stepwise.
func (d *RecoveryDriver) PlanRecovery(replicaID string, replicaFlushedLSN uint64) (*RecoveryPlan, error) {
history := d.Storage.GetRetainedHistory()
result := d.Orchestrator.ExecuteRecovery(replicaID, replicaFlushedLSN, &history)
if result.Error != nil {
return nil, result.Error
}
plan := &RecoveryPlan{
ReplicaID: replicaID,
SessionID: d.Orchestrator.Registry.Sender(replicaID).SessionID(),
Outcome: result.Outcome,
Proof: result.Proof,
}
switch result.Outcome {
case OutcomeZeroGap:
// Already completed by ExecuteRecovery.
return plan, nil
case OutcomeCatchUp:
// Acquire WAL retention pin.
pin, err := d.Storage.PinWALRetention(replicaFlushedLSN)
if err != nil {
d.Orchestrator.Log.Record(replicaID, plan.SessionID, "wal_pin_failed", err.Error())
return nil, fmt.Errorf("WAL retention pin failed: %w", err)
}
plan.RetentionPin = &pin
plan.CatchUpTarget = history.CommittedLSN
// Check if truncation is needed (replica ahead).
proof := history.ProveRecoverability(replicaFlushedLSN)
if proof.Reason == "replica_ahead_needs_truncation" {
plan.TruncateLSN = history.CommittedLSN
}
d.Orchestrator.Log.Record(replicaID, plan.SessionID, "plan_catchup",
fmt.Sprintf("target=%d pin=%d truncate=%d", plan.CatchUpTarget, pin.PinID, plan.TruncateLSN))
return plan, nil
case OutcomeNeedsRebuild:
// No resource acquisition — needs rebuild assignment first.
return plan, nil
}
return plan, nil
}
// PlanRebuild acquires rebuild resources (snapshot pin + optional WAL pin)
// from real storage state. Called after a rebuild assignment.
func (d *RecoveryDriver) PlanRebuild(replicaID string) (*RecoveryPlan, error) {
history := d.Storage.GetRetainedHistory()
source, snapLSN := history.RebuildSourceDecision()
plan := &RecoveryPlan{
ReplicaID: replicaID,
SessionID: d.Orchestrator.Registry.Sender(replicaID).SessionID(),
Outcome: OutcomeNeedsRebuild,
RebuildSource: source,
}
if source == RebuildSnapshotTail {
// Pin snapshot.
snapPin, err := d.Storage.PinSnapshot(snapLSN)
if err != nil {
d.Orchestrator.Log.Record(replicaID, plan.SessionID, "snapshot_pin_failed", err.Error())
return nil, fmt.Errorf("snapshot pin failed: %w", err)
}
plan.SnapshotPin = &snapPin
// Pin WAL retention for tail replay.
retPin, err := d.Storage.PinWALRetention(snapLSN)
if err != nil {
d.Storage.ReleaseSnapshot(snapPin)
d.Orchestrator.Log.Record(replicaID, plan.SessionID, "wal_pin_failed", err.Error())
return nil, fmt.Errorf("WAL retention pin failed: %w", err)
}
plan.RetentionPin = &retPin
d.Orchestrator.Log.Record(replicaID, plan.SessionID, "plan_rebuild_snapshot_tail",
fmt.Sprintf("snapshot=%d", snapLSN))
} else {
d.Orchestrator.Log.Record(replicaID, plan.SessionID, "plan_rebuild_full_base", "")
}
return plan, nil
}
// ReleasePlan releases any resources acquired by a plan.
func (d *RecoveryDriver) ReleasePlan(plan *RecoveryPlan) {
if plan.RetentionPin != nil {
d.Storage.ReleaseWALRetention(*plan.RetentionPin)
plan.RetentionPin = nil
}
if plan.SnapshotPin != nil {
d.Storage.ReleaseSnapshot(*plan.SnapshotPin)
plan.SnapshotPin = nil
}
}

344
sw-block/engine/replication/driver_test.go

@ -0,0 +1,344 @@
package replication
import (
"fmt"
"sync/atomic"
"testing"
)
// ============================================================
// Phase 06 P0/P1: Recovery driver tests with mock storage adapter
// ============================================================
// --- Mock storage adapter ---
type mockStorage struct {
history RetainedHistory
nextPinID atomic.Uint64
pinnedSnaps map[uint64]bool
pinnedWAL map[uint64]bool
failSnapshotPin bool
failWALPin bool
}
func newMockStorage(history RetainedHistory) *mockStorage {
return &mockStorage{
history: history,
pinnedSnaps: map[uint64]bool{},
pinnedWAL: map[uint64]bool{},
}
}
func (m *mockStorage) GetRetainedHistory() RetainedHistory { return m.history }
func (m *mockStorage) PinSnapshot(lsn uint64) (SnapshotPin, error) {
if m.failSnapshotPin {
return SnapshotPin{}, fmt.Errorf("snapshot pin refused")
}
id := m.nextPinID.Add(1)
m.pinnedSnaps[id] = true
return SnapshotPin{LSN: lsn, PinID: id, Valid: true}, nil
}
func (m *mockStorage) ReleaseSnapshot(pin SnapshotPin) {
delete(m.pinnedSnaps, pin.PinID)
}
func (m *mockStorage) PinWALRetention(startLSN uint64) (RetentionPin, error) {
if m.failWALPin {
return RetentionPin{}, fmt.Errorf("WAL retention pin refused")
}
id := m.nextPinID.Add(1)
m.pinnedWAL[id] = true
return RetentionPin{StartLSN: startLSN, PinID: id, Valid: true}, nil
}
func (m *mockStorage) ReleaseWALRetention(pin RetentionPin) {
delete(m.pinnedWAL, pin.PinID)
}
// --- Plan + execute: catch-up ---
func TestDriver_PlanRecovery_CatchUp(t *testing.T) {
storage := newMockStorage(RetainedHistory{
HeadLSN: 100, TailLSN: 30, CommittedLSN: 100,
})
driver := NewRecoveryDriver(storage)
driver.Orchestrator.ProcessAssignment(AssignmentIntent{
Replicas: []ReplicaAssignment{
{ReplicaID: "r1", Endpoint: Endpoint{DataAddr: "r1:9333", Version: 1}},
},
Epoch: 1,
RecoveryTargets: map[string]SessionKind{"r1": SessionCatchUp},
})
plan, err := driver.PlanRecovery("r1", 70)
if err != nil {
t.Fatal(err)
}
if plan.Outcome != OutcomeCatchUp {
t.Fatalf("outcome=%s", plan.Outcome)
}
if plan.RetentionPin == nil {
t.Fatal("WAL retention should be pinned")
}
if plan.CatchUpTarget != 100 {
t.Fatalf("target=%d", plan.CatchUpTarget)
}
if !plan.Proof.Recoverable {
t.Fatalf("proof: %s", plan.Proof.Reason)
}
// WAL is pinned.
if len(storage.pinnedWAL) != 1 {
t.Fatalf("expected 1 WAL pin, got %d", len(storage.pinnedWAL))
}
// Execute catch-up via orchestrator.
driver.Orchestrator.CompleteCatchUp("r1", CatchUpOptions{TargetLSN: plan.CatchUpTarget})
// Release resources.
driver.ReleasePlan(plan)
if len(storage.pinnedWAL) != 0 {
t.Fatal("WAL pin should be released")
}
}
// --- Plan + execute: zero-gap ---
func TestDriver_PlanRecovery_ZeroGap(t *testing.T) {
storage := newMockStorage(RetainedHistory{
HeadLSN: 100, TailLSN: 0, CommittedLSN: 100,
})
driver := NewRecoveryDriver(storage)
driver.Orchestrator.ProcessAssignment(AssignmentIntent{
Replicas: []ReplicaAssignment{
{ReplicaID: "r1", Endpoint: Endpoint{DataAddr: "r1:9333", Version: 1}},
},
Epoch: 1,
RecoveryTargets: map[string]SessionKind{"r1": SessionCatchUp},
})
plan, err := driver.PlanRecovery("r1", 100)
if err != nil {
t.Fatal(err)
}
if plan.Outcome != OutcomeZeroGap {
t.Fatalf("outcome=%s", plan.Outcome)
}
// Zero-gap: no resources pinned.
if plan.RetentionPin != nil {
t.Fatal("zero-gap should not pin WAL")
}
// Already completed.
if driver.Orchestrator.Registry.Sender("r1").State() != StateInSync {
t.Fatalf("state=%s", driver.Orchestrator.Registry.Sender("r1").State())
}
}
// --- Plan + execute: needs rebuild ---
func TestDriver_PlanRecovery_NeedsRebuild_ThenRebuild(t *testing.T) {
storage := newMockStorage(RetainedHistory{
HeadLSN: 100, TailLSN: 60, CommittedLSN: 100,
CheckpointLSN: 50, CheckpointTrusted: true,
})
driver := NewRecoveryDriver(storage)
driver.Orchestrator.ProcessAssignment(AssignmentIntent{
Replicas: []ReplicaAssignment{
{ReplicaID: "r1", Endpoint: Endpoint{DataAddr: "r1:9333", Version: 1}},
},
Epoch: 1,
RecoveryTargets: map[string]SessionKind{"r1": SessionCatchUp},
})
// Plan: catch-up fails.
plan, err := driver.PlanRecovery("r1", 30)
if err != nil {
t.Fatal(err)
}
if plan.Outcome != OutcomeNeedsRebuild {
t.Fatalf("outcome=%s", plan.Outcome)
}
// Rebuild assignment.
driver.Orchestrator.ProcessAssignment(AssignmentIntent{
Replicas: []ReplicaAssignment{
{ReplicaID: "r1", Endpoint: Endpoint{DataAddr: "r1:9333", Version: 1}},
},
Epoch: 1,
RecoveryTargets: map[string]SessionKind{"r1": SessionRebuild},
})
// Plan rebuild with resource acquisition.
rebuildPlan, err := driver.PlanRebuild("r1")
if err != nil {
t.Fatal(err)
}
// Checkpoint at 50, tail at 60 → unreplayable → full base.
if rebuildPlan.RebuildSource != RebuildFullBase {
t.Fatalf("source=%s (checkpoint at 50 but tail at 60)", rebuildPlan.RebuildSource)
}
// Execute rebuild via orchestrator.
driver.Orchestrator.CompleteRebuild("r1", &storage.history)
if driver.Orchestrator.Registry.Sender("r1").State() != StateInSync {
t.Fatalf("state=%s", driver.Orchestrator.Registry.Sender("r1").State())
}
}
// --- Resource failure: WAL pin refused ---
func TestDriver_PlanRecovery_WALPinFailure(t *testing.T) {
storage := newMockStorage(RetainedHistory{
HeadLSN: 100, TailLSN: 30, CommittedLSN: 100,
})
storage.failWALPin = true
driver := NewRecoveryDriver(storage)
driver.Orchestrator.ProcessAssignment(AssignmentIntent{
Replicas: []ReplicaAssignment{
{ReplicaID: "r1", Endpoint: Endpoint{DataAddr: "r1:9333", Version: 1}},
},
Epoch: 1,
RecoveryTargets: map[string]SessionKind{"r1": SessionCatchUp},
})
_, err := driver.PlanRecovery("r1", 70)
if err == nil {
t.Fatal("should fail when WAL pin is refused")
}
// Log should show the failure.
hasFailure := false
for _, e := range driver.Orchestrator.Log.EventsFor("r1") {
if e.Event == "wal_pin_failed" {
hasFailure = true
}
}
if !hasFailure {
t.Fatal("log should contain wal_pin_failed")
}
}
// --- Resource failure: snapshot pin refused → fallback ---
func TestDriver_PlanRebuild_SnapshotPinFailure(t *testing.T) {
storage := newMockStorage(RetainedHistory{
HeadLSN: 100, TailLSN: 30, CommittedLSN: 100,
CheckpointLSN: 50, CheckpointTrusted: true,
})
storage.failSnapshotPin = true
driver := NewRecoveryDriver(storage)
driver.Orchestrator.ProcessAssignment(AssignmentIntent{
Replicas: []ReplicaAssignment{
{ReplicaID: "r1", Endpoint: Endpoint{DataAddr: "r1:9333", Version: 1}},
},
Epoch: 1,
RecoveryTargets: map[string]SessionKind{"r1": SessionRebuild},
})
_, err := driver.PlanRebuild("r1")
if err == nil {
t.Fatal("should fail when snapshot pin is refused")
}
hasFailure := false
for _, e := range driver.Orchestrator.Log.EventsFor("r1") {
if e.Event == "snapshot_pin_failed" {
hasFailure = true
}
}
if !hasFailure {
t.Fatal("log should contain snapshot_pin_failed")
}
}
// --- Replica-ahead with truncation through driver ---
func TestDriver_PlanRecovery_ReplicaAhead_Truncation(t *testing.T) {
storage := newMockStorage(RetainedHistory{
HeadLSN: 100, TailLSN: 0, CommittedLSN: 100,
})
driver := NewRecoveryDriver(storage)
driver.Orchestrator.ProcessAssignment(AssignmentIntent{
Replicas: []ReplicaAssignment{
{ReplicaID: "r1", Endpoint: Endpoint{DataAddr: "r1:9333", Version: 1}},
},
Epoch: 1,
RecoveryTargets: map[string]SessionKind{"r1": SessionCatchUp},
})
plan, err := driver.PlanRecovery("r1", 105) // replica ahead
if err != nil {
t.Fatal(err)
}
if plan.Outcome != OutcomeCatchUp {
t.Fatalf("outcome=%s", plan.Outcome)
}
if plan.TruncateLSN != 100 {
t.Fatalf("truncate=%d, want 100", plan.TruncateLSN)
}
// Execute with truncation.
err = driver.Orchestrator.CompleteCatchUp("r1", CatchUpOptions{
TargetLSN: plan.CatchUpTarget,
TruncateLSN: plan.TruncateLSN,
})
if err != nil {
t.Fatalf("catch-up with truncation: %v", err)
}
driver.ReleasePlan(plan)
}
// --- Cross-layer contract: storage proves recoverability ---
func TestDriver_CrossLayer_StorageProvesRecoverability(t *testing.T) {
// The engine asks "is this recoverable?" and the storage adapter
// answers from real state — not from test-reconstructed inputs.
storage := newMockStorage(RetainedHistory{
HeadLSN: 100, TailLSN: 50, CommittedLSN: 100,
CheckpointLSN: 40, CheckpointTrusted: true,
})
driver := NewRecoveryDriver(storage)
driver.Orchestrator.ProcessAssignment(AssignmentIntent{
Replicas: []ReplicaAssignment{
{ReplicaID: "r1", Endpoint: Endpoint{DataAddr: "r1:9333", Version: 1}},
},
Epoch: 1,
RecoveryTargets: map[string]SessionKind{"r1": SessionCatchUp},
})
// Engine asks storage for recoverability proof.
history := storage.GetRetainedHistory()
proof := history.ProveRecoverability(60) // gap 60→100
if !proof.Recoverable {
t.Fatalf("storage should prove recoverable: %s", proof.Reason)
}
// Engine asks for rebuild source decision.
source, snapLSN := history.RebuildSourceDecision()
// Checkpoint at 40, tail at 50 → checkpoint < tail → unreplayable.
if source != RebuildFullBase {
t.Fatalf("source=%s snap=%d (checkpoint 40 < tail 50)", source, snapLSN)
}
// Failure is observable: log from PlanRecovery.
plan, _ := driver.PlanRecovery("r1", 60)
if plan.Proof == nil || !plan.Proof.Recoverable {
t.Fatal("plan should carry proof from storage")
}
driver.ReleasePlan(plan)
}
Loading…
Cancel
Save