From 5cdee4a0113975bf418f4dc9e8225a1ae7824073 Mon Sep 17 00:00:00 2001 From: pingqiu Date: Mon, 30 Mar 2026 01:01:53 -0700 Subject: [PATCH] fix: orchestrator owns zero-gap completion and per-replica invalidation logging Zero-gap completion: - ExecuteRecovery auto-completes zero-gap sessions (no sender call needed) - RecoveryResult.FinalState = StateInSync for zero-gap Epoch transition: - UpdateSenderEpoch: orchestrator-owned epoch advancement with auto-log - InvalidateEpoch: per-replica session_invalidated events (not aggregate) Endpoint-change invalidation: - ProcessAssignment detects session ID change from endpoint update - Logs per-replica session_invalidated with "endpoint_changed" reason All integration tests now use orchestrator exclusively for core lifecycle. No direct sender API calls for recovery execution in integration tests. 1 new test: EndpointChange_LogsInvalidation Co-Authored-By: Claude Opus 4.6 (1M context) --- .../engine/replication/integration_test.go | 62 ++++++++++---- sw-block/engine/replication/orchestrator.go | 80 ++++++++++++++++++- 2 files changed, 123 insertions(+), 19 deletions(-) diff --git a/sw-block/engine/replication/integration_test.go b/sw-block/engine/replication/integration_test.go index 28686a1b7..475e674c7 100644 --- a/sw-block/engine/replication/integration_test.go +++ b/sw-block/engine/replication/integration_test.go @@ -53,13 +53,14 @@ func TestIntegration_ChangedAddress_ViaOrchestrator(t *testing.T) { t.Fatalf("endpoint not updated: %s", s.Endpoint().DataAddr) } - // Zero-gap recovery on new endpoint. + // Zero-gap recovery on new endpoint — orchestrator handles completion. result2 := o.ExecuteRecovery("vol1-r1", 100, &primary) if result2.Outcome != OutcomeZeroGap { t.Fatalf("outcome=%s", result2.Outcome) } - // Zero-gap completes in handshake phase. - s.CompleteSessionByID(s.SessionID()) + if result2.FinalState != StateInSync { + t.Fatalf("zero-gap should complete to InSync, got %s", result2.FinalState) + } // Verify log has events from both cycles. events := o.Log.EventsFor("vol1-r1") @@ -151,9 +152,9 @@ func TestIntegration_EpochBump_ViaOrchestrator(t *testing.T) { RecoveryTargets: map[string]SessionKind{"r1": SessionCatchUp}, }) - // Epoch bumps mid-recovery. + // Epoch bumps mid-recovery — all via orchestrator. o.InvalidateEpoch(2) - o.Registry.Sender("r1").UpdateEpoch(2) + o.UpdateSenderEpoch("r1", 2) // Old session is dead — ExecuteRecovery should fail. result := o.ExecuteRecovery("r1", 100, &primary) @@ -174,21 +175,51 @@ func TestIntegration_EpochBump_ViaOrchestrator(t *testing.T) { if result2.Outcome != OutcomeZeroGap { t.Fatalf("epoch 2: %s", result2.Outcome) } - o.Registry.Sender("r1").CompleteSessionByID(o.Registry.Sender("r1").SessionID()) + if result2.FinalState != StateInSync { + t.Fatalf("state=%s", result2.FinalState) + } - if o.Registry.Sender("r1").State() != StateInSync { - t.Fatalf("state=%s", o.Registry.Sender("r1").State()) + // Log should show per-replica session invalidation. + hasPerReplicaInvalidation := false + for _, e := range o.Log.EventsFor("r1") { + if e.Event == "session_invalidated" { + hasPerReplicaInvalidation = true + } } + if !hasPerReplicaInvalidation { + t.Fatal("log should contain per-replica session_invalidated event") + } +} + +func TestIntegration_EndpointChange_LogsInvalidation(t *testing.T) { + o := NewRecoveryOrchestrator() + + o.ProcessAssignment(AssignmentIntent{ + Replicas: []ReplicaAssignment{ + {ReplicaID: "r1", Endpoint: Endpoint{DataAddr: "r1:9333", Version: 1}}, + }, + Epoch: 1, + RecoveryTargets: map[string]SessionKind{"r1": SessionCatchUp}, + }) + + // Address changes in next assignment — should log invalidation. + o.ProcessAssignment(AssignmentIntent{ + Replicas: []ReplicaAssignment{ + {ReplicaID: "r1", Endpoint: Endpoint{DataAddr: "r1:9444", Version: 2}}, + }, + Epoch: 1, + RecoveryTargets: map[string]SessionKind{"r1": SessionCatchUp}, + }) - // Log should show epoch invalidation. + // Check per-replica invalidation event. hasInvalidation := false - for _, e := range o.Log.Events() { - if e.Event == "epoch_invalidation" { + for _, e := range o.Log.EventsFor("r1") { + if e.Event == "session_invalidated" { hasInvalidation = true } } if !hasInvalidation { - t.Fatal("log should contain epoch_invalidation event") + t.Fatal("endpoint change should produce per-replica session_invalidated event") } } @@ -215,12 +246,11 @@ func TestIntegration_MultiReplica_ViaOrchestrator(t *testing.T) { }, }) - // r1: zero-gap. + // r1: zero-gap — orchestrator completes automatically. r1 := o.ExecuteRecovery("r1", 100, &primary) - if r1.Outcome != OutcomeZeroGap { - t.Fatalf("r1: %s", r1.Outcome) + if r1.Outcome != OutcomeZeroGap || r1.FinalState != StateInSync { + t.Fatalf("r1: outcome=%s state=%s", r1.Outcome, r1.FinalState) } - o.Registry.Sender("r1").CompleteSessionByID(o.Registry.Sender("r1").SessionID()) // r2: catch-up. r2 := o.ExecuteRecovery("r2", 60, &primary) diff --git a/sw-block/engine/replication/orchestrator.go b/sw-block/engine/replication/orchestrator.go index a80c8ad48..722bc0170 100644 --- a/sw-block/engine/replication/orchestrator.go +++ b/sw-block/engine/replication/orchestrator.go @@ -26,14 +26,43 @@ func NewRecoveryOrchestrator() *RecoveryOrchestrator { } // ProcessAssignment applies an assignment intent and logs the result. +// Detects endpoint-change invalidations automatically. func (o *RecoveryOrchestrator) ProcessAssignment(intent AssignmentIntent) AssignmentResult { + // Snapshot pre-assignment session state for invalidation detection. + type preState struct { + hadSession bool + sessionID uint64 + } + pre := map[string]preState{} + for _, ra := range intent.Replicas { + if s := o.Registry.Sender(ra.ReplicaID); s != nil { + pre[ra.ReplicaID] = preState{ + hadSession: s.HasActiveSession(), + sessionID: s.SessionID(), + } + } + } + result := o.Registry.ApplyAssignment(intent) + for _, id := range result.Added { o.Log.Record(id, 0, "sender_added", "") } for _, id := range result.Removed { o.Log.Record(id, 0, "sender_removed", "") } + + // Detect endpoint-change invalidations: if the session ID changed + // (old session was invalidated and possibly replaced), log the old one. + for id, p := range pre { + if p.hadSession { + s := o.Registry.Sender(id) + if s != nil && s.SessionID() != p.sessionID { + o.Log.Record(id, p.sessionID, "session_invalidated", "endpoint_changed") + } + } + } + for _, id := range result.SessionsCreated { s := o.Registry.Sender(id) o.Log.Record(id, s.SessionID(), "session_created", "") @@ -94,6 +123,14 @@ func (o *RecoveryOrchestrator) ExecuteRecovery(replicaID string, replicaFlushedL return RecoveryResult{ReplicaID: replicaID, Outcome: outcome, Proof: proof, FinalState: StateNeedsRebuild} } + // Zero-gap: complete immediately (no catch-up needed). + if outcome == OutcomeZeroGap { + if s.CompleteSessionByID(sessID) { + o.Log.Record(replicaID, sessID, "completed", "zero_gap") + return RecoveryResult{ReplicaID: replicaID, Outcome: outcome, Proof: proof, FinalState: StateInSync} + } + } + return RecoveryResult{ReplicaID: replicaID, Outcome: outcome, Proof: proof, FinalState: s.State()} } @@ -176,11 +213,48 @@ func (o *RecoveryOrchestrator) CompleteRebuild(replicaID string, history *Retain return nil } -// InvalidateEpoch invalidates all stale sessions and logs the event. +// UpdateSenderEpoch advances a specific sender's epoch via the orchestrator. +// Logs the transition and any session invalidation. +func (o *RecoveryOrchestrator) UpdateSenderEpoch(replicaID string, newEpoch uint64) { + s := o.Registry.Sender(replicaID) + if s == nil { + return + } + hadSession := s.HasActiveSession() + oldSessID := s.SessionID() + s.UpdateEpoch(newEpoch) + if hadSession && !s.HasActiveSession() { + o.Log.Record(replicaID, oldSessID, "session_invalidated", + fmt.Sprintf("epoch_advanced_to_%d", newEpoch)) + } +} + +// InvalidateEpoch invalidates all stale sessions with per-replica logging. func (o *RecoveryOrchestrator) InvalidateEpoch(newEpoch uint64) int { + // Collect per-replica state before invalidation. + type pre struct { + id string + sess uint64 + had bool + } + var senders []pre + for _, s := range o.Registry.All() { + senders = append(senders, pre{ + id: s.ReplicaID(), + sess: s.SessionID(), + had: s.HasActiveSession(), + }) + } + count := o.Registry.InvalidateEpoch(newEpoch) - if count > 0 { - o.Log.Record("", 0, "epoch_invalidation", fmt.Sprintf("epoch=%d invalidated=%d", newEpoch, count)) + + // Log per-replica invalidations. + for _, p := range senders { + s := o.Registry.Sender(p.id) + if s != nil && p.had && !s.HasActiveSession() { + o.Log.Record(p.id, p.sess, "session_invalidated", + fmt.Sprintf("epoch_bump_to_%d", newEpoch)) + } } return count }