diff --git a/sw-block/engine/replication/integration_test.go b/sw-block/engine/replication/integration_test.go index 475e674c7..df19db486 100644 --- a/sw-block/engine/replication/integration_test.go +++ b/sw-block/engine/replication/integration_test.go @@ -29,7 +29,7 @@ func TestIntegration_ChangedAddress_ViaOrchestrator(t *testing.T) { if result.Outcome != OutcomeCatchUp { t.Fatalf("outcome=%s", result.Outcome) } - o.CompleteCatchUp("vol1-r1", 100) + o.CompleteCatchUp("vol1-r1", CatchUpOptions{TargetLSN: 100}) s := o.Registry.Sender("vol1-r1") if s.State() != StateInSync { @@ -257,7 +257,7 @@ func TestIntegration_MultiReplica_ViaOrchestrator(t *testing.T) { if r2.Outcome != OutcomeCatchUp || !r2.Proof.Recoverable { t.Fatalf("r2: outcome=%s proof=%v", r2.Outcome, r2.Proof) } - o.CompleteCatchUp("r2", 100) + o.CompleteCatchUp("r2", CatchUpOptions{TargetLSN: 100}) // r3: needs rebuild. r3 := o.ExecuteRecovery("r3", 20, &primary) @@ -275,9 +275,138 @@ func TestIntegration_MultiReplica_ViaOrchestrator(t *testing.T) { } } -// --- Observability --- +// --- Orchestrated truncation (replica-ahead) --- -func TestIntegration_Observability_SessionSnapshot(t *testing.T) { +func TestIntegration_ReplicaAhead_TruncateViaOrchestrator(t *testing.T) { + o := NewRecoveryOrchestrator() + primary := RetainedHistory{HeadLSN: 100, TailLSN: 0, CommittedLSN: 100} + + o.ProcessAssignment(AssignmentIntent{ + Replicas: []ReplicaAssignment{ + {ReplicaID: "r1", Endpoint: Endpoint{DataAddr: "r1:9333", Version: 1}}, + }, + Epoch: 1, + RecoveryTargets: map[string]SessionKind{"r1": SessionCatchUp}, + }) + + // Replica ahead of committed — needs truncation. + result := o.ExecuteRecovery("r1", 105, &primary) + if result.Outcome != OutcomeCatchUp { + t.Fatalf("outcome=%s (replica ahead → catchup with truncation)", result.Outcome) + } + + // CompleteCatchUp with truncation via orchestrator. + err := o.CompleteCatchUp("r1", CatchUpOptions{ + TargetLSN: 100, + TruncateLSN: 100, + }) + if err != nil { + t.Fatalf("catch-up with truncation: %v", err) + } + + if o.Registry.Sender("r1").State() != StateInSync { + t.Fatalf("state=%s", o.Registry.Sender("r1").State()) + } + + // Log should show truncation event. + hasTruncation := false + for _, e := range o.Log.EventsFor("r1") { + if e.Event == "truncation_recorded" { + hasTruncation = true + } + } + if !hasTruncation { + t.Fatal("log should contain truncation_recorded event") + } +} + +func TestIntegration_ReplicaAhead_NoTruncate_CompletionRejected(t *testing.T) { + o := NewRecoveryOrchestrator() + primary := RetainedHistory{HeadLSN: 100, TailLSN: 0, CommittedLSN: 100} + + o.ProcessAssignment(AssignmentIntent{ + Replicas: []ReplicaAssignment{ + {ReplicaID: "r1", Endpoint: Endpoint{DataAddr: "r1:9333", Version: 1}}, + }, + Epoch: 1, + RecoveryTargets: map[string]SessionKind{"r1": SessionCatchUp}, + }) + + o.ExecuteRecovery("r1", 105, &primary) + + // CompleteCatchUp WITHOUT truncation — should be rejected. + err := o.CompleteCatchUp("r1", CatchUpOptions{TargetLSN: 100}) + if err == nil { + t.Fatal("completion without truncation should be rejected for replica-ahead") + } + + // Log should show rejection reason. + hasRejection := false + for _, e := range o.Log.EventsFor("r1") { + if e.Event == "completion_rejected" { + hasRejection = true + } + } + if !hasRejection { + t.Fatal("log should contain completion_rejected event") + } +} + +// --- Orchestrated budget escalation --- + +func TestIntegration_BudgetEscalation_ViaOrchestrator(t *testing.T) { + o := NewRecoveryOrchestrator() + primary := RetainedHistory{HeadLSN: 1000, TailLSN: 0, CommittedLSN: 1000} + + o.ProcessAssignment(AssignmentIntent{ + Replicas: []ReplicaAssignment{ + {ReplicaID: "r1", Endpoint: Endpoint{DataAddr: "r1:9333", Version: 1}}, + }, + Epoch: 1, + RecoveryTargets: map[string]SessionKind{"r1": SessionCatchUp}, + }) + + // Attach budget to the session. + s := o.Registry.Sender("r1") + sessID := s.SessionID() + // Need to supersede with budget. Use direct attach since orchestrator + // doesn't yet expose budget configuration in ProcessAssignment. + s.InvalidateSession("budget_setup", StateDisconnected) + sessID, _ = s.AttachSession(1, SessionCatchUp, WithBudget(CatchUpBudget{MaxDurationTicks: 5})) + + // ExecuteRecovery. + s.BeginConnect(sessID) + s.RecordHandshakeFromHistory(sessID, 500, &primary) + + // CompleteCatchUp: started at tick 0, completing at tick 10 (> MaxDuration 5). + err := o.CompleteCatchUp("r1", CatchUpOptions{ + TargetLSN: 1000, + StartTick: 0, + CompleteTick: 10, + }) + if err == nil { + t.Fatal("should escalate on budget violation") + } + + if s.State() != StateNeedsRebuild { + t.Fatalf("state=%s, want needs_rebuild", s.State()) + } + + // Log should show budget escalation. + hasBudgetEvent := false + for _, e := range o.Log.EventsFor("r1") { + if e.Event == "budget_escalated" { + hasBudgetEvent = true + } + } + if !hasBudgetEvent { + t.Fatal("log should contain budget_escalated event") + } +} + +// --- Sender-level observability (not entry-path integration) --- + +func TestSenderObservability_SessionSnapshot(t *testing.T) { o := NewRecoveryOrchestrator() _ = RetainedHistory{HeadLSN: 100, TailLSN: 0, CommittedLSN: 100} @@ -306,7 +435,7 @@ func TestIntegration_Observability_SessionSnapshot(t *testing.T) { } } -func TestIntegration_Observability_RebuildSnapshot(t *testing.T) { +func TestSenderObservability_RebuildSnapshot(t *testing.T) { o := NewRecoveryOrchestrator() primary := RetainedHistory{ HeadLSN: 100, TailLSN: 30, CommittedLSN: 100, @@ -348,7 +477,7 @@ func TestIntegration_RecoveryLog_AutoPopulated(t *testing.T) { RecoveryTargets: map[string]SessionKind{"r1": SessionCatchUp}, }) o.ExecuteRecovery("r1", 80, &primary) - o.CompleteCatchUp("r1", 100) + o.CompleteCatchUp("r1", CatchUpOptions{TargetLSN: 100}) events := o.Log.EventsFor("r1") // Should have: sender_added, session_created, connected, handshake, catchup_started, completed. diff --git a/sw-block/engine/replication/orchestrator.go b/sw-block/engine/replication/orchestrator.go index b838b4648..d32328348 100644 --- a/sw-block/engine/replication/orchestrator.go +++ b/sw-block/engine/replication/orchestrator.go @@ -136,28 +136,75 @@ func (o *RecoveryOrchestrator) ExecuteRecovery(replicaID string, replicaFlushedL return RecoveryResult{ReplicaID: replicaID, Outcome: outcome, Proof: proof, FinalState: s.State()} } -// CompleteCatchUp drives catch-up from startLSN to targetLSN and completes. -// Called after ExecuteRecovery returns OutcomeCatchUp. -func (o *RecoveryOrchestrator) CompleteCatchUp(replicaID string, targetLSN uint64) error { +// CatchUpOptions configures the orchestrated catch-up flow. +type CatchUpOptions struct { + TargetLSN uint64 // required: what to catch up to + StartTick uint64 // tick when catch-up begins (for budget tracking) + CompleteTick uint64 // tick when catch-up is evaluated (for budget checking) + TruncateLSN uint64 // if non-zero, record truncation before completion +} + +// CompleteCatchUp drives the full catch-up lifecycle: +// 1. BeginCatchUp (freezes target) +// 2. RecordCatchUpProgress +// 3. CheckBudget (if budget configured) +// 4. RecordTruncation (if truncation required) +// 5. CompleteSessionByID +// +// Logs causal reason for every rejection, escalation, or completion. +func (o *RecoveryOrchestrator) CompleteCatchUp(replicaID string, opts CatchUpOptions) error { s := o.Registry.Sender(replicaID) if s == nil { return fmt.Errorf("sender not found") } sessID := s.SessionID() - if err := s.BeginCatchUp(sessID); err != nil { - o.Log.Record(replicaID, sessID, "catchup_failed", err.Error()) + // Step 1: begin catch-up (freezes target, records start tick). + if err := s.BeginCatchUp(sessID, opts.StartTick); err != nil { + o.Log.Record(replicaID, sessID, "catchup_begin_failed", err.Error()) return err } - o.Log.Record(replicaID, sessID, "catchup_started", "") + o.Log.Record(replicaID, sessID, "catchup_started", + fmt.Sprintf("target=%d start_tick=%d", opts.TargetLSN, opts.StartTick)) - if err := s.RecordCatchUpProgress(sessID, targetLSN); err != nil { - o.Log.Record(replicaID, sessID, "catchup_progress_failed", err.Error()) - return err + // Step 2: record progress (skip if already converged, e.g., truncation-only case). + snap := s.SessionSnapshot() + if snap != nil && snap.RecoveredTo < opts.TargetLSN { + var progressTick []uint64 + if opts.CompleteTick > 0 { + progressTick = []uint64{opts.CompleteTick} + } + if err := s.RecordCatchUpProgress(sessID, opts.TargetLSN, progressTick...); err != nil { + o.Log.Record(replicaID, sessID, "catchup_progress_failed", err.Error()) + return err + } + } + + // Step 3: check budget at completion time. + if opts.CompleteTick > 0 { + violation, err := s.CheckBudget(sessID, opts.CompleteTick) + if err != nil { + return err + } + if violation != BudgetOK { + o.Log.Record(replicaID, sessID, "budget_escalated", string(violation)) + return fmt.Errorf("budget violation: %s", violation) + } + } + + // Step 4: handle truncation (if required). + if opts.TruncateLSN > 0 { + if err := s.RecordTruncation(sessID, opts.TruncateLSN); err != nil { + o.Log.Record(replicaID, sessID, "truncation_failed", err.Error()) + return err + } + o.Log.Record(replicaID, sessID, "truncation_recorded", + fmt.Sprintf("truncated_to=%d", opts.TruncateLSN)) } + // Step 5: complete. if !s.CompleteSessionByID(sessID) { - o.Log.Record(replicaID, sessID, "completion_rejected", "") + o.Log.Record(replicaID, sessID, "completion_rejected", "session not convergent or truncation missing") return fmt.Errorf("completion rejected") } o.Log.Record(replicaID, sessID, "completed", "in_sync")