From c89709e47ee0b9b1daaae642aba7fd1634bddfd8 Mon Sep 17 00:00:00 2001 From: pingqiu Date: Sat, 28 Mar 2026 11:29:27 -0700 Subject: [PATCH] feat: add WAL history model and recoverability proof (Phase 04 P3) Adds minimal historical-data prototype to enginev2: - WALHistory: retained-prefix model with Append, Commit, AdvanceTail, Truncate, EntriesInRange, IsRecoverable, StateAt - MakeHandshakeResult connects WAL state to outcome classification - RecordTruncation execution API for divergent tail cleanup - CompleteSessionByID gates on truncation when required - Zero-gap requires exact equality (FlushedLSN == CommittedLSN) - Replica-ahead classified as CatchUp with mandatory truncation 15 new tests: WAL basics, provable recoverability, unprovable gap, exact boundary, truncation enforcement, WAL-backed end-to-end recovery with data verification. Co-Authored-By: Claude Opus 4.6 (1M context) --- sw-block/prototype/enginev2/outcome.go | 17 +- sw-block/prototype/enginev2/p2_test.go | 18 +- sw-block/prototype/enginev2/p3_test.go | 419 ++++++++++++++++++++++ sw-block/prototype/enginev2/sender.go | 34 +- sw-block/prototype/enginev2/session.go | 5 + sw-block/prototype/enginev2/walhistory.go | 146 ++++++++ 6 files changed, 625 insertions(+), 14 deletions(-) create mode 100644 sw-block/prototype/enginev2/p3_test.go create mode 100644 sw-block/prototype/enginev2/walhistory.go diff --git a/sw-block/prototype/enginev2/outcome.go b/sw-block/prototype/enginev2/outcome.go index 0f0573761..032046227 100644 --- a/sw-block/prototype/enginev2/outcome.go +++ b/sw-block/prototype/enginev2/outcome.go @@ -20,16 +20,21 @@ const ( // ClassifyRecoveryOutcome determines the recovery path from handshake data. // // Uses CommittedLSN (not WAL head) as the target boundary. This is the -// lineage-safe recovery point — only acknowledged data counts. A replica -// with FlushedLSN > CommittedLSN has divergent/uncommitted tail that must -// NOT be treated as "already in sync." +// lineage-safe recovery point — only acknowledged data counts. // -// Decision matrix (matches CP13-5 gap analysis): -// - ReplicaFlushedLSN >= CommittedLSN → zero gap, has full committed prefix +// Zero-gap requires exact equality (ReplicaFlushedLSN == CommittedLSN). +// A replica with FlushedLSN > CommittedLSN has divergent/uncommitted tail +// that requires truncation before InSync — this prototype does not model +// truncation, so that case is classified as CatchUp (the catch-up path +// will set the correct range and the completion check ensures convergence +// to CommittedLSN exactly). +// +// Decision matrix: +// - ReplicaFlushedLSN == CommittedLSN → zero gap, exact match // - ReplicaFlushedLSN+1 >= RetentionStartLSN → recoverable via WAL catch-up // - otherwise → gap too large, needs rebuild func ClassifyRecoveryOutcome(result HandshakeResult) RecoveryOutcome { - if result.ReplicaFlushedLSN >= result.CommittedLSN { + if result.ReplicaFlushedLSN == result.CommittedLSN { return OutcomeZeroGap } if result.RetentionStartLSN == 0 || result.ReplicaFlushedLSN+1 >= result.RetentionStartLSN { diff --git a/sw-block/prototype/enginev2/p2_test.go b/sw-block/prototype/enginev2/p2_test.go index c1904a08d..fc7e81cec 100644 --- a/sw-block/prototype/enginev2/p2_test.go +++ b/sw-block/prototype/enginev2/p2_test.go @@ -19,17 +19,21 @@ func TestOutcome_ZeroGap(t *testing.T) { } } -func TestOutcome_ZeroGap_ReplicaAtCommitted(t *testing.T) { - // Replica has exactly the committed prefix — zero gap. - // Note: replica may have uncommitted tail beyond CommittedLSN; - // that is handled by truncation, not by recovery classification. +func TestOutcome_ReplicaAhead_NotZeroGap(t *testing.T) { + // Replica FlushedLSN > CommittedLSN — has divergent/uncommitted tail. + // Must NOT be classified as zero-gap (would skip truncation). + // Classified as CatchUp because the committed prefix is present and + // the "ahead" tail needs explicit handling before InSync. o := ClassifyRecoveryOutcome(HandshakeResult{ - ReplicaFlushedLSN: 100, + ReplicaFlushedLSN: 105, CommittedLSN: 100, RetentionStartLSN: 50, }) - if o != OutcomeZeroGap { - t.Fatalf("got %s, want zero_gap", o) + if o == OutcomeZeroGap { + t.Fatal("replica ahead of committed must NOT be zero_gap") + } + if o != OutcomeCatchUp { + t.Fatalf("got %s, want catchup", o) } } diff --git a/sw-block/prototype/enginev2/p3_test.go b/sw-block/prototype/enginev2/p3_test.go new file mode 100644 index 000000000..a5910d3f8 --- /dev/null +++ b/sw-block/prototype/enginev2/p3_test.go @@ -0,0 +1,419 @@ +package enginev2 + +import "testing" + +// ============================================================ +// Phase 04 P3: Historical data model and recoverability proof +// ============================================================ + +// --- WALHistory basics --- + +func TestWAL_AppendAndCommit(t *testing.T) { + w := NewWALHistory() + w.Append(WALEntry{LSN: 1, Epoch: 1, Block: 1, Value: 10}) + w.Append(WALEntry{LSN: 2, Epoch: 1, Block: 2, Value: 20}) + w.Commit(2) + + if w.HeadLSN() != 2 { + t.Fatalf("head=%d, want 2", w.HeadLSN()) + } + if w.CommittedLSN() != 2 { + t.Fatalf("committed=%d, want 2", w.CommittedLSN()) + } +} + +func TestWAL_AppendRejectsRegression(t *testing.T) { + w := NewWALHistory() + w.Append(WALEntry{LSN: 5}) + if err := w.Append(WALEntry{LSN: 3}); err == nil { + t.Fatal("should reject LSN regression") + } +} + +func TestWAL_CommitRejectsAheadOfHead(t *testing.T) { + w := NewWALHistory() + w.Append(WALEntry{LSN: 5}) + if err := w.Commit(10); err == nil { + t.Fatal("should reject commit ahead of head") + } +} + +func TestWAL_AdvanceTail_RecyclesEntries(t *testing.T) { + w := NewWALHistory() + for i := uint64(1); i <= 10; i++ { + w.Append(WALEntry{LSN: i, Block: i, Value: i * 10}) + } + w.Commit(10) + + w.AdvanceTail(5) // recycle LSNs 1-5 + + if w.TailLSN() != 5 { + t.Fatalf("tail=%d, want 5", w.TailLSN()) + } + if w.Len() != 5 { + t.Fatalf("retained=%d, want 5", w.Len()) + } + + // Entries 1-5 gone. + _, err := w.EntriesInRange(0, 5) + if err == nil { + t.Fatal("recycled range should return error") + } + + // Entries 6-10 available. + entries, err := w.EntriesInRange(5, 10) + if err != nil { + t.Fatalf("retained range should work: %v", err) + } + if len(entries) != 5 { + t.Fatalf("entries=%d, want 5", len(entries)) + } +} + +func TestWAL_Truncate_RemovesDivergentTail(t *testing.T) { + w := NewWALHistory() + for i := uint64(1); i <= 10; i++ { + w.Append(WALEntry{LSN: i, Block: i, Value: i * 10}) + } + w.Commit(7) // committed up to 7, entries 8-10 are uncommitted + + w.Truncate(7) // remove divergent tail + + if w.HeadLSN() != 7 { + t.Fatalf("head=%d after truncate, want 7", w.HeadLSN()) + } + if w.Len() != 7 { + t.Fatalf("retained=%d after truncate, want 7", w.Len()) + } +} + +func TestWAL_StateAt_HistoricalCorrectness(t *testing.T) { + w := NewWALHistory() + w.Append(WALEntry{LSN: 1, Block: 1, Value: 100}) + w.Append(WALEntry{LSN: 2, Block: 1, Value: 200}) // overwrites block 1 + w.Append(WALEntry{LSN: 3, Block: 2, Value: 300}) + + // State at LSN 1: block 1 = 100. + s1 := w.StateAt(1) + if s1[1] != 100 { + t.Fatalf("state@1: block 1 = %d, want 100", s1[1]) + } + + // State at LSN 2: block 1 = 200 (overwritten). + s2 := w.StateAt(2) + if s2[1] != 200 { + t.Fatalf("state@2: block 1 = %d, want 200", s2[1]) + } + + // State at LSN 3: block 1 = 200, block 2 = 300. + s3 := w.StateAt(3) + if s3[1] != 200 || s3[2] != 300 { + t.Fatalf("state@3: %v", s3) + } +} + +// --- Recoverability proof: "why is catch-up allowed?" --- + +func TestRecoverability_Provable_GapWithinRetention(t *testing.T) { + primary := NewWALHistory() + for i := uint64(1); i <= 100; i++ { + primary.Append(WALEntry{LSN: i, Epoch: 1, Block: i % 10, Value: i}) + } + primary.Commit(100) + primary.AdvanceTail(30) // retain LSNs 31-100 + + // Replica at LSN 50 — gap is 51-100, within retention. + if !primary.IsRecoverable(50, 100) { + t.Fatal("gap 50→100 should be recoverable (tail=30)") + } + + entries, err := primary.EntriesInRange(50, 100) + if err != nil { + t.Fatalf("entries should be available: %v", err) + } + if len(entries) != 50 { + t.Fatalf("entries=%d, want 50", len(entries)) + } + + // Handshake result proves recoverability. + hr := primary.MakeHandshakeResult(50) + outcome := ClassifyRecoveryOutcome(hr) + if outcome != OutcomeCatchUp { + t.Fatalf("outcome=%s, want catchup", outcome) + } +} + +func TestRecoverability_Unprovable_GapBeyondRetention(t *testing.T) { + primary := NewWALHistory() + for i := uint64(1); i <= 100; i++ { + primary.Append(WALEntry{LSN: i, Epoch: 1, Block: i % 10, Value: i}) + } + primary.Commit(100) + primary.AdvanceTail(60) // retain only LSNs 61-100 + + // Replica at LSN 50 — gap is 51-100, but 51-60 are recycled. + if primary.IsRecoverable(50, 100) { + t.Fatal("gap 50→100 should NOT be recoverable (tail=60)") + } + + _, err := primary.EntriesInRange(50, 100) + if err == nil { + t.Fatal("recycled entries should return error") + } + + hr := primary.MakeHandshakeResult(50) + outcome := ClassifyRecoveryOutcome(hr) + if outcome != OutcomeNeedsRebuild { + t.Fatalf("outcome=%s, want needs_rebuild", outcome) + } +} + +func TestRecoverability_ExactBoundary(t *testing.T) { + primary := NewWALHistory() + for i := uint64(1); i <= 100; i++ { + primary.Append(WALEntry{LSN: i, Epoch: 1}) + } + primary.Commit(100) + primary.AdvanceTail(50) // retain 51-100 + + // Replica at LSN 50 — gap starts at exactly tailLSN. Recoverable. + if !primary.IsRecoverable(50, 100) { + t.Fatal("exact boundary should be recoverable") + } + + // Replica at LSN 49 — gap starts before tailLSN. NOT recoverable. + if primary.IsRecoverable(49, 100) { + t.Fatal("one-below boundary should NOT be recoverable") + } +} + +// --- Truncation: divergent tail handling --- + +func TestTruncation_ReplicaAhead_RequiresTruncateBeforeInSync(t *testing.T) { + s := NewSender("r1:9333", Endpoint{DataAddr: "r1:9333", Version: 1}, 1) + sess, _ := s.AttachSession(1, SessionCatchUp) + s.BeginConnect(sess.ID) + + // Replica is ahead of committed (divergent uncommitted tail). + outcome, err := s.RecordHandshakeWithOutcome(sess.ID, HandshakeResult{ + ReplicaFlushedLSN: 105, + CommittedLSN: 100, + RetentionStartLSN: 50, + }) + if err != nil { + t.Fatal(err) + } + if outcome != OutcomeCatchUp { + t.Fatalf("replica ahead should be catchup (needs truncation), got %s", outcome) + } + + // Session requires truncation. + if !sess.TruncateRequired { + t.Fatal("session should require truncation") + } + if sess.TruncateToLSN != 100 { + t.Fatalf("truncate to=%d, want 100", sess.TruncateToLSN) + } + + // Completion WITHOUT truncation recorded — rejected. + if s.CompleteSessionByID(sess.ID) { + t.Fatal("completion without truncation should be rejected") + } + + // Record truncation. + if err := s.RecordTruncation(sess.ID, 100); err != nil { + t.Fatalf("truncation record: %v", err) + } + + // Now completion succeeds (zero-gap after truncation). + if !s.CompleteSessionByID(sess.ID) { + t.Fatal("completion after truncation should succeed") + } + if s.State != StateInSync { + t.Fatalf("state=%s, want in_sync", s.State) + } +} + +func TestTruncation_WrongLSN_Rejected(t *testing.T) { + s := NewSender("r1:9333", Endpoint{DataAddr: "r1:9333", Version: 1}, 1) + sess, _ := s.AttachSession(1, SessionCatchUp) + s.BeginConnect(sess.ID) + + s.RecordHandshakeWithOutcome(sess.ID, HandshakeResult{ + ReplicaFlushedLSN: 105, + CommittedLSN: 100, + RetentionStartLSN: 50, + }) + + // Wrong truncation LSN. + if err := s.RecordTruncation(sess.ID, 99); err == nil { + t.Fatal("wrong truncation LSN should be rejected") + } +} + +func TestTruncation_NotRequired_Rejected(t *testing.T) { + s := NewSender("r1:9333", Endpoint{DataAddr: "r1:9333", Version: 1}, 1) + sess, _ := s.AttachSession(1, SessionCatchUp) + s.BeginConnect(sess.ID) + + // Normal gap (replica behind, not ahead). + s.RecordHandshakeWithOutcome(sess.ID, HandshakeResult{ + ReplicaFlushedLSN: 80, + CommittedLSN: 100, + RetentionStartLSN: 50, + }) + + // Truncation not required. + if err := s.RecordTruncation(sess.ID, 100); err == nil { + t.Fatal("truncation on non-truncate session should be rejected") + } +} + +// --- End-to-end: WAL-backed recovery with data verification --- + +func TestE2E_WALBacked_CatchUpRecovery(t *testing.T) { + // Primary WAL with history. + primary := NewWALHistory() + for i := uint64(1); i <= 50; i++ { + primary.Append(WALEntry{LSN: i, Epoch: 1, Block: i % 8, Value: i}) + } + primary.Commit(50) + primary.AdvanceTail(10) // retain 11-50 + + // Replica WAL (simulates a replica that fell behind at LSN 30). + replica := NewWALHistory() + for i := uint64(1); i <= 30; i++ { + replica.Append(WALEntry{LSN: i, Epoch: 1, Block: i % 8, Value: i}) + } + + // Sender + session. + s := NewSender("r1:9333", Endpoint{DataAddr: "r1:9333", Version: 1}, 1) + sess, _ := s.AttachSession(1, SessionCatchUp) + s.BeginConnect(sess.ID) + + // Handshake with WAL-backed result. + hr := primary.MakeHandshakeResult(30) + outcome, _ := s.RecordHandshakeWithOutcome(sess.ID, hr) + if outcome != OutcomeCatchUp { + t.Fatalf("outcome=%s", outcome) + } + + // Verify the gap is provably recoverable. + if !primary.IsRecoverable(30, 50) { + t.Fatal("gap should be provably recoverable from WAL") + } + + // Get catch-up entries. + entries, err := primary.EntriesInRange(30, 50) + if err != nil { + t.Fatalf("catch-up entries: %v", err) + } + + // Apply to replica. + s.BeginCatchUp(sess.ID) + for _, e := range entries { + replica.Append(e) + s.RecordCatchUpProgress(sess.ID, e.LSN) + } + + // Complete. + if !s.CompleteSessionByID(sess.ID) { + t.Fatal("completion should succeed") + } + + // Verify historical data correctness: replica state at committed LSN + // matches primary state at committed LSN. + primaryState := primary.StateAt(50) + replicaState := replica.StateAt(50) + for block, pVal := range primaryState { + if rVal := replicaState[block]; rVal != pVal { + t.Fatalf("block %d: primary=%d replica=%d", block, pVal, rVal) + } + } + t.Logf("WAL-backed recovery: gap 30→50 proven recoverable, data verified") +} + +func TestE2E_WALBacked_RebuildFallback(t *testing.T) { + primary := NewWALHistory() + for i := uint64(1); i <= 100; i++ { + primary.Append(WALEntry{LSN: i, Epoch: 1, Block: i % 8, Value: i}) + } + primary.Commit(100) + primary.AdvanceTail(60) // only retain 61-100 + + // Sender + session. + s := NewSender("r1:9333", Endpoint{DataAddr: "r1:9333", Version: 1}, 1) + sess, _ := s.AttachSession(1, SessionCatchUp) + s.BeginConnect(sess.ID) + + // Handshake: replica at LSN 30, but retention starts at 61. Unrecoverable. + hr := primary.MakeHandshakeResult(30) + outcome, _ := s.RecordHandshakeWithOutcome(sess.ID, hr) + if outcome != OutcomeNeedsRebuild { + t.Fatalf("outcome=%s, want needs_rebuild", outcome) + } + + // WAL proves WHY rebuild is needed. + if primary.IsRecoverable(30, 100) { + t.Fatal("gap should NOT be recoverable") + } + + // Sender state. + if s.State != StateNeedsRebuild { + t.Fatalf("state=%s", s.State) + } + t.Logf("WAL-backed rebuild: gap 30→100 proven unrecoverable (tail=60)") +} + +func TestE2E_WALBacked_TruncateThenInSync(t *testing.T) { + // Replica is ahead of committed — has divergent tail. + primary := NewWALHistory() + for i := uint64(1); i <= 100; i++ { + primary.Append(WALEntry{LSN: i, Epoch: 1, Block: i % 8, Value: i}) + } + primary.Commit(100) + + replica := NewWALHistory() + for i := uint64(1); i <= 105; i++ { + replica.Append(WALEntry{LSN: i, Epoch: 1, Block: i % 8, Value: i}) + } + // Entries 101-105 are divergent (uncommitted on primary). + + // Sender. + s := NewSender("r1:9333", Endpoint{DataAddr: "r1:9333", Version: 1}, 1) + sess, _ := s.AttachSession(1, SessionCatchUp) + s.BeginConnect(sess.ID) + + hr := primary.MakeHandshakeResult(105) + outcome, _ := s.RecordHandshakeWithOutcome(sess.ID, hr) + if outcome != OutcomeCatchUp { + t.Fatalf("outcome=%s", outcome) + } + if !sess.TruncateRequired { + t.Fatal("truncation should be required") + } + + // Truncate replica's divergent tail. + replica.Truncate(100) + if replica.HeadLSN() != 100 { + t.Fatalf("replica head after truncate=%d, want 100", replica.HeadLSN()) + } + + // Record truncation in session. + s.RecordTruncation(sess.ID, 100) + + // Complete (zero-gap after truncation). + if !s.CompleteSessionByID(sess.ID) { + t.Fatal("completion after truncation should succeed") + } + + // Verify data matches at committed boundary. + primaryState := primary.StateAt(100) + replicaState := replica.StateAt(100) + for block, pVal := range primaryState { + if rVal := replicaState[block]; rVal != pVal { + t.Fatalf("block %d: primary=%d replica=%d", block, pVal, rVal) + } + } + t.Logf("truncate-then-InSync: divergent tail removed, data verified at committed=100") +} diff --git a/sw-block/prototype/enginev2/sender.go b/sw-block/prototype/enginev2/sender.go index d9020222d..b82941308 100644 --- a/sw-block/prototype/enginev2/sender.go +++ b/sw-block/prototype/enginev2/sender.go @@ -158,6 +158,10 @@ func (s *Sender) CompleteSessionByID(sessionID uint64) bool { return false } sess := s.session + // Truncation gate: if truncation was required, it must be recorded. + if sess.TruncateRequired && !sess.TruncateRecorded { + return false + } switch sess.Phase { case PhaseCatchUp: if !sess.Converged() { @@ -255,11 +259,39 @@ func (s *Sender) RecordHandshakeWithOutcome(sessionID uint64, result HandshakeRe case OutcomeZeroGap: s.session.SetRange(result.ReplicaFlushedLSN, result.ReplicaFlushedLSN) case OutcomeCatchUp: - s.session.SetRange(result.ReplicaFlushedLSN, result.CommittedLSN) + if result.ReplicaFlushedLSN > result.CommittedLSN { + // Replica ahead of committed — divergent tail needs truncation. + s.session.TruncateRequired = true + s.session.TruncateToLSN = result.CommittedLSN + // Catch-up range is zero after truncation (already has committed prefix). + s.session.SetRange(result.CommittedLSN, result.CommittedLSN) + } else { + s.session.SetRange(result.ReplicaFlushedLSN, result.CommittedLSN) + } } return outcome, nil } +// RecordTruncation confirms that the replica's divergent tail has been truncated. +// Must be called before completion when session.TruncateRequired is true. +// Rejects: wrong sessionID, truncation not required, already recorded. +func (s *Sender) RecordTruncation(sessionID uint64, truncatedToLSN uint64) error { + s.mu.Lock() + defer s.mu.Unlock() + if err := s.checkSessionAuthority(sessionID); err != nil { + return err + } + if !s.session.TruncateRequired { + return fmt.Errorf("truncation not required for this session") + } + if truncatedToLSN != s.session.TruncateToLSN { + return fmt.Errorf("truncation LSN mismatch: expected %d, got %d", + s.session.TruncateToLSN, truncatedToLSN) + } + s.session.TruncateRecorded = true + return nil +} + // BeginCatchUp transitions the session from handshake to catch-up phase. // Mutates: session.Phase → PhaseCatchUp. Sender.State → StateCatchingUp. // Rejects: wrong sessionID, wrong phase. diff --git a/sw-block/prototype/enginev2/session.go b/sw-block/prototype/enginev2/session.go index 922138af6..f88418c7d 100644 --- a/sw-block/prototype/enginev2/session.go +++ b/sw-block/prototype/enginev2/session.go @@ -58,6 +58,11 @@ type RecoverySession struct { StartLSN uint64 // gap start (exclusive) TargetLSN uint64 // gap end (inclusive) RecoveredTo uint64 // highest LSN recovered so far + + // Truncation tracking: set when replica has divergent tail beyond committed. + TruncateRequired bool // true if replica FlushedLSN > CommittedLSN at handshake + TruncateToLSN uint64 // truncate entries beyond this LSN + TruncateRecorded bool // true after truncation is confirmed } func newRecoverySession(replicaID string, epoch uint64, kind SessionKind) *RecoverySession { diff --git a/sw-block/prototype/enginev2/walhistory.go b/sw-block/prototype/enginev2/walhistory.go new file mode 100644 index 000000000..b4e771ed8 --- /dev/null +++ b/sw-block/prototype/enginev2/walhistory.go @@ -0,0 +1,146 @@ +package enginev2 + +import "fmt" + +// WALEntry represents a single write in the WAL history. +type WALEntry struct { + LSN uint64 + Epoch uint64 + Block uint64 + Value uint64 +} + +// WALHistory is a minimal retained-prefix model for proving recoverability. +// It tracks which LSN ranges are available for catch-up and which have been +// recycled (requiring rebuild). This is the data model behind +// ClassifyRecoveryOutcome — it makes "why recovery is allowed" executable. +type WALHistory struct { + entries []WALEntry + headLSN uint64 // highest LSN written + tailLSN uint64 // oldest retained LSN (exclusive: entries with LSN > tailLSN are kept) + committedLSN uint64 // lineage-safe boundary +} + +// NewWALHistory creates an empty WAL history. +func NewWALHistory() *WALHistory { + return &WALHistory{} +} + +// Append adds a WAL entry. LSN must be strictly greater than headLSN. +func (w *WALHistory) Append(entry WALEntry) error { + if entry.LSN <= w.headLSN { + return fmt.Errorf("WAL: append LSN %d <= head %d", entry.LSN, w.headLSN) + } + w.entries = append(w.entries, entry) + w.headLSN = entry.LSN + return nil +} + +// Commit advances the committed boundary. Must be <= headLSN. +func (w *WALHistory) Commit(lsn uint64) error { + if lsn > w.headLSN { + return fmt.Errorf("WAL: commit LSN %d > head %d", lsn, w.headLSN) + } + if lsn > w.committedLSN { + w.committedLSN = lsn + } + return nil +} + +// AdvanceTail recycles entries at or below lsn. After this, entries with +// LSN <= lsn are no longer available for catch-up recovery. +func (w *WALHistory) AdvanceTail(lsn uint64) { + if lsn <= w.tailLSN { + return + } + w.tailLSN = lsn + // Remove recycled entries from storage. + kept := w.entries[:0] + for _, e := range w.entries { + if e.LSN > lsn { + kept = append(kept, e) + } + } + w.entries = kept +} + +// Truncate removes entries with LSN > afterLSN. Used to clean divergent +// tail on a replica that is ahead of the committed boundary. +func (w *WALHistory) Truncate(afterLSN uint64) { + kept := w.entries[:0] + for _, e := range w.entries { + if e.LSN <= afterLSN { + kept = append(kept, e) + } + } + w.entries = kept + if afterLSN < w.headLSN { + w.headLSN = afterLSN + } +} + +// EntriesInRange returns entries with startExclusive < LSN <= endInclusive. +// Returns nil if any entry in the range has been recycled. +func (w *WALHistory) EntriesInRange(startExclusive, endInclusive uint64) ([]WALEntry, error) { + if startExclusive < w.tailLSN { + return nil, fmt.Errorf("WAL: range start %d < tail %d (recycled)", startExclusive, w.tailLSN) + } + var result []WALEntry + for _, e := range w.entries { + if e.LSN <= startExclusive { + continue + } + if e.LSN > endInclusive { + break + } + result = append(result, e) + } + return result, nil +} + +// IsRecoverable checks whether all entries from startExclusive+1 to +// endInclusive are retained in the WAL. This is the executable proof +// of "why catch-up is allowed." +func (w *WALHistory) IsRecoverable(startExclusive, endInclusive uint64) bool { + return startExclusive >= w.tailLSN +} + +// MakeHandshakeResult generates a HandshakeResult from the WAL state +// and a replica's reported flushed LSN. This connects the data model +// to the outcome classification. +func (w *WALHistory) MakeHandshakeResult(replicaFlushedLSN uint64) HandshakeResult { + retentionStart := w.tailLSN + 1 // first available LSN + if w.tailLSN == 0 { + retentionStart = 0 // all history retained + } + return HandshakeResult{ + ReplicaFlushedLSN: replicaFlushedLSN, + CommittedLSN: w.committedLSN, + RetentionStartLSN: retentionStart, + } +} + +// HeadLSN returns the highest LSN written. +func (w *WALHistory) HeadLSN() uint64 { return w.headLSN } + +// TailLSN returns the oldest retained LSN boundary. +func (w *WALHistory) TailLSN() uint64 { return w.tailLSN } + +// CommittedLSN returns the lineage-safe committed boundary. +func (w *WALHistory) CommittedLSN() uint64 { return w.committedLSN } + +// Len returns the number of retained entries. +func (w *WALHistory) Len() int { return len(w.entries) } + +// StateAt replays entries up to lsn and returns block→value state. +// Used to verify historical data correctness after recovery. +func (w *WALHistory) StateAt(lsn uint64) map[uint64]uint64 { + state := map[uint64]uint64{} + for _, e := range w.entries { + if e.LSN > lsn { + break + } + state[e.Block] = e.Value + } + return state +}