From 3f0048cbd9d1421694929d738baf276acc797197 Mon Sep 17 00:00:00 2001 From: pingqiu Date: Sun, 29 Mar 2026 14:33:06 -0700 Subject: [PATCH] feat: add bounded CatchUp budget and Rebuild mode state machine (Phase 4.5 P0) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Bounded CatchUp: - CatchUpBudget: MaxDurationTicks, MaxEntries, ProgressDeadlineTicks - BudgetCheck: runtime consumption tracker (StartTick, EntriesReplayed, LastProgressTick) - Sender.CheckBudget: evaluates budget, escalates to NeedsRebuild on violation - RecordCatchUpProgressAt: tracks progress tick for stall detection - BeginCatchUp accepts optional startTick for budget tracking Rebuild state machine: - RebuildSource: snapshot_tail (preferred) vs full_base (fallback) - RebuildPhase: init → source_select → transfer → tail_replay → completed|aborted - SelectSource: chooses based on snapshot availability - Phase ordering enforced, transfer regression rejected - ReadyToComplete validates target reached 13 new tests: budget enforcement (duration, entries, stall, no-budget), sender budget integration, rebuild lifecycle (snapshot+tail, full base, abort, phase order, regression), E2E bounded catch-up → rebuild. Co-Authored-By: Claude Opus 4.6 (1M context) --- sw-block/prototype/enginev2/budget.go | 59 ++++ sw-block/prototype/enginev2/phase45_test.go | 306 ++++++++++++++++++++ sw-block/prototype/enginev2/rebuild.go | 152 ++++++++++ sw-block/prototype/enginev2/sender.go | 52 +++- sw-block/prototype/enginev2/session.go | 7 + 5 files changed, 574 insertions(+), 2 deletions(-) create mode 100644 sw-block/prototype/enginev2/budget.go create mode 100644 sw-block/prototype/enginev2/phase45_test.go create mode 100644 sw-block/prototype/enginev2/rebuild.go diff --git a/sw-block/prototype/enginev2/budget.go b/sw-block/prototype/enginev2/budget.go new file mode 100644 index 000000000..2e4a01623 --- /dev/null +++ b/sw-block/prototype/enginev2/budget.go @@ -0,0 +1,59 @@ +package enginev2 + +// CatchUpBudget defines the bounded resource contract for a catch-up session. +// CatchUp is a short-gap, bounded recovery path. When any budget limit is +// exceeded, the session must escalate to NeedsRebuild rather than continuing +// indefinitely. +// +// A zero value for any field means "no limit" for that dimension. +type CatchUpBudget struct { + // TargetLSNAtStart is frozen at handshake time. The catch-up target + // does not drift — if the primary writes more, the session completes + // at the original target and then transitions to normal shipping. + TargetLSNAtStart uint64 + + // MaxDurationTicks is the hard time budget. If the session has not + // converged within this many ticks, it escalates. + MaxDurationTicks uint64 + + // MaxEntries is the maximum number of WAL entries to replay. + // Prevents a "short gap" from silently becoming a full rebuild. + MaxEntries uint64 + + // ProgressDeadlineTicks is the stall detection window. If no progress + // is recorded within this many ticks, the session escalates. + ProgressDeadlineTicks uint64 +} + +// BudgetCheck tracks runtime budget consumption for a catch-up session. +type BudgetCheck struct { + StartTick uint64 // tick when catch-up began + EntriesReplayed uint64 // total entries replayed so far + LastProgressTick uint64 // tick of last RecordCatchUpProgress call +} + +// BudgetViolation identifies which budget limit was exceeded. +type BudgetViolation string + +const ( + BudgetOK BudgetViolation = "" + BudgetDurationExceeded BudgetViolation = "duration_exceeded" + BudgetEntriesExceeded BudgetViolation = "entries_exceeded" + BudgetProgressStalled BudgetViolation = "progress_stalled" +) + +// Check evaluates the budget against the current tick. Returns BudgetOK +// if all limits are within bounds, or the specific violation. +func (b *CatchUpBudget) Check(tracker BudgetCheck, currentTick uint64) BudgetViolation { + if b.MaxDurationTicks > 0 && currentTick-tracker.StartTick > b.MaxDurationTicks { + return BudgetDurationExceeded + } + if b.MaxEntries > 0 && tracker.EntriesReplayed > b.MaxEntries { + return BudgetEntriesExceeded + } + if b.ProgressDeadlineTicks > 0 && tracker.LastProgressTick > 0 && + currentTick-tracker.LastProgressTick > b.ProgressDeadlineTicks { + return BudgetProgressStalled + } + return BudgetOK +} diff --git a/sw-block/prototype/enginev2/phase45_test.go b/sw-block/prototype/enginev2/phase45_test.go new file mode 100644 index 000000000..97aacac8b --- /dev/null +++ b/sw-block/prototype/enginev2/phase45_test.go @@ -0,0 +1,306 @@ +package enginev2 + +import "testing" + +// ============================================================ +// Phase 4.5 P0: Bounded CatchUp + Rebuild mode state machine +// ============================================================ + +// --- CatchUp Budget --- + +func TestBudget_DurationExceeded(t *testing.T) { + b := &CatchUpBudget{MaxDurationTicks: 10} + tracker := BudgetCheck{StartTick: 0} + + if v := b.Check(tracker, 5); v != BudgetOK { + t.Fatalf("tick 5: %s", v) + } + if v := b.Check(tracker, 11); v != BudgetDurationExceeded { + t.Fatalf("tick 11: got %s, want duration_exceeded", v) + } +} + +func TestBudget_EntriesExceeded(t *testing.T) { + b := &CatchUpBudget{MaxEntries: 100} + tracker := BudgetCheck{EntriesReplayed: 50} + + if v := b.Check(tracker, 0); v != BudgetOK { + t.Fatalf("50 entries: %s", v) + } + tracker.EntriesReplayed = 101 + if v := b.Check(tracker, 0); v != BudgetEntriesExceeded { + t.Fatalf("101 entries: got %s, want entries_exceeded", v) + } +} + +func TestBudget_ProgressStalled(t *testing.T) { + b := &CatchUpBudget{ProgressDeadlineTicks: 5} + tracker := BudgetCheck{LastProgressTick: 10} + + if v := b.Check(tracker, 14); v != BudgetOK { + t.Fatalf("tick 14: %s", v) + } + if v := b.Check(tracker, 16); v != BudgetProgressStalled { + t.Fatalf("tick 16: got %s, want progress_stalled", v) + } +} + +func TestBudget_NoBudget_AlwaysOK(t *testing.T) { + b := &CatchUpBudget{} // all zeros = no limits + tracker := BudgetCheck{EntriesReplayed: 999999, StartTick: 0} + if v := b.Check(tracker, 999999); v != BudgetOK { + t.Fatalf("no budget: %s", v) + } +} + +// --- Sender budget integration --- + +func TestSender_Budget_EscalatesOnViolation(t *testing.T) { + s := NewSender("r1:9333", Endpoint{DataAddr: "r1:9333", Version: 1}, 1) + sess, _ := s.AttachSession(1, SessionCatchUp) + + // Set budget. + sess.Budget = &CatchUpBudget{MaxDurationTicks: 10} + + s.BeginConnect(sess.ID) + s.RecordHandshake(sess.ID, 0, 100) + s.BeginCatchUp(sess.ID, 0) // start at tick 0 + + s.RecordCatchUpProgress(sess.ID, 10) + + // Within budget at tick 5. + v, _ := s.CheckBudget(sess.ID, 5) + if v != BudgetOK { + t.Fatalf("tick 5: %s", v) + } + + // Exceeded at tick 11. + v, _ = s.CheckBudget(sess.ID, 11) + if v != BudgetDurationExceeded { + t.Fatalf("tick 11: got %s, want duration_exceeded", v) + } + + // Session invalidated, sender at NeedsRebuild. + if sess.Active() { + t.Fatal("session should be invalidated after budget violation") + } + if s.State != StateNeedsRebuild { + t.Fatalf("state=%s, want needs_rebuild", s.State) + } +} + +func TestSender_Budget_ProgressStall_Escalates(t *testing.T) { + s := NewSender("r1:9333", Endpoint{DataAddr: "r1:9333", Version: 1}, 1) + sess, _ := s.AttachSession(1, SessionCatchUp) + sess.Budget = &CatchUpBudget{ProgressDeadlineTicks: 5} + + s.BeginConnect(sess.ID) + s.RecordHandshake(sess.ID, 0, 100) + s.BeginCatchUp(sess.ID, 10) + + s.RecordCatchUpProgressAt(sess.ID, 20, 12) // progress at tick 12 + + // Stalled: no progress for 6 ticks (12→18 > deadline 5). + v, _ := s.CheckBudget(sess.ID, 18) + if v != BudgetProgressStalled { + t.Fatalf("tick 18: got %s, want progress_stalled", v) + } + if s.State != StateNeedsRebuild { + t.Fatalf("state=%s", s.State) + } +} + +func TestSender_NoBudget_NeverEscalates(t *testing.T) { + s := NewSender("r1:9333", Endpoint{DataAddr: "r1:9333", Version: 1}, 1) + sess, _ := s.AttachSession(1, SessionCatchUp) + // No budget set. + + s.BeginConnect(sess.ID) + s.RecordHandshake(sess.ID, 0, 100) + s.BeginCatchUp(sess.ID, 0) + + v, _ := s.CheckBudget(sess.ID, 999999) + if v != BudgetOK { + t.Fatalf("no budget: %s", v) + } + if s.State != StateCatchingUp { + t.Fatalf("state=%s", s.State) + } +} + +// --- Rebuild state machine --- + +func TestRebuild_SnapshotTail_FullLifecycle(t *testing.T) { + rs := NewRebuildState() + + // Source select: snapshot available. + rs.SelectSource(50, true, 100) + if rs.Source != RebuildSnapshotTail { + t.Fatalf("source=%s, want snapshot_tail", rs.Source) + } + if rs.TailStartLSN != 50 || rs.TailTargetLSN != 100 { + t.Fatalf("tail: start=%d target=%d", rs.TailStartLSN, rs.TailTargetLSN) + } + + // Transfer base (snapshot copy). + rs.BeginTransfer() + rs.RecordTransferProgress(50) + + // Tail replay. + rs.BeginTailReplay() + rs.RecordTailReplayProgress(75) + rs.RecordTailReplayProgress(100) + + if !rs.ReadyToComplete() { + t.Fatal("should be ready to complete") + } + rs.Complete() + if rs.Phase != RebuildPhaseCompleted { + t.Fatalf("phase=%s", rs.Phase) + } +} + +func TestRebuild_FullBase_Lifecycle(t *testing.T) { + rs := NewRebuildState() + + // No valid snapshot. + rs.SelectSource(0, false, 100) + if rs.Source != RebuildFullBase { + t.Fatalf("source=%s, want full_base", rs.Source) + } + + rs.BeginTransfer() + rs.RecordTransferProgress(50) + rs.RecordTransferProgress(100) + + // Full base: no tail replay needed. + if err := rs.BeginTailReplay(); err == nil { + t.Fatal("full base should not allow tail replay") + } + + if !rs.ReadyToComplete() { + t.Fatal("should be ready to complete") + } + rs.Complete() +} + +func TestRebuild_Abort(t *testing.T) { + rs := NewRebuildState() + rs.SelectSource(50, true, 100) + rs.BeginTransfer() + + rs.Abort("epoch_bump") + if rs.Phase != RebuildPhaseAborted { + t.Fatalf("phase=%s", rs.Phase) + } + if rs.AbortReason != "epoch_bump" { + t.Fatalf("reason=%s", rs.AbortReason) + } + + // Cannot complete after abort. + if err := rs.Complete(); err == nil { + t.Fatal("complete after abort should fail") + } +} + +func TestRebuild_PhaseOrderEnforced(t *testing.T) { + rs := NewRebuildState() + + // Cannot transfer before source select. + if err := rs.BeginTransfer(); err == nil { + t.Fatal("transfer before source select should fail") + } + + rs.SelectSource(50, true, 100) + + // Cannot tail replay before transfer. + if err := rs.BeginTailReplay(); err == nil { + t.Fatal("tail replay before transfer should fail") + } + + rs.BeginTransfer() + + // Cannot complete before reaching target. + if rs.ReadyToComplete() { + t.Fatal("should not be ready before reaching target") + } +} + +func TestRebuild_TransferRegression_Rejected(t *testing.T) { + rs := NewRebuildState() + rs.SelectSource(0, false, 100) + rs.BeginTransfer() + rs.RecordTransferProgress(50) + + if err := rs.RecordTransferProgress(30); err == nil { + t.Fatal("transfer regression should be rejected") + } +} + +// --- E2E: Bounded catch-up → budget exceeded → rebuild --- + +func TestE2E_BoundedCatchUp_EscalatesToRebuild(t *testing.T) { + primary := NewWALHistory() + for i := uint64(1); i <= 100; i++ { + primary.Append(WALEntry{LSN: i, Epoch: 1, Block: i % 8, Value: i}) + } + primary.Commit(100) + + sg := NewSenderGroup() + sg.ApplyAssignment(AssignmentIntent{ + Endpoints: map[string]Endpoint{ + "r1:9333": {DataAddr: "r1:9333", Version: 1}, + }, + Epoch: 1, + RecoveryTargets: map[string]SessionKind{"r1:9333": SessionCatchUp}, + }) + + r1 := sg.Sender("r1:9333") + sess := r1.Session() + + // Set tight budget: max 20 entries. + sess.Budget = &CatchUpBudget{MaxEntries: 20} + + r1.BeginConnect(sess.ID) + outcome, _ := r1.RecordHandshakeWithOutcome(sess.ID, primary.MakeHandshakeResult(50)) + if outcome != OutcomeCatchUp { + t.Fatalf("outcome=%s", outcome) + } + + r1.BeginCatchUp(sess.ID, 0) + + // Replay 21 entries — exceeds budget. + entries, _ := primary.EntriesInRange(50, 71) + for _, e := range entries { + r1.RecordCatchUpProgress(sess.ID, e.LSN) + } + + v, _ := r1.CheckBudget(sess.ID, 0) + if v != BudgetEntriesExceeded { + t.Fatalf("budget: %s", v) + } + if r1.State != StateNeedsRebuild { + t.Fatalf("state=%s", r1.State) + } + + // New rebuild assignment. + sg.ApplyAssignment(AssignmentIntent{ + Endpoints: map[string]Endpoint{ + "r1:9333": {DataAddr: "r1:9333", Version: 1}, + }, + Epoch: 1, + RecoveryTargets: map[string]SessionKind{"r1:9333": SessionRebuild}, + }) + + rebuildSess := r1.Session() + r1.BeginConnect(rebuildSess.ID) + r1.RecordHandshake(rebuildSess.ID, 0, 100) + r1.BeginCatchUp(rebuildSess.ID) + r1.RecordCatchUpProgress(rebuildSess.ID, 100) + r1.CompleteSessionByID(rebuildSess.ID) + + if r1.State != StateInSync { + t.Fatalf("after rebuild: state=%s", r1.State) + } + t.Logf("bounded catch-up → budget exceeded → rebuild → InSync") +} diff --git a/sw-block/prototype/enginev2/rebuild.go b/sw-block/prototype/enginev2/rebuild.go new file mode 100644 index 000000000..e4dc7b2fb --- /dev/null +++ b/sw-block/prototype/enginev2/rebuild.go @@ -0,0 +1,152 @@ +package enginev2 + +import "fmt" + +// RebuildSource identifies the recovery base for a rebuild session. +type RebuildSource string + +const ( + // RebuildSnapshotTail uses a trusted base snapshot/checkpoint and + // replays the retained WAL tail from that point to CommittedLSN. + // Preferred path — avoids copying the full extent. + RebuildSnapshotTail RebuildSource = "snapshot_tail" + + // RebuildFullBase copies the full extent from the primary (or a + // snapshot source) when no acceptable base snapshot exists. + // Fallback path — expensive but always available. + RebuildFullBase RebuildSource = "full_base" +) + +// RebuildPhase tracks progress within a rebuild session. +type RebuildPhase string + +const ( + RebuildPhaseInit RebuildPhase = "init" + RebuildPhaseSourceSelect RebuildPhase = "source_select" // choosing snapshot vs full + RebuildPhaseTransfer RebuildPhase = "transfer" // copying base data + RebuildPhaseTailReplay RebuildPhase = "tail_replay" // replaying WAL tail after snapshot + RebuildPhaseCompleted RebuildPhase = "completed" + RebuildPhaseAborted RebuildPhase = "aborted" +) + +// RebuildState tracks the execution state of a rebuild session. +// Owned by the Sender's RecoverySession when Kind == SessionRebuild. +type RebuildState struct { + Source RebuildSource + Phase RebuildPhase + AbortReason string + + // Source selection inputs. + SnapshotLSN uint64 // LSN of the best available snapshot (0 = none) + SnapshotValid bool // whether the snapshot is trustworthy + + // Transfer progress. + TransferredTo uint64 // highest LSN transferred (base or extent copy) + + // Tail replay progress (snapshot_tail mode only). + TailStartLSN uint64 // start of WAL tail replay (= snapshot LSN) + TailTargetLSN uint64 // committed boundary + TailReplayedTo uint64 // highest LSN replayed from tail +} + +// NewRebuildState creates a rebuild state in the init phase. +func NewRebuildState() *RebuildState { + return &RebuildState{Phase: RebuildPhaseInit} +} + +// SelectSource chooses the rebuild source based on snapshot availability. +// If a valid snapshot exists, uses snapshot+tail (cheaper). +// Otherwise, uses full base rebuild. +func (rs *RebuildState) SelectSource(snapshotLSN uint64, snapshotValid bool, committedLSN uint64) error { + if rs.Phase != RebuildPhaseInit { + return fmt.Errorf("rebuild: source select requires init phase, got %s", rs.Phase) + } + rs.SnapshotLSN = snapshotLSN + rs.SnapshotValid = snapshotValid + rs.Phase = RebuildPhaseSourceSelect + + if snapshotValid && snapshotLSN > 0 { + rs.Source = RebuildSnapshotTail + rs.TailStartLSN = snapshotLSN + rs.TailTargetLSN = committedLSN + } else { + rs.Source = RebuildFullBase + rs.TailTargetLSN = committedLSN + } + return nil +} + +// BeginTransfer starts the base data transfer phase. +func (rs *RebuildState) BeginTransfer() error { + if rs.Phase != RebuildPhaseSourceSelect { + return fmt.Errorf("rebuild: transfer requires source_select phase, got %s", rs.Phase) + } + rs.Phase = RebuildPhaseTransfer + return nil +} + +// RecordTransferProgress records how much base data has been transferred. +func (rs *RebuildState) RecordTransferProgress(transferredTo uint64) error { + if rs.Phase != RebuildPhaseTransfer { + return fmt.Errorf("rebuild: progress requires transfer phase, got %s", rs.Phase) + } + if transferredTo <= rs.TransferredTo { + return fmt.Errorf("rebuild: transfer regression: %d <= %d", transferredTo, rs.TransferredTo) + } + rs.TransferredTo = transferredTo + return nil +} + +// BeginTailReplay transitions to tail replay after base transfer (snapshot_tail only). +func (rs *RebuildState) BeginTailReplay() error { + if rs.Phase != RebuildPhaseTransfer { + return fmt.Errorf("rebuild: tail replay requires transfer phase, got %s", rs.Phase) + } + if rs.Source != RebuildSnapshotTail { + return fmt.Errorf("rebuild: tail replay only valid for snapshot_tail source") + } + rs.Phase = RebuildPhaseTailReplay + return nil +} + +// RecordTailReplayProgress records WAL tail replay progress. +func (rs *RebuildState) RecordTailReplayProgress(replayedTo uint64) error { + if rs.Phase != RebuildPhaseTailReplay { + return fmt.Errorf("rebuild: tail progress requires tail_replay phase, got %s", rs.Phase) + } + if replayedTo <= rs.TailReplayedTo { + return fmt.Errorf("rebuild: tail regression: %d <= %d", replayedTo, rs.TailReplayedTo) + } + rs.TailReplayedTo = replayedTo + return nil +} + +// ReadyToComplete checks whether the rebuild has reached its target. +func (rs *RebuildState) ReadyToComplete() bool { + switch rs.Source { + case RebuildSnapshotTail: + return rs.Phase == RebuildPhaseTailReplay && rs.TailReplayedTo >= rs.TailTargetLSN + case RebuildFullBase: + return rs.Phase == RebuildPhaseTransfer && rs.TransferredTo >= rs.TailTargetLSN + default: + return false + } +} + +// Complete marks the rebuild as completed. +func (rs *RebuildState) Complete() error { + if !rs.ReadyToComplete() { + return fmt.Errorf("rebuild: not ready to complete (source=%s phase=%s)", rs.Source, rs.Phase) + } + rs.Phase = RebuildPhaseCompleted + return nil +} + +// Abort marks the rebuild as aborted with a reason. +func (rs *RebuildState) Abort(reason string) { + if rs.Phase == RebuildPhaseCompleted || rs.Phase == RebuildPhaseAborted { + return + } + rs.Phase = RebuildPhaseAborted + rs.AbortReason = reason +} diff --git a/sw-block/prototype/enginev2/sender.go b/sw-block/prototype/enginev2/sender.go index b82941308..9dd66209a 100644 --- a/sw-block/prototype/enginev2/sender.go +++ b/sw-block/prototype/enginev2/sender.go @@ -294,8 +294,9 @@ func (s *Sender) RecordTruncation(sessionID uint64, truncatedToLSN uint64) error // BeginCatchUp transitions the session from handshake to catch-up phase. // Mutates: session.Phase → PhaseCatchUp. Sender.State → StateCatchingUp. +// Initializes budget tracker with startTick (pass 0 if no budget enforcement). // Rejects: wrong sessionID, wrong phase. -func (s *Sender) BeginCatchUp(sessionID uint64) error { +func (s *Sender) BeginCatchUp(sessionID uint64, startTick ...uint64) error { s.mu.Lock() defer s.mu.Unlock() if err := s.checkSessionAuthority(sessionID); err != nil { @@ -305,11 +306,15 @@ func (s *Sender) BeginCatchUp(sessionID uint64) error { return fmt.Errorf("cannot begin catch-up: session phase=%s", s.session.Phase) } s.State = StateCatchingUp + if len(startTick) > 0 { + s.session.Tracker.StartTick = startTick[0] + s.session.Tracker.LastProgressTick = startTick[0] + } return nil } // RecordCatchUpProgress records catch-up progress (highest LSN recovered). -// Mutates: session.RecoveredTo (monotonic only). +// Mutates: session.RecoveredTo (monotonic only), session.Tracker.EntriesReplayed. // Rejects: wrong sessionID, wrong phase, progress regression, invalidated session. func (s *Sender) RecordCatchUpProgress(sessionID uint64, recoveredTo uint64) error { s.mu.Lock() @@ -324,9 +329,52 @@ func (s *Sender) RecordCatchUpProgress(sessionID uint64, recoveredTo uint64) err return fmt.Errorf("progress regression: current=%d proposed=%d", s.session.RecoveredTo, recoveredTo) } s.session.UpdateProgress(recoveredTo) + s.session.Tracker.EntriesReplayed++ + return nil +} + +// RecordCatchUpProgressAt is like RecordCatchUpProgress but also records the tick +// for budget stall detection. +func (s *Sender) RecordCatchUpProgressAt(sessionID uint64, recoveredTo uint64, tick uint64) error { + s.mu.Lock() + defer s.mu.Unlock() + if err := s.checkSessionAuthority(sessionID); err != nil { + return err + } + if s.session.Phase != PhaseCatchUp { + return fmt.Errorf("cannot record progress: session phase=%s, want catchup", s.session.Phase) + } + if recoveredTo <= s.session.RecoveredTo { + return fmt.Errorf("progress regression: current=%d proposed=%d", s.session.RecoveredTo, recoveredTo) + } + s.session.UpdateProgress(recoveredTo) + s.session.Tracker.EntriesReplayed++ + s.session.Tracker.LastProgressTick = tick return nil } +// CheckBudget evaluates the session's catch-up budget at the current tick. +// Returns BudgetOK if within bounds, or the specific violation. +// If a violation is detected, the session is invalidated and the sender +// transitions to NeedsRebuild. +func (s *Sender) CheckBudget(sessionID uint64, currentTick uint64) (BudgetViolation, error) { + s.mu.Lock() + defer s.mu.Unlock() + if err := s.checkSessionAuthority(sessionID); err != nil { + return BudgetOK, err + } + if s.session.Budget == nil { + return BudgetOK, nil // no budget enforcement + } + violation := s.session.Budget.Check(s.session.Tracker, currentTick) + if violation != BudgetOK { + s.session.invalidate(fmt.Sprintf("budget_%s", violation)) + s.session = nil + s.State = StateNeedsRebuild + } + return violation, nil +} + // checkSessionAuthority validates that the sender has an active session // matching the given ID. Must be called with s.mu held. func (s *Sender) checkSessionAuthority(sessionID uint64) error { diff --git a/sw-block/prototype/enginev2/session.go b/sw-block/prototype/enginev2/session.go index f88418c7d..3920b80c8 100644 --- a/sw-block/prototype/enginev2/session.go +++ b/sw-block/prototype/enginev2/session.go @@ -63,6 +63,13 @@ type RecoverySession struct { TruncateRequired bool // true if replica FlushedLSN > CommittedLSN at handshake TruncateToLSN uint64 // truncate entries beyond this LSN TruncateRecorded bool // true after truncation is confirmed + + // Bounded CatchUp budget (Phase 4.5). + Budget *CatchUpBudget // nil = no budget enforcement + Tracker BudgetCheck // runtime consumption + + // Rebuild state (Phase 4.5). Non-nil when Kind == SessionRebuild. + Rebuild *RebuildState } func newRecoverySession(replicaID string, epoch uint64, kind SessionKind) *RecoverySession {