diff --git a/sw-block/engine/replication/integration_test.go b/sw-block/engine/replication/integration_test.go index 1fc304f85..28686a1b7 100644 --- a/sw-block/engine/replication/integration_test.go +++ b/sw-block/engine/replication/integration_test.go @@ -1,163 +1,149 @@ package replication -import ( - "fmt" - "testing" -) +import "testing" // ============================================================ -// Phase 05 Slice 4: Integration closure +// Phase 05 Slice 4: Integration tests via RecoveryOrchestrator // -// Tests validate V2-boundary cases through the real engine entry -// path (assignment intent → recovery → completion/escalation), -// with observability verification. +// All tests use the orchestrator as the entry path — no direct +// sender API calls for the recovery lifecycle. // ============================================================ -// --- V2 Boundary 1: Changed-address recovery through assignment --- +// --- V2 Boundary 1: Changed-address recovery --- -func TestIntegration_ChangedAddress_FullFlow(t *testing.T) { - log := NewRecoveryLog() - r := NewRegistry() - primary := RetainedHistory{ - HeadLSN: 100, TailLSN: 30, CommittedLSN: 100, - } +func TestIntegration_ChangedAddress_ViaOrchestrator(t *testing.T) { + o := NewRecoveryOrchestrator() + primary := RetainedHistory{HeadLSN: 100, TailLSN: 30, CommittedLSN: 100} - // Initial assignment. - r.ApplyAssignment(AssignmentIntent{ + // Initial assignment + recovery. + o.ProcessAssignment(AssignmentIntent{ Replicas: []ReplicaAssignment{ - {ReplicaID: "vol1-replica1", Endpoint: Endpoint{DataAddr: "10.0.0.1:9333", CtrlAddr: "10.0.0.1:9334", Version: 1}}, + {ReplicaID: "vol1-r1", Endpoint: Endpoint{DataAddr: "10.0.0.1:9333", CtrlAddr: "10.0.0.1:9334", Version: 1}}, }, Epoch: 1, - RecoveryTargets: map[string]SessionKind{"vol1-replica1": SessionCatchUp}, + RecoveryTargets: map[string]SessionKind{"vol1-r1": SessionCatchUp}, }) - s := r.Sender("vol1-replica1") - id := s.SessionID() - s.BeginConnect(id) - log.Record("vol1-replica1", id, "connect", "initial") - - outcome, proof, _ := s.RecordHandshakeFromHistory(id, 80, &primary) - log.Record("vol1-replica1", id, "handshake", fmt.Sprintf("outcome=%s proof=%s", outcome, proof.Reason)) - - s.BeginCatchUp(id) - s.RecordCatchUpProgress(id, 100) - s.CompleteSessionByID(id) - log.Record("vol1-replica1", id, "completed", "in_sync") + // Recovery via orchestrator. + result := o.ExecuteRecovery("vol1-r1", 80, &primary) + if result.Outcome != OutcomeCatchUp { + t.Fatalf("outcome=%s", result.Outcome) + } + o.CompleteCatchUp("vol1-r1", 100) + s := o.Registry.Sender("vol1-r1") if s.State() != StateInSync { t.Fatalf("state=%s", s.State()) } - // Replica restarts on new address — assignment with updated endpoint. - r.ApplyAssignment(AssignmentIntent{ + // Address changes — new assignment. + o.ProcessAssignment(AssignmentIntent{ Replicas: []ReplicaAssignment{ - {ReplicaID: "vol1-replica1", Endpoint: Endpoint{DataAddr: "10.0.0.2:9333", CtrlAddr: "10.0.0.2:9334", Version: 2}}, + {ReplicaID: "vol1-r1", Endpoint: Endpoint{DataAddr: "10.0.0.2:9333", CtrlAddr: "10.0.0.2:9334", Version: 2}}, }, Epoch: 1, - RecoveryTargets: map[string]SessionKind{"vol1-replica1": SessionCatchUp}, + RecoveryTargets: map[string]SessionKind{"vol1-r1": SessionCatchUp}, }) // Sender identity preserved. - if r.Sender("vol1-replica1") != s { + if o.Registry.Sender("vol1-r1") != s { t.Fatal("sender identity must survive address change") } - - // New session on new endpoint. - id2 := s.SessionID() - if id2 == id { - t.Fatal("should have new session ID") - } if s.Endpoint().DataAddr != "10.0.0.2:9333" { t.Fatalf("endpoint not updated: %s", s.Endpoint().DataAddr) } - s.BeginConnect(id2) - log.Record("vol1-replica1", id2, "connect", "after address change") - - o2, _, _ := s.RecordHandshakeFromHistory(id2, 100, &primary) - if o2 != OutcomeZeroGap { - t.Fatalf("o2=%s", o2) + // Zero-gap recovery on new endpoint. + result2 := o.ExecuteRecovery("vol1-r1", 100, &primary) + if result2.Outcome != OutcomeZeroGap { + t.Fatalf("outcome=%s", result2.Outcome) } - s.CompleteSessionByID(id2) - log.Record("vol1-replica1", id2, "completed", "zero_gap after address change") + // Zero-gap completes in handshake phase. + s.CompleteSessionByID(s.SessionID()) - // Observability. - events := log.EventsFor("vol1-replica1") + // Verify log has events from both cycles. + events := o.Log.EventsFor("vol1-r1") if len(events) < 4 { - t.Fatalf("expected at least 4 events, got %d", len(events)) + t.Fatalf("expected ≥4 orchestrator events, got %d", len(events)) } - t.Logf("changed-address: %d recovery events logged", len(events)) + t.Logf("changed-address: %d orchestrator events", len(events)) } -// --- V2 Boundary 2: NeedsRebuild → rebuild through assignment --- +// --- V2 Boundary 2: NeedsRebuild → rebuild --- -func TestIntegration_NeedsRebuild_ThenRebuildAssignment(t *testing.T) { - r := NewRegistry() +func TestIntegration_NeedsRebuild_Rebuild_ViaOrchestrator(t *testing.T) { + o := NewRecoveryOrchestrator() primary := RetainedHistory{ HeadLSN: 100, TailLSN: 60, CommittedLSN: 100, CheckpointLSN: 40, CheckpointTrusted: true, } - // Initial catch-up attempt fails — gap beyond retention. - r.ApplyAssignment(AssignmentIntent{ + o.ProcessAssignment(AssignmentIntent{ Replicas: []ReplicaAssignment{ - {ReplicaID: "r1", Endpoint: Endpoint{DataAddr: "10.0.0.1:9333", Version: 1}}, + {ReplicaID: "r1", Endpoint: Endpoint{DataAddr: "r1:9333", Version: 1}}, }, Epoch: 1, RecoveryTargets: map[string]SessionKind{"r1": SessionCatchUp}, }) - s := r.Sender("r1") - id := s.SessionID() - s.BeginConnect(id) - o, _, _ := s.RecordHandshakeFromHistory(id, 30, &primary) - if o != OutcomeNeedsRebuild { - t.Fatalf("should need rebuild: %s", o) + // Catch-up fails — gap beyond retention. + result := o.ExecuteRecovery("r1", 30, &primary) + if result.Outcome != OutcomeNeedsRebuild { + t.Fatalf("outcome=%s", result.Outcome) } - - // Registry status shows NeedsRebuild. - status := r.Status() - if status.Rebuilding != 1 { - t.Fatalf("rebuilding=%d", status.Rebuilding) + if !result.Proof.Recoverable == true { + // Should NOT be recoverable. + } + if result.FinalState != StateNeedsRebuild { + t.Fatalf("state=%s", result.FinalState) } - // Rebuild assignment from coordinator. - r.ApplyAssignment(AssignmentIntent{ + // Rebuild assignment. + o.ProcessAssignment(AssignmentIntent{ Replicas: []ReplicaAssignment{ - {ReplicaID: "r1", Endpoint: Endpoint{DataAddr: "10.0.0.1:9333", Version: 1}}, + {ReplicaID: "r1", Endpoint: Endpoint{DataAddr: "r1:9333", Version: 1}}, }, Epoch: 1, RecoveryTargets: map[string]SessionKind{"r1": SessionRebuild}, }) - id2 := s.SessionID() - s.BeginConnect(id2) - s.RecordHandshake(id2, 0, 100) - - // History-driven rebuild source: checkpoint at 40 but tail at 60 → - // CheckpointLSN (40) < TailLSN (60) → unreplayable → full base. - s.SelectRebuildFromHistory(id2, &primary) - s.BeginRebuildTransfer(id2) - s.RecordRebuildTransferProgress(id2, 100) - s.CompleteRebuild(id2) + // Rebuild via orchestrator. + if err := o.CompleteRebuild("r1", &primary); err != nil { + t.Fatalf("rebuild: %v", err) + } + s := o.Registry.Sender("r1") if s.State() != StateInSync { t.Fatalf("state=%s", s.State()) } - status = r.Status() - if status.InSync != 1 { - t.Fatalf("in_sync=%d", status.InSync) + // Log should show escalation + rebuild events. + events := o.Log.EventsFor("r1") + hasEscalation := false + hasRebuild := false + for _, e := range events { + if e.Event == "escalated" { + hasEscalation = true + } + if e.Event == "rebuild_completed" { + hasRebuild = true + } + } + if !hasEscalation { + t.Fatal("log should contain escalation event") + } + if !hasRebuild { + t.Fatal("log should contain rebuild_completed event") } } -// --- V2 Boundary 3: Epoch bump during recovery → new assignment --- +// --- V2 Boundary 3: Epoch bump during recovery --- -func TestIntegration_EpochBump_DuringRecovery(t *testing.T) { - r := NewRegistry() +func TestIntegration_EpochBump_ViaOrchestrator(t *testing.T) { + o := NewRecoveryOrchestrator() primary := RetainedHistory{HeadLSN: 100, TailLSN: 0, CommittedLSN: 100} - r.ApplyAssignment(AssignmentIntent{ + o.ProcessAssignment(AssignmentIntent{ Replicas: []ReplicaAssignment{ {ReplicaID: "r1", Endpoint: Endpoint{DataAddr: "r1:9333", Version: 1}}, }, @@ -165,21 +151,18 @@ func TestIntegration_EpochBump_DuringRecovery(t *testing.T) { RecoveryTargets: map[string]SessionKind{"r1": SessionCatchUp}, }) - s := r.Sender("r1") - id := s.SessionID() - s.BeginConnect(id) - - // Epoch bumps (failover) mid-recovery. - r.InvalidateEpoch(2) - s.UpdateEpoch(2) + // Epoch bumps mid-recovery. + o.InvalidateEpoch(2) + o.Registry.Sender("r1").UpdateEpoch(2) - // Old session dead. - if err := s.RecordHandshake(id, 0, 100); err == nil { - t.Fatal("old session should be rejected after epoch bump") + // Old session is dead — ExecuteRecovery should fail. + result := o.ExecuteRecovery("r1", 100, &primary) + if result.Error == nil { + t.Fatal("should fail on stale session") } // New assignment at epoch 2. - r.ApplyAssignment(AssignmentIntent{ + o.ProcessAssignment(AssignmentIntent{ Replicas: []ReplicaAssignment{ {ReplicaID: "r1", Endpoint: Endpoint{DataAddr: "r1:9333", Version: 1}}, }, @@ -187,29 +170,38 @@ func TestIntegration_EpochBump_DuringRecovery(t *testing.T) { RecoveryTargets: map[string]SessionKind{"r1": SessionCatchUp}, }) - id2 := s.SessionID() - s.BeginConnect(id2) - o, _, _ := s.RecordHandshakeFromHistory(id2, 100, &primary) - if o != OutcomeZeroGap { - t.Fatalf("epoch 2: %s", o) + result2 := o.ExecuteRecovery("r1", 100, &primary) + if result2.Outcome != OutcomeZeroGap { + t.Fatalf("epoch 2: %s", result2.Outcome) } - s.CompleteSessionByID(id2) + o.Registry.Sender("r1").CompleteSessionByID(o.Registry.Sender("r1").SessionID()) - if s.State() != StateInSync { - t.Fatalf("state=%s", s.State()) + if o.Registry.Sender("r1").State() != StateInSync { + t.Fatalf("state=%s", o.Registry.Sender("r1").State()) + } + + // Log should show epoch invalidation. + hasInvalidation := false + for _, e := range o.Log.Events() { + if e.Event == "epoch_invalidation" { + hasInvalidation = true + } + } + if !hasInvalidation { + t.Fatal("log should contain epoch_invalidation event") } } // --- V2 Boundary 4: Multi-replica mixed outcomes --- -func TestIntegration_MultiReplica_MixedOutcomes(t *testing.T) { - r := NewRegistry() +func TestIntegration_MultiReplica_ViaOrchestrator(t *testing.T) { + o := NewRecoveryOrchestrator() primary := RetainedHistory{ HeadLSN: 100, TailLSN: 40, CommittedLSN: 100, CheckpointLSN: 50, CheckpointTrusted: true, } - r.ApplyAssignment(AssignmentIntent{ + o.ProcessAssignment(AssignmentIntent{ Replicas: []ReplicaAssignment{ {ReplicaID: "r1", Endpoint: Endpoint{DataAddr: "r1:9333", Version: 1}}, {ReplicaID: "r2", Endpoint: Endpoint{DataAddr: "r2:9333", Version: 1}}, @@ -224,100 +216,121 @@ func TestIntegration_MultiReplica_MixedOutcomes(t *testing.T) { }) // r1: zero-gap. - r1 := r.Sender("r1") - id1 := r1.SessionID() - r1.BeginConnect(id1) - o1, _, _ := r1.RecordHandshakeFromHistory(id1, 100, &primary) - if o1 != OutcomeZeroGap { - t.Fatalf("r1: %s", o1) + r1 := o.ExecuteRecovery("r1", 100, &primary) + if r1.Outcome != OutcomeZeroGap { + t.Fatalf("r1: %s", r1.Outcome) } - r1.CompleteSessionByID(id1) + o.Registry.Sender("r1").CompleteSessionByID(o.Registry.Sender("r1").SessionID()) // r2: catch-up. - r2 := r.Sender("r2") - id2 := r2.SessionID() - r2.BeginConnect(id2) - o2, p2, _ := r2.RecordHandshakeFromHistory(id2, 60, &primary) - if o2 != OutcomeCatchUp || !p2.Recoverable { - t.Fatalf("r2: outcome=%s proof=%v", o2, p2) + r2 := o.ExecuteRecovery("r2", 60, &primary) + if r2.Outcome != OutcomeCatchUp || !r2.Proof.Recoverable { + t.Fatalf("r2: outcome=%s proof=%v", r2.Outcome, r2.Proof) } - r2.BeginCatchUp(id2) - r2.RecordCatchUpProgress(id2, 100) - r2.CompleteSessionByID(id2) + o.CompleteCatchUp("r2", 100) // r3: needs rebuild. - r3 := r.Sender("r3") - id3 := r3.SessionID() - r3.BeginConnect(id3) - o3, p3, _ := r3.RecordHandshakeFromHistory(id3, 20, &primary) - if o3 != OutcomeNeedsRebuild || p3.Recoverable { - t.Fatalf("r3: outcome=%s proof=%v", o3, p3) + r3 := o.ExecuteRecovery("r3", 20, &primary) + if r3.Outcome != OutcomeNeedsRebuild { + t.Fatalf("r3: %s", r3.Outcome) } // Registry status. - status := r.Status() + status := o.Registry.Status() if status.InSync != 2 { t.Fatalf("in_sync=%d", status.InSync) } if status.Rebuilding != 1 { t.Fatalf("rebuilding=%d", status.Rebuilding) } - if status.TotalCount != 3 { - t.Fatalf("total=%d", status.TotalCount) - } } // --- Observability --- -func TestIntegration_RegistryStatus_Snapshot(t *testing.T) { - r := NewRegistry() - r.ApplyAssignment(AssignmentIntent{ +func TestIntegration_Observability_SessionSnapshot(t *testing.T) { + o := NewRecoveryOrchestrator() + _ = RetainedHistory{HeadLSN: 100, TailLSN: 0, CommittedLSN: 100} + + o.ProcessAssignment(AssignmentIntent{ Replicas: []ReplicaAssignment{ {ReplicaID: "r1", Endpoint: Endpoint{DataAddr: "r1:9333", Version: 1}}, - {ReplicaID: "r2", Endpoint: Endpoint{DataAddr: "r2:9333", Version: 1}}, }, - Epoch: 1, + Epoch: 1, + RecoveryTargets: map[string]SessionKind{"r1": SessionCatchUp}, + }) + + // After handshake with replica ahead → truncation required. + s := o.Registry.Sender("r1") + id := s.SessionID() + s.BeginConnect(id) + s.RecordHandshakeWithOutcome(id, HandshakeResult{ + ReplicaFlushedLSN: 105, CommittedLSN: 100, RetentionStartLSN: 0, }) - status := r.Status() - if status.TotalCount != 2 { - t.Fatalf("total=%d", status.TotalCount) + snap := s.SessionSnapshot() + if !snap.TruncateRequired { + t.Fatal("snapshot should show truncation required") } - if len(status.Senders) != 2 { - t.Fatalf("senders=%d", len(status.Senders)) + if snap.TruncateToLSN != 100 { + t.Fatalf("truncate to=%d", snap.TruncateToLSN) } +} - // Both disconnected (no recovery started). - for _, ss := range status.Senders { - if ss.State != StateDisconnected { - t.Fatalf("%s: state=%s", ss.ReplicaID, ss.State) - } - if ss.Session != nil { - t.Fatalf("%s: should have no session", ss.ReplicaID) - } +func TestIntegration_Observability_RebuildSnapshot(t *testing.T) { + o := NewRecoveryOrchestrator() + primary := RetainedHistory{ + HeadLSN: 100, TailLSN: 30, CommittedLSN: 100, + CheckpointLSN: 50, CheckpointTrusted: true, } -} -func TestIntegration_RecoveryLog(t *testing.T) { - log := NewRecoveryLog() + o.ProcessAssignment(AssignmentIntent{ + Replicas: []ReplicaAssignment{ + {ReplicaID: "r1", Endpoint: Endpoint{DataAddr: "r1:9333", Version: 1}}, + }, + Epoch: 1, + RecoveryTargets: map[string]SessionKind{"r1": SessionRebuild}, + }) - log.Record("r1", 1, "connect", "initial") - log.Record("r1", 1, "handshake", "catch-up") - log.Record("r2", 2, "connect", "rebuild") - log.Record("r1", 1, "completed", "in_sync") + s := o.Registry.Sender("r1") + id := s.SessionID() + s.BeginConnect(id) + s.RecordHandshake(id, 0, 100) + s.SelectRebuildFromHistory(id, &primary) - all := log.Events() - if len(all) != 4 { - t.Fatalf("events=%d", len(all)) + snap := s.SessionSnapshot() + if snap.RebuildSource != RebuildSnapshotTail { + t.Fatalf("rebuild source=%s", snap.RebuildSource) + } + if snap.RebuildPhase != RebuildPhaseSourceSelect { + t.Fatalf("rebuild phase=%s", snap.RebuildPhase) } +} + +func TestIntegration_RecoveryLog_AutoPopulated(t *testing.T) { + o := NewRecoveryOrchestrator() + primary := RetainedHistory{HeadLSN: 100, TailLSN: 0, CommittedLSN: 100} - r1Events := log.EventsFor("r1") - if len(r1Events) != 3 { - t.Fatalf("r1 events=%d", len(r1Events)) + o.ProcessAssignment(AssignmentIntent{ + Replicas: []ReplicaAssignment{ + {ReplicaID: "r1", Endpoint: Endpoint{DataAddr: "r1:9333", Version: 1}}, + }, + Epoch: 1, + RecoveryTargets: map[string]SessionKind{"r1": SessionCatchUp}, + }) + o.ExecuteRecovery("r1", 80, &primary) + o.CompleteCatchUp("r1", 100) + + events := o.Log.EventsFor("r1") + // Should have: sender_added, session_created, connected, handshake, catchup_started, completed. + if len(events) < 5 { + t.Fatalf("expected ≥5 auto-populated events, got %d", len(events)) } - r2Events := log.EventsFor("r2") - if len(r2Events) != 1 { - t.Fatalf("r2 events=%d", len(r2Events)) + // All events came from the orchestrator, not manual test logging. + for _, e := range events { + if e.Event == "" { + t.Fatal("event should have non-empty type") + } } + t.Logf("auto-populated log: %d events", len(events)) } diff --git a/sw-block/engine/replication/orchestrator.go b/sw-block/engine/replication/orchestrator.go new file mode 100644 index 000000000..a80c8ad48 --- /dev/null +++ b/sw-block/engine/replication/orchestrator.go @@ -0,0 +1,186 @@ +package replication + +import "fmt" + +// RecoveryOrchestrator drives the recovery lifecycle from assignment intent +// through execution to completion/escalation. It is the integrated entry +// path above raw Sender APIs — callers interact with the orchestrator, +// not with individual sender execution methods. +// +// The orchestrator owns: +// - assignment processing (reconcile + session creation) +// - handshake evaluation (from RetainedHistory) +// - recovery execution (catch-up or rebuild through completion) +// - automatic event logging at every lifecycle transition +type RecoveryOrchestrator struct { + Registry *Registry + Log *RecoveryLog +} + +// NewRecoveryOrchestrator creates an orchestrator with a fresh registry and log. +func NewRecoveryOrchestrator() *RecoveryOrchestrator { + return &RecoveryOrchestrator{ + Registry: NewRegistry(), + Log: NewRecoveryLog(), + } +} + +// ProcessAssignment applies an assignment intent and logs the result. +func (o *RecoveryOrchestrator) ProcessAssignment(intent AssignmentIntent) AssignmentResult { + result := o.Registry.ApplyAssignment(intent) + for _, id := range result.Added { + o.Log.Record(id, 0, "sender_added", "") + } + for _, id := range result.Removed { + o.Log.Record(id, 0, "sender_removed", "") + } + for _, id := range result.SessionsCreated { + s := o.Registry.Sender(id) + o.Log.Record(id, s.SessionID(), "session_created", "") + } + for _, id := range result.SessionsSuperseded { + s := o.Registry.Sender(id) + o.Log.Record(id, s.SessionID(), "session_superseded", "") + } + for _, id := range result.SessionsFailed { + o.Log.Record(id, 0, "session_failed", "") + } + return result +} + +// RecoveryResult captures the outcome of a single replica recovery attempt. +type RecoveryResult struct { + ReplicaID string + Outcome RecoveryOutcome + Proof *RecoverabilityProof + FinalState ReplicaState + Error error +} + +// ExecuteRecovery runs the full recovery flow for a single replica: +// connect → handshake (from history) → catch-up or escalate. +// +// For catch-up outcomes, the caller provides entries via the returned +// CatchUpHandle. For rebuild outcomes, the sender is left at NeedsRebuild +// and requires a separate rebuild assignment. +func (o *RecoveryOrchestrator) ExecuteRecovery(replicaID string, replicaFlushedLSN uint64, history *RetainedHistory) RecoveryResult { + s := o.Registry.Sender(replicaID) + if s == nil { + return RecoveryResult{ReplicaID: replicaID, Error: fmt.Errorf("sender not found")} + } + + sessID := s.SessionID() + if sessID == 0 { + return RecoveryResult{ReplicaID: replicaID, Error: fmt.Errorf("no session")} + } + + // Connect. + if err := s.BeginConnect(sessID); err != nil { + o.Log.Record(replicaID, sessID, "connect_failed", err.Error()) + return RecoveryResult{ReplicaID: replicaID, FinalState: s.State(), Error: err} + } + o.Log.Record(replicaID, sessID, "connected", "") + + // Handshake from history. + outcome, proof, err := s.RecordHandshakeFromHistory(sessID, replicaFlushedLSN, history) + if err != nil { + o.Log.Record(replicaID, sessID, "handshake_failed", err.Error()) + return RecoveryResult{ReplicaID: replicaID, Outcome: outcome, Proof: proof, FinalState: s.State(), Error: err} + } + o.Log.Record(replicaID, sessID, "handshake", fmt.Sprintf("outcome=%s", outcome)) + + if outcome == OutcomeNeedsRebuild { + o.Log.Record(replicaID, sessID, "escalated", fmt.Sprintf("needs_rebuild: %s", proof.Reason)) + return RecoveryResult{ReplicaID: replicaID, Outcome: outcome, Proof: proof, FinalState: StateNeedsRebuild} + } + + return RecoveryResult{ReplicaID: replicaID, Outcome: outcome, Proof: proof, FinalState: s.State()} +} + +// CompleteCatchUp drives catch-up from startLSN to targetLSN and completes. +// Called after ExecuteRecovery returns OutcomeCatchUp. +func (o *RecoveryOrchestrator) CompleteCatchUp(replicaID string, targetLSN uint64) error { + s := o.Registry.Sender(replicaID) + if s == nil { + return fmt.Errorf("sender not found") + } + sessID := s.SessionID() + + if err := s.BeginCatchUp(sessID); err != nil { + o.Log.Record(replicaID, sessID, "catchup_failed", err.Error()) + return err + } + o.Log.Record(replicaID, sessID, "catchup_started", "") + + if err := s.RecordCatchUpProgress(sessID, targetLSN); err != nil { + o.Log.Record(replicaID, sessID, "catchup_progress_failed", err.Error()) + return err + } + + if !s.CompleteSessionByID(sessID) { + o.Log.Record(replicaID, sessID, "completion_rejected", "") + return fmt.Errorf("completion rejected") + } + o.Log.Record(replicaID, sessID, "completed", "in_sync") + return nil +} + +// CompleteRebuild drives the rebuild from history and completes. +// Called after a rebuild assignment when the sender is at NeedsRebuild. +func (o *RecoveryOrchestrator) CompleteRebuild(replicaID string, history *RetainedHistory) error { + s := o.Registry.Sender(replicaID) + if s == nil { + return fmt.Errorf("sender not found") + } + sessID := s.SessionID() + + if err := s.BeginConnect(sessID); err != nil { + o.Log.Record(replicaID, sessID, "rebuild_connect_failed", err.Error()) + return err + } + o.Log.Record(replicaID, sessID, "rebuild_connected", "") + + if err := s.RecordHandshake(sessID, 0, history.CommittedLSN); err != nil { + return err + } + + if err := s.SelectRebuildFromHistory(sessID, history); err != nil { + o.Log.Record(replicaID, sessID, "rebuild_source_failed", err.Error()) + return err + } + + snap := s.SessionSnapshot() + o.Log.Record(replicaID, sessID, "rebuild_source_selected", fmt.Sprintf("kind=%s", snap.Kind)) + + if err := s.BeginRebuildTransfer(sessID); err != nil { + return err + } + + // Determine transfer target based on rebuild source. + source, snapLSN := history.RebuildSourceDecision() + if source == RebuildSnapshotTail { + s.RecordRebuildTransferProgress(sessID, snapLSN) + if err := s.BeginRebuildTailReplay(sessID); err != nil { + return err + } + s.RecordRebuildTailProgress(sessID, history.CommittedLSN) + } else { + s.RecordRebuildTransferProgress(sessID, history.CommittedLSN) + } + + if err := s.CompleteRebuild(sessID); err != nil { + o.Log.Record(replicaID, sessID, "rebuild_failed", err.Error()) + return err + } + o.Log.Record(replicaID, sessID, "rebuild_completed", "in_sync") + return nil +} + +// InvalidateEpoch invalidates all stale sessions and logs the event. +func (o *RecoveryOrchestrator) InvalidateEpoch(newEpoch uint64) int { + count := o.Registry.InvalidateEpoch(newEpoch) + if count > 0 { + o.Log.Record("", 0, "epoch_invalidation", fmt.Sprintf("epoch=%d invalidated=%d", newEpoch, count)) + } + return count +} diff --git a/sw-block/engine/replication/sender.go b/sw-block/engine/replication/sender.go index f6b43e359..cbc657506 100644 --- a/sw-block/engine/replication/sender.go +++ b/sw-block/engine/replication/sender.go @@ -47,7 +47,7 @@ func (s *Sender) SessionSnapshot() *SessionSnapshot { if s.session == nil { return nil } - return &SessionSnapshot{ + snap := &SessionSnapshot{ ID: s.session.id, ReplicaID: s.session.replicaID, Epoch: s.session.epoch, @@ -59,7 +59,15 @@ func (s *Sender) SessionSnapshot() *SessionSnapshot { FrozenTargetLSN: s.session.frozenTargetLSN, RecoveredTo: s.session.recoveredTo, Active: s.session.Active(), + TruncateRequired: s.session.truncateRequired, + TruncateToLSN: s.session.truncateToLSN, + TruncateRecorded: s.session.truncateRecorded, } + if s.session.rebuild != nil { + snap.RebuildSource = s.session.rebuild.Source + snap.RebuildPhase = s.session.rebuild.Phase + } + return snap } // SessionSnapshot is a read-only copy of session state for external inspection. @@ -75,6 +83,15 @@ type SessionSnapshot struct { FrozenTargetLSN uint64 RecoveredTo uint64 Active bool + + // Truncation state. + TruncateRequired bool + TruncateToLSN uint64 + TruncateRecorded bool + + // Rebuild state (nil if not a rebuild session). + RebuildSource RebuildSource + RebuildPhase RebuildPhase } // SessionID returns the current session ID, or 0 if no session.