diff --git a/sw-block/engine/replication/rebuild.go b/sw-block/engine/replication/rebuild.go index abee0cb37..d82da9c69 100644 --- a/sw-block/engine/replication/rebuild.go +++ b/sw-block/engine/replication/rebuild.go @@ -85,6 +85,12 @@ func (rs *RebuildState) BeginTailReplay() error { if rs.Source != RebuildSnapshotTail { return fmt.Errorf("rebuild: tail replay only for snapshot_tail") } + // Gate: base transfer must have reached at least the snapshot LSN. + // Without this, tail replay could start on an incomplete base. + if rs.TransferredTo < rs.SnapshotLSN { + return fmt.Errorf("rebuild: base transfer incomplete (%d < snapshot %d)", + rs.TransferredTo, rs.SnapshotLSN) + } rs.Phase = RebuildPhaseTailReplay return nil } diff --git a/sw-block/engine/replication/recovery_test.go b/sw-block/engine/replication/recovery_test.go index 87dfc3918..247fe74bb 100644 --- a/sw-block/engine/replication/recovery_test.go +++ b/sw-block/engine/replication/recovery_test.go @@ -197,6 +197,77 @@ func TestRecovery_BudgetStall_Escalates(t *testing.T) { } } +func TestRecovery_BudgetEntries_NonZeroStart_CountsOnlyDelta(t *testing.T) { + // Replica starts at LSN 70. Catching up to 100 = 30 entries. + // With MaxEntries=50, this should NOT exceed budget. + s := NewSender("r1", Endpoint{DataAddr: "r1:9333", Version: 1}, 1) + id, _ := s.AttachSession(1, SessionCatchUp, WithBudget(CatchUpBudget{MaxEntries: 50})) + + s.BeginConnect(id) + s.RecordHandshakeWithOutcome(id, HandshakeResult{ + ReplicaFlushedLSN: 70, CommittedLSN: 100, RetentionStartLSN: 50, + }) + s.BeginCatchUp(id) + + // Progress from 70 → 100 = 30 entries (not 100). + s.RecordCatchUpProgress(id, 100) + + v, _ := s.CheckBudget(id, 0) + if v != BudgetOK { + t.Fatalf("30 entries should be within 50-entry budget, got %s", v) + } + + // Verify: only 30 entries counted, not 100. + snap := s.SessionSnapshot() + if snap == nil { + t.Fatal("session should exist") + } +} + +func TestRecovery_BudgetEntries_NonZeroStart_ExceedsBudget(t *testing.T) { + // Replica starts at LSN 70. Catching up to 100 = 30 entries. + // With MaxEntries=20, this SHOULD exceed budget. + s := NewSender("r1", Endpoint{DataAddr: "r1:9333", Version: 1}, 1) + id, _ := s.AttachSession(1, SessionCatchUp, WithBudget(CatchUpBudget{MaxEntries: 20})) + + s.BeginConnect(id) + s.RecordHandshakeWithOutcome(id, HandshakeResult{ + ReplicaFlushedLSN: 70, CommittedLSN: 100, RetentionStartLSN: 50, + }) + s.BeginCatchUp(id) + s.RecordCatchUpProgress(id, 100) // 30 entries > 20 limit + + v, _ := s.CheckBudget(id, 0) + if v != BudgetEntriesExceeded { + t.Fatalf("30 entries should exceed 20-entry budget, got %s", v) + } +} + +func TestRecovery_Rebuild_PartialTransfer_BlocksTailReplay(t *testing.T) { + s := NewSender("r1", Endpoint{DataAddr: "r1:9333", Version: 1}, 1) + id, _ := s.AttachSession(1, SessionRebuild) + + s.BeginConnect(id) + s.RecordHandshake(id, 0, 100) + s.SelectRebuildSource(id, 50, true, 100) // snapshot at LSN 50 + + s.BeginRebuildTransfer(id) + s.RecordRebuildTransferProgress(id, 30) // only transferred to 30 < snapshot 50 + + // Tail replay should be blocked: base transfer incomplete. + if err := s.BeginRebuildTailReplay(id); err == nil { + t.Fatal("tail replay should be blocked when transfer < snapshotLSN") + } + + // Complete transfer to snapshot LSN. + s.RecordRebuildTransferProgress(id, 50) + + // Now tail replay allowed. + if err := s.BeginRebuildTailReplay(id); err != nil { + t.Fatalf("tail replay should work after full transfer: %v", err) + } +} + func TestRecovery_CompletionBeforeConvergence_Rejected(t *testing.T) { s := NewSender("r1", Endpoint{DataAddr: "r1:9333", Version: 1}, 1) id, _ := s.AttachSession(1, SessionCatchUp) diff --git a/sw-block/engine/replication/session.go b/sw-block/engine/replication/session.go index 4613411df..9a4062bf1 100644 --- a/sw-block/engine/replication/session.go +++ b/sw-block/engine/replication/session.go @@ -84,6 +84,10 @@ func (s *Session) advance(phase SessionPhase) bool { func (s *Session) setRange(start, target uint64) { s.startLSN = start s.targetLSN = target + // Initialize recoveredTo to startLSN so that delta-based entry counting + // in RecordCatchUpProgress measures only the actual catch-up work, + // not the replica's pre-existing prefix. + s.recoveredTo = start } func (s *Session) updateProgress(recoveredTo uint64) {