Browse Source

fix: wire rebuild FSM into sender, enforce frozen target, fix entry counting

Rebuild execution path:
- newRecoverySession auto-initializes RebuildState for SessionRebuild
- Sender rebuild APIs: SelectRebuildSource, BeginRebuildTransfer,
  RecordRebuildTransferProgress, BeginRebuildTailReplay,
  RecordRebuildTailProgress, CompleteRebuild
- All rebuild APIs are sender-authority-gated by sessionID
- E2E rebuild test now drives through rebuild FSM, not catch-up APIs

Bounded CatchUp enforcement:
- BeginCatchUp freezes TargetLSNAtStart from session.TargetLSN
- RecordCatchUpProgress rejects progress beyond frozen target
- Entry counting uses LSN delta (recoveredTo - previous), not call count
- Merged RecordCatchUpProgressAt into RecordCatchUpProgress (tick param)

5 new tests: target-frozen enforcement, sender-level rebuild via
rebuild APIs, reject non-rebuild, reject stale ID on rebuild.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
feature/sw-block
pingqiu 2 days ago
parent
commit
5b66a85f92
  1. 131
      sw-block/prototype/enginev2/phase45_test.go
  2. 138
      sw-block/prototype/enginev2/sender.go
  3. 6
      sw-block/prototype/enginev2/session.go

131
sw-block/prototype/enginev2/phase45_test.go

@ -98,7 +98,7 @@ func TestSender_Budget_ProgressStall_Escalates(t *testing.T) {
s.RecordHandshake(sess.ID, 0, 100)
s.BeginCatchUp(sess.ID, 10)
s.RecordCatchUpProgressAt(sess.ID, 20, 12) // progress at tick 12
s.RecordCatchUpProgress(sess.ID, 20, 12) // progress at tick 12
// Stalled: no progress for 6 ticks (12→18 > deadline 5).
v, _ := s.CheckBudget(sess.ID, 18)
@ -237,6 +237,116 @@ func TestRebuild_TransferRegression_Rejected(t *testing.T) {
}
}
// --- Target-frozen enforcement ---
func TestSender_TargetFrozen_RejectsProgressBeyond(t *testing.T) {
s := NewSender("r1:9333", Endpoint{DataAddr: "r1:9333", Version: 1}, 1)
sess, _ := s.AttachSession(1, SessionCatchUp)
sess.Budget = &CatchUpBudget{} // budget present = target freezes
s.BeginConnect(sess.ID)
s.RecordHandshake(sess.ID, 50, 100) // target = 100
s.BeginCatchUp(sess.ID)
// Budget.TargetLSNAtStart should be frozen to 100.
if sess.Budget.TargetLSNAtStart != 100 {
t.Fatalf("frozen target=%d, want 100", sess.Budget.TargetLSNAtStart)
}
// Progress within target works.
if err := s.RecordCatchUpProgress(sess.ID, 80); err != nil {
t.Fatalf("progress to 80: %v", err)
}
// Progress AT target works.
if err := s.RecordCatchUpProgress(sess.ID, 100); err != nil {
t.Fatalf("progress to 100: %v", err)
}
// Progress BEYOND frozen target — rejected.
if err := s.RecordCatchUpProgress(sess.ID, 101); err == nil {
t.Fatal("progress beyond frozen target should be rejected")
}
}
func TestSender_NoBudget_TargetNotFrozen(t *testing.T) {
s := NewSender("r1:9333", Endpoint{DataAddr: "r1:9333", Version: 1}, 1)
sess, _ := s.AttachSession(1, SessionCatchUp)
// No budget = no target freeze.
s.BeginConnect(sess.ID)
s.RecordHandshake(sess.ID, 50, 100)
s.BeginCatchUp(sess.ID)
// Without budget, no frozen target. Progress beyond 100 is allowed.
s.RecordCatchUpProgress(sess.ID, 100)
// Session target can be manually updated if needed (no freeze enforced).
}
// --- Sender-level rebuild test ---
func TestSender_RebuildViaRebuildAPIs(t *testing.T) {
s := NewSender("r1:9333", Endpoint{DataAddr: "r1:9333", Version: 1}, 1)
sess, _ := s.AttachSession(1, SessionRebuild)
if sess.Rebuild == nil {
t.Fatal("rebuild session should auto-initialize RebuildState")
}
s.BeginConnect(sess.ID)
s.RecordHandshake(sess.ID, 0, 100)
// Select source via sender.
if err := s.SelectRebuildSource(sess.ID, 40, true, 100); err != nil {
t.Fatalf("select source: %v", err)
}
if sess.Rebuild.Source != RebuildSnapshotTail {
t.Fatalf("source=%s", sess.Rebuild.Source)
}
// Transfer via sender.
s.BeginRebuildTransfer(sess.ID)
s.RecordRebuildTransferProgress(sess.ID, 40)
// Tail replay via sender.
s.BeginRebuildTailReplay(sess.ID)
s.RecordRebuildTailProgress(sess.ID, 100)
// Complete via sender.
if err := s.CompleteRebuild(sess.ID); err != nil {
t.Fatalf("complete rebuild: %v", err)
}
if s.State != StateInSync {
t.Fatalf("state=%s, want in_sync", s.State)
}
if s.Session() != nil {
t.Fatal("session should be nil after rebuild completion")
}
}
func TestSender_RebuildAPIs_RejectNonRebuild(t *testing.T) {
s := NewSender("r1:9333", Endpoint{DataAddr: "r1:9333", Version: 1}, 1)
sess, _ := s.AttachSession(1, SessionCatchUp) // NOT rebuild
s.BeginConnect(sess.ID)
s.RecordHandshake(sess.ID, 0, 100)
if err := s.SelectRebuildSource(sess.ID, 40, true, 100); err == nil {
t.Fatal("rebuild API should reject non-rebuild session")
}
}
func TestSender_RebuildAPIs_RejectStaleID(t *testing.T) {
s := NewSender("r1:9333", Endpoint{DataAddr: "r1:9333", Version: 1}, 1)
sess, _ := s.AttachSession(1, SessionRebuild)
oldID := sess.ID
s.UpdateEpoch(2)
s.AttachSession(2, SessionRebuild)
if err := s.SelectRebuildSource(oldID, 40, true, 100); err == nil {
t.Fatal("stale sessionID should be rejected by rebuild APIs")
}
}
// --- E2E: Bounded catch-up → budget exceeded → rebuild ---
func TestE2E_BoundedCatchUp_EscalatesToRebuild(t *testing.T) {
@ -293,11 +403,24 @@ func TestE2E_BoundedCatchUp_EscalatesToRebuild(t *testing.T) {
})
rebuildSess := r1.Session()
if rebuildSess.Rebuild == nil {
t.Fatal("rebuild session should have RebuildState initialized")
}
// Drive rebuild through the sender-owned rebuild path.
r1.BeginConnect(rebuildSess.ID)
r1.RecordHandshake(rebuildSess.ID, 0, 100)
r1.BeginCatchUp(rebuildSess.ID)
r1.RecordCatchUpProgress(rebuildSess.ID, 100)
r1.CompleteSessionByID(rebuildSess.ID)
// Select snapshot+tail source.
r1.SelectRebuildSource(rebuildSess.ID, 30, true, 100)
r1.BeginRebuildTransfer(rebuildSess.ID)
r1.RecordRebuildTransferProgress(rebuildSess.ID, 30)
r1.BeginRebuildTailReplay(rebuildSess.ID)
r1.RecordRebuildTailProgress(rebuildSess.ID, 100)
if err := r1.CompleteRebuild(rebuildSess.ID); err != nil {
t.Fatalf("rebuild completion: %v", err)
}
if r1.State != StateInSync {
t.Fatalf("after rebuild: state=%s", r1.State)

138
sw-block/prototype/enginev2/sender.go

@ -293,8 +293,9 @@ func (s *Sender) RecordTruncation(sessionID uint64, truncatedToLSN uint64) error
}
// BeginCatchUp transitions the session from handshake to catch-up phase.
// Freezes TargetLSNAtStart from the session's TargetLSN — catch-up will not
// chase a moving head beyond this boundary.
// Mutates: session.Phase → PhaseCatchUp. Sender.State → StateCatchingUp.
// Initializes budget tracker with startTick (pass 0 if no budget enforcement).
// Rejects: wrong sessionID, wrong phase.
func (s *Sender) BeginCatchUp(sessionID uint64, startTick ...uint64) error {
s.mu.Lock()
@ -306,6 +307,10 @@ func (s *Sender) BeginCatchUp(sessionID uint64, startTick ...uint64) error {
return fmt.Errorf("cannot begin catch-up: session phase=%s", s.session.Phase)
}
s.State = StateCatchingUp
// Freeze the target: catch-up will not chase beyond this.
if s.session.Budget != nil {
s.session.Budget.TargetLSNAtStart = s.session.TargetLSN
}
if len(startTick) > 0 {
s.session.Tracker.StartTick = startTick[0]
s.session.Tracker.LastProgressTick = startTick[0]
@ -314,9 +319,12 @@ func (s *Sender) BeginCatchUp(sessionID uint64, startTick ...uint64) error {
}
// RecordCatchUpProgress records catch-up progress (highest LSN recovered).
// Mutates: session.RecoveredTo (monotonic only), session.Tracker.EntriesReplayed.
// Rejects: wrong sessionID, wrong phase, progress regression, invalidated session.
func (s *Sender) RecordCatchUpProgress(sessionID uint64, recoveredTo uint64) error {
// Uses LSN delta (recoveredTo - previous RecoveredTo) for entry counting,
// not per-call increment. This ensures MaxEntries budget is accurate even
// with batched reporting.
// Rejects: wrong sessionID, wrong phase, progress regression, progress
// beyond frozen TargetLSNAtStart (if budget is set).
func (s *Sender) RecordCatchUpProgress(sessionID uint64, recoveredTo uint64, tick ...uint64) error {
s.mu.Lock()
defer s.mu.Unlock()
if err := s.checkSessionAuthority(sessionID); err != nil {
@ -328,28 +336,19 @@ func (s *Sender) RecordCatchUpProgress(sessionID uint64, recoveredTo uint64) err
if recoveredTo <= s.session.RecoveredTo {
return fmt.Errorf("progress regression: current=%d proposed=%d", s.session.RecoveredTo, recoveredTo)
}
s.session.UpdateProgress(recoveredTo)
s.session.Tracker.EntriesReplayed++
return nil
}
// RecordCatchUpProgressAt is like RecordCatchUpProgress but also records the tick
// for budget stall detection.
func (s *Sender) RecordCatchUpProgressAt(sessionID uint64, recoveredTo uint64, tick uint64) error {
s.mu.Lock()
defer s.mu.Unlock()
if err := s.checkSessionAuthority(sessionID); err != nil {
return err
}
if s.session.Phase != PhaseCatchUp {
return fmt.Errorf("cannot record progress: session phase=%s, want catchup", s.session.Phase)
}
if recoveredTo <= s.session.RecoveredTo {
return fmt.Errorf("progress regression: current=%d proposed=%d", s.session.RecoveredTo, recoveredTo)
// Enforce frozen target: reject progress beyond the contract boundary.
if s.session.Budget != nil && s.session.Budget.TargetLSNAtStart > 0 &&
recoveredTo > s.session.Budget.TargetLSNAtStart {
return fmt.Errorf("progress %d exceeds frozen target %d",
recoveredTo, s.session.Budget.TargetLSNAtStart)
}
// Entry counting by LSN delta, not call count.
delta := recoveredTo - s.session.RecoveredTo
s.session.Tracker.EntriesReplayed += delta
s.session.UpdateProgress(recoveredTo)
s.session.Tracker.EntriesReplayed++
s.session.Tracker.LastProgressTick = tick
if len(tick) > 0 {
s.session.Tracker.LastProgressTick = tick[0]
}
return nil
}
@ -375,6 +374,97 @@ func (s *Sender) CheckBudget(sessionID uint64, currentTick uint64) (BudgetViolat
return violation, nil
}
// === Rebuild execution APIs (sender-owned) ===
// SelectRebuildSource chooses the rebuild source and initializes the rebuild FSM.
// Requires: active rebuild session, sessionID match, session in PhaseHandshake.
func (s *Sender) SelectRebuildSource(sessionID uint64, snapshotLSN uint64, snapshotValid bool, committedLSN uint64) error {
s.mu.Lock()
defer s.mu.Unlock()
if err := s.checkSessionAuthority(sessionID); err != nil {
return err
}
if s.session.Kind != SessionRebuild {
return fmt.Errorf("not a rebuild session (kind=%s)", s.session.Kind)
}
if s.session.Rebuild == nil {
return fmt.Errorf("rebuild state not initialized")
}
return s.session.Rebuild.SelectSource(snapshotLSN, snapshotValid, committedLSN)
}
// BeginRebuildTransfer starts the base data transfer phase.
func (s *Sender) BeginRebuildTransfer(sessionID uint64) error {
s.mu.Lock()
defer s.mu.Unlock()
if err := s.checkSessionAuthority(sessionID); err != nil {
return err
}
if s.session.Rebuild == nil {
return fmt.Errorf("no rebuild state")
}
return s.session.Rebuild.BeginTransfer()
}
// RecordRebuildTransferProgress records base transfer progress.
func (s *Sender) RecordRebuildTransferProgress(sessionID uint64, transferredTo uint64) error {
s.mu.Lock()
defer s.mu.Unlock()
if err := s.checkSessionAuthority(sessionID); err != nil {
return err
}
if s.session.Rebuild == nil {
return fmt.Errorf("no rebuild state")
}
return s.session.Rebuild.RecordTransferProgress(transferredTo)
}
// BeginRebuildTailReplay starts WAL tail replay after base transfer (snapshot+tail only).
func (s *Sender) BeginRebuildTailReplay(sessionID uint64) error {
s.mu.Lock()
defer s.mu.Unlock()
if err := s.checkSessionAuthority(sessionID); err != nil {
return err
}
if s.session.Rebuild == nil {
return fmt.Errorf("no rebuild state")
}
return s.session.Rebuild.BeginTailReplay()
}
// RecordRebuildTailProgress records WAL tail replay progress.
func (s *Sender) RecordRebuildTailProgress(sessionID uint64, replayedTo uint64) error {
s.mu.Lock()
defer s.mu.Unlock()
if err := s.checkSessionAuthority(sessionID); err != nil {
return err
}
if s.session.Rebuild == nil {
return fmt.Errorf("no rebuild state")
}
return s.session.Rebuild.RecordTailReplayProgress(replayedTo)
}
// CompleteRebuild completes the rebuild session and transitions the sender
// to InSync. Requires the rebuild FSM to be ReadyToComplete.
func (s *Sender) CompleteRebuild(sessionID uint64) error {
s.mu.Lock()
defer s.mu.Unlock()
if err := s.checkSessionAuthority(sessionID); err != nil {
return err
}
if s.session.Rebuild == nil {
return fmt.Errorf("no rebuild state")
}
if err := s.session.Rebuild.Complete(); err != nil {
return err
}
s.session.complete()
s.session = nil
s.State = StateInSync
return nil
}
// checkSessionAuthority validates that the sender has an active session
// matching the given ID. Must be called with s.mu held.
func (s *Sender) checkSessionAuthority(sessionID uint64) error {

6
sw-block/prototype/enginev2/session.go

@ -73,13 +73,17 @@ type RecoverySession struct {
}
func newRecoverySession(replicaID string, epoch uint64, kind SessionKind) *RecoverySession {
return &RecoverySession{
rs := &RecoverySession{
ID: sessionIDCounter.Add(1),
ReplicaID: replicaID,
Epoch: epoch,
Kind: kind,
Phase: PhaseInit,
}
if kind == SessionRebuild {
rs.Rebuild = NewRebuildState()
}
return rs
}
// Active returns true if the session has not been completed or invalidated.

Loading…
Cancel
Save