Browse Source

feat: add bounded CatchUp budget and Rebuild mode state machine (Phase 4.5 P0)

Bounded CatchUp:
- CatchUpBudget: MaxDurationTicks, MaxEntries, ProgressDeadlineTicks
- BudgetCheck: runtime consumption tracker (StartTick, EntriesReplayed, LastProgressTick)
- Sender.CheckBudget: evaluates budget, escalates to NeedsRebuild on violation
- RecordCatchUpProgressAt: tracks progress tick for stall detection
- BeginCatchUp accepts optional startTick for budget tracking

Rebuild state machine:
- RebuildSource: snapshot_tail (preferred) vs full_base (fallback)
- RebuildPhase: init → source_select → transfer → tail_replay → completed|aborted
- SelectSource: chooses based on snapshot availability
- Phase ordering enforced, transfer regression rejected
- ReadyToComplete validates target reached

13 new tests: budget enforcement (duration, entries, stall, no-budget),
sender budget integration, rebuild lifecycle (snapshot+tail, full base,
abort, phase order, regression), E2E bounded catch-up → rebuild.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
feature/sw-block
pingqiu 2 days ago
parent
commit
3f0048cbd9
  1. 59
      sw-block/prototype/enginev2/budget.go
  2. 306
      sw-block/prototype/enginev2/phase45_test.go
  3. 152
      sw-block/prototype/enginev2/rebuild.go
  4. 52
      sw-block/prototype/enginev2/sender.go
  5. 7
      sw-block/prototype/enginev2/session.go

59
sw-block/prototype/enginev2/budget.go

@ -0,0 +1,59 @@
package enginev2
// CatchUpBudget defines the bounded resource contract for a catch-up session.
// CatchUp is a short-gap, bounded recovery path. When any budget limit is
// exceeded, the session must escalate to NeedsRebuild rather than continuing
// indefinitely.
//
// A zero value for any field means "no limit" for that dimension.
type CatchUpBudget struct {
// TargetLSNAtStart is frozen at handshake time. The catch-up target
// does not drift — if the primary writes more, the session completes
// at the original target and then transitions to normal shipping.
TargetLSNAtStart uint64
// MaxDurationTicks is the hard time budget. If the session has not
// converged within this many ticks, it escalates.
MaxDurationTicks uint64
// MaxEntries is the maximum number of WAL entries to replay.
// Prevents a "short gap" from silently becoming a full rebuild.
MaxEntries uint64
// ProgressDeadlineTicks is the stall detection window. If no progress
// is recorded within this many ticks, the session escalates.
ProgressDeadlineTicks uint64
}
// BudgetCheck tracks runtime budget consumption for a catch-up session.
type BudgetCheck struct {
StartTick uint64 // tick when catch-up began
EntriesReplayed uint64 // total entries replayed so far
LastProgressTick uint64 // tick of last RecordCatchUpProgress call
}
// BudgetViolation identifies which budget limit was exceeded.
type BudgetViolation string
const (
BudgetOK BudgetViolation = ""
BudgetDurationExceeded BudgetViolation = "duration_exceeded"
BudgetEntriesExceeded BudgetViolation = "entries_exceeded"
BudgetProgressStalled BudgetViolation = "progress_stalled"
)
// Check evaluates the budget against the current tick. Returns BudgetOK
// if all limits are within bounds, or the specific violation.
func (b *CatchUpBudget) Check(tracker BudgetCheck, currentTick uint64) BudgetViolation {
if b.MaxDurationTicks > 0 && currentTick-tracker.StartTick > b.MaxDurationTicks {
return BudgetDurationExceeded
}
if b.MaxEntries > 0 && tracker.EntriesReplayed > b.MaxEntries {
return BudgetEntriesExceeded
}
if b.ProgressDeadlineTicks > 0 && tracker.LastProgressTick > 0 &&
currentTick-tracker.LastProgressTick > b.ProgressDeadlineTicks {
return BudgetProgressStalled
}
return BudgetOK
}

306
sw-block/prototype/enginev2/phase45_test.go

@ -0,0 +1,306 @@
package enginev2
import "testing"
// ============================================================
// Phase 4.5 P0: Bounded CatchUp + Rebuild mode state machine
// ============================================================
// --- CatchUp Budget ---
func TestBudget_DurationExceeded(t *testing.T) {
b := &CatchUpBudget{MaxDurationTicks: 10}
tracker := BudgetCheck{StartTick: 0}
if v := b.Check(tracker, 5); v != BudgetOK {
t.Fatalf("tick 5: %s", v)
}
if v := b.Check(tracker, 11); v != BudgetDurationExceeded {
t.Fatalf("tick 11: got %s, want duration_exceeded", v)
}
}
func TestBudget_EntriesExceeded(t *testing.T) {
b := &CatchUpBudget{MaxEntries: 100}
tracker := BudgetCheck{EntriesReplayed: 50}
if v := b.Check(tracker, 0); v != BudgetOK {
t.Fatalf("50 entries: %s", v)
}
tracker.EntriesReplayed = 101
if v := b.Check(tracker, 0); v != BudgetEntriesExceeded {
t.Fatalf("101 entries: got %s, want entries_exceeded", v)
}
}
func TestBudget_ProgressStalled(t *testing.T) {
b := &CatchUpBudget{ProgressDeadlineTicks: 5}
tracker := BudgetCheck{LastProgressTick: 10}
if v := b.Check(tracker, 14); v != BudgetOK {
t.Fatalf("tick 14: %s", v)
}
if v := b.Check(tracker, 16); v != BudgetProgressStalled {
t.Fatalf("tick 16: got %s, want progress_stalled", v)
}
}
func TestBudget_NoBudget_AlwaysOK(t *testing.T) {
b := &CatchUpBudget{} // all zeros = no limits
tracker := BudgetCheck{EntriesReplayed: 999999, StartTick: 0}
if v := b.Check(tracker, 999999); v != BudgetOK {
t.Fatalf("no budget: %s", v)
}
}
// --- Sender budget integration ---
func TestSender_Budget_EscalatesOnViolation(t *testing.T) {
s := NewSender("r1:9333", Endpoint{DataAddr: "r1:9333", Version: 1}, 1)
sess, _ := s.AttachSession(1, SessionCatchUp)
// Set budget.
sess.Budget = &CatchUpBudget{MaxDurationTicks: 10}
s.BeginConnect(sess.ID)
s.RecordHandshake(sess.ID, 0, 100)
s.BeginCatchUp(sess.ID, 0) // start at tick 0
s.RecordCatchUpProgress(sess.ID, 10)
// Within budget at tick 5.
v, _ := s.CheckBudget(sess.ID, 5)
if v != BudgetOK {
t.Fatalf("tick 5: %s", v)
}
// Exceeded at tick 11.
v, _ = s.CheckBudget(sess.ID, 11)
if v != BudgetDurationExceeded {
t.Fatalf("tick 11: got %s, want duration_exceeded", v)
}
// Session invalidated, sender at NeedsRebuild.
if sess.Active() {
t.Fatal("session should be invalidated after budget violation")
}
if s.State != StateNeedsRebuild {
t.Fatalf("state=%s, want needs_rebuild", s.State)
}
}
func TestSender_Budget_ProgressStall_Escalates(t *testing.T) {
s := NewSender("r1:9333", Endpoint{DataAddr: "r1:9333", Version: 1}, 1)
sess, _ := s.AttachSession(1, SessionCatchUp)
sess.Budget = &CatchUpBudget{ProgressDeadlineTicks: 5}
s.BeginConnect(sess.ID)
s.RecordHandshake(sess.ID, 0, 100)
s.BeginCatchUp(sess.ID, 10)
s.RecordCatchUpProgressAt(sess.ID, 20, 12) // progress at tick 12
// Stalled: no progress for 6 ticks (12→18 > deadline 5).
v, _ := s.CheckBudget(sess.ID, 18)
if v != BudgetProgressStalled {
t.Fatalf("tick 18: got %s, want progress_stalled", v)
}
if s.State != StateNeedsRebuild {
t.Fatalf("state=%s", s.State)
}
}
func TestSender_NoBudget_NeverEscalates(t *testing.T) {
s := NewSender("r1:9333", Endpoint{DataAddr: "r1:9333", Version: 1}, 1)
sess, _ := s.AttachSession(1, SessionCatchUp)
// No budget set.
s.BeginConnect(sess.ID)
s.RecordHandshake(sess.ID, 0, 100)
s.BeginCatchUp(sess.ID, 0)
v, _ := s.CheckBudget(sess.ID, 999999)
if v != BudgetOK {
t.Fatalf("no budget: %s", v)
}
if s.State != StateCatchingUp {
t.Fatalf("state=%s", s.State)
}
}
// --- Rebuild state machine ---
func TestRebuild_SnapshotTail_FullLifecycle(t *testing.T) {
rs := NewRebuildState()
// Source select: snapshot available.
rs.SelectSource(50, true, 100)
if rs.Source != RebuildSnapshotTail {
t.Fatalf("source=%s, want snapshot_tail", rs.Source)
}
if rs.TailStartLSN != 50 || rs.TailTargetLSN != 100 {
t.Fatalf("tail: start=%d target=%d", rs.TailStartLSN, rs.TailTargetLSN)
}
// Transfer base (snapshot copy).
rs.BeginTransfer()
rs.RecordTransferProgress(50)
// Tail replay.
rs.BeginTailReplay()
rs.RecordTailReplayProgress(75)
rs.RecordTailReplayProgress(100)
if !rs.ReadyToComplete() {
t.Fatal("should be ready to complete")
}
rs.Complete()
if rs.Phase != RebuildPhaseCompleted {
t.Fatalf("phase=%s", rs.Phase)
}
}
func TestRebuild_FullBase_Lifecycle(t *testing.T) {
rs := NewRebuildState()
// No valid snapshot.
rs.SelectSource(0, false, 100)
if rs.Source != RebuildFullBase {
t.Fatalf("source=%s, want full_base", rs.Source)
}
rs.BeginTransfer()
rs.RecordTransferProgress(50)
rs.RecordTransferProgress(100)
// Full base: no tail replay needed.
if err := rs.BeginTailReplay(); err == nil {
t.Fatal("full base should not allow tail replay")
}
if !rs.ReadyToComplete() {
t.Fatal("should be ready to complete")
}
rs.Complete()
}
func TestRebuild_Abort(t *testing.T) {
rs := NewRebuildState()
rs.SelectSource(50, true, 100)
rs.BeginTransfer()
rs.Abort("epoch_bump")
if rs.Phase != RebuildPhaseAborted {
t.Fatalf("phase=%s", rs.Phase)
}
if rs.AbortReason != "epoch_bump" {
t.Fatalf("reason=%s", rs.AbortReason)
}
// Cannot complete after abort.
if err := rs.Complete(); err == nil {
t.Fatal("complete after abort should fail")
}
}
func TestRebuild_PhaseOrderEnforced(t *testing.T) {
rs := NewRebuildState()
// Cannot transfer before source select.
if err := rs.BeginTransfer(); err == nil {
t.Fatal("transfer before source select should fail")
}
rs.SelectSource(50, true, 100)
// Cannot tail replay before transfer.
if err := rs.BeginTailReplay(); err == nil {
t.Fatal("tail replay before transfer should fail")
}
rs.BeginTransfer()
// Cannot complete before reaching target.
if rs.ReadyToComplete() {
t.Fatal("should not be ready before reaching target")
}
}
func TestRebuild_TransferRegression_Rejected(t *testing.T) {
rs := NewRebuildState()
rs.SelectSource(0, false, 100)
rs.BeginTransfer()
rs.RecordTransferProgress(50)
if err := rs.RecordTransferProgress(30); err == nil {
t.Fatal("transfer regression should be rejected")
}
}
// --- E2E: Bounded catch-up → budget exceeded → rebuild ---
func TestE2E_BoundedCatchUp_EscalatesToRebuild(t *testing.T) {
primary := NewWALHistory()
for i := uint64(1); i <= 100; i++ {
primary.Append(WALEntry{LSN: i, Epoch: 1, Block: i % 8, Value: i})
}
primary.Commit(100)
sg := NewSenderGroup()
sg.ApplyAssignment(AssignmentIntent{
Endpoints: map[string]Endpoint{
"r1:9333": {DataAddr: "r1:9333", Version: 1},
},
Epoch: 1,
RecoveryTargets: map[string]SessionKind{"r1:9333": SessionCatchUp},
})
r1 := sg.Sender("r1:9333")
sess := r1.Session()
// Set tight budget: max 20 entries.
sess.Budget = &CatchUpBudget{MaxEntries: 20}
r1.BeginConnect(sess.ID)
outcome, _ := r1.RecordHandshakeWithOutcome(sess.ID, primary.MakeHandshakeResult(50))
if outcome != OutcomeCatchUp {
t.Fatalf("outcome=%s", outcome)
}
r1.BeginCatchUp(sess.ID, 0)
// Replay 21 entries — exceeds budget.
entries, _ := primary.EntriesInRange(50, 71)
for _, e := range entries {
r1.RecordCatchUpProgress(sess.ID, e.LSN)
}
v, _ := r1.CheckBudget(sess.ID, 0)
if v != BudgetEntriesExceeded {
t.Fatalf("budget: %s", v)
}
if r1.State != StateNeedsRebuild {
t.Fatalf("state=%s", r1.State)
}
// New rebuild assignment.
sg.ApplyAssignment(AssignmentIntent{
Endpoints: map[string]Endpoint{
"r1:9333": {DataAddr: "r1:9333", Version: 1},
},
Epoch: 1,
RecoveryTargets: map[string]SessionKind{"r1:9333": SessionRebuild},
})
rebuildSess := r1.Session()
r1.BeginConnect(rebuildSess.ID)
r1.RecordHandshake(rebuildSess.ID, 0, 100)
r1.BeginCatchUp(rebuildSess.ID)
r1.RecordCatchUpProgress(rebuildSess.ID, 100)
r1.CompleteSessionByID(rebuildSess.ID)
if r1.State != StateInSync {
t.Fatalf("after rebuild: state=%s", r1.State)
}
t.Logf("bounded catch-up → budget exceeded → rebuild → InSync")
}

152
sw-block/prototype/enginev2/rebuild.go

@ -0,0 +1,152 @@
package enginev2
import "fmt"
// RebuildSource identifies the recovery base for a rebuild session.
type RebuildSource string
const (
// RebuildSnapshotTail uses a trusted base snapshot/checkpoint and
// replays the retained WAL tail from that point to CommittedLSN.
// Preferred path — avoids copying the full extent.
RebuildSnapshotTail RebuildSource = "snapshot_tail"
// RebuildFullBase copies the full extent from the primary (or a
// snapshot source) when no acceptable base snapshot exists.
// Fallback path — expensive but always available.
RebuildFullBase RebuildSource = "full_base"
)
// RebuildPhase tracks progress within a rebuild session.
type RebuildPhase string
const (
RebuildPhaseInit RebuildPhase = "init"
RebuildPhaseSourceSelect RebuildPhase = "source_select" // choosing snapshot vs full
RebuildPhaseTransfer RebuildPhase = "transfer" // copying base data
RebuildPhaseTailReplay RebuildPhase = "tail_replay" // replaying WAL tail after snapshot
RebuildPhaseCompleted RebuildPhase = "completed"
RebuildPhaseAborted RebuildPhase = "aborted"
)
// RebuildState tracks the execution state of a rebuild session.
// Owned by the Sender's RecoverySession when Kind == SessionRebuild.
type RebuildState struct {
Source RebuildSource
Phase RebuildPhase
AbortReason string
// Source selection inputs.
SnapshotLSN uint64 // LSN of the best available snapshot (0 = none)
SnapshotValid bool // whether the snapshot is trustworthy
// Transfer progress.
TransferredTo uint64 // highest LSN transferred (base or extent copy)
// Tail replay progress (snapshot_tail mode only).
TailStartLSN uint64 // start of WAL tail replay (= snapshot LSN)
TailTargetLSN uint64 // committed boundary
TailReplayedTo uint64 // highest LSN replayed from tail
}
// NewRebuildState creates a rebuild state in the init phase.
func NewRebuildState() *RebuildState {
return &RebuildState{Phase: RebuildPhaseInit}
}
// SelectSource chooses the rebuild source based on snapshot availability.
// If a valid snapshot exists, uses snapshot+tail (cheaper).
// Otherwise, uses full base rebuild.
func (rs *RebuildState) SelectSource(snapshotLSN uint64, snapshotValid bool, committedLSN uint64) error {
if rs.Phase != RebuildPhaseInit {
return fmt.Errorf("rebuild: source select requires init phase, got %s", rs.Phase)
}
rs.SnapshotLSN = snapshotLSN
rs.SnapshotValid = snapshotValid
rs.Phase = RebuildPhaseSourceSelect
if snapshotValid && snapshotLSN > 0 {
rs.Source = RebuildSnapshotTail
rs.TailStartLSN = snapshotLSN
rs.TailTargetLSN = committedLSN
} else {
rs.Source = RebuildFullBase
rs.TailTargetLSN = committedLSN
}
return nil
}
// BeginTransfer starts the base data transfer phase.
func (rs *RebuildState) BeginTransfer() error {
if rs.Phase != RebuildPhaseSourceSelect {
return fmt.Errorf("rebuild: transfer requires source_select phase, got %s", rs.Phase)
}
rs.Phase = RebuildPhaseTransfer
return nil
}
// RecordTransferProgress records how much base data has been transferred.
func (rs *RebuildState) RecordTransferProgress(transferredTo uint64) error {
if rs.Phase != RebuildPhaseTransfer {
return fmt.Errorf("rebuild: progress requires transfer phase, got %s", rs.Phase)
}
if transferredTo <= rs.TransferredTo {
return fmt.Errorf("rebuild: transfer regression: %d <= %d", transferredTo, rs.TransferredTo)
}
rs.TransferredTo = transferredTo
return nil
}
// BeginTailReplay transitions to tail replay after base transfer (snapshot_tail only).
func (rs *RebuildState) BeginTailReplay() error {
if rs.Phase != RebuildPhaseTransfer {
return fmt.Errorf("rebuild: tail replay requires transfer phase, got %s", rs.Phase)
}
if rs.Source != RebuildSnapshotTail {
return fmt.Errorf("rebuild: tail replay only valid for snapshot_tail source")
}
rs.Phase = RebuildPhaseTailReplay
return nil
}
// RecordTailReplayProgress records WAL tail replay progress.
func (rs *RebuildState) RecordTailReplayProgress(replayedTo uint64) error {
if rs.Phase != RebuildPhaseTailReplay {
return fmt.Errorf("rebuild: tail progress requires tail_replay phase, got %s", rs.Phase)
}
if replayedTo <= rs.TailReplayedTo {
return fmt.Errorf("rebuild: tail regression: %d <= %d", replayedTo, rs.TailReplayedTo)
}
rs.TailReplayedTo = replayedTo
return nil
}
// ReadyToComplete checks whether the rebuild has reached its target.
func (rs *RebuildState) ReadyToComplete() bool {
switch rs.Source {
case RebuildSnapshotTail:
return rs.Phase == RebuildPhaseTailReplay && rs.TailReplayedTo >= rs.TailTargetLSN
case RebuildFullBase:
return rs.Phase == RebuildPhaseTransfer && rs.TransferredTo >= rs.TailTargetLSN
default:
return false
}
}
// Complete marks the rebuild as completed.
func (rs *RebuildState) Complete() error {
if !rs.ReadyToComplete() {
return fmt.Errorf("rebuild: not ready to complete (source=%s phase=%s)", rs.Source, rs.Phase)
}
rs.Phase = RebuildPhaseCompleted
return nil
}
// Abort marks the rebuild as aborted with a reason.
func (rs *RebuildState) Abort(reason string) {
if rs.Phase == RebuildPhaseCompleted || rs.Phase == RebuildPhaseAborted {
return
}
rs.Phase = RebuildPhaseAborted
rs.AbortReason = reason
}

52
sw-block/prototype/enginev2/sender.go

@ -294,8 +294,9 @@ func (s *Sender) RecordTruncation(sessionID uint64, truncatedToLSN uint64) error
// BeginCatchUp transitions the session from handshake to catch-up phase.
// Mutates: session.Phase → PhaseCatchUp. Sender.State → StateCatchingUp.
// Initializes budget tracker with startTick (pass 0 if no budget enforcement).
// Rejects: wrong sessionID, wrong phase.
func (s *Sender) BeginCatchUp(sessionID uint64) error {
func (s *Sender) BeginCatchUp(sessionID uint64, startTick ...uint64) error {
s.mu.Lock()
defer s.mu.Unlock()
if err := s.checkSessionAuthority(sessionID); err != nil {
@ -305,11 +306,15 @@ func (s *Sender) BeginCatchUp(sessionID uint64) error {
return fmt.Errorf("cannot begin catch-up: session phase=%s", s.session.Phase)
}
s.State = StateCatchingUp
if len(startTick) > 0 {
s.session.Tracker.StartTick = startTick[0]
s.session.Tracker.LastProgressTick = startTick[0]
}
return nil
}
// RecordCatchUpProgress records catch-up progress (highest LSN recovered).
// Mutates: session.RecoveredTo (monotonic only).
// Mutates: session.RecoveredTo (monotonic only), session.Tracker.EntriesReplayed.
// Rejects: wrong sessionID, wrong phase, progress regression, invalidated session.
func (s *Sender) RecordCatchUpProgress(sessionID uint64, recoveredTo uint64) error {
s.mu.Lock()
@ -324,9 +329,52 @@ func (s *Sender) RecordCatchUpProgress(sessionID uint64, recoveredTo uint64) err
return fmt.Errorf("progress regression: current=%d proposed=%d", s.session.RecoveredTo, recoveredTo)
}
s.session.UpdateProgress(recoveredTo)
s.session.Tracker.EntriesReplayed++
return nil
}
// RecordCatchUpProgressAt is like RecordCatchUpProgress but also records the tick
// for budget stall detection.
func (s *Sender) RecordCatchUpProgressAt(sessionID uint64, recoveredTo uint64, tick uint64) error {
s.mu.Lock()
defer s.mu.Unlock()
if err := s.checkSessionAuthority(sessionID); err != nil {
return err
}
if s.session.Phase != PhaseCatchUp {
return fmt.Errorf("cannot record progress: session phase=%s, want catchup", s.session.Phase)
}
if recoveredTo <= s.session.RecoveredTo {
return fmt.Errorf("progress regression: current=%d proposed=%d", s.session.RecoveredTo, recoveredTo)
}
s.session.UpdateProgress(recoveredTo)
s.session.Tracker.EntriesReplayed++
s.session.Tracker.LastProgressTick = tick
return nil
}
// CheckBudget evaluates the session's catch-up budget at the current tick.
// Returns BudgetOK if within bounds, or the specific violation.
// If a violation is detected, the session is invalidated and the sender
// transitions to NeedsRebuild.
func (s *Sender) CheckBudget(sessionID uint64, currentTick uint64) (BudgetViolation, error) {
s.mu.Lock()
defer s.mu.Unlock()
if err := s.checkSessionAuthority(sessionID); err != nil {
return BudgetOK, err
}
if s.session.Budget == nil {
return BudgetOK, nil // no budget enforcement
}
violation := s.session.Budget.Check(s.session.Tracker, currentTick)
if violation != BudgetOK {
s.session.invalidate(fmt.Sprintf("budget_%s", violation))
s.session = nil
s.State = StateNeedsRebuild
}
return violation, nil
}
// checkSessionAuthority validates that the sender has an active session
// matching the given ID. Must be called with s.mu held.
func (s *Sender) checkSessionAuthority(sessionID uint64) error {

7
sw-block/prototype/enginev2/session.go

@ -63,6 +63,13 @@ type RecoverySession struct {
TruncateRequired bool // true if replica FlushedLSN > CommittedLSN at handshake
TruncateToLSN uint64 // truncate entries beyond this LSN
TruncateRecorded bool // true after truncation is confirmed
// Bounded CatchUp budget (Phase 4.5).
Budget *CatchUpBudget // nil = no budget enforcement
Tracker BudgetCheck // runtime consumption
// Rebuild state (Phase 4.5). Non-nil when Kind == SessionRebuild.
Rebuild *RebuildState
}
func newRecoverySession(replicaID string, epoch uint64, kind SessionKind) *RecoverySession {

Loading…
Cancel
Save