Browse Source

fix: separate stable ReplicaID from Endpoint in registry

Registry is now keyed by stable ReplicaID, not by address.
DataAddr changes preserve sender identity — the core V2 invariant.

Changes:
- ReplicaAssignment{ReplicaID, Endpoint} replaces map[string]Endpoint
- AssignmentIntent.Replicas uses []ReplicaAssignment
- Registry.Reconcile takes []ReplicaAssignment
- Tests use stable IDs ("replica-1", "r1") independent of addresses

New test: ChangedDataAddr_PreservesSenderIdentity
- Same ReplicaID, different DataAddr (10.0.0.1 → 10.0.0.2)
- Sender pointer preserved, session invalidated, new session attached
- This is the exact V1/V1.5 regression that V2 must fix

doc.go: clarified Slice 1 core vs carried-forward files

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
feature/sw-block
pingqiu 2 days ago
parent
commit
61e9408261
  1. 5
      sw-block/engine/replication/doc.go
  2. 189
      sw-block/engine/replication/ownership_test.go
  3. 32
      sw-block/engine/replication/registry.go

5
sw-block/engine/replication/doc.go

@ -12,12 +12,15 @@
// - Rebuild is a separate, exclusive sender-owned execution path
// - Completion requires convergence (catch-up) or ReadyToComplete (rebuild)
//
// File layout (Slice 1):
// File layout:
//
// Slice 1 core (ownership/fencing):
// types.go — Endpoint, ReplicaState, SessionKind, SessionPhase
// sender.go — Sender: per-replica owner with execution APIs
// session.go — Session: recovery lifecycle with FSM phases
// registry.go — Registry: sender group with reconcile + assignment intent
//
// Carried forward from prototype (accepted in Phase 4.5):
// budget.go — CatchUpBudget: bounded catch-up enforcement
// rebuild.go — RebuildState: rebuild execution FSM
// outcome.go — HandshakeResult, RecoveryOutcome classification

189
sw-block/engine/replication/ownership_test.go

@ -6,92 +6,122 @@ import "testing"
// Phase 05 Slice 1: Engine ownership/fencing tests
// ============================================================
// Helper: build ReplicaAssignment list from map.
func replicas(m map[string]Endpoint) []ReplicaAssignment {
var out []ReplicaAssignment
for id, ep := range m {
out = append(out, ReplicaAssignment{ReplicaID: id, Endpoint: ep})
}
return out
}
// --- Changed-address invalidation (A10) ---
func TestEngine_ChangedAddress_InvalidatesSession(t *testing.T) {
func TestEngine_ChangedDataAddr_PreservesSenderIdentity(t *testing.T) {
// THE core V2 test: DataAddr changes but stable ReplicaID stays.
// Sender must survive. Session must be invalidated (endpoint changed).
r := NewRegistry()
r.ApplyAssignment(AssignmentIntent{
Endpoints: map[string]Endpoint{
"r1:9333": {DataAddr: "r1:9333", CtrlAddr: "r1:9334", Version: 1},
Replicas: []ReplicaAssignment{
{ReplicaID: "replica-1", Endpoint: Endpoint{DataAddr: "10.0.0.1:9333", CtrlAddr: "10.0.0.1:9334", Version: 1}},
},
Epoch: 1,
RecoveryTargets: map[string]SessionKind{"r1:9333": SessionCatchUp},
RecoveryTargets: map[string]SessionKind{"replica-1": SessionCatchUp},
})
s := r.Sender("r1:9333")
s := r.Sender("replica-1")
sessID := s.SessionID()
s.BeginConnect(sessID)
r.Reconcile(map[string]Endpoint{
"r1:9333": {DataAddr: "r1:9333", CtrlAddr: "r1:9445", Version: 2},
// DataAddr changes (replica restarted on different port/IP).
r.Reconcile([]ReplicaAssignment{
{ReplicaID: "replica-1", Endpoint: Endpoint{DataAddr: "10.0.0.2:9333", CtrlAddr: "10.0.0.2:9334", Version: 2}},
}, 1)
// Sender identity preserved (same pointer, same ReplicaID).
if r.Sender("replica-1") != s {
t.Fatal("sender identity must be preserved across DataAddr change")
}
// Session invalidated (endpoint changed).
if s.HasActiveSession() {
t.Fatal("session should be invalidated")
t.Fatal("session should be invalidated by DataAddr change")
}
if s.State() != StateDisconnected {
t.Fatalf("state=%s", s.State())
// Endpoint updated.
if s.Endpoint().DataAddr != "10.0.0.2:9333" {
t.Fatalf("endpoint not updated: %s", s.Endpoint().DataAddr)
}
// New session can be attached on the updated endpoint.
result := r.ApplyAssignment(AssignmentIntent{
Replicas: []ReplicaAssignment{
{ReplicaID: "replica-1", Endpoint: Endpoint{DataAddr: "10.0.0.2:9333", CtrlAddr: "10.0.0.2:9334", Version: 2}},
},
Epoch: 1,
RecoveryTargets: map[string]SessionKind{"replica-1": SessionCatchUp},
})
if len(result.SessionsCreated) != 1 {
t.Fatalf("should create new session: %v", result)
}
if s.SessionID() == sessID {
t.Fatal("new session should have different ID")
}
t.Logf("DataAddr changed: sender preserved, old session invalidated, new session attached")
}
func TestEngine_ChangedAddress_NewSessionAfterUpdate(t *testing.T) {
func TestEngine_ChangedCtrlAddr_InvalidatesSession(t *testing.T) {
r := NewRegistry()
r.ApplyAssignment(AssignmentIntent{
Endpoints: map[string]Endpoint{
"r1:9333": {DataAddr: "r1:9333", Version: 1},
},
Replicas: replicas(map[string]Endpoint{
"r1": {DataAddr: "10.0.0.1:9333", CtrlAddr: "10.0.0.1:9334", Version: 1},
}),
Epoch: 1,
RecoveryTargets: map[string]SessionKind{"r1:9333": SessionCatchUp},
RecoveryTargets: map[string]SessionKind{"r1": SessionCatchUp},
})
s := r.Sender("r1:9333")
oldID := s.SessionID()
s := r.Sender("r1")
sessID := s.SessionID()
s.BeginConnect(sessID)
r.Reconcile(map[string]Endpoint{
"r1:9333": {DataAddr: "r1:9333", Version: 2},
}, 1)
result := r.ApplyAssignment(AssignmentIntent{
Endpoints: map[string]Endpoint{"r1:9333": {DataAddr: "r1:9333", Version: 2}},
Epoch: 1,
RecoveryTargets: map[string]SessionKind{"r1:9333": SessionCatchUp},
})
r.Reconcile(replicas(map[string]Endpoint{
"r1": {DataAddr: "10.0.0.1:9333", CtrlAddr: "10.0.0.1:9445", Version: 2},
}), 1)
if len(result.SessionsCreated) != 1 {
t.Fatalf("should create new session: %v", result)
if s.HasActiveSession() {
t.Fatal("CtrlAddr change should invalidate session")
}
if s.SessionID() == oldID {
t.Fatal("should have different session ID")
if s.State() != StateDisconnected {
t.Fatalf("state=%s", s.State())
}
}
// --- Stale-session rejection (A3) ---
func TestEngine_StaleSessionID_RejectedAtAllAPIs(t *testing.T) {
s := NewSender("r1:9333", Endpoint{DataAddr: "r1:9333", Version: 1}, 1)
s := NewSender("r1", Endpoint{DataAddr: "r1:9333", Version: 1}, 1)
staleID, _ := s.AttachSession(1, SessionCatchUp)
s.UpdateEpoch(2)
s.AttachSession(2, SessionCatchUp)
if err := s.BeginConnect(staleID); err == nil {
t.Fatal("stale ID: BeginConnect should reject")
t.Fatal("stale BeginConnect should reject")
}
if err := s.RecordHandshake(staleID, 0, 10); err == nil {
t.Fatal("stale ID: RecordHandshake should reject")
t.Fatal("stale RecordHandshake should reject")
}
if err := s.BeginCatchUp(staleID); err == nil {
t.Fatal("stale ID: BeginCatchUp should reject")
t.Fatal("stale BeginCatchUp should reject")
}
if err := s.RecordCatchUpProgress(staleID, 5); err == nil {
t.Fatal("stale ID: RecordCatchUpProgress should reject")
t.Fatal("stale RecordCatchUpProgress should reject")
}
if s.CompleteSessionByID(staleID) {
t.Fatal("stale ID: CompleteSessionByID should reject")
t.Fatal("stale CompleteSessionByID should reject")
}
}
func TestEngine_StaleCompletion_AfterSupersede(t *testing.T) {
s := NewSender("r1:9333", Endpoint{DataAddr: "r1:9333", Version: 1}, 1)
s := NewSender("r1", Endpoint{DataAddr: "r1:9333", Version: 1}, 1)
id1, _ := s.AttachSession(1, SessionCatchUp)
s.UpdateEpoch(2)
@ -100,12 +130,9 @@ func TestEngine_StaleCompletion_AfterSupersede(t *testing.T) {
if s.CompleteSessionByID(id1) {
t.Fatal("stale completion must be rejected")
}
if s.HasActiveSession() != true {
if !s.HasActiveSession() {
t.Fatal("new session should be active")
}
if s.State() == StateInSync {
t.Fatal("sender should not be InSync from stale completion")
}
}
// --- Epoch-bump invalidation (A3) ---
@ -113,37 +140,34 @@ func TestEngine_StaleCompletion_AfterSupersede(t *testing.T) {
func TestEngine_EpochBump_InvalidatesAllSessions(t *testing.T) {
r := NewRegistry()
r.ApplyAssignment(AssignmentIntent{
Endpoints: map[string]Endpoint{
"r1:9333": {DataAddr: "r1:9333", Version: 1},
"r2:9333": {DataAddr: "r2:9333", Version: 1},
},
Replicas: replicas(map[string]Endpoint{
"r1": {DataAddr: "r1:9333", Version: 1},
"r2": {DataAddr: "r2:9333", Version: 1},
}),
Epoch: 1,
RecoveryTargets: map[string]SessionKind{
"r1:9333": SessionCatchUp,
"r2:9333": SessionCatchUp,
"r1": SessionCatchUp,
"r2": SessionCatchUp,
},
})
count := r.InvalidateEpoch(2)
if count != 2 {
t.Fatalf("should invalidate 2, got %d", count)
}
if r.Sender("r1:9333").HasActiveSession() || r.Sender("r2:9333").HasActiveSession() {
t.Fatal("both sessions should be invalidated")
t.Fatalf("invalidated=%d, want 2", count)
}
}
func TestEngine_EpochBump_StaleAssignment_Rejected(t *testing.T) {
r := NewRegistry()
r.ApplyAssignment(AssignmentIntent{
Endpoints: map[string]Endpoint{"r1:9333": {DataAddr: "r1:9333", Version: 1}},
Epoch: 2,
Replicas: replicas(map[string]Endpoint{"r1": {DataAddr: "r1:9333", Version: 1}}),
Epoch: 2,
})
result := r.ApplyAssignment(AssignmentIntent{
Endpoints: map[string]Endpoint{"r1:9333": {DataAddr: "r1:9333", Version: 1}},
Replicas: replicas(map[string]Endpoint{"r1": {DataAddr: "r1:9333", Version: 1}}),
Epoch: 1,
RecoveryTargets: map[string]SessionKind{"r1:9333": SessionCatchUp},
RecoveryTargets: map[string]SessionKind{"r1": SessionCatchUp},
})
if len(result.SessionsFailed) != 1 {
@ -154,7 +178,7 @@ func TestEngine_EpochBump_StaleAssignment_Rejected(t *testing.T) {
// --- Rebuild exclusivity ---
func TestEngine_Rebuild_CatchUpAPIs_Rejected(t *testing.T) {
s := NewSender("r1:9333", Endpoint{DataAddr: "r1:9333", Version: 1}, 1)
s := NewSender("r1", Endpoint{DataAddr: "r1:9333", Version: 1}, 1)
sessID, _ := s.AttachSession(1, SessionRebuild)
s.BeginConnect(sessID)
s.RecordHandshake(sessID, 0, 100)
@ -162,16 +186,13 @@ func TestEngine_Rebuild_CatchUpAPIs_Rejected(t *testing.T) {
if err := s.BeginCatchUp(sessID); err == nil {
t.Fatal("rebuild: BeginCatchUp should reject")
}
if err := s.RecordCatchUpProgress(sessID, 50); err == nil {
t.Fatal("rebuild: RecordCatchUpProgress should reject")
}
if s.CompleteSessionByID(sessID) {
t.Fatal("rebuild: catch-up completion should reject")
}
}
func TestEngine_Rebuild_FullLifecycle(t *testing.T) {
s := NewSender("r1:9333", Endpoint{DataAddr: "r1:9333", Version: 1}, 1)
s := NewSender("r1", Endpoint{DataAddr: "r1:9333", Version: 1}, 1)
sessID, _ := s.AttachSession(1, SessionRebuild)
s.BeginConnect(sessID)
@ -193,7 +214,7 @@ func TestEngine_Rebuild_FullLifecycle(t *testing.T) {
// --- Bounded catch-up ---
func TestEngine_FrozenTarget_RejectsChase(t *testing.T) {
s := NewSender("r1:9333", Endpoint{DataAddr: "r1:9333", Version: 1}, 1)
s := NewSender("r1", Endpoint{DataAddr: "r1:9333", Version: 1}, 1)
sessID, _ := s.AttachSession(1, SessionCatchUp)
s.BeginConnect(sessID)
@ -206,7 +227,7 @@ func TestEngine_FrozenTarget_RejectsChase(t *testing.T) {
}
func TestEngine_BudgetViolation_Escalates(t *testing.T) {
s := NewSender("r1:9333", Endpoint{DataAddr: "r1:9333", Version: 1}, 1)
s := NewSender("r1", Endpoint{DataAddr: "r1:9333", Version: 1}, 1)
sessID, _ := s.AttachSession(1, SessionCatchUp, WithBudget(CatchUpBudget{MaxDurationTicks: 5}))
s.BeginConnect(sessID)
@ -223,31 +244,19 @@ func TestEngine_BudgetViolation_Escalates(t *testing.T) {
}
}
// --- Encapsulation: no direct state mutation ---
// --- Encapsulation ---
func TestEngine_Encapsulation_SnapshotIsReadOnly(t *testing.T) {
s := NewSender("r1:9333", Endpoint{DataAddr: "r1:9333", Version: 1}, 1)
s := NewSender("r1", Endpoint{DataAddr: "r1:9333", Version: 1}, 1)
sessID, _ := s.AttachSession(1, SessionCatchUp)
snap := s.SessionSnapshot()
if snap == nil || !snap.Active {
t.Fatal("should have active session snapshot")
}
// Mutating the snapshot does not affect the sender.
snap.Phase = PhaseCompleted
snap.Active = false
// Sender's session is still active.
if !s.HasActiveSession() {
t.Fatal("sender should still have active session after snapshot mutation")
}
snap2 := s.SessionSnapshot()
if snap2.Phase == PhaseCompleted {
t.Fatal("snapshot mutation should not leak back to sender")
t.Fatal("snapshot mutation should not affect sender")
}
// Can still execute on the real session.
if err := s.BeginConnect(sessID); err != nil {
t.Fatalf("execution should still work: %v", err)
}
@ -258,21 +267,21 @@ func TestEngine_Encapsulation_SnapshotIsReadOnly(t *testing.T) {
func TestEngine_E2E_ThreeReplicas_ThreeOutcomes(t *testing.T) {
r := NewRegistry()
r.ApplyAssignment(AssignmentIntent{
Endpoints: map[string]Endpoint{
"r1:9333": {DataAddr: "r1:9333", Version: 1},
"r2:9333": {DataAddr: "r2:9333", Version: 1},
"r3:9333": {DataAddr: "r3:9333", Version: 1},
},
Replicas: replicas(map[string]Endpoint{
"r1": {DataAddr: "r1:9333", Version: 1},
"r2": {DataAddr: "r2:9333", Version: 1},
"r3": {DataAddr: "r3:9333", Version: 1},
}),
Epoch: 1,
RecoveryTargets: map[string]SessionKind{
"r1:9333": SessionCatchUp,
"r2:9333": SessionCatchUp,
"r3:9333": SessionCatchUp,
"r1": SessionCatchUp,
"r2": SessionCatchUp,
"r3": SessionCatchUp,
},
})
// r1: zero-gap.
r1 := r.Sender("r1:9333")
r1 := r.Sender("r1")
id1 := r1.SessionID()
r1.BeginConnect(id1)
o1, _ := r1.RecordHandshakeWithOutcome(id1, HandshakeResult{
@ -284,7 +293,7 @@ func TestEngine_E2E_ThreeReplicas_ThreeOutcomes(t *testing.T) {
r1.CompleteSessionByID(id1)
// r2: catch-up.
r2 := r.Sender("r2:9333")
r2 := r.Sender("r2")
id2 := r2.SessionID()
r2.BeginConnect(id2)
o2, _ := r2.RecordHandshakeWithOutcome(id2, HandshakeResult{
@ -298,7 +307,7 @@ func TestEngine_E2E_ThreeReplicas_ThreeOutcomes(t *testing.T) {
r2.CompleteSessionByID(id2)
// r3: needs rebuild.
r3 := r.Sender("r3:9333")
r3 := r.Sender("r3")
id3 := r3.SessionID()
r3.BeginConnect(id3)
o3, _ := r3.RecordHandshakeWithOutcome(id3, HandshakeResult{
@ -308,12 +317,6 @@ func TestEngine_E2E_ThreeReplicas_ThreeOutcomes(t *testing.T) {
t.Fatalf("r3: %s", o3)
}
if r1.State() != StateInSync || r2.State() != StateInSync {
t.Fatalf("r1=%s r2=%s", r1.State(), r2.State())
}
if r3.State() != StateNeedsRebuild {
t.Fatalf("r3=%s", r3.State())
}
if r.InSyncCount() != 2 {
t.Fatalf("in_sync=%d", r.InSyncCount())
}

32
sw-block/engine/replication/registry.go

@ -5,11 +5,18 @@ import (
"sync"
)
// ReplicaAssignment describes one replica's identity + endpoint in an assignment.
type ReplicaAssignment struct {
ReplicaID string // stable identity (e.g., volume-scoped replica name)
Endpoint Endpoint // current network address (may change)
}
// AssignmentIntent represents a coordinator-driven assignment update.
// Replicas are identified by stable ReplicaID, not by address.
type AssignmentIntent struct {
Endpoints map[string]Endpoint
Replicas []ReplicaAssignment // desired replica set with stable IDs
Epoch uint64
RecoveryTargets map[string]SessionKind
RecoveryTargets map[string]SessionKind // keyed by ReplicaID
}
// AssignmentResult records the outcome of applying an assignment.
@ -32,19 +39,30 @@ func NewRegistry() *Registry {
return &Registry{senders: map[string]*Sender{}}
}
// Reconcile diffs current senders against new endpoints.
func (r *Registry) Reconcile(endpoints map[string]Endpoint, epoch uint64) (added, removed []string) {
// Reconcile diffs current senders against new replicas (by stable ReplicaID).
// Matching senders are preserved with endpoint/epoch update.
// Removed senders are stopped. New senders are created.
func (r *Registry) Reconcile(replicas []ReplicaAssignment, epoch uint64) (added, removed []string) {
r.mu.Lock()
defer r.mu.Unlock()
// Build target set keyed by stable ReplicaID.
target := make(map[string]Endpoint, len(replicas))
for _, ra := range replicas {
target[ra.ReplicaID] = ra.Endpoint
}
// Stop and remove senders not in the new set.
for id, s := range r.senders {
if _, keep := endpoints[id]; !keep {
if _, keep := target[id]; !keep {
s.Stop()
delete(r.senders, id)
removed = append(removed, id)
}
}
for id, ep := range endpoints {
// Add new senders; update endpoint+epoch for existing.
for id, ep := range target {
if existing, ok := r.senders[id]; ok {
existing.UpdateEndpoint(ep)
existing.UpdateEpoch(epoch)
@ -61,7 +79,7 @@ func (r *Registry) Reconcile(endpoints map[string]Endpoint, epoch uint64) (added
// ApplyAssignment reconciles topology and creates recovery sessions.
func (r *Registry) ApplyAssignment(intent AssignmentIntent) AssignmentResult {
var result AssignmentResult
result.Added, result.Removed = r.Reconcile(intent.Endpoints, intent.Epoch)
result.Added, result.Removed = r.Reconcile(intent.Replicas, intent.Epoch)
if intent.RecoveryTargets == nil {
return result

Loading…
Cancel
Save