Browse Source

fix: orchestrator owns zero-gap completion and per-replica invalidation logging

Zero-gap completion:
- ExecuteRecovery auto-completes zero-gap sessions (no sender call needed)
- RecoveryResult.FinalState = StateInSync for zero-gap

Epoch transition:
- UpdateSenderEpoch: orchestrator-owned epoch advancement with auto-log
- InvalidateEpoch: per-replica session_invalidated events (not aggregate)

Endpoint-change invalidation:
- ProcessAssignment detects session ID change from endpoint update
- Logs per-replica session_invalidated with "endpoint_changed" reason

All integration tests now use orchestrator exclusively for core lifecycle.
No direct sender API calls for recovery execution in integration tests.

1 new test: EndpointChange_LogsInvalidation

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
feature/sw-block
pingqiu 2 days ago
parent
commit
5cdee4a011
  1. 62
      sw-block/engine/replication/integration_test.go
  2. 80
      sw-block/engine/replication/orchestrator.go

62
sw-block/engine/replication/integration_test.go

@ -53,13 +53,14 @@ func TestIntegration_ChangedAddress_ViaOrchestrator(t *testing.T) {
t.Fatalf("endpoint not updated: %s", s.Endpoint().DataAddr)
}
// Zero-gap recovery on new endpoint.
// Zero-gap recovery on new endpoint — orchestrator handles completion.
result2 := o.ExecuteRecovery("vol1-r1", 100, &primary)
if result2.Outcome != OutcomeZeroGap {
t.Fatalf("outcome=%s", result2.Outcome)
}
// Zero-gap completes in handshake phase.
s.CompleteSessionByID(s.SessionID())
if result2.FinalState != StateInSync {
t.Fatalf("zero-gap should complete to InSync, got %s", result2.FinalState)
}
// Verify log has events from both cycles.
events := o.Log.EventsFor("vol1-r1")
@ -151,9 +152,9 @@ func TestIntegration_EpochBump_ViaOrchestrator(t *testing.T) {
RecoveryTargets: map[string]SessionKind{"r1": SessionCatchUp},
})
// Epoch bumps mid-recovery.
// Epoch bumps mid-recovery — all via orchestrator.
o.InvalidateEpoch(2)
o.Registry.Sender("r1").UpdateEpoch(2)
o.UpdateSenderEpoch("r1", 2)
// Old session is dead — ExecuteRecovery should fail.
result := o.ExecuteRecovery("r1", 100, &primary)
@ -174,21 +175,51 @@ func TestIntegration_EpochBump_ViaOrchestrator(t *testing.T) {
if result2.Outcome != OutcomeZeroGap {
t.Fatalf("epoch 2: %s", result2.Outcome)
}
o.Registry.Sender("r1").CompleteSessionByID(o.Registry.Sender("r1").SessionID())
if result2.FinalState != StateInSync {
t.Fatalf("state=%s", result2.FinalState)
}
if o.Registry.Sender("r1").State() != StateInSync {
t.Fatalf("state=%s", o.Registry.Sender("r1").State())
// Log should show per-replica session invalidation.
hasPerReplicaInvalidation := false
for _, e := range o.Log.EventsFor("r1") {
if e.Event == "session_invalidated" {
hasPerReplicaInvalidation = true
}
}
if !hasPerReplicaInvalidation {
t.Fatal("log should contain per-replica session_invalidated event")
}
}
func TestIntegration_EndpointChange_LogsInvalidation(t *testing.T) {
o := NewRecoveryOrchestrator()
o.ProcessAssignment(AssignmentIntent{
Replicas: []ReplicaAssignment{
{ReplicaID: "r1", Endpoint: Endpoint{DataAddr: "r1:9333", Version: 1}},
},
Epoch: 1,
RecoveryTargets: map[string]SessionKind{"r1": SessionCatchUp},
})
// Address changes in next assignment — should log invalidation.
o.ProcessAssignment(AssignmentIntent{
Replicas: []ReplicaAssignment{
{ReplicaID: "r1", Endpoint: Endpoint{DataAddr: "r1:9444", Version: 2}},
},
Epoch: 1,
RecoveryTargets: map[string]SessionKind{"r1": SessionCatchUp},
})
// Log should show epoch invalidation.
// Check per-replica invalidation event.
hasInvalidation := false
for _, e := range o.Log.Events() {
if e.Event == "epoch_invalidation" {
for _, e := range o.Log.EventsFor("r1") {
if e.Event == "session_invalidated" {
hasInvalidation = true
}
}
if !hasInvalidation {
t.Fatal("log should contain epoch_invalidation event")
t.Fatal("endpoint change should produce per-replica session_invalidated event")
}
}
@ -215,12 +246,11 @@ func TestIntegration_MultiReplica_ViaOrchestrator(t *testing.T) {
},
})
// r1: zero-gap.
// r1: zero-gap — orchestrator completes automatically.
r1 := o.ExecuteRecovery("r1", 100, &primary)
if r1.Outcome != OutcomeZeroGap {
t.Fatalf("r1: %s", r1.Outcome)
if r1.Outcome != OutcomeZeroGap || r1.FinalState != StateInSync {
t.Fatalf("r1: outcome=%s state=%s", r1.Outcome, r1.FinalState)
}
o.Registry.Sender("r1").CompleteSessionByID(o.Registry.Sender("r1").SessionID())
// r2: catch-up.
r2 := o.ExecuteRecovery("r2", 60, &primary)

80
sw-block/engine/replication/orchestrator.go

@ -26,14 +26,43 @@ func NewRecoveryOrchestrator() *RecoveryOrchestrator {
}
// ProcessAssignment applies an assignment intent and logs the result.
// Detects endpoint-change invalidations automatically.
func (o *RecoveryOrchestrator) ProcessAssignment(intent AssignmentIntent) AssignmentResult {
// Snapshot pre-assignment session state for invalidation detection.
type preState struct {
hadSession bool
sessionID uint64
}
pre := map[string]preState{}
for _, ra := range intent.Replicas {
if s := o.Registry.Sender(ra.ReplicaID); s != nil {
pre[ra.ReplicaID] = preState{
hadSession: s.HasActiveSession(),
sessionID: s.SessionID(),
}
}
}
result := o.Registry.ApplyAssignment(intent)
for _, id := range result.Added {
o.Log.Record(id, 0, "sender_added", "")
}
for _, id := range result.Removed {
o.Log.Record(id, 0, "sender_removed", "")
}
// Detect endpoint-change invalidations: if the session ID changed
// (old session was invalidated and possibly replaced), log the old one.
for id, p := range pre {
if p.hadSession {
s := o.Registry.Sender(id)
if s != nil && s.SessionID() != p.sessionID {
o.Log.Record(id, p.sessionID, "session_invalidated", "endpoint_changed")
}
}
}
for _, id := range result.SessionsCreated {
s := o.Registry.Sender(id)
o.Log.Record(id, s.SessionID(), "session_created", "")
@ -94,6 +123,14 @@ func (o *RecoveryOrchestrator) ExecuteRecovery(replicaID string, replicaFlushedL
return RecoveryResult{ReplicaID: replicaID, Outcome: outcome, Proof: proof, FinalState: StateNeedsRebuild}
}
// Zero-gap: complete immediately (no catch-up needed).
if outcome == OutcomeZeroGap {
if s.CompleteSessionByID(sessID) {
o.Log.Record(replicaID, sessID, "completed", "zero_gap")
return RecoveryResult{ReplicaID: replicaID, Outcome: outcome, Proof: proof, FinalState: StateInSync}
}
}
return RecoveryResult{ReplicaID: replicaID, Outcome: outcome, Proof: proof, FinalState: s.State()}
}
@ -176,11 +213,48 @@ func (o *RecoveryOrchestrator) CompleteRebuild(replicaID string, history *Retain
return nil
}
// InvalidateEpoch invalidates all stale sessions and logs the event.
// UpdateSenderEpoch advances a specific sender's epoch via the orchestrator.
// Logs the transition and any session invalidation.
func (o *RecoveryOrchestrator) UpdateSenderEpoch(replicaID string, newEpoch uint64) {
s := o.Registry.Sender(replicaID)
if s == nil {
return
}
hadSession := s.HasActiveSession()
oldSessID := s.SessionID()
s.UpdateEpoch(newEpoch)
if hadSession && !s.HasActiveSession() {
o.Log.Record(replicaID, oldSessID, "session_invalidated",
fmt.Sprintf("epoch_advanced_to_%d", newEpoch))
}
}
// InvalidateEpoch invalidates all stale sessions with per-replica logging.
func (o *RecoveryOrchestrator) InvalidateEpoch(newEpoch uint64) int {
// Collect per-replica state before invalidation.
type pre struct {
id string
sess uint64
had bool
}
var senders []pre
for _, s := range o.Registry.All() {
senders = append(senders, pre{
id: s.ReplicaID(),
sess: s.SessionID(),
had: s.HasActiveSession(),
})
}
count := o.Registry.InvalidateEpoch(newEpoch)
if count > 0 {
o.Log.Record("", 0, "epoch_invalidation", fmt.Sprintf("epoch=%d invalidated=%d", newEpoch, count))
// Log per-replica invalidations.
for _, p := range senders {
s := o.Registry.Sender(p.id)
if s != nil && p.had && !s.HasActiveSession() {
o.Log.Record(p.id, p.sess, "session_invalidated",
fmt.Sprintf("epoch_bump_to_%d", newEpoch))
}
}
return count
}
Loading…
Cancel
Save