diff --git a/sw-block/engine/replication/history.go b/sw-block/engine/replication/history.go new file mode 100644 index 000000000..02225e16a --- /dev/null +++ b/sw-block/engine/replication/history.go @@ -0,0 +1,112 @@ +package replication + +import "fmt" + +// RetainedHistory represents the primary's WAL retention state as seen +// by the recovery decision path. It answers "why is recovery allowed?" +// with executable proof, not just policy assertions. +// +// This is the engine-level equivalent of the prototype's WALHistory — +// it provides the recoverability inputs that ClassifyRecoveryOutcome +// and rebuild-source selection consume. +type RetainedHistory struct { + // HeadLSN is the highest LSN written to the primary WAL. + HeadLSN uint64 + + // TailLSN is the oldest retained LSN boundary (exclusive). + // Entries with LSN > TailLSN are available for catch-up. + // Entries with LSN <= TailLSN have been recycled. + TailLSN uint64 + + // CommittedLSN is the lineage-safe boundary — the highest LSN + // acknowledged as durable by the commit protocol. + CommittedLSN uint64 + + // CheckpointLSN is the highest LSN with a durable base image. + // Used for rebuild-source decision: if CheckpointLSN > 0 and + // the checkpoint is trusted, snapshot+tail rebuild is possible. + CheckpointLSN uint64 + + // CheckpointTrusted indicates whether the checkpoint base image + // is known to be consistent and usable for rebuild. + CheckpointTrusted bool +} + +// MakeHandshakeResult generates a HandshakeResult from the primary's +// retained history and a replica's reported flushed LSN. +func (rh *RetainedHistory) MakeHandshakeResult(replicaFlushedLSN uint64) HandshakeResult { + retentionStart := rh.TailLSN + 1 + if rh.TailLSN == 0 { + retentionStart = 0 + } + return HandshakeResult{ + ReplicaFlushedLSN: replicaFlushedLSN, + CommittedLSN: rh.CommittedLSN, + RetentionStartLSN: retentionStart, + } +} + +// IsRecoverable checks whether all entries from startExclusive+1 to +// endInclusive are available in the retained WAL. +func (rh *RetainedHistory) IsRecoverable(startExclusive, endInclusive uint64) bool { + if startExclusive < rh.TailLSN { + return false + } + if endInclusive > rh.HeadLSN { + return false + } + return true +} + +// RebuildSourceDecision determines the optimal rebuild source from +// the current retained history state. +func (rh *RetainedHistory) RebuildSourceDecision() (source RebuildSource, snapshotLSN uint64) { + if rh.CheckpointTrusted && rh.CheckpointLSN > 0 { + return RebuildSnapshotTail, rh.CheckpointLSN + } + return RebuildFullBase, 0 +} + +// RecoverabilityProof explains why a gap is or is not recoverable. +type RecoverabilityProof struct { + ReplicaFlushedLSN uint64 + CommittedLSN uint64 + TailLSN uint64 + HeadLSN uint64 + Recoverable bool + Reason string +} + +// ProveRecoverability generates an explicit proof for a recovery decision. +func (rh *RetainedHistory) ProveRecoverability(replicaFlushedLSN uint64) RecoverabilityProof { + proof := RecoverabilityProof{ + ReplicaFlushedLSN: replicaFlushedLSN, + CommittedLSN: rh.CommittedLSN, + TailLSN: rh.TailLSN, + HeadLSN: rh.HeadLSN, + } + + if replicaFlushedLSN == rh.CommittedLSN { + proof.Recoverable = true + proof.Reason = "zero_gap" + return proof + } + + if replicaFlushedLSN > rh.CommittedLSN { + proof.Recoverable = true + proof.Reason = "replica_ahead_needs_truncation" + return proof + } + + if rh.IsRecoverable(replicaFlushedLSN, rh.CommittedLSN) { + proof.Recoverable = true + proof.Reason = fmt.Sprintf("gap_within_retention: need LSN %d-%d, tail=%d head=%d", + replicaFlushedLSN+1, rh.CommittedLSN, rh.TailLSN, rh.HeadLSN) + return proof + } + + proof.Recoverable = false + proof.Reason = fmt.Sprintf("gap_beyond_retention: need LSN %d but tail=%d", + replicaFlushedLSN+1, rh.TailLSN) + return proof +} diff --git a/sw-block/engine/replication/recoverability_test.go b/sw-block/engine/replication/recoverability_test.go new file mode 100644 index 000000000..6f43bfe01 --- /dev/null +++ b/sw-block/engine/replication/recoverability_test.go @@ -0,0 +1,313 @@ +package replication + +import "testing" + +// ============================================================ +// Phase 05 Slice 3: Engine Data / Recoverability Core +// +// Tests validate that recovery decisions are backed by actual +// retained-history state, not just policy assertions. +// ============================================================ + +// --- Recoverable vs unrecoverable gap --- + +func TestHistory_Recoverable_GapWithinRetention(t *testing.T) { + rh := RetainedHistory{ + HeadLSN: 100, + TailLSN: 30, + CommittedLSN: 100, + } + + if !rh.IsRecoverable(50, 100) { + t.Fatal("gap 50→100 should be recoverable (tail=30)") + } + + proof := rh.ProveRecoverability(50) + if !proof.Recoverable { + t.Fatalf("proof: %s", proof.Reason) + } +} + +func TestHistory_Unrecoverable_GapBeyondRetention(t *testing.T) { + rh := RetainedHistory{ + HeadLSN: 100, + TailLSN: 60, + CommittedLSN: 100, + } + + if rh.IsRecoverable(50, 100) { + t.Fatal("gap 50→100 should NOT be recoverable (tail=60)") + } + + proof := rh.ProveRecoverability(50) + if proof.Recoverable { + t.Fatal("proof should show unrecoverable") + } +} + +func TestHistory_ExactBoundary(t *testing.T) { + rh := RetainedHistory{ + HeadLSN: 100, + TailLSN: 50, + CommittedLSN: 100, + } + + // AT boundary: recoverable. + if !rh.IsRecoverable(50, 100) { + t.Fatal("exact boundary should be recoverable") + } + // ONE BELOW: unrecoverable. + if rh.IsRecoverable(49, 100) { + t.Fatal("below boundary should NOT be recoverable") + } +} + +func TestHistory_BeyondHead_Unrecoverable(t *testing.T) { + rh := RetainedHistory{ + HeadLSN: 80, + TailLSN: 0, + CommittedLSN: 100, + } + + if rh.IsRecoverable(0, 100) { + t.Fatal("gap beyond head should NOT be recoverable") + } +} + +// --- Trusted base vs no trusted base --- + +func TestHistory_RebuildSource_TrustedCheckpoint(t *testing.T) { + rh := RetainedHistory{ + CheckpointLSN: 50, + CheckpointTrusted: true, + CommittedLSN: 100, + } + + source, snapLSN := rh.RebuildSourceDecision() + if source != RebuildSnapshotTail { + t.Fatalf("source=%s, want snapshot_tail", source) + } + if snapLSN != 50 { + t.Fatalf("snapshot LSN=%d, want 50", snapLSN) + } +} + +func TestHistory_RebuildSource_NoCheckpoint(t *testing.T) { + rh := RetainedHistory{ + CommittedLSN: 100, + } + + source, snapLSN := rh.RebuildSourceDecision() + if source != RebuildFullBase { + t.Fatalf("source=%s, want full_base", source) + } + if snapLSN != 0 { + t.Fatalf("snapshot LSN=%d, want 0", snapLSN) + } +} + +func TestHistory_RebuildSource_UntrustedCheckpoint(t *testing.T) { + rh := RetainedHistory{ + CheckpointLSN: 50, + CheckpointTrusted: false, + CommittedLSN: 100, + } + + source, _ := rh.RebuildSourceDecision() + if source != RebuildFullBase { + t.Fatalf("untrusted checkpoint: source=%s, want full_base", source) + } +} + +// --- Handshake result from retained history --- + +func TestHistory_MakeHandshakeResult(t *testing.T) { + rh := RetainedHistory{ + HeadLSN: 100, + TailLSN: 30, + CommittedLSN: 90, + } + + hr := rh.MakeHandshakeResult(70) + if hr.ReplicaFlushedLSN != 70 { + t.Fatalf("replica=%d", hr.ReplicaFlushedLSN) + } + if hr.CommittedLSN != 90 { + t.Fatalf("committed=%d", hr.CommittedLSN) + } + if hr.RetentionStartLSN != 31 { + t.Fatalf("retention=%d, want 31", hr.RetentionStartLSN) + } + + outcome := ClassifyRecoveryOutcome(hr) + if outcome != OutcomeCatchUp { + t.Fatalf("outcome=%s", outcome) + } +} + +// --- Recoverability proof --- + +func TestHistory_Proof_ZeroGap(t *testing.T) { + rh := RetainedHistory{CommittedLSN: 100} + proof := rh.ProveRecoverability(100) + if !proof.Recoverable || proof.Reason != "zero_gap" { + t.Fatalf("proof: recoverable=%v reason=%s", proof.Recoverable, proof.Reason) + } +} + +func TestHistory_Proof_ReplicaAhead(t *testing.T) { + rh := RetainedHistory{CommittedLSN: 100} + proof := rh.ProveRecoverability(105) + if !proof.Recoverable || proof.Reason != "replica_ahead_needs_truncation" { + t.Fatalf("proof: recoverable=%v reason=%s", proof.Recoverable, proof.Reason) + } +} + +func TestHistory_Proof_GapWithinRetention(t *testing.T) { + rh := RetainedHistory{HeadLSN: 100, TailLSN: 30, CommittedLSN: 100} + proof := rh.ProveRecoverability(50) + if !proof.Recoverable { + t.Fatalf("proof: %s", proof.Reason) + } +} + +func TestHistory_Proof_GapBeyondRetention(t *testing.T) { + rh := RetainedHistory{HeadLSN: 100, TailLSN: 60, CommittedLSN: 100} + proof := rh.ProveRecoverability(50) + if proof.Recoverable { + t.Fatal("should not be recoverable") + } +} + +// --- End-to-end: retained-history-driven recovery flow --- + +func TestHistory_E2E_RecoveryDrivenByRetainedHistory(t *testing.T) { + // Primary's retained history. + primary := RetainedHistory{ + HeadLSN: 100, + TailLSN: 30, + CommittedLSN: 100, + CheckpointLSN: 50, + CheckpointTrusted: true, + } + + r := NewRegistry() + r.ApplyAssignment(AssignmentIntent{ + Replicas: []ReplicaAssignment{ + {ReplicaID: "r1", Endpoint: Endpoint{DataAddr: "10.0.0.1:9333", Version: 1}}, + {ReplicaID: "r2", Endpoint: Endpoint{DataAddr: "10.0.0.2:9333", Version: 1}}, + }, + Epoch: 1, + RecoveryTargets: map[string]SessionKind{ + "r1": SessionCatchUp, + "r2": SessionCatchUp, + }, + }) + + // r1: replica at LSN 70 — catch-up (within retention). + r1 := r.Sender("r1") + id1 := r1.SessionID() + r1.BeginConnect(id1) + + proof1 := primary.ProveRecoverability(70) + if !proof1.Recoverable { + t.Fatalf("r1: %s", proof1.Reason) + } + + hr1 := primary.MakeHandshakeResult(70) + o1, _ := r1.RecordHandshakeWithOutcome(id1, hr1) + if o1 != OutcomeCatchUp { + t.Fatalf("r1: outcome=%s", o1) + } + r1.BeginCatchUp(id1) + r1.RecordCatchUpProgress(id1, 100) + r1.CompleteSessionByID(id1) + + // r2: replica at LSN 20 — needs rebuild (beyond retention). + r2 := r.Sender("r2") + id2 := r2.SessionID() + r2.BeginConnect(id2) + + proof2 := primary.ProveRecoverability(20) + if proof2.Recoverable { + t.Fatal("r2: should not be recoverable") + } + + hr2 := primary.MakeHandshakeResult(20) + o2, _ := r2.RecordHandshakeWithOutcome(id2, hr2) + if o2 != OutcomeNeedsRebuild { + t.Fatalf("r2: outcome=%s", o2) + } + + // r2 needs rebuild — use history to choose source. + source, snapLSN := primary.RebuildSourceDecision() + if source != RebuildSnapshotTail || snapLSN != 50 { + t.Fatalf("rebuild source=%s snap=%d", source, snapLSN) + } + + // New rebuild session for r2. + r.ApplyAssignment(AssignmentIntent{ + Replicas: []ReplicaAssignment{ + {ReplicaID: "r2", Endpoint: Endpoint{DataAddr: "10.0.0.2:9333", Version: 1}}, + }, + Epoch: 1, + RecoveryTargets: map[string]SessionKind{"r2": SessionRebuild}, + }) + + id2b := r2.SessionID() + r2.BeginConnect(id2b) + r2.RecordHandshake(id2b, 0, 100) + r2.SelectRebuildSource(id2b, snapLSN, true, primary.CommittedLSN) + r2.BeginRebuildTransfer(id2b) + r2.RecordRebuildTransferProgress(id2b, snapLSN) + r2.BeginRebuildTailReplay(id2b) + r2.RecordRebuildTailProgress(id2b, 100) + r2.CompleteRebuild(id2b) + + if r1.State() != StateInSync || r2.State() != StateInSync { + t.Fatalf("r1=%s r2=%s", r1.State(), r2.State()) + } + t.Log("e2e: r1 caught up (proof: gap within retention), r2 rebuilt (proof: gap beyond retention, snapshot+tail)") +} + +// --- Truncation / safe-boundary handling --- + +func TestHistory_Proof_TruncationRequired(t *testing.T) { + rh := RetainedHistory{CommittedLSN: 100} + + // Replica ahead → truncation required. + proof := rh.ProveRecoverability(105) + if proof.Reason != "replica_ahead_needs_truncation" { + t.Fatalf("reason=%s", proof.Reason) + } + + // Sender execution: handshake sets truncation requirement. + s := NewSender("r1", Endpoint{DataAddr: "r1:9333", Version: 1}, 1) + id, _ := s.AttachSession(1, SessionCatchUp) + s.BeginConnect(id) + + hr := rh.MakeHandshakeResult(105) + outcome, _ := s.RecordHandshakeWithOutcome(id, hr) + if outcome != OutcomeCatchUp { + t.Fatalf("outcome=%s", outcome) + } + + // Session should require truncation. + snap := s.SessionSnapshot() + if snap == nil { + t.Fatal("session should exist") + } + + // Completion without truncation rejected. + if s.CompleteSessionByID(id) { + t.Fatal("should reject completion without truncation") + } + + // Record truncation. + s.RecordTruncation(id, 100) + + // Now completion works (zero-gap after truncation). + if !s.CompleteSessionByID(id) { + t.Fatal("completion after truncation should succeed") + } +}