Browse Source

feat: add integration closure and observability (Phase 05 Slice 4)

New files:
- observe.go: RegistryStatus, SenderStatus, RecoveryLog for debugging
- integration_test.go: V2-boundary integration tests through real
  engine entry path

Observability:
- Registry.Status() returns full snapshot: per-sender state, session
  snapshots, counts by category (InSync, Recovering, Rebuilding)
- RecoveryLog: append-only event log for recovery lifecycle debugging

Integration tests (6):
- ChangedAddress_FullFlow: initial recovery → address change →
  sender preserved → new session → recovery with proof
- NeedsRebuild_ThenRebuildAssignment: catch-up fails → NeedsRebuild
  → rebuild assignment → history-driven source → InSync
- EpochBump_DuringRecovery: mid-recovery epoch bump → old session
  rejected → new assignment at new epoch → InSync
- MultiReplica_MixedOutcomes: 3 replicas, 3 outcomes via
  RetainedHistory proofs, registry status verified
- RegistryStatus_Snapshot: observability snapshot structure
- RecoveryLog: event recording and filtering

Engine module at 54 tests (12 + 18 + 18 + 6).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
feature/sw-block
pingqiu 2 days ago
parent
commit
7436b3b79c
  1. 323
      sw-block/engine/replication/integration_test.go
  2. 96
      sw-block/engine/replication/observe.go

323
sw-block/engine/replication/integration_test.go

@ -0,0 +1,323 @@
package replication
import (
"fmt"
"testing"
)
// ============================================================
// Phase 05 Slice 4: Integration closure
//
// Tests validate V2-boundary cases through the real engine entry
// path (assignment intent → recovery → completion/escalation),
// with observability verification.
// ============================================================
// --- V2 Boundary 1: Changed-address recovery through assignment ---
func TestIntegration_ChangedAddress_FullFlow(t *testing.T) {
log := NewRecoveryLog()
r := NewRegistry()
primary := RetainedHistory{
HeadLSN: 100, TailLSN: 30, CommittedLSN: 100,
}
// Initial assignment.
r.ApplyAssignment(AssignmentIntent{
Replicas: []ReplicaAssignment{
{ReplicaID: "vol1-replica1", Endpoint: Endpoint{DataAddr: "10.0.0.1:9333", CtrlAddr: "10.0.0.1:9334", Version: 1}},
},
Epoch: 1,
RecoveryTargets: map[string]SessionKind{"vol1-replica1": SessionCatchUp},
})
s := r.Sender("vol1-replica1")
id := s.SessionID()
s.BeginConnect(id)
log.Record("vol1-replica1", id, "connect", "initial")
outcome, proof, _ := s.RecordHandshakeFromHistory(id, 80, &primary)
log.Record("vol1-replica1", id, "handshake", fmt.Sprintf("outcome=%s proof=%s", outcome, proof.Reason))
s.BeginCatchUp(id)
s.RecordCatchUpProgress(id, 100)
s.CompleteSessionByID(id)
log.Record("vol1-replica1", id, "completed", "in_sync")
if s.State() != StateInSync {
t.Fatalf("state=%s", s.State())
}
// Replica restarts on new address — assignment with updated endpoint.
r.ApplyAssignment(AssignmentIntent{
Replicas: []ReplicaAssignment{
{ReplicaID: "vol1-replica1", Endpoint: Endpoint{DataAddr: "10.0.0.2:9333", CtrlAddr: "10.0.0.2:9334", Version: 2}},
},
Epoch: 1,
RecoveryTargets: map[string]SessionKind{"vol1-replica1": SessionCatchUp},
})
// Sender identity preserved.
if r.Sender("vol1-replica1") != s {
t.Fatal("sender identity must survive address change")
}
// New session on new endpoint.
id2 := s.SessionID()
if id2 == id {
t.Fatal("should have new session ID")
}
if s.Endpoint().DataAddr != "10.0.0.2:9333" {
t.Fatalf("endpoint not updated: %s", s.Endpoint().DataAddr)
}
s.BeginConnect(id2)
log.Record("vol1-replica1", id2, "connect", "after address change")
o2, _, _ := s.RecordHandshakeFromHistory(id2, 100, &primary)
if o2 != OutcomeZeroGap {
t.Fatalf("o2=%s", o2)
}
s.CompleteSessionByID(id2)
log.Record("vol1-replica1", id2, "completed", "zero_gap after address change")
// Observability.
events := log.EventsFor("vol1-replica1")
if len(events) < 4 {
t.Fatalf("expected at least 4 events, got %d", len(events))
}
t.Logf("changed-address: %d recovery events logged", len(events))
}
// --- V2 Boundary 2: NeedsRebuild → rebuild through assignment ---
func TestIntegration_NeedsRebuild_ThenRebuildAssignment(t *testing.T) {
r := NewRegistry()
primary := RetainedHistory{
HeadLSN: 100, TailLSN: 60, CommittedLSN: 100,
CheckpointLSN: 40, CheckpointTrusted: true,
}
// Initial catch-up attempt fails — gap beyond retention.
r.ApplyAssignment(AssignmentIntent{
Replicas: []ReplicaAssignment{
{ReplicaID: "r1", Endpoint: Endpoint{DataAddr: "10.0.0.1:9333", Version: 1}},
},
Epoch: 1,
RecoveryTargets: map[string]SessionKind{"r1": SessionCatchUp},
})
s := r.Sender("r1")
id := s.SessionID()
s.BeginConnect(id)
o, _, _ := s.RecordHandshakeFromHistory(id, 30, &primary)
if o != OutcomeNeedsRebuild {
t.Fatalf("should need rebuild: %s", o)
}
// Registry status shows NeedsRebuild.
status := r.Status()
if status.Rebuilding != 1 {
t.Fatalf("rebuilding=%d", status.Rebuilding)
}
// Rebuild assignment from coordinator.
r.ApplyAssignment(AssignmentIntent{
Replicas: []ReplicaAssignment{
{ReplicaID: "r1", Endpoint: Endpoint{DataAddr: "10.0.0.1:9333", Version: 1}},
},
Epoch: 1,
RecoveryTargets: map[string]SessionKind{"r1": SessionRebuild},
})
id2 := s.SessionID()
s.BeginConnect(id2)
s.RecordHandshake(id2, 0, 100)
// History-driven rebuild source: checkpoint at 40 but tail at 60 →
// CheckpointLSN (40) < TailLSN (60) → unreplayable → full base.
s.SelectRebuildFromHistory(id2, &primary)
s.BeginRebuildTransfer(id2)
s.RecordRebuildTransferProgress(id2, 100)
s.CompleteRebuild(id2)
if s.State() != StateInSync {
t.Fatalf("state=%s", s.State())
}
status = r.Status()
if status.InSync != 1 {
t.Fatalf("in_sync=%d", status.InSync)
}
}
// --- V2 Boundary 3: Epoch bump during recovery → new assignment ---
func TestIntegration_EpochBump_DuringRecovery(t *testing.T) {
r := NewRegistry()
primary := RetainedHistory{HeadLSN: 100, TailLSN: 0, CommittedLSN: 100}
r.ApplyAssignment(AssignmentIntent{
Replicas: []ReplicaAssignment{
{ReplicaID: "r1", Endpoint: Endpoint{DataAddr: "r1:9333", Version: 1}},
},
Epoch: 1,
RecoveryTargets: map[string]SessionKind{"r1": SessionCatchUp},
})
s := r.Sender("r1")
id := s.SessionID()
s.BeginConnect(id)
// Epoch bumps (failover) mid-recovery.
r.InvalidateEpoch(2)
s.UpdateEpoch(2)
// Old session dead.
if err := s.RecordHandshake(id, 0, 100); err == nil {
t.Fatal("old session should be rejected after epoch bump")
}
// New assignment at epoch 2.
r.ApplyAssignment(AssignmentIntent{
Replicas: []ReplicaAssignment{
{ReplicaID: "r1", Endpoint: Endpoint{DataAddr: "r1:9333", Version: 1}},
},
Epoch: 2,
RecoveryTargets: map[string]SessionKind{"r1": SessionCatchUp},
})
id2 := s.SessionID()
s.BeginConnect(id2)
o, _, _ := s.RecordHandshakeFromHistory(id2, 100, &primary)
if o != OutcomeZeroGap {
t.Fatalf("epoch 2: %s", o)
}
s.CompleteSessionByID(id2)
if s.State() != StateInSync {
t.Fatalf("state=%s", s.State())
}
}
// --- V2 Boundary 4: Multi-replica mixed outcomes ---
func TestIntegration_MultiReplica_MixedOutcomes(t *testing.T) {
r := NewRegistry()
primary := RetainedHistory{
HeadLSN: 100, TailLSN: 40, CommittedLSN: 100,
CheckpointLSN: 50, CheckpointTrusted: true,
}
r.ApplyAssignment(AssignmentIntent{
Replicas: []ReplicaAssignment{
{ReplicaID: "r1", Endpoint: Endpoint{DataAddr: "r1:9333", Version: 1}},
{ReplicaID: "r2", Endpoint: Endpoint{DataAddr: "r2:9333", Version: 1}},
{ReplicaID: "r3", Endpoint: Endpoint{DataAddr: "r3:9333", Version: 1}},
},
Epoch: 1,
RecoveryTargets: map[string]SessionKind{
"r1": SessionCatchUp,
"r2": SessionCatchUp,
"r3": SessionCatchUp,
},
})
// r1: zero-gap.
r1 := r.Sender("r1")
id1 := r1.SessionID()
r1.BeginConnect(id1)
o1, _, _ := r1.RecordHandshakeFromHistory(id1, 100, &primary)
if o1 != OutcomeZeroGap {
t.Fatalf("r1: %s", o1)
}
r1.CompleteSessionByID(id1)
// r2: catch-up.
r2 := r.Sender("r2")
id2 := r2.SessionID()
r2.BeginConnect(id2)
o2, p2, _ := r2.RecordHandshakeFromHistory(id2, 60, &primary)
if o2 != OutcomeCatchUp || !p2.Recoverable {
t.Fatalf("r2: outcome=%s proof=%v", o2, p2)
}
r2.BeginCatchUp(id2)
r2.RecordCatchUpProgress(id2, 100)
r2.CompleteSessionByID(id2)
// r3: needs rebuild.
r3 := r.Sender("r3")
id3 := r3.SessionID()
r3.BeginConnect(id3)
o3, p3, _ := r3.RecordHandshakeFromHistory(id3, 20, &primary)
if o3 != OutcomeNeedsRebuild || p3.Recoverable {
t.Fatalf("r3: outcome=%s proof=%v", o3, p3)
}
// Registry status.
status := r.Status()
if status.InSync != 2 {
t.Fatalf("in_sync=%d", status.InSync)
}
if status.Rebuilding != 1 {
t.Fatalf("rebuilding=%d", status.Rebuilding)
}
if status.TotalCount != 3 {
t.Fatalf("total=%d", status.TotalCount)
}
}
// --- Observability ---
func TestIntegration_RegistryStatus_Snapshot(t *testing.T) {
r := NewRegistry()
r.ApplyAssignment(AssignmentIntent{
Replicas: []ReplicaAssignment{
{ReplicaID: "r1", Endpoint: Endpoint{DataAddr: "r1:9333", Version: 1}},
{ReplicaID: "r2", Endpoint: Endpoint{DataAddr: "r2:9333", Version: 1}},
},
Epoch: 1,
})
status := r.Status()
if status.TotalCount != 2 {
t.Fatalf("total=%d", status.TotalCount)
}
if len(status.Senders) != 2 {
t.Fatalf("senders=%d", len(status.Senders))
}
// Both disconnected (no recovery started).
for _, ss := range status.Senders {
if ss.State != StateDisconnected {
t.Fatalf("%s: state=%s", ss.ReplicaID, ss.State)
}
if ss.Session != nil {
t.Fatalf("%s: should have no session", ss.ReplicaID)
}
}
}
func TestIntegration_RecoveryLog(t *testing.T) {
log := NewRecoveryLog()
log.Record("r1", 1, "connect", "initial")
log.Record("r1", 1, "handshake", "catch-up")
log.Record("r2", 2, "connect", "rebuild")
log.Record("r1", 1, "completed", "in_sync")
all := log.Events()
if len(all) != 4 {
t.Fatalf("events=%d", len(all))
}
r1Events := log.EventsFor("r1")
if len(r1Events) != 3 {
t.Fatalf("r1 events=%d", len(r1Events))
}
r2Events := log.EventsFor("r2")
if len(r2Events) != 1 {
t.Fatalf("r2 events=%d", len(r2Events))
}
}

96
sw-block/engine/replication/observe.go

@ -0,0 +1,96 @@
package replication
// SenderStatus is a read-only observability snapshot of one sender.
type SenderStatus struct {
ReplicaID string
Endpoint Endpoint
Epoch uint64
State ReplicaState
Stopped bool
Session *SessionSnapshot // nil if no session
}
// RegistryStatus is a read-only observability snapshot of the entire registry.
type RegistryStatus struct {
Senders []SenderStatus
TotalCount int
InSync int
Recovering int
Degraded int
Rebuilding int
}
// Status returns a full observability snapshot of the registry.
func (r *Registry) Status() RegistryStatus {
r.mu.RLock()
defer r.mu.RUnlock()
status := RegistryStatus{TotalCount: len(r.senders)}
for _, s := range r.All() {
ss := SenderStatus{
ReplicaID: s.ReplicaID(),
Endpoint: s.Endpoint(),
Epoch: s.Epoch(),
State: s.State(),
Stopped: s.Stopped(),
Session: s.SessionSnapshot(),
}
status.Senders = append(status.Senders, ss)
switch ss.State {
case StateInSync:
status.InSync++
case StateCatchingUp, StateConnecting:
status.Recovering++
case StateDegraded:
status.Degraded++
case StateNeedsRebuild:
status.Rebuilding++
}
}
return status
}
// RecoveryEvent records a significant recovery lifecycle event for debugging.
type RecoveryEvent struct {
ReplicaID string
SessionID uint64
Event string
Detail string
}
// RecoveryLog collects recovery events for observability.
type RecoveryLog struct {
events []RecoveryEvent
}
// NewRecoveryLog creates an empty recovery log.
func NewRecoveryLog() *RecoveryLog {
return &RecoveryLog{}
}
// Record adds an event to the log.
func (rl *RecoveryLog) Record(replicaID string, sessionID uint64, event, detail string) {
rl.events = append(rl.events, RecoveryEvent{
ReplicaID: replicaID,
SessionID: sessionID,
Event: event,
Detail: detail,
})
}
// Events returns all recorded events.
func (rl *RecoveryLog) Events() []RecoveryEvent {
return rl.events
}
// EventsFor returns events for a specific replica.
func (rl *RecoveryLog) EventsFor(replicaID string) []RecoveryEvent {
var out []RecoveryEvent
for _, e := range rl.events {
if e.ReplicaID == replicaID {
out = append(out, e)
}
}
return out
}
Loading…
Cancel
Save