Browse Source

feat: add WAL history model and recoverability proof (Phase 04 P3)

Adds minimal historical-data prototype to enginev2:

- WALHistory: retained-prefix model with Append, Commit, AdvanceTail,
  Truncate, EntriesInRange, IsRecoverable, StateAt
- MakeHandshakeResult connects WAL state to outcome classification
- RecordTruncation execution API for divergent tail cleanup
- CompleteSessionByID gates on truncation when required
- Zero-gap requires exact equality (FlushedLSN == CommittedLSN)
- Replica-ahead classified as CatchUp with mandatory truncation

15 new tests: WAL basics, provable recoverability, unprovable gap,
exact boundary, truncation enforcement, WAL-backed end-to-end
recovery with data verification.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
feature/sw-block
pingqiu 1 day ago
parent
commit
c89709e47e
  1. 17
      sw-block/prototype/enginev2/outcome.go
  2. 18
      sw-block/prototype/enginev2/p2_test.go
  3. 419
      sw-block/prototype/enginev2/p3_test.go
  4. 34
      sw-block/prototype/enginev2/sender.go
  5. 5
      sw-block/prototype/enginev2/session.go
  6. 146
      sw-block/prototype/enginev2/walhistory.go

17
sw-block/prototype/enginev2/outcome.go

@ -20,16 +20,21 @@ const (
// ClassifyRecoveryOutcome determines the recovery path from handshake data.
//
// Uses CommittedLSN (not WAL head) as the target boundary. This is the
// lineage-safe recovery point — only acknowledged data counts. A replica
// with FlushedLSN > CommittedLSN has divergent/uncommitted tail that must
// NOT be treated as "already in sync."
// lineage-safe recovery point — only acknowledged data counts.
//
// Decision matrix (matches CP13-5 gap analysis):
// - ReplicaFlushedLSN >= CommittedLSN → zero gap, has full committed prefix
// Zero-gap requires exact equality (ReplicaFlushedLSN == CommittedLSN).
// A replica with FlushedLSN > CommittedLSN has divergent/uncommitted tail
// that requires truncation before InSync — this prototype does not model
// truncation, so that case is classified as CatchUp (the catch-up path
// will set the correct range and the completion check ensures convergence
// to CommittedLSN exactly).
//
// Decision matrix:
// - ReplicaFlushedLSN == CommittedLSN → zero gap, exact match
// - ReplicaFlushedLSN+1 >= RetentionStartLSN → recoverable via WAL catch-up
// - otherwise → gap too large, needs rebuild
func ClassifyRecoveryOutcome(result HandshakeResult) RecoveryOutcome {
if result.ReplicaFlushedLSN >= result.CommittedLSN {
if result.ReplicaFlushedLSN == result.CommittedLSN {
return OutcomeZeroGap
}
if result.RetentionStartLSN == 0 || result.ReplicaFlushedLSN+1 >= result.RetentionStartLSN {

18
sw-block/prototype/enginev2/p2_test.go

@ -19,17 +19,21 @@ func TestOutcome_ZeroGap(t *testing.T) {
}
}
func TestOutcome_ZeroGap_ReplicaAtCommitted(t *testing.T) {
// Replica has exactly the committed prefix — zero gap.
// Note: replica may have uncommitted tail beyond CommittedLSN;
// that is handled by truncation, not by recovery classification.
func TestOutcome_ReplicaAhead_NotZeroGap(t *testing.T) {
// Replica FlushedLSN > CommittedLSN — has divergent/uncommitted tail.
// Must NOT be classified as zero-gap (would skip truncation).
// Classified as CatchUp because the committed prefix is present and
// the "ahead" tail needs explicit handling before InSync.
o := ClassifyRecoveryOutcome(HandshakeResult{
ReplicaFlushedLSN: 100,
ReplicaFlushedLSN: 105,
CommittedLSN: 100,
RetentionStartLSN: 50,
})
if o != OutcomeZeroGap {
t.Fatalf("got %s, want zero_gap", o)
if o == OutcomeZeroGap {
t.Fatal("replica ahead of committed must NOT be zero_gap")
}
if o != OutcomeCatchUp {
t.Fatalf("got %s, want catchup", o)
}
}

419
sw-block/prototype/enginev2/p3_test.go

@ -0,0 +1,419 @@
package enginev2
import "testing"
// ============================================================
// Phase 04 P3: Historical data model and recoverability proof
// ============================================================
// --- WALHistory basics ---
func TestWAL_AppendAndCommit(t *testing.T) {
w := NewWALHistory()
w.Append(WALEntry{LSN: 1, Epoch: 1, Block: 1, Value: 10})
w.Append(WALEntry{LSN: 2, Epoch: 1, Block: 2, Value: 20})
w.Commit(2)
if w.HeadLSN() != 2 {
t.Fatalf("head=%d, want 2", w.HeadLSN())
}
if w.CommittedLSN() != 2 {
t.Fatalf("committed=%d, want 2", w.CommittedLSN())
}
}
func TestWAL_AppendRejectsRegression(t *testing.T) {
w := NewWALHistory()
w.Append(WALEntry{LSN: 5})
if err := w.Append(WALEntry{LSN: 3}); err == nil {
t.Fatal("should reject LSN regression")
}
}
func TestWAL_CommitRejectsAheadOfHead(t *testing.T) {
w := NewWALHistory()
w.Append(WALEntry{LSN: 5})
if err := w.Commit(10); err == nil {
t.Fatal("should reject commit ahead of head")
}
}
func TestWAL_AdvanceTail_RecyclesEntries(t *testing.T) {
w := NewWALHistory()
for i := uint64(1); i <= 10; i++ {
w.Append(WALEntry{LSN: i, Block: i, Value: i * 10})
}
w.Commit(10)
w.AdvanceTail(5) // recycle LSNs 1-5
if w.TailLSN() != 5 {
t.Fatalf("tail=%d, want 5", w.TailLSN())
}
if w.Len() != 5 {
t.Fatalf("retained=%d, want 5", w.Len())
}
// Entries 1-5 gone.
_, err := w.EntriesInRange(0, 5)
if err == nil {
t.Fatal("recycled range should return error")
}
// Entries 6-10 available.
entries, err := w.EntriesInRange(5, 10)
if err != nil {
t.Fatalf("retained range should work: %v", err)
}
if len(entries) != 5 {
t.Fatalf("entries=%d, want 5", len(entries))
}
}
func TestWAL_Truncate_RemovesDivergentTail(t *testing.T) {
w := NewWALHistory()
for i := uint64(1); i <= 10; i++ {
w.Append(WALEntry{LSN: i, Block: i, Value: i * 10})
}
w.Commit(7) // committed up to 7, entries 8-10 are uncommitted
w.Truncate(7) // remove divergent tail
if w.HeadLSN() != 7 {
t.Fatalf("head=%d after truncate, want 7", w.HeadLSN())
}
if w.Len() != 7 {
t.Fatalf("retained=%d after truncate, want 7", w.Len())
}
}
func TestWAL_StateAt_HistoricalCorrectness(t *testing.T) {
w := NewWALHistory()
w.Append(WALEntry{LSN: 1, Block: 1, Value: 100})
w.Append(WALEntry{LSN: 2, Block: 1, Value: 200}) // overwrites block 1
w.Append(WALEntry{LSN: 3, Block: 2, Value: 300})
// State at LSN 1: block 1 = 100.
s1 := w.StateAt(1)
if s1[1] != 100 {
t.Fatalf("state@1: block 1 = %d, want 100", s1[1])
}
// State at LSN 2: block 1 = 200 (overwritten).
s2 := w.StateAt(2)
if s2[1] != 200 {
t.Fatalf("state@2: block 1 = %d, want 200", s2[1])
}
// State at LSN 3: block 1 = 200, block 2 = 300.
s3 := w.StateAt(3)
if s3[1] != 200 || s3[2] != 300 {
t.Fatalf("state@3: %v", s3)
}
}
// --- Recoverability proof: "why is catch-up allowed?" ---
func TestRecoverability_Provable_GapWithinRetention(t *testing.T) {
primary := NewWALHistory()
for i := uint64(1); i <= 100; i++ {
primary.Append(WALEntry{LSN: i, Epoch: 1, Block: i % 10, Value: i})
}
primary.Commit(100)
primary.AdvanceTail(30) // retain LSNs 31-100
// Replica at LSN 50 — gap is 51-100, within retention.
if !primary.IsRecoverable(50, 100) {
t.Fatal("gap 50→100 should be recoverable (tail=30)")
}
entries, err := primary.EntriesInRange(50, 100)
if err != nil {
t.Fatalf("entries should be available: %v", err)
}
if len(entries) != 50 {
t.Fatalf("entries=%d, want 50", len(entries))
}
// Handshake result proves recoverability.
hr := primary.MakeHandshakeResult(50)
outcome := ClassifyRecoveryOutcome(hr)
if outcome != OutcomeCatchUp {
t.Fatalf("outcome=%s, want catchup", outcome)
}
}
func TestRecoverability_Unprovable_GapBeyondRetention(t *testing.T) {
primary := NewWALHistory()
for i := uint64(1); i <= 100; i++ {
primary.Append(WALEntry{LSN: i, Epoch: 1, Block: i % 10, Value: i})
}
primary.Commit(100)
primary.AdvanceTail(60) // retain only LSNs 61-100
// Replica at LSN 50 — gap is 51-100, but 51-60 are recycled.
if primary.IsRecoverable(50, 100) {
t.Fatal("gap 50→100 should NOT be recoverable (tail=60)")
}
_, err := primary.EntriesInRange(50, 100)
if err == nil {
t.Fatal("recycled entries should return error")
}
hr := primary.MakeHandshakeResult(50)
outcome := ClassifyRecoveryOutcome(hr)
if outcome != OutcomeNeedsRebuild {
t.Fatalf("outcome=%s, want needs_rebuild", outcome)
}
}
func TestRecoverability_ExactBoundary(t *testing.T) {
primary := NewWALHistory()
for i := uint64(1); i <= 100; i++ {
primary.Append(WALEntry{LSN: i, Epoch: 1})
}
primary.Commit(100)
primary.AdvanceTail(50) // retain 51-100
// Replica at LSN 50 — gap starts at exactly tailLSN. Recoverable.
if !primary.IsRecoverable(50, 100) {
t.Fatal("exact boundary should be recoverable")
}
// Replica at LSN 49 — gap starts before tailLSN. NOT recoverable.
if primary.IsRecoverable(49, 100) {
t.Fatal("one-below boundary should NOT be recoverable")
}
}
// --- Truncation: divergent tail handling ---
func TestTruncation_ReplicaAhead_RequiresTruncateBeforeInSync(t *testing.T) {
s := NewSender("r1:9333", Endpoint{DataAddr: "r1:9333", Version: 1}, 1)
sess, _ := s.AttachSession(1, SessionCatchUp)
s.BeginConnect(sess.ID)
// Replica is ahead of committed (divergent uncommitted tail).
outcome, err := s.RecordHandshakeWithOutcome(sess.ID, HandshakeResult{
ReplicaFlushedLSN: 105,
CommittedLSN: 100,
RetentionStartLSN: 50,
})
if err != nil {
t.Fatal(err)
}
if outcome != OutcomeCatchUp {
t.Fatalf("replica ahead should be catchup (needs truncation), got %s", outcome)
}
// Session requires truncation.
if !sess.TruncateRequired {
t.Fatal("session should require truncation")
}
if sess.TruncateToLSN != 100 {
t.Fatalf("truncate to=%d, want 100", sess.TruncateToLSN)
}
// Completion WITHOUT truncation recorded — rejected.
if s.CompleteSessionByID(sess.ID) {
t.Fatal("completion without truncation should be rejected")
}
// Record truncation.
if err := s.RecordTruncation(sess.ID, 100); err != nil {
t.Fatalf("truncation record: %v", err)
}
// Now completion succeeds (zero-gap after truncation).
if !s.CompleteSessionByID(sess.ID) {
t.Fatal("completion after truncation should succeed")
}
if s.State != StateInSync {
t.Fatalf("state=%s, want in_sync", s.State)
}
}
func TestTruncation_WrongLSN_Rejected(t *testing.T) {
s := NewSender("r1:9333", Endpoint{DataAddr: "r1:9333", Version: 1}, 1)
sess, _ := s.AttachSession(1, SessionCatchUp)
s.BeginConnect(sess.ID)
s.RecordHandshakeWithOutcome(sess.ID, HandshakeResult{
ReplicaFlushedLSN: 105,
CommittedLSN: 100,
RetentionStartLSN: 50,
})
// Wrong truncation LSN.
if err := s.RecordTruncation(sess.ID, 99); err == nil {
t.Fatal("wrong truncation LSN should be rejected")
}
}
func TestTruncation_NotRequired_Rejected(t *testing.T) {
s := NewSender("r1:9333", Endpoint{DataAddr: "r1:9333", Version: 1}, 1)
sess, _ := s.AttachSession(1, SessionCatchUp)
s.BeginConnect(sess.ID)
// Normal gap (replica behind, not ahead).
s.RecordHandshakeWithOutcome(sess.ID, HandshakeResult{
ReplicaFlushedLSN: 80,
CommittedLSN: 100,
RetentionStartLSN: 50,
})
// Truncation not required.
if err := s.RecordTruncation(sess.ID, 100); err == nil {
t.Fatal("truncation on non-truncate session should be rejected")
}
}
// --- End-to-end: WAL-backed recovery with data verification ---
func TestE2E_WALBacked_CatchUpRecovery(t *testing.T) {
// Primary WAL with history.
primary := NewWALHistory()
for i := uint64(1); i <= 50; i++ {
primary.Append(WALEntry{LSN: i, Epoch: 1, Block: i % 8, Value: i})
}
primary.Commit(50)
primary.AdvanceTail(10) // retain 11-50
// Replica WAL (simulates a replica that fell behind at LSN 30).
replica := NewWALHistory()
for i := uint64(1); i <= 30; i++ {
replica.Append(WALEntry{LSN: i, Epoch: 1, Block: i % 8, Value: i})
}
// Sender + session.
s := NewSender("r1:9333", Endpoint{DataAddr: "r1:9333", Version: 1}, 1)
sess, _ := s.AttachSession(1, SessionCatchUp)
s.BeginConnect(sess.ID)
// Handshake with WAL-backed result.
hr := primary.MakeHandshakeResult(30)
outcome, _ := s.RecordHandshakeWithOutcome(sess.ID, hr)
if outcome != OutcomeCatchUp {
t.Fatalf("outcome=%s", outcome)
}
// Verify the gap is provably recoverable.
if !primary.IsRecoverable(30, 50) {
t.Fatal("gap should be provably recoverable from WAL")
}
// Get catch-up entries.
entries, err := primary.EntriesInRange(30, 50)
if err != nil {
t.Fatalf("catch-up entries: %v", err)
}
// Apply to replica.
s.BeginCatchUp(sess.ID)
for _, e := range entries {
replica.Append(e)
s.RecordCatchUpProgress(sess.ID, e.LSN)
}
// Complete.
if !s.CompleteSessionByID(sess.ID) {
t.Fatal("completion should succeed")
}
// Verify historical data correctness: replica state at committed LSN
// matches primary state at committed LSN.
primaryState := primary.StateAt(50)
replicaState := replica.StateAt(50)
for block, pVal := range primaryState {
if rVal := replicaState[block]; rVal != pVal {
t.Fatalf("block %d: primary=%d replica=%d", block, pVal, rVal)
}
}
t.Logf("WAL-backed recovery: gap 30→50 proven recoverable, data verified")
}
func TestE2E_WALBacked_RebuildFallback(t *testing.T) {
primary := NewWALHistory()
for i := uint64(1); i <= 100; i++ {
primary.Append(WALEntry{LSN: i, Epoch: 1, Block: i % 8, Value: i})
}
primary.Commit(100)
primary.AdvanceTail(60) // only retain 61-100
// Sender + session.
s := NewSender("r1:9333", Endpoint{DataAddr: "r1:9333", Version: 1}, 1)
sess, _ := s.AttachSession(1, SessionCatchUp)
s.BeginConnect(sess.ID)
// Handshake: replica at LSN 30, but retention starts at 61. Unrecoverable.
hr := primary.MakeHandshakeResult(30)
outcome, _ := s.RecordHandshakeWithOutcome(sess.ID, hr)
if outcome != OutcomeNeedsRebuild {
t.Fatalf("outcome=%s, want needs_rebuild", outcome)
}
// WAL proves WHY rebuild is needed.
if primary.IsRecoverable(30, 100) {
t.Fatal("gap should NOT be recoverable")
}
// Sender state.
if s.State != StateNeedsRebuild {
t.Fatalf("state=%s", s.State)
}
t.Logf("WAL-backed rebuild: gap 30→100 proven unrecoverable (tail=60)")
}
func TestE2E_WALBacked_TruncateThenInSync(t *testing.T) {
// Replica is ahead of committed — has divergent tail.
primary := NewWALHistory()
for i := uint64(1); i <= 100; i++ {
primary.Append(WALEntry{LSN: i, Epoch: 1, Block: i % 8, Value: i})
}
primary.Commit(100)
replica := NewWALHistory()
for i := uint64(1); i <= 105; i++ {
replica.Append(WALEntry{LSN: i, Epoch: 1, Block: i % 8, Value: i})
}
// Entries 101-105 are divergent (uncommitted on primary).
// Sender.
s := NewSender("r1:9333", Endpoint{DataAddr: "r1:9333", Version: 1}, 1)
sess, _ := s.AttachSession(1, SessionCatchUp)
s.BeginConnect(sess.ID)
hr := primary.MakeHandshakeResult(105)
outcome, _ := s.RecordHandshakeWithOutcome(sess.ID, hr)
if outcome != OutcomeCatchUp {
t.Fatalf("outcome=%s", outcome)
}
if !sess.TruncateRequired {
t.Fatal("truncation should be required")
}
// Truncate replica's divergent tail.
replica.Truncate(100)
if replica.HeadLSN() != 100 {
t.Fatalf("replica head after truncate=%d, want 100", replica.HeadLSN())
}
// Record truncation in session.
s.RecordTruncation(sess.ID, 100)
// Complete (zero-gap after truncation).
if !s.CompleteSessionByID(sess.ID) {
t.Fatal("completion after truncation should succeed")
}
// Verify data matches at committed boundary.
primaryState := primary.StateAt(100)
replicaState := replica.StateAt(100)
for block, pVal := range primaryState {
if rVal := replicaState[block]; rVal != pVal {
t.Fatalf("block %d: primary=%d replica=%d", block, pVal, rVal)
}
}
t.Logf("truncate-then-InSync: divergent tail removed, data verified at committed=100")
}

34
sw-block/prototype/enginev2/sender.go

@ -158,6 +158,10 @@ func (s *Sender) CompleteSessionByID(sessionID uint64) bool {
return false
}
sess := s.session
// Truncation gate: if truncation was required, it must be recorded.
if sess.TruncateRequired && !sess.TruncateRecorded {
return false
}
switch sess.Phase {
case PhaseCatchUp:
if !sess.Converged() {
@ -255,11 +259,39 @@ func (s *Sender) RecordHandshakeWithOutcome(sessionID uint64, result HandshakeRe
case OutcomeZeroGap:
s.session.SetRange(result.ReplicaFlushedLSN, result.ReplicaFlushedLSN)
case OutcomeCatchUp:
s.session.SetRange(result.ReplicaFlushedLSN, result.CommittedLSN)
if result.ReplicaFlushedLSN > result.CommittedLSN {
// Replica ahead of committed — divergent tail needs truncation.
s.session.TruncateRequired = true
s.session.TruncateToLSN = result.CommittedLSN
// Catch-up range is zero after truncation (already has committed prefix).
s.session.SetRange(result.CommittedLSN, result.CommittedLSN)
} else {
s.session.SetRange(result.ReplicaFlushedLSN, result.CommittedLSN)
}
}
return outcome, nil
}
// RecordTruncation confirms that the replica's divergent tail has been truncated.
// Must be called before completion when session.TruncateRequired is true.
// Rejects: wrong sessionID, truncation not required, already recorded.
func (s *Sender) RecordTruncation(sessionID uint64, truncatedToLSN uint64) error {
s.mu.Lock()
defer s.mu.Unlock()
if err := s.checkSessionAuthority(sessionID); err != nil {
return err
}
if !s.session.TruncateRequired {
return fmt.Errorf("truncation not required for this session")
}
if truncatedToLSN != s.session.TruncateToLSN {
return fmt.Errorf("truncation LSN mismatch: expected %d, got %d",
s.session.TruncateToLSN, truncatedToLSN)
}
s.session.TruncateRecorded = true
return nil
}
// BeginCatchUp transitions the session from handshake to catch-up phase.
// Mutates: session.Phase → PhaseCatchUp. Sender.State → StateCatchingUp.
// Rejects: wrong sessionID, wrong phase.

5
sw-block/prototype/enginev2/session.go

@ -58,6 +58,11 @@ type RecoverySession struct {
StartLSN uint64 // gap start (exclusive)
TargetLSN uint64 // gap end (inclusive)
RecoveredTo uint64 // highest LSN recovered so far
// Truncation tracking: set when replica has divergent tail beyond committed.
TruncateRequired bool // true if replica FlushedLSN > CommittedLSN at handshake
TruncateToLSN uint64 // truncate entries beyond this LSN
TruncateRecorded bool // true after truncation is confirmed
}
func newRecoverySession(replicaID string, epoch uint64, kind SessionKind) *RecoverySession {

146
sw-block/prototype/enginev2/walhistory.go

@ -0,0 +1,146 @@
package enginev2
import "fmt"
// WALEntry represents a single write in the WAL history.
type WALEntry struct {
LSN uint64
Epoch uint64
Block uint64
Value uint64
}
// WALHistory is a minimal retained-prefix model for proving recoverability.
// It tracks which LSN ranges are available for catch-up and which have been
// recycled (requiring rebuild). This is the data model behind
// ClassifyRecoveryOutcome — it makes "why recovery is allowed" executable.
type WALHistory struct {
entries []WALEntry
headLSN uint64 // highest LSN written
tailLSN uint64 // oldest retained LSN (exclusive: entries with LSN > tailLSN are kept)
committedLSN uint64 // lineage-safe boundary
}
// NewWALHistory creates an empty WAL history.
func NewWALHistory() *WALHistory {
return &WALHistory{}
}
// Append adds a WAL entry. LSN must be strictly greater than headLSN.
func (w *WALHistory) Append(entry WALEntry) error {
if entry.LSN <= w.headLSN {
return fmt.Errorf("WAL: append LSN %d <= head %d", entry.LSN, w.headLSN)
}
w.entries = append(w.entries, entry)
w.headLSN = entry.LSN
return nil
}
// Commit advances the committed boundary. Must be <= headLSN.
func (w *WALHistory) Commit(lsn uint64) error {
if lsn > w.headLSN {
return fmt.Errorf("WAL: commit LSN %d > head %d", lsn, w.headLSN)
}
if lsn > w.committedLSN {
w.committedLSN = lsn
}
return nil
}
// AdvanceTail recycles entries at or below lsn. After this, entries with
// LSN <= lsn are no longer available for catch-up recovery.
func (w *WALHistory) AdvanceTail(lsn uint64) {
if lsn <= w.tailLSN {
return
}
w.tailLSN = lsn
// Remove recycled entries from storage.
kept := w.entries[:0]
for _, e := range w.entries {
if e.LSN > lsn {
kept = append(kept, e)
}
}
w.entries = kept
}
// Truncate removes entries with LSN > afterLSN. Used to clean divergent
// tail on a replica that is ahead of the committed boundary.
func (w *WALHistory) Truncate(afterLSN uint64) {
kept := w.entries[:0]
for _, e := range w.entries {
if e.LSN <= afterLSN {
kept = append(kept, e)
}
}
w.entries = kept
if afterLSN < w.headLSN {
w.headLSN = afterLSN
}
}
// EntriesInRange returns entries with startExclusive < LSN <= endInclusive.
// Returns nil if any entry in the range has been recycled.
func (w *WALHistory) EntriesInRange(startExclusive, endInclusive uint64) ([]WALEntry, error) {
if startExclusive < w.tailLSN {
return nil, fmt.Errorf("WAL: range start %d < tail %d (recycled)", startExclusive, w.tailLSN)
}
var result []WALEntry
for _, e := range w.entries {
if e.LSN <= startExclusive {
continue
}
if e.LSN > endInclusive {
break
}
result = append(result, e)
}
return result, nil
}
// IsRecoverable checks whether all entries from startExclusive+1 to
// endInclusive are retained in the WAL. This is the executable proof
// of "why catch-up is allowed."
func (w *WALHistory) IsRecoverable(startExclusive, endInclusive uint64) bool {
return startExclusive >= w.tailLSN
}
// MakeHandshakeResult generates a HandshakeResult from the WAL state
// and a replica's reported flushed LSN. This connects the data model
// to the outcome classification.
func (w *WALHistory) MakeHandshakeResult(replicaFlushedLSN uint64) HandshakeResult {
retentionStart := w.tailLSN + 1 // first available LSN
if w.tailLSN == 0 {
retentionStart = 0 // all history retained
}
return HandshakeResult{
ReplicaFlushedLSN: replicaFlushedLSN,
CommittedLSN: w.committedLSN,
RetentionStartLSN: retentionStart,
}
}
// HeadLSN returns the highest LSN written.
func (w *WALHistory) HeadLSN() uint64 { return w.headLSN }
// TailLSN returns the oldest retained LSN boundary.
func (w *WALHistory) TailLSN() uint64 { return w.tailLSN }
// CommittedLSN returns the lineage-safe committed boundary.
func (w *WALHistory) CommittedLSN() uint64 { return w.committedLSN }
// Len returns the number of retained entries.
func (w *WALHistory) Len() int { return len(w.entries) }
// StateAt replays entries up to lsn and returns block→value state.
// Used to verify historical data correctness after recovery.
func (w *WALHistory) StateAt(lsn uint64) map[uint64]uint64 {
state := map[uint64]uint64{}
for _, e := range w.entries {
if e.LSN > lsn {
break
}
state[e.Block] = e.Value
}
return state
}
Loading…
Cancel
Save