From 5b66a85f9250a4d7bd407061d7b1849b0f62ceb4 Mon Sep 17 00:00:00 2001 From: pingqiu Date: Sun, 29 Mar 2026 15:16:56 -0700 Subject: [PATCH] fix: wire rebuild FSM into sender, enforce frozen target, fix entry counting Rebuild execution path: - newRecoverySession auto-initializes RebuildState for SessionRebuild - Sender rebuild APIs: SelectRebuildSource, BeginRebuildTransfer, RecordRebuildTransferProgress, BeginRebuildTailReplay, RecordRebuildTailProgress, CompleteRebuild - All rebuild APIs are sender-authority-gated by sessionID - E2E rebuild test now drives through rebuild FSM, not catch-up APIs Bounded CatchUp enforcement: - BeginCatchUp freezes TargetLSNAtStart from session.TargetLSN - RecordCatchUpProgress rejects progress beyond frozen target - Entry counting uses LSN delta (recoveredTo - previous), not call count - Merged RecordCatchUpProgressAt into RecordCatchUpProgress (tick param) 5 new tests: target-frozen enforcement, sender-level rebuild via rebuild APIs, reject non-rebuild, reject stale ID on rebuild. Co-Authored-By: Claude Opus 4.6 (1M context) --- sw-block/prototype/enginev2/phase45_test.go | 131 ++++++++++++++++++- sw-block/prototype/enginev2/sender.go | 138 ++++++++++++++++---- sw-block/prototype/enginev2/session.go | 6 +- 3 files changed, 246 insertions(+), 29 deletions(-) diff --git a/sw-block/prototype/enginev2/phase45_test.go b/sw-block/prototype/enginev2/phase45_test.go index 97aacac8b..9bb145ed7 100644 --- a/sw-block/prototype/enginev2/phase45_test.go +++ b/sw-block/prototype/enginev2/phase45_test.go @@ -98,7 +98,7 @@ func TestSender_Budget_ProgressStall_Escalates(t *testing.T) { s.RecordHandshake(sess.ID, 0, 100) s.BeginCatchUp(sess.ID, 10) - s.RecordCatchUpProgressAt(sess.ID, 20, 12) // progress at tick 12 + s.RecordCatchUpProgress(sess.ID, 20, 12) // progress at tick 12 // Stalled: no progress for 6 ticks (12→18 > deadline 5). v, _ := s.CheckBudget(sess.ID, 18) @@ -237,6 +237,116 @@ func TestRebuild_TransferRegression_Rejected(t *testing.T) { } } +// --- Target-frozen enforcement --- + +func TestSender_TargetFrozen_RejectsProgressBeyond(t *testing.T) { + s := NewSender("r1:9333", Endpoint{DataAddr: "r1:9333", Version: 1}, 1) + sess, _ := s.AttachSession(1, SessionCatchUp) + sess.Budget = &CatchUpBudget{} // budget present = target freezes + + s.BeginConnect(sess.ID) + s.RecordHandshake(sess.ID, 50, 100) // target = 100 + s.BeginCatchUp(sess.ID) + + // Budget.TargetLSNAtStart should be frozen to 100. + if sess.Budget.TargetLSNAtStart != 100 { + t.Fatalf("frozen target=%d, want 100", sess.Budget.TargetLSNAtStart) + } + + // Progress within target works. + if err := s.RecordCatchUpProgress(sess.ID, 80); err != nil { + t.Fatalf("progress to 80: %v", err) + } + + // Progress AT target works. + if err := s.RecordCatchUpProgress(sess.ID, 100); err != nil { + t.Fatalf("progress to 100: %v", err) + } + + // Progress BEYOND frozen target — rejected. + if err := s.RecordCatchUpProgress(sess.ID, 101); err == nil { + t.Fatal("progress beyond frozen target should be rejected") + } +} + +func TestSender_NoBudget_TargetNotFrozen(t *testing.T) { + s := NewSender("r1:9333", Endpoint{DataAddr: "r1:9333", Version: 1}, 1) + sess, _ := s.AttachSession(1, SessionCatchUp) + // No budget = no target freeze. + + s.BeginConnect(sess.ID) + s.RecordHandshake(sess.ID, 50, 100) + s.BeginCatchUp(sess.ID) + + // Without budget, no frozen target. Progress beyond 100 is allowed. + s.RecordCatchUpProgress(sess.ID, 100) + // Session target can be manually updated if needed (no freeze enforced). +} + +// --- Sender-level rebuild test --- + +func TestSender_RebuildViaRebuildAPIs(t *testing.T) { + s := NewSender("r1:9333", Endpoint{DataAddr: "r1:9333", Version: 1}, 1) + sess, _ := s.AttachSession(1, SessionRebuild) + + if sess.Rebuild == nil { + t.Fatal("rebuild session should auto-initialize RebuildState") + } + + s.BeginConnect(sess.ID) + s.RecordHandshake(sess.ID, 0, 100) + + // Select source via sender. + if err := s.SelectRebuildSource(sess.ID, 40, true, 100); err != nil { + t.Fatalf("select source: %v", err) + } + if sess.Rebuild.Source != RebuildSnapshotTail { + t.Fatalf("source=%s", sess.Rebuild.Source) + } + + // Transfer via sender. + s.BeginRebuildTransfer(sess.ID) + s.RecordRebuildTransferProgress(sess.ID, 40) + + // Tail replay via sender. + s.BeginRebuildTailReplay(sess.ID) + s.RecordRebuildTailProgress(sess.ID, 100) + + // Complete via sender. + if err := s.CompleteRebuild(sess.ID); err != nil { + t.Fatalf("complete rebuild: %v", err) + } + if s.State != StateInSync { + t.Fatalf("state=%s, want in_sync", s.State) + } + if s.Session() != nil { + t.Fatal("session should be nil after rebuild completion") + } +} + +func TestSender_RebuildAPIs_RejectNonRebuild(t *testing.T) { + s := NewSender("r1:9333", Endpoint{DataAddr: "r1:9333", Version: 1}, 1) + sess, _ := s.AttachSession(1, SessionCatchUp) // NOT rebuild + s.BeginConnect(sess.ID) + s.RecordHandshake(sess.ID, 0, 100) + + if err := s.SelectRebuildSource(sess.ID, 40, true, 100); err == nil { + t.Fatal("rebuild API should reject non-rebuild session") + } +} + +func TestSender_RebuildAPIs_RejectStaleID(t *testing.T) { + s := NewSender("r1:9333", Endpoint{DataAddr: "r1:9333", Version: 1}, 1) + sess, _ := s.AttachSession(1, SessionRebuild) + oldID := sess.ID + s.UpdateEpoch(2) + s.AttachSession(2, SessionRebuild) + + if err := s.SelectRebuildSource(oldID, 40, true, 100); err == nil { + t.Fatal("stale sessionID should be rejected by rebuild APIs") + } +} + // --- E2E: Bounded catch-up → budget exceeded → rebuild --- func TestE2E_BoundedCatchUp_EscalatesToRebuild(t *testing.T) { @@ -293,11 +403,24 @@ func TestE2E_BoundedCatchUp_EscalatesToRebuild(t *testing.T) { }) rebuildSess := r1.Session() + if rebuildSess.Rebuild == nil { + t.Fatal("rebuild session should have RebuildState initialized") + } + + // Drive rebuild through the sender-owned rebuild path. r1.BeginConnect(rebuildSess.ID) r1.RecordHandshake(rebuildSess.ID, 0, 100) - r1.BeginCatchUp(rebuildSess.ID) - r1.RecordCatchUpProgress(rebuildSess.ID, 100) - r1.CompleteSessionByID(rebuildSess.ID) + + // Select snapshot+tail source. + r1.SelectRebuildSource(rebuildSess.ID, 30, true, 100) + r1.BeginRebuildTransfer(rebuildSess.ID) + r1.RecordRebuildTransferProgress(rebuildSess.ID, 30) + r1.BeginRebuildTailReplay(rebuildSess.ID) + r1.RecordRebuildTailProgress(rebuildSess.ID, 100) + + if err := r1.CompleteRebuild(rebuildSess.ID); err != nil { + t.Fatalf("rebuild completion: %v", err) + } if r1.State != StateInSync { t.Fatalf("after rebuild: state=%s", r1.State) diff --git a/sw-block/prototype/enginev2/sender.go b/sw-block/prototype/enginev2/sender.go index 9dd66209a..3b1d92da7 100644 --- a/sw-block/prototype/enginev2/sender.go +++ b/sw-block/prototype/enginev2/sender.go @@ -293,8 +293,9 @@ func (s *Sender) RecordTruncation(sessionID uint64, truncatedToLSN uint64) error } // BeginCatchUp transitions the session from handshake to catch-up phase. +// Freezes TargetLSNAtStart from the session's TargetLSN — catch-up will not +// chase a moving head beyond this boundary. // Mutates: session.Phase → PhaseCatchUp. Sender.State → StateCatchingUp. -// Initializes budget tracker with startTick (pass 0 if no budget enforcement). // Rejects: wrong sessionID, wrong phase. func (s *Sender) BeginCatchUp(sessionID uint64, startTick ...uint64) error { s.mu.Lock() @@ -306,6 +307,10 @@ func (s *Sender) BeginCatchUp(sessionID uint64, startTick ...uint64) error { return fmt.Errorf("cannot begin catch-up: session phase=%s", s.session.Phase) } s.State = StateCatchingUp + // Freeze the target: catch-up will not chase beyond this. + if s.session.Budget != nil { + s.session.Budget.TargetLSNAtStart = s.session.TargetLSN + } if len(startTick) > 0 { s.session.Tracker.StartTick = startTick[0] s.session.Tracker.LastProgressTick = startTick[0] @@ -314,9 +319,12 @@ func (s *Sender) BeginCatchUp(sessionID uint64, startTick ...uint64) error { } // RecordCatchUpProgress records catch-up progress (highest LSN recovered). -// Mutates: session.RecoveredTo (monotonic only), session.Tracker.EntriesReplayed. -// Rejects: wrong sessionID, wrong phase, progress regression, invalidated session. -func (s *Sender) RecordCatchUpProgress(sessionID uint64, recoveredTo uint64) error { +// Uses LSN delta (recoveredTo - previous RecoveredTo) for entry counting, +// not per-call increment. This ensures MaxEntries budget is accurate even +// with batched reporting. +// Rejects: wrong sessionID, wrong phase, progress regression, progress +// beyond frozen TargetLSNAtStart (if budget is set). +func (s *Sender) RecordCatchUpProgress(sessionID uint64, recoveredTo uint64, tick ...uint64) error { s.mu.Lock() defer s.mu.Unlock() if err := s.checkSessionAuthority(sessionID); err != nil { @@ -328,28 +336,19 @@ func (s *Sender) RecordCatchUpProgress(sessionID uint64, recoveredTo uint64) err if recoveredTo <= s.session.RecoveredTo { return fmt.Errorf("progress regression: current=%d proposed=%d", s.session.RecoveredTo, recoveredTo) } - s.session.UpdateProgress(recoveredTo) - s.session.Tracker.EntriesReplayed++ - return nil -} - -// RecordCatchUpProgressAt is like RecordCatchUpProgress but also records the tick -// for budget stall detection. -func (s *Sender) RecordCatchUpProgressAt(sessionID uint64, recoveredTo uint64, tick uint64) error { - s.mu.Lock() - defer s.mu.Unlock() - if err := s.checkSessionAuthority(sessionID); err != nil { - return err - } - if s.session.Phase != PhaseCatchUp { - return fmt.Errorf("cannot record progress: session phase=%s, want catchup", s.session.Phase) - } - if recoveredTo <= s.session.RecoveredTo { - return fmt.Errorf("progress regression: current=%d proposed=%d", s.session.RecoveredTo, recoveredTo) + // Enforce frozen target: reject progress beyond the contract boundary. + if s.session.Budget != nil && s.session.Budget.TargetLSNAtStart > 0 && + recoveredTo > s.session.Budget.TargetLSNAtStart { + return fmt.Errorf("progress %d exceeds frozen target %d", + recoveredTo, s.session.Budget.TargetLSNAtStart) } + // Entry counting by LSN delta, not call count. + delta := recoveredTo - s.session.RecoveredTo + s.session.Tracker.EntriesReplayed += delta s.session.UpdateProgress(recoveredTo) - s.session.Tracker.EntriesReplayed++ - s.session.Tracker.LastProgressTick = tick + if len(tick) > 0 { + s.session.Tracker.LastProgressTick = tick[0] + } return nil } @@ -375,6 +374,97 @@ func (s *Sender) CheckBudget(sessionID uint64, currentTick uint64) (BudgetViolat return violation, nil } +// === Rebuild execution APIs (sender-owned) === + +// SelectRebuildSource chooses the rebuild source and initializes the rebuild FSM. +// Requires: active rebuild session, sessionID match, session in PhaseHandshake. +func (s *Sender) SelectRebuildSource(sessionID uint64, snapshotLSN uint64, snapshotValid bool, committedLSN uint64) error { + s.mu.Lock() + defer s.mu.Unlock() + if err := s.checkSessionAuthority(sessionID); err != nil { + return err + } + if s.session.Kind != SessionRebuild { + return fmt.Errorf("not a rebuild session (kind=%s)", s.session.Kind) + } + if s.session.Rebuild == nil { + return fmt.Errorf("rebuild state not initialized") + } + return s.session.Rebuild.SelectSource(snapshotLSN, snapshotValid, committedLSN) +} + +// BeginRebuildTransfer starts the base data transfer phase. +func (s *Sender) BeginRebuildTransfer(sessionID uint64) error { + s.mu.Lock() + defer s.mu.Unlock() + if err := s.checkSessionAuthority(sessionID); err != nil { + return err + } + if s.session.Rebuild == nil { + return fmt.Errorf("no rebuild state") + } + return s.session.Rebuild.BeginTransfer() +} + +// RecordRebuildTransferProgress records base transfer progress. +func (s *Sender) RecordRebuildTransferProgress(sessionID uint64, transferredTo uint64) error { + s.mu.Lock() + defer s.mu.Unlock() + if err := s.checkSessionAuthority(sessionID); err != nil { + return err + } + if s.session.Rebuild == nil { + return fmt.Errorf("no rebuild state") + } + return s.session.Rebuild.RecordTransferProgress(transferredTo) +} + +// BeginRebuildTailReplay starts WAL tail replay after base transfer (snapshot+tail only). +func (s *Sender) BeginRebuildTailReplay(sessionID uint64) error { + s.mu.Lock() + defer s.mu.Unlock() + if err := s.checkSessionAuthority(sessionID); err != nil { + return err + } + if s.session.Rebuild == nil { + return fmt.Errorf("no rebuild state") + } + return s.session.Rebuild.BeginTailReplay() +} + +// RecordRebuildTailProgress records WAL tail replay progress. +func (s *Sender) RecordRebuildTailProgress(sessionID uint64, replayedTo uint64) error { + s.mu.Lock() + defer s.mu.Unlock() + if err := s.checkSessionAuthority(sessionID); err != nil { + return err + } + if s.session.Rebuild == nil { + return fmt.Errorf("no rebuild state") + } + return s.session.Rebuild.RecordTailReplayProgress(replayedTo) +} + +// CompleteRebuild completes the rebuild session and transitions the sender +// to InSync. Requires the rebuild FSM to be ReadyToComplete. +func (s *Sender) CompleteRebuild(sessionID uint64) error { + s.mu.Lock() + defer s.mu.Unlock() + if err := s.checkSessionAuthority(sessionID); err != nil { + return err + } + if s.session.Rebuild == nil { + return fmt.Errorf("no rebuild state") + } + if err := s.session.Rebuild.Complete(); err != nil { + return err + } + s.session.complete() + s.session = nil + s.State = StateInSync + return nil +} + // checkSessionAuthority validates that the sender has an active session // matching the given ID. Must be called with s.mu held. func (s *Sender) checkSessionAuthority(sessionID uint64) error { diff --git a/sw-block/prototype/enginev2/session.go b/sw-block/prototype/enginev2/session.go index 3920b80c8..f4eb3d5c4 100644 --- a/sw-block/prototype/enginev2/session.go +++ b/sw-block/prototype/enginev2/session.go @@ -73,13 +73,17 @@ type RecoverySession struct { } func newRecoverySession(replicaID string, epoch uint64, kind SessionKind) *RecoverySession { - return &RecoverySession{ + rs := &RecoverySession{ ID: sessionIDCounter.Add(1), ReplicaID: replicaID, Epoch: epoch, Kind: kind, Phase: PhaseInit, } + if kind == SessionRebuild { + rs.Rebuild = NewRebuildState() + } + return rs } // Active returns true if the session has not been completed or invalidated.