Browse Source

fix: make frozen target intrinsic and rebuild completion exclusive

Frozen target is now unconditional:
- FrozenTargetLSN field on RecoverySession, set by BeginCatchUp
- RecordCatchUpProgress enforces FrozenTargetLSN regardless of Budget
- Catch-up is always a bounded (R, H0] contract

Rebuild completion exclusivity:
- CompleteSessionByID explicitly rejects SessionRebuild by kind
- Rebuild sessions can ONLY complete via CompleteRebuild

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
feature/sw-block
pingqiu 3 days ago
parent
commit
8f5070679c
  1. 17
      sw-block/prototype/enginev2/phase45_test.go
  2. 15
      sw-block/prototype/enginev2/sender.go
  3. 7
      sw-block/prototype/enginev2/session.go

17
sw-block/prototype/enginev2/phase45_test.go

@ -269,18 +269,25 @@ func TestSender_TargetFrozen_RejectsProgressBeyond(t *testing.T) {
} }
} }
func TestSender_NoBudget_TargetNotFrozen(t *testing.T) {
func TestSender_NoBudget_TargetStillFrozen(t *testing.T) {
// Target freeze is intrinsic to catch-up, not budget-dependent.
s := NewSender("r1:9333", Endpoint{DataAddr: "r1:9333", Version: 1}, 1) s := NewSender("r1:9333", Endpoint{DataAddr: "r1:9333", Version: 1}, 1)
sess, _ := s.AttachSession(1, SessionCatchUp) sess, _ := s.AttachSession(1, SessionCatchUp)
// No budget = no target freeze.
s.BeginConnect(sess.ID) s.BeginConnect(sess.ID)
s.RecordHandshake(sess.ID, 50, 100) s.RecordHandshake(sess.ID, 50, 100)
s.BeginCatchUp(sess.ID) s.BeginCatchUp(sess.ID)
// Without budget, no frozen target. Progress beyond 100 is allowed.
s.RecordCatchUpProgress(sess.ID, 100)
// Session target can be manually updated if needed (no freeze enforced).
// Frozen target enforced even without budget.
if sess.FrozenTargetLSN != 100 {
t.Fatalf("frozen target=%d, want 100", sess.FrozenTargetLSN)
}
if err := s.RecordCatchUpProgress(sess.ID, 100); err != nil {
t.Fatalf("at target: %v", err)
}
if err := s.RecordCatchUpProgress(sess.ID, 101); err == nil {
t.Fatal("beyond frozen target should be rejected even without budget")
}
} }
// --- Sender-level rebuild test --- // --- Sender-level rebuild test ---

15
sw-block/prototype/enginev2/sender.go

@ -158,6 +158,10 @@ func (s *Sender) CompleteSessionByID(sessionID uint64) bool {
return false return false
} }
sess := s.session sess := s.session
// Rebuild sessions must use CompleteRebuild, not this path.
if sess.Kind == SessionRebuild {
return false
}
// Truncation gate: if truncation was required, it must be recorded. // Truncation gate: if truncation was required, it must be recorded.
if sess.TruncateRequired && !sess.TruncateRecorded { if sess.TruncateRequired && !sess.TruncateRecorded {
return false return false
@ -310,7 +314,9 @@ func (s *Sender) BeginCatchUp(sessionID uint64, startTick ...uint64) error {
return fmt.Errorf("cannot begin catch-up: session phase=%s", s.session.Phase) return fmt.Errorf("cannot begin catch-up: session phase=%s", s.session.Phase)
} }
s.State = StateCatchingUp s.State = StateCatchingUp
// Freeze the target: catch-up will not chase beyond this.
// Freeze the target unconditionally: catch-up is a bounded (R, H0] contract.
// The session will not chase a moving head beyond this boundary.
s.session.FrozenTargetLSN = s.session.TargetLSN
if s.session.Budget != nil { if s.session.Budget != nil {
s.session.Budget.TargetLSNAtStart = s.session.TargetLSN s.session.Budget.TargetLSNAtStart = s.session.TargetLSN
} }
@ -342,11 +348,10 @@ func (s *Sender) RecordCatchUpProgress(sessionID uint64, recoveredTo uint64, tic
if recoveredTo <= s.session.RecoveredTo { if recoveredTo <= s.session.RecoveredTo {
return fmt.Errorf("progress regression: current=%d proposed=%d", s.session.RecoveredTo, recoveredTo) return fmt.Errorf("progress regression: current=%d proposed=%d", s.session.RecoveredTo, recoveredTo)
} }
// Enforce frozen target: reject progress beyond the contract boundary.
if s.session.Budget != nil && s.session.Budget.TargetLSNAtStart > 0 &&
recoveredTo > s.session.Budget.TargetLSNAtStart {
// Enforce frozen target unconditionally: catch-up is bounded to (R, H0].
if s.session.FrozenTargetLSN > 0 && recoveredTo > s.session.FrozenTargetLSN {
return fmt.Errorf("progress %d exceeds frozen target %d", return fmt.Errorf("progress %d exceeds frozen target %d",
recoveredTo, s.session.Budget.TargetLSNAtStart)
recoveredTo, s.session.FrozenTargetLSN)
} }
// Tick is mandatory when ProgressDeadlineTicks is configured. // Tick is mandatory when ProgressDeadlineTicks is configured.
if s.session.Budget != nil && s.session.Budget.ProgressDeadlineTicks > 0 && len(tick) == 0 { if s.session.Budget != nil && s.session.Budget.ProgressDeadlineTicks > 0 && len(tick) == 0 {

7
sw-block/prototype/enginev2/session.go

@ -55,9 +55,10 @@ type RecoverySession struct {
InvalidateReason string // non-empty when invalidated InvalidateReason string // non-empty when invalidated
// Progress tracking. // Progress tracking.
StartLSN uint64 // gap start (exclusive)
TargetLSN uint64 // gap end (inclusive)
RecoveredTo uint64 // highest LSN recovered so far
StartLSN uint64 // gap start (exclusive)
TargetLSN uint64 // gap end (inclusive)
FrozenTargetLSN uint64 // frozen at BeginCatchUp — catch-up will not chase beyond this
RecoveredTo uint64 // highest LSN recovered so far
// Truncation tracking: set when replica has divergent tail beyond committed. // Truncation tracking: set when replica has divergent tail beyond committed.
TruncateRequired bool // true if replica FlushedLSN > CommittedLSN at handshake TruncateRequired bool // true if replica FlushedLSN > CommittedLSN at handshake

Loading…
Cancel
Save