Browse Source

fix: correct catch-up entry counting and rebuild transfer gate

Entry counting:
- Session.setRange now initializes recoveredTo = startLSN
- RecordCatchUpProgress delta counts only actual catch-up work
  (recoveredTo - startLSN), not the replica's pre-existing prefix

Rebuild transfer gate:
- BeginTailReplay requires TransferredTo >= SnapshotLSN
- Prevents tail replay on incomplete base transfer

3 new regression tests:
- BudgetEntries_NonZeroStart_CountsOnlyDelta (30 entries within 50 budget)
- BudgetEntries_NonZeroStart_ExceedsBudget (30 entries exceeds 20 budget)
- Rebuild_PartialTransfer_BlocksTailReplay

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
feature/sw-block
pingqiu 2 days ago
parent
commit
368a956aee
  1. 6
      sw-block/engine/replication/rebuild.go
  2. 71
      sw-block/engine/replication/recovery_test.go
  3. 4
      sw-block/engine/replication/session.go

6
sw-block/engine/replication/rebuild.go

@ -85,6 +85,12 @@ func (rs *RebuildState) BeginTailReplay() error {
if rs.Source != RebuildSnapshotTail {
return fmt.Errorf("rebuild: tail replay only for snapshot_tail")
}
// Gate: base transfer must have reached at least the snapshot LSN.
// Without this, tail replay could start on an incomplete base.
if rs.TransferredTo < rs.SnapshotLSN {
return fmt.Errorf("rebuild: base transfer incomplete (%d < snapshot %d)",
rs.TransferredTo, rs.SnapshotLSN)
}
rs.Phase = RebuildPhaseTailReplay
return nil
}

71
sw-block/engine/replication/recovery_test.go

@ -197,6 +197,77 @@ func TestRecovery_BudgetStall_Escalates(t *testing.T) {
}
}
func TestRecovery_BudgetEntries_NonZeroStart_CountsOnlyDelta(t *testing.T) {
// Replica starts at LSN 70. Catching up to 100 = 30 entries.
// With MaxEntries=50, this should NOT exceed budget.
s := NewSender("r1", Endpoint{DataAddr: "r1:9333", Version: 1}, 1)
id, _ := s.AttachSession(1, SessionCatchUp, WithBudget(CatchUpBudget{MaxEntries: 50}))
s.BeginConnect(id)
s.RecordHandshakeWithOutcome(id, HandshakeResult{
ReplicaFlushedLSN: 70, CommittedLSN: 100, RetentionStartLSN: 50,
})
s.BeginCatchUp(id)
// Progress from 70 → 100 = 30 entries (not 100).
s.RecordCatchUpProgress(id, 100)
v, _ := s.CheckBudget(id, 0)
if v != BudgetOK {
t.Fatalf("30 entries should be within 50-entry budget, got %s", v)
}
// Verify: only 30 entries counted, not 100.
snap := s.SessionSnapshot()
if snap == nil {
t.Fatal("session should exist")
}
}
func TestRecovery_BudgetEntries_NonZeroStart_ExceedsBudget(t *testing.T) {
// Replica starts at LSN 70. Catching up to 100 = 30 entries.
// With MaxEntries=20, this SHOULD exceed budget.
s := NewSender("r1", Endpoint{DataAddr: "r1:9333", Version: 1}, 1)
id, _ := s.AttachSession(1, SessionCatchUp, WithBudget(CatchUpBudget{MaxEntries: 20}))
s.BeginConnect(id)
s.RecordHandshakeWithOutcome(id, HandshakeResult{
ReplicaFlushedLSN: 70, CommittedLSN: 100, RetentionStartLSN: 50,
})
s.BeginCatchUp(id)
s.RecordCatchUpProgress(id, 100) // 30 entries > 20 limit
v, _ := s.CheckBudget(id, 0)
if v != BudgetEntriesExceeded {
t.Fatalf("30 entries should exceed 20-entry budget, got %s", v)
}
}
func TestRecovery_Rebuild_PartialTransfer_BlocksTailReplay(t *testing.T) {
s := NewSender("r1", Endpoint{DataAddr: "r1:9333", Version: 1}, 1)
id, _ := s.AttachSession(1, SessionRebuild)
s.BeginConnect(id)
s.RecordHandshake(id, 0, 100)
s.SelectRebuildSource(id, 50, true, 100) // snapshot at LSN 50
s.BeginRebuildTransfer(id)
s.RecordRebuildTransferProgress(id, 30) // only transferred to 30 < snapshot 50
// Tail replay should be blocked: base transfer incomplete.
if err := s.BeginRebuildTailReplay(id); err == nil {
t.Fatal("tail replay should be blocked when transfer < snapshotLSN")
}
// Complete transfer to snapshot LSN.
s.RecordRebuildTransferProgress(id, 50)
// Now tail replay allowed.
if err := s.BeginRebuildTailReplay(id); err != nil {
t.Fatalf("tail replay should work after full transfer: %v", err)
}
}
func TestRecovery_CompletionBeforeConvergence_Rejected(t *testing.T) {
s := NewSender("r1", Endpoint{DataAddr: "r1:9333", Version: 1}, 1)
id, _ := s.AttachSession(1, SessionCatchUp)

4
sw-block/engine/replication/session.go

@ -84,6 +84,10 @@ func (s *Session) advance(phase SessionPhase) bool {
func (s *Session) setRange(start, target uint64) {
s.startLSN = start
s.targetLSN = target
// Initialize recoveredTo to startLSN so that delta-based entry counting
// in RecordCatchUpProgress measures only the actual catch-up work,
// not the replica's pre-existing prefix.
s.recoveredTo = start
}
func (s *Session) updateProgress(recoveredTo uint64) {

Loading…
Cancel
Save