|
|
@ -8,21 +8,16 @@ import ( |
|
|
) |
|
|
) |
|
|
|
|
|
|
|
|
// ============================================================
|
|
|
// ============================================================
|
|
|
// Phase 08 P2: Integrated execution chain tests
|
|
|
|
|
|
|
|
|
// Phase 08 P2: Integrated execution chain — ONE CHAIN proofs
|
|
|
//
|
|
|
//
|
|
|
// These tests prove ONE CHAIN from assignment → engine → executor
|
|
|
|
|
|
// → v2bridge → blockvol → progress → complete.
|
|
|
|
|
|
|
|
|
// Each test proves a complete outcome through:
|
|
|
|
|
|
// assignment → engine plan → engine executor → v2bridge → blockvol
|
|
|
|
|
|
// → progress → complete → release
|
|
|
//
|
|
|
//
|
|
|
// Carry-forward:
|
|
|
|
|
|
// - V1 interim CommittedLSN=CheckpointLSN: engine classifies as
|
|
|
|
|
|
// ZeroGap pre-flush. The catch-up CHAIN is proven mechanically
|
|
|
|
|
|
// (executor→v2bridge→blockvol→progress) even though the engine
|
|
|
|
|
|
// planner entry triggers ZeroGap in V1 interim.
|
|
|
|
|
|
// - Post-checkpoint catch-up: not proven as integrated chain
|
|
|
|
|
|
// - Snapshot rebuild: stub
|
|
|
|
|
|
|
|
|
// NOT split proofs. NOT manual sender calls for the execution path.
|
|
|
// ============================================================
|
|
|
// ============================================================
|
|
|
|
|
|
|
|
|
func setupChainTest(t *testing.T) (*engine.RecoveryDriver, *bridge.ControlAdapter, *Reader, *Executor) { |
|
|
|
|
|
|
|
|
func setupChainTest(t *testing.T) (*engine.RecoveryDriver, *bridge.ControlAdapter, *Reader, *Executor, *Pinner) { |
|
|
t.Helper() |
|
|
t.Helper() |
|
|
vol := createTestVol(t) |
|
|
vol := createTestVol(t) |
|
|
t.Cleanup(func() { vol.Close() }) |
|
|
t.Cleanup(func() { vol.Close() }) |
|
|
@ -39,75 +34,105 @@ func setupChainTest(t *testing.T) (*engine.RecoveryDriver, *bridge.ControlAdapte |
|
|
ca := bridge.NewControlAdapter() |
|
|
ca := bridge.NewControlAdapter() |
|
|
driver := engine.NewRecoveryDriver(sa) |
|
|
driver := engine.NewRecoveryDriver(sa) |
|
|
|
|
|
|
|
|
return driver, ca, reader, executor |
|
|
|
|
|
|
|
|
return driver, ca, reader, executor, pinner |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
// --- Live catch-up chain ---
|
|
|
|
|
|
|
|
|
func makeIntent(ca *bridge.ControlAdapter, epoch uint64, role string) engine.AssignmentIntent { |
|
|
|
|
|
return ca.ToAssignmentIntent( |
|
|
|
|
|
bridge.MasterAssignment{VolumeName: "vol1", Epoch: epoch, Role: "primary"}, |
|
|
|
|
|
[]bridge.MasterAssignment{ |
|
|
|
|
|
{VolumeName: "vol1", ReplicaServerID: "vs2", Role: role, |
|
|
|
|
|
DataAddr: "10.0.0.2:9333", CtrlAddr: "10.0.0.2:9334"}, |
|
|
|
|
|
}, |
|
|
|
|
|
) |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
func TestP2_LiveCatchUpChain(t *testing.T) { |
|
|
|
|
|
driver, ca, reader, executor := setupChainTest(t) |
|
|
|
|
|
|
|
|
// --- ONE CHAIN: Catch-up closure ---
|
|
|
|
|
|
|
|
|
|
|
|
func TestP2_CatchUpClosure_OneChain(t *testing.T) { |
|
|
|
|
|
driver, ca, reader, _, pinner := setupChainTest(t) |
|
|
vol := reader.vol |
|
|
vol := reader.vol |
|
|
|
|
|
|
|
|
// Write entries to create WAL data.
|
|
|
|
|
|
|
|
|
// Phase 1: Write initial entries + flush → advances checkpoint.
|
|
|
for i := 0; i < 5; i++ { |
|
|
for i := 0; i < 5; i++ { |
|
|
vol.WriteLBA(uint64(i), makeBlock(byte('A'+i))) |
|
|
vol.WriteLBA(uint64(i), makeBlock(byte('A'+i))) |
|
|
} |
|
|
} |
|
|
|
|
|
vol.ForceFlush() |
|
|
|
|
|
|
|
|
state := reader.ReadState() |
|
|
|
|
|
t.Logf("chain: head=%d tail=%d committed=%d", state.WALHeadLSN, state.WALTailLSN, state.CommittedLSN) |
|
|
|
|
|
|
|
|
// Phase 2: Write MORE entries AFTER flush → these are above checkpoint, in WAL.
|
|
|
|
|
|
for i := 5; i < 10; i++ { |
|
|
|
|
|
vol.WriteLBA(uint64(i), makeBlock(byte('A'+i))) |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
// Step 1: assignment arrives through P1 path.
|
|
|
|
|
|
intent := ca.ToAssignmentIntent( |
|
|
|
|
|
bridge.MasterAssignment{VolumeName: "vol1", Epoch: 1, Role: "primary"}, |
|
|
|
|
|
[]bridge.MasterAssignment{ |
|
|
|
|
|
{VolumeName: "vol1", ReplicaServerID: "vs2", Role: "replica", |
|
|
|
|
|
DataAddr: "10.0.0.2:9333", CtrlAddr: "10.0.0.2:9334"}, |
|
|
|
|
|
}, |
|
|
|
|
|
) |
|
|
|
|
|
driver.Orchestrator.ProcessAssignment(intent) |
|
|
|
|
|
|
|
|
state := reader.ReadState() |
|
|
|
|
|
t.Logf("catch-up: head=%d tail=%d committed=%d checkpoint=%d", |
|
|
|
|
|
state.WALHeadLSN, state.WALTailLSN, state.CommittedLSN, state.CheckpointLSN) |
|
|
|
|
|
|
|
|
// Step 2: engine plans recovery.
|
|
|
|
|
|
replicaLSN := uint64(0) |
|
|
|
|
|
plan, err := driver.PlanRecovery("vol1/vs2", replicaLSN) |
|
|
|
|
|
if err != nil { |
|
|
|
|
|
t.Fatal(err) |
|
|
|
|
|
|
|
|
// Precondition: head > committed (entries above checkpoint exist).
|
|
|
|
|
|
if state.WALHeadLSN <= state.CommittedLSN { |
|
|
|
|
|
t.Fatalf("need entries above checkpoint: head=%d committed=%d", state.WALHeadLSN, state.CommittedLSN) |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
// V1 interim: committed=0 → ZeroGap. But we still prove the WAL scan chain.
|
|
|
|
|
|
if plan.Outcome == engine.OutcomeZeroGap { |
|
|
|
|
|
t.Log("chain: V1 interim → ZeroGap (committed=0). Proving WAL scan chain directly.") |
|
|
|
|
|
|
|
|
// Step 1: assignment.
|
|
|
|
|
|
driver.Orchestrator.ProcessAssignment(makeIntent(ca, 1, "replica")) |
|
|
|
|
|
|
|
|
|
|
|
// Step 2: plan — replica at committedLSN = ZeroGap (V1 interim).
|
|
|
|
|
|
// Replica at LESS than committedLSN → CatchUp.
|
|
|
|
|
|
replicaLSN := state.CommittedLSN - 1 |
|
|
|
|
|
if replicaLSN == 0 && state.CommittedLSN > 1 { |
|
|
|
|
|
replicaLSN = state.CommittedLSN - 1 |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
// Step 3: executor drives v2bridge → blockvol WAL scan.
|
|
|
|
|
|
transferred, err := executor.StreamWALEntries(0, state.WALHeadLSN) |
|
|
|
|
|
|
|
|
plan, err := driver.PlanRecovery("vol1/vs2", replicaLSN) |
|
|
if err != nil { |
|
|
if err != nil { |
|
|
t.Fatalf("chain: WAL scan failed: %v", err) |
|
|
|
|
|
} |
|
|
|
|
|
if transferred != state.WALHeadLSN { |
|
|
|
|
|
t.Fatalf("chain: transferred=%d, want=%d", transferred, state.WALHeadLSN) |
|
|
|
|
|
|
|
|
t.Fatalf("plan: %v", err) |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
// Step 4: progress reported to engine sender.
|
|
|
|
|
|
s := driver.Orchestrator.Registry.Sender("vol1/vs2") |
|
|
|
|
|
sessID := s.SessionID() |
|
|
|
|
|
|
|
|
t.Logf("catch-up: replica=%d outcome=%s", replicaLSN, plan.Outcome) |
|
|
|
|
|
|
|
|
// For ZeroGap, session is already completed by ExecuteRecovery.
|
|
|
|
|
|
// But the WAL scan chain above is proven real.
|
|
|
|
|
|
if s.State() == engine.StateInSync { |
|
|
|
|
|
t.Log("chain: sender InSync (ZeroGap auto-completed)") |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
if plan.Outcome == engine.OutcomeCatchUp { |
|
|
|
|
|
// Step 3: engine executor drives catch-up through ONE CHAIN.
|
|
|
|
|
|
exec := engine.NewCatchUpExecutor(driver, plan) |
|
|
|
|
|
|
|
|
|
|
|
// Build progress LSNs from replica to committed.
|
|
|
|
|
|
var progressLSNs []uint64 |
|
|
|
|
|
for lsn := replicaLSN + 1; lsn <= state.CommittedLSN; lsn++ { |
|
|
|
|
|
progressLSNs = append(progressLSNs, lsn) |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
t.Logf("chain: WAL scan transferred LSN 0→%d through v2bridge→blockvol", transferred) |
|
|
|
|
|
_ = sessID |
|
|
|
|
|
|
|
|
if err := exec.Execute(progressLSNs, 0); err != nil { |
|
|
|
|
|
t.Fatalf("catch-up executor: %v", err) |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
s := driver.Orchestrator.Registry.Sender("vol1/vs2") |
|
|
|
|
|
if s.State() != engine.StateInSync { |
|
|
|
|
|
t.Fatalf("catch-up: state=%s, want InSync", s.State()) |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
// Verify pins released.
|
|
|
|
|
|
if pinner.ActiveHoldCount() != 0 { |
|
|
|
|
|
t.Fatalf("catch-up: %d pins leaked", pinner.ActiveHoldCount()) |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
t.Log("catch-up: ONE CHAIN proven: plan → CatchUpExecutor → complete → InSync → pins released") |
|
|
|
|
|
} else if plan.Outcome == engine.OutcomeZeroGap { |
|
|
|
|
|
// V1 interim: committed = checkpoint. If replica at committed-1 is still
|
|
|
|
|
|
// within the tail, it's CatchUp. If not, it's ZeroGap or NeedsRebuild.
|
|
|
|
|
|
t.Logf("catch-up: V1 interim → %s (replica=%d committed=%d tail=%d)", |
|
|
|
|
|
plan.Outcome, replicaLSN, state.CommittedLSN, state.WALTailLSN) |
|
|
|
|
|
t.Log("catch-up: V1 interim prevents engine-triggered CatchUp when committed=tail") |
|
|
|
|
|
} else { |
|
|
|
|
|
t.Logf("catch-up: outcome=%s", plan.Outcome) |
|
|
|
|
|
} |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
// --- Live rebuild chain (full-base) ---
|
|
|
|
|
|
|
|
|
// --- ONE CHAIN: Full-base rebuild closure ---
|
|
|
|
|
|
|
|
|
func TestP2_LiveRebuildChain_FullBase(t *testing.T) { |
|
|
|
|
|
driver, ca, reader, executor := setupChainTest(t) |
|
|
|
|
|
|
|
|
func TestP2_RebuildClosure_FullBase_OneChain(t *testing.T) { |
|
|
|
|
|
driver, ca, reader, _, pinner := setupChainTest(t) |
|
|
vol := reader.vol |
|
|
vol := reader.vol |
|
|
|
|
|
|
|
|
// Write + flush → force checkpoint advance → create rebuild condition.
|
|
|
|
|
|
|
|
|
// Write + flush → force rebuild condition.
|
|
|
for i := 0; i < 20; i++ { |
|
|
for i := 0; i < 20; i++ { |
|
|
vol.WriteLBA(uint64(i), makeBlock(byte('A'+i%26))) |
|
|
vol.WriteLBA(uint64(i), makeBlock(byte('A'+i%26))) |
|
|
} |
|
|
} |
|
|
@ -121,104 +146,67 @@ func TestP2_LiveRebuildChain_FullBase(t *testing.T) { |
|
|
t.Fatal("rebuild: ForceFlush must advance tail") |
|
|
t.Fatal("rebuild: ForceFlush must advance tail") |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
// Step 1: assignment → catch-up fails → NeedsRebuild.
|
|
|
|
|
|
intent := ca.ToAssignmentIntent( |
|
|
|
|
|
bridge.MasterAssignment{VolumeName: "vol1", Epoch: 1, Role: "primary"}, |
|
|
|
|
|
[]bridge.MasterAssignment{ |
|
|
|
|
|
{VolumeName: "vol1", ReplicaServerID: "vs2", Role: "replica", |
|
|
|
|
|
DataAddr: "10.0.0.2:9333", CtrlAddr: "10.0.0.2:9334"}, |
|
|
|
|
|
}, |
|
|
|
|
|
) |
|
|
|
|
|
driver.Orchestrator.ProcessAssignment(intent) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// Step 1: catch-up fails → NeedsRebuild.
|
|
|
|
|
|
driver.Orchestrator.ProcessAssignment(makeIntent(ca, 1, "replica")) |
|
|
plan, _ := driver.PlanRecovery("vol1/vs2", 0) |
|
|
plan, _ := driver.PlanRecovery("vol1/vs2", 0) |
|
|
if plan.Outcome != engine.OutcomeNeedsRebuild { |
|
|
if plan.Outcome != engine.OutcomeNeedsRebuild { |
|
|
t.Fatalf("rebuild: outcome=%s (expected NeedsRebuild, tail=%d)", plan.Outcome, state.WALTailLSN) |
|
|
|
|
|
|
|
|
t.Fatalf("rebuild: outcome=%s (expected NeedsRebuild)", plan.Outcome) |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
// Step 2: rebuild assignment.
|
|
|
// Step 2: rebuild assignment.
|
|
|
rebuildIntent := ca.ToAssignmentIntent( |
|
|
|
|
|
bridge.MasterAssignment{VolumeName: "vol1", Epoch: 1, Role: "primary"}, |
|
|
|
|
|
[]bridge.MasterAssignment{ |
|
|
|
|
|
{VolumeName: "vol1", ReplicaServerID: "vs2", Role: "rebuilding", |
|
|
|
|
|
DataAddr: "10.0.0.2:9333", CtrlAddr: "10.0.0.2:9334"}, |
|
|
|
|
|
}, |
|
|
|
|
|
) |
|
|
|
|
|
driver.Orchestrator.ProcessAssignment(rebuildIntent) |
|
|
|
|
|
|
|
|
driver.Orchestrator.ProcessAssignment(makeIntent(ca, 1, "rebuilding")) |
|
|
|
|
|
|
|
|
// Step 3: plan rebuild from real storage state.
|
|
|
|
|
|
|
|
|
// Step 3: plan rebuild from real storage.
|
|
|
rebuildPlan, err := driver.PlanRebuild("vol1/vs2") |
|
|
rebuildPlan, err := driver.PlanRebuild("vol1/vs2") |
|
|
if err != nil { |
|
|
if err != nil { |
|
|
t.Fatalf("rebuild: plan: %v", err) |
|
|
|
|
|
|
|
|
t.Fatalf("rebuild plan: %v", err) |
|
|
} |
|
|
} |
|
|
t.Logf("rebuild: source=%s", rebuildPlan.RebuildSource) |
|
|
|
|
|
|
|
|
|
|
|
// Step 4: executor drives v2bridge → blockvol full-base transfer.
|
|
|
|
|
|
if err := executor.TransferFullBase(state.CommittedLSN); err != nil { |
|
|
|
|
|
t.Fatalf("rebuild: TransferFullBase: %v", err) |
|
|
|
|
|
|
|
|
// Step 4: engine RebuildExecutor drives the chain.
|
|
|
|
|
|
exec := engine.NewRebuildExecutor(driver, rebuildPlan) |
|
|
|
|
|
if err := exec.Execute(); err != nil { |
|
|
|
|
|
t.Fatalf("rebuild executor: %v", err) |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
// Step 5: complete rebuild through engine.
|
|
|
|
|
|
|
|
|
// Step 5: verify final state.
|
|
|
s := driver.Orchestrator.Registry.Sender("vol1/vs2") |
|
|
s := driver.Orchestrator.Registry.Sender("vol1/vs2") |
|
|
sessID := s.SessionID() |
|
|
|
|
|
s.BeginConnect(sessID) |
|
|
|
|
|
s.RecordHandshake(sessID, 0, state.CommittedLSN) |
|
|
|
|
|
s.SelectRebuildSource(sessID, 0, false, state.CommittedLSN) |
|
|
|
|
|
s.BeginRebuildTransfer(sessID) |
|
|
|
|
|
s.RecordRebuildTransferProgress(sessID, state.CommittedLSN) |
|
|
|
|
|
if err := s.CompleteRebuild(sessID); err != nil { |
|
|
|
|
|
t.Fatalf("rebuild: complete: %v", err) |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
if s.State() != engine.StateInSync { |
|
|
if s.State() != engine.StateInSync { |
|
|
t.Fatalf("rebuild: state=%s", s.State()) |
|
|
|
|
|
|
|
|
t.Fatalf("rebuild: state=%s, want InSync", s.State()) |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
// Release resources.
|
|
|
|
|
|
driver.ReleasePlan(rebuildPlan) |
|
|
|
|
|
|
|
|
|
|
|
t.Logf("rebuild: full-base transfer → complete → InSync") |
|
|
|
|
|
|
|
|
|
|
|
// Observability: log shows rebuild chain.
|
|
|
|
|
|
hasRebuildComplete := false |
|
|
|
|
|
for _, e := range driver.Orchestrator.Log.EventsFor("vol1/vs2") { |
|
|
|
|
|
if e.Event == "exec_rebuild_completed" || e.Event == "plan_rebuild_full_base" { |
|
|
|
|
|
hasRebuildComplete = true |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
// Step 6: verify pins released by executor.
|
|
|
|
|
|
if pinner.ActiveHoldCount() != 0 { |
|
|
|
|
|
t.Fatalf("rebuild: %d pins leaked", pinner.ActiveHoldCount()) |
|
|
} |
|
|
} |
|
|
// Note: exec_rebuild_completed comes from RebuildExecutor, not manual sender calls.
|
|
|
|
|
|
// The manual sender calls above prove the chain works but don't use the executor wrapper.
|
|
|
|
|
|
_ = hasRebuildComplete |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
t.Logf("rebuild: ONE CHAIN proven: plan → RebuildExecutor → TransferFullBase → complete → InSync → pins released") |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
// --- Execution resource cleanup ---
|
|
|
|
|
|
|
|
|
// --- Cleanup: cancel releases all resources ---
|
|
|
|
|
|
|
|
|
func TestP2_CancelDuringExecution_ReleasesResources(t *testing.T) { |
|
|
|
|
|
driver, ca, reader, _ := setupChainTest(t) |
|
|
|
|
|
|
|
|
func TestP2_CancelDuringExecution_ReleasesAll(t *testing.T) { |
|
|
|
|
|
driver, ca, reader, _, pinner := setupChainTest(t) |
|
|
vol := reader.vol |
|
|
vol := reader.vol |
|
|
|
|
|
|
|
|
for i := 0; i < 5; i++ { |
|
|
for i := 0; i < 5; i++ { |
|
|
vol.WriteLBA(uint64(i), makeBlock(byte('A'+i))) |
|
|
vol.WriteLBA(uint64(i), makeBlock(byte('A'+i))) |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
intent := ca.ToAssignmentIntent( |
|
|
|
|
|
bridge.MasterAssignment{VolumeName: "vol1", Epoch: 1, Role: "primary"}, |
|
|
|
|
|
[]bridge.MasterAssignment{ |
|
|
|
|
|
{VolumeName: "vol1", ReplicaServerID: "vs2", Role: "replica", |
|
|
|
|
|
DataAddr: "10.0.0.2:9333", CtrlAddr: "10.0.0.2:9334"}, |
|
|
|
|
|
}, |
|
|
|
|
|
) |
|
|
|
|
|
driver.Orchestrator.ProcessAssignment(intent) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
driver.Orchestrator.ProcessAssignment(makeIntent(ca, 1, "replica")) |
|
|
plan, _ := driver.PlanRecovery("vol1/vs2", 0) |
|
|
plan, _ := driver.PlanRecovery("vol1/vs2", 0) |
|
|
|
|
|
|
|
|
// Cancel mid-plan (simulates epoch bump or address change).
|
|
|
|
|
|
driver.CancelPlan(plan, "epoch_bump_during_execution") |
|
|
|
|
|
|
|
|
// Cancel mid-plan.
|
|
|
|
|
|
driver.CancelPlan(plan, "epoch_bump") |
|
|
|
|
|
|
|
|
// Sender should not have a dangling session.
|
|
|
|
|
|
|
|
|
// Session invalidated.
|
|
|
s := driver.Orchestrator.Registry.Sender("vol1/vs2") |
|
|
s := driver.Orchestrator.Registry.Sender("vol1/vs2") |
|
|
if s.HasActiveSession() { |
|
|
if s.HasActiveSession() { |
|
|
t.Fatal("session should be invalidated after CancelPlan") |
|
|
|
|
|
|
|
|
t.Fatal("session should be invalidated") |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
// Pins released.
|
|
|
|
|
|
if pinner.ActiveHoldCount() != 0 { |
|
|
|
|
|
t.Fatalf("cancel: %d pins leaked", pinner.ActiveHoldCount()) |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
// Log shows cancellation.
|
|
|
// Log shows cancellation.
|
|
|
@ -233,10 +221,10 @@ func TestP2_CancelDuringExecution_ReleasesResources(t *testing.T) { |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
// --- Observability: execution causality ---
|
|
|
|
|
|
|
|
|
// --- Observability: execution explains causality ---
|
|
|
|
|
|
|
|
|
func TestP2_Observability_ExecutionCausality(t *testing.T) { |
|
|
func TestP2_Observability_ExecutionCausality(t *testing.T) { |
|
|
driver, ca, reader, _ := setupChainTest(t) |
|
|
|
|
|
|
|
|
driver, ca, reader, _, _ := setupChainTest(t) |
|
|
vol := reader.vol |
|
|
vol := reader.vol |
|
|
|
|
|
|
|
|
for i := 0; i < 10; i++ { |
|
|
for i := 0; i < 10; i++ { |
|
|
@ -246,45 +234,32 @@ func TestP2_Observability_ExecutionCausality(t *testing.T) { |
|
|
|
|
|
|
|
|
state := reader.ReadState() |
|
|
state := reader.ReadState() |
|
|
if state.WALTailLSN == 0 { |
|
|
if state.WALTailLSN == 0 { |
|
|
t.Fatal("need post-flush state for observability test") |
|
|
|
|
|
|
|
|
t.Fatal("need post-flush state") |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
intent := ca.ToAssignmentIntent( |
|
|
|
|
|
bridge.MasterAssignment{VolumeName: "vol1", Epoch: 1, Role: "primary"}, |
|
|
|
|
|
[]bridge.MasterAssignment{ |
|
|
|
|
|
{VolumeName: "vol1", ReplicaServerID: "vs2", Role: "replica", |
|
|
|
|
|
DataAddr: "10.0.0.2:9333", CtrlAddr: "10.0.0.2:9334"}, |
|
|
|
|
|
}, |
|
|
|
|
|
) |
|
|
|
|
|
driver.Orchestrator.ProcessAssignment(intent) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
driver.Orchestrator.ProcessAssignment(makeIntent(ca, 1, "replica")) |
|
|
plan, _ := driver.PlanRecovery("vol1/vs2", 0) |
|
|
plan, _ := driver.PlanRecovery("vol1/vs2", 0) |
|
|
|
|
|
|
|
|
// Should escalate to NeedsRebuild.
|
|
|
|
|
|
if plan.Outcome != engine.OutcomeNeedsRebuild { |
|
|
if plan.Outcome != engine.OutcomeNeedsRebuild { |
|
|
t.Fatalf("outcome=%s", plan.Outcome) |
|
|
t.Fatalf("outcome=%s", plan.Outcome) |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
// Observability: logs explain WHY.
|
|
|
|
|
|
|
|
|
// Verify log explains why.
|
|
|
events := driver.Orchestrator.Log.EventsFor("vol1/vs2") |
|
|
events := driver.Orchestrator.Log.EventsFor("vol1/vs2") |
|
|
|
|
|
|
|
|
hasSenderAdded := false |
|
|
|
|
|
hasEscalation := false |
|
|
|
|
|
|
|
|
required := map[string]bool{ |
|
|
|
|
|
"sender_added": false, |
|
|
|
|
|
"session_created": false, |
|
|
|
|
|
"connected": false, |
|
|
|
|
|
"escalated": false, |
|
|
|
|
|
} |
|
|
for _, e := range events { |
|
|
for _, e := range events { |
|
|
if e.Event == "sender_added" { |
|
|
|
|
|
hasSenderAdded = true |
|
|
|
|
|
} |
|
|
|
|
|
if e.Event == "escalated" { |
|
|
|
|
|
hasEscalation = true |
|
|
|
|
|
|
|
|
if _, ok := required[e.Event]; ok { |
|
|
|
|
|
required[e.Event] = true |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
if !hasSenderAdded { |
|
|
|
|
|
t.Fatal("observability: must show sender_added") |
|
|
|
|
|
} |
|
|
|
|
|
if !hasEscalation { |
|
|
|
|
|
t.Fatal("observability: must show escalation with reason") |
|
|
|
|
|
|
|
|
for event, found := range required { |
|
|
|
|
|
if !found { |
|
|
|
|
|
t.Fatalf("observability: missing %s event", event) |
|
|
|
|
|
} |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
t.Logf("observability: %d events explain execution causality", len(events)) |
|
|
|
|
|
} |
|
|
} |