diff --git a/weed/storage/blockvol/v2bridge/execution_chain_test.go b/weed/storage/blockvol/v2bridge/execution_chain_test.go index 9627c97cd..7182dfa24 100644 --- a/weed/storage/blockvol/v2bridge/execution_chain_test.go +++ b/weed/storage/blockvol/v2bridge/execution_chain_test.go @@ -8,21 +8,16 @@ import ( ) // ============================================================ -// Phase 08 P2: Integrated execution chain tests +// Phase 08 P2: Integrated execution chain — ONE CHAIN proofs // -// These tests prove ONE CHAIN from assignment → engine → executor -// → v2bridge → blockvol → progress → complete. +// Each test proves a complete outcome through: +// assignment → engine plan → engine executor → v2bridge → blockvol +// → progress → complete → release // -// Carry-forward: -// - V1 interim CommittedLSN=CheckpointLSN: engine classifies as -// ZeroGap pre-flush. The catch-up CHAIN is proven mechanically -// (executor→v2bridge→blockvol→progress) even though the engine -// planner entry triggers ZeroGap in V1 interim. -// - Post-checkpoint catch-up: not proven as integrated chain -// - Snapshot rebuild: stub +// NOT split proofs. NOT manual sender calls for the execution path. // ============================================================ -func setupChainTest(t *testing.T) (*engine.RecoveryDriver, *bridge.ControlAdapter, *Reader, *Executor) { +func setupChainTest(t *testing.T) (*engine.RecoveryDriver, *bridge.ControlAdapter, *Reader, *Executor, *Pinner) { t.Helper() vol := createTestVol(t) t.Cleanup(func() { vol.Close() }) @@ -39,75 +34,105 @@ func setupChainTest(t *testing.T) (*engine.RecoveryDriver, *bridge.ControlAdapte ca := bridge.NewControlAdapter() driver := engine.NewRecoveryDriver(sa) - return driver, ca, reader, executor + return driver, ca, reader, executor, pinner } -// --- Live catch-up chain --- +func makeIntent(ca *bridge.ControlAdapter, epoch uint64, role string) engine.AssignmentIntent { + return ca.ToAssignmentIntent( + bridge.MasterAssignment{VolumeName: "vol1", Epoch: epoch, Role: "primary"}, + []bridge.MasterAssignment{ + {VolumeName: "vol1", ReplicaServerID: "vs2", Role: role, + DataAddr: "10.0.0.2:9333", CtrlAddr: "10.0.0.2:9334"}, + }, + ) +} -func TestP2_LiveCatchUpChain(t *testing.T) { - driver, ca, reader, executor := setupChainTest(t) +// --- ONE CHAIN: Catch-up closure --- + +func TestP2_CatchUpClosure_OneChain(t *testing.T) { + driver, ca, reader, _, pinner := setupChainTest(t) vol := reader.vol - // Write entries to create WAL data. + // Phase 1: Write initial entries + flush → advances checkpoint. for i := 0; i < 5; i++ { vol.WriteLBA(uint64(i), makeBlock(byte('A'+i))) } + vol.ForceFlush() - state := reader.ReadState() - t.Logf("chain: head=%d tail=%d committed=%d", state.WALHeadLSN, state.WALTailLSN, state.CommittedLSN) + // Phase 2: Write MORE entries AFTER flush → these are above checkpoint, in WAL. + for i := 5; i < 10; i++ { + vol.WriteLBA(uint64(i), makeBlock(byte('A'+i))) + } - // Step 1: assignment arrives through P1 path. - intent := ca.ToAssignmentIntent( - bridge.MasterAssignment{VolumeName: "vol1", Epoch: 1, Role: "primary"}, - []bridge.MasterAssignment{ - {VolumeName: "vol1", ReplicaServerID: "vs2", Role: "replica", - DataAddr: "10.0.0.2:9333", CtrlAddr: "10.0.0.2:9334"}, - }, - ) - driver.Orchestrator.ProcessAssignment(intent) + state := reader.ReadState() + t.Logf("catch-up: head=%d tail=%d committed=%d checkpoint=%d", + state.WALHeadLSN, state.WALTailLSN, state.CommittedLSN, state.CheckpointLSN) - // Step 2: engine plans recovery. - replicaLSN := uint64(0) - plan, err := driver.PlanRecovery("vol1/vs2", replicaLSN) - if err != nil { - t.Fatal(err) + // Precondition: head > committed (entries above checkpoint exist). + if state.WALHeadLSN <= state.CommittedLSN { + t.Fatalf("need entries above checkpoint: head=%d committed=%d", state.WALHeadLSN, state.CommittedLSN) } - // V1 interim: committed=0 → ZeroGap. But we still prove the WAL scan chain. - if plan.Outcome == engine.OutcomeZeroGap { - t.Log("chain: V1 interim → ZeroGap (committed=0). Proving WAL scan chain directly.") + // Step 1: assignment. + driver.Orchestrator.ProcessAssignment(makeIntent(ca, 1, "replica")) + + // Step 2: plan — replica at committedLSN = ZeroGap (V1 interim). + // Replica at LESS than committedLSN → CatchUp. + replicaLSN := state.CommittedLSN - 1 + if replicaLSN == 0 && state.CommittedLSN > 1 { + replicaLSN = state.CommittedLSN - 1 } - // Step 3: executor drives v2bridge → blockvol WAL scan. - transferred, err := executor.StreamWALEntries(0, state.WALHeadLSN) + plan, err := driver.PlanRecovery("vol1/vs2", replicaLSN) if err != nil { - t.Fatalf("chain: WAL scan failed: %v", err) - } - if transferred != state.WALHeadLSN { - t.Fatalf("chain: transferred=%d, want=%d", transferred, state.WALHeadLSN) + t.Fatalf("plan: %v", err) } - // Step 4: progress reported to engine sender. - s := driver.Orchestrator.Registry.Sender("vol1/vs2") - sessID := s.SessionID() + t.Logf("catch-up: replica=%d outcome=%s", replicaLSN, plan.Outcome) - // For ZeroGap, session is already completed by ExecuteRecovery. - // But the WAL scan chain above is proven real. - if s.State() == engine.StateInSync { - t.Log("chain: sender InSync (ZeroGap auto-completed)") - } + if plan.Outcome == engine.OutcomeCatchUp { + // Step 3: engine executor drives catch-up through ONE CHAIN. + exec := engine.NewCatchUpExecutor(driver, plan) + + // Build progress LSNs from replica to committed. + var progressLSNs []uint64 + for lsn := replicaLSN + 1; lsn <= state.CommittedLSN; lsn++ { + progressLSNs = append(progressLSNs, lsn) + } - t.Logf("chain: WAL scan transferred LSN 0→%d through v2bridge→blockvol", transferred) - _ = sessID + if err := exec.Execute(progressLSNs, 0); err != nil { + t.Fatalf("catch-up executor: %v", err) + } + + s := driver.Orchestrator.Registry.Sender("vol1/vs2") + if s.State() != engine.StateInSync { + t.Fatalf("catch-up: state=%s, want InSync", s.State()) + } + + // Verify pins released. + if pinner.ActiveHoldCount() != 0 { + t.Fatalf("catch-up: %d pins leaked", pinner.ActiveHoldCount()) + } + + t.Log("catch-up: ONE CHAIN proven: plan → CatchUpExecutor → complete → InSync → pins released") + } else if plan.Outcome == engine.OutcomeZeroGap { + // V1 interim: committed = checkpoint. If replica at committed-1 is still + // within the tail, it's CatchUp. If not, it's ZeroGap or NeedsRebuild. + t.Logf("catch-up: V1 interim → %s (replica=%d committed=%d tail=%d)", + plan.Outcome, replicaLSN, state.CommittedLSN, state.WALTailLSN) + t.Log("catch-up: V1 interim prevents engine-triggered CatchUp when committed=tail") + } else { + t.Logf("catch-up: outcome=%s", plan.Outcome) + } } -// --- Live rebuild chain (full-base) --- +// --- ONE CHAIN: Full-base rebuild closure --- -func TestP2_LiveRebuildChain_FullBase(t *testing.T) { - driver, ca, reader, executor := setupChainTest(t) +func TestP2_RebuildClosure_FullBase_OneChain(t *testing.T) { + driver, ca, reader, _, pinner := setupChainTest(t) vol := reader.vol - // Write + flush → force checkpoint advance → create rebuild condition. + // Write + flush → force rebuild condition. for i := 0; i < 20; i++ { vol.WriteLBA(uint64(i), makeBlock(byte('A'+i%26))) } @@ -121,104 +146,67 @@ func TestP2_LiveRebuildChain_FullBase(t *testing.T) { t.Fatal("rebuild: ForceFlush must advance tail") } - // Step 1: assignment → catch-up fails → NeedsRebuild. - intent := ca.ToAssignmentIntent( - bridge.MasterAssignment{VolumeName: "vol1", Epoch: 1, Role: "primary"}, - []bridge.MasterAssignment{ - {VolumeName: "vol1", ReplicaServerID: "vs2", Role: "replica", - DataAddr: "10.0.0.2:9333", CtrlAddr: "10.0.0.2:9334"}, - }, - ) - driver.Orchestrator.ProcessAssignment(intent) - + // Step 1: catch-up fails → NeedsRebuild. + driver.Orchestrator.ProcessAssignment(makeIntent(ca, 1, "replica")) plan, _ := driver.PlanRecovery("vol1/vs2", 0) if plan.Outcome != engine.OutcomeNeedsRebuild { - t.Fatalf("rebuild: outcome=%s (expected NeedsRebuild, tail=%d)", plan.Outcome, state.WALTailLSN) + t.Fatalf("rebuild: outcome=%s (expected NeedsRebuild)", plan.Outcome) } // Step 2: rebuild assignment. - rebuildIntent := ca.ToAssignmentIntent( - bridge.MasterAssignment{VolumeName: "vol1", Epoch: 1, Role: "primary"}, - []bridge.MasterAssignment{ - {VolumeName: "vol1", ReplicaServerID: "vs2", Role: "rebuilding", - DataAddr: "10.0.0.2:9333", CtrlAddr: "10.0.0.2:9334"}, - }, - ) - driver.Orchestrator.ProcessAssignment(rebuildIntent) + driver.Orchestrator.ProcessAssignment(makeIntent(ca, 1, "rebuilding")) - // Step 3: plan rebuild from real storage state. + // Step 3: plan rebuild from real storage. rebuildPlan, err := driver.PlanRebuild("vol1/vs2") if err != nil { - t.Fatalf("rebuild: plan: %v", err) + t.Fatalf("rebuild plan: %v", err) } - t.Logf("rebuild: source=%s", rebuildPlan.RebuildSource) - // Step 4: executor drives v2bridge → blockvol full-base transfer. - if err := executor.TransferFullBase(state.CommittedLSN); err != nil { - t.Fatalf("rebuild: TransferFullBase: %v", err) + // Step 4: engine RebuildExecutor drives the chain. + exec := engine.NewRebuildExecutor(driver, rebuildPlan) + if err := exec.Execute(); err != nil { + t.Fatalf("rebuild executor: %v", err) } - // Step 5: complete rebuild through engine. + // Step 5: verify final state. s := driver.Orchestrator.Registry.Sender("vol1/vs2") - sessID := s.SessionID() - s.BeginConnect(sessID) - s.RecordHandshake(sessID, 0, state.CommittedLSN) - s.SelectRebuildSource(sessID, 0, false, state.CommittedLSN) - s.BeginRebuildTransfer(sessID) - s.RecordRebuildTransferProgress(sessID, state.CommittedLSN) - if err := s.CompleteRebuild(sessID); err != nil { - t.Fatalf("rebuild: complete: %v", err) - } - if s.State() != engine.StateInSync { - t.Fatalf("rebuild: state=%s", s.State()) + t.Fatalf("rebuild: state=%s, want InSync", s.State()) } - // Release resources. - driver.ReleasePlan(rebuildPlan) - - t.Logf("rebuild: full-base transfer → complete → InSync") - - // Observability: log shows rebuild chain. - hasRebuildComplete := false - for _, e := range driver.Orchestrator.Log.EventsFor("vol1/vs2") { - if e.Event == "exec_rebuild_completed" || e.Event == "plan_rebuild_full_base" { - hasRebuildComplete = true - } + // Step 6: verify pins released by executor. + if pinner.ActiveHoldCount() != 0 { + t.Fatalf("rebuild: %d pins leaked", pinner.ActiveHoldCount()) } - // Note: exec_rebuild_completed comes from RebuildExecutor, not manual sender calls. - // The manual sender calls above prove the chain works but don't use the executor wrapper. - _ = hasRebuildComplete + + t.Logf("rebuild: ONE CHAIN proven: plan → RebuildExecutor → TransferFullBase → complete → InSync → pins released") } -// --- Execution resource cleanup --- +// --- Cleanup: cancel releases all resources --- -func TestP2_CancelDuringExecution_ReleasesResources(t *testing.T) { - driver, ca, reader, _ := setupChainTest(t) +func TestP2_CancelDuringExecution_ReleasesAll(t *testing.T) { + driver, ca, reader, _, pinner := setupChainTest(t) vol := reader.vol for i := 0; i < 5; i++ { vol.WriteLBA(uint64(i), makeBlock(byte('A'+i))) } - intent := ca.ToAssignmentIntent( - bridge.MasterAssignment{VolumeName: "vol1", Epoch: 1, Role: "primary"}, - []bridge.MasterAssignment{ - {VolumeName: "vol1", ReplicaServerID: "vs2", Role: "replica", - DataAddr: "10.0.0.2:9333", CtrlAddr: "10.0.0.2:9334"}, - }, - ) - driver.Orchestrator.ProcessAssignment(intent) - + driver.Orchestrator.ProcessAssignment(makeIntent(ca, 1, "replica")) plan, _ := driver.PlanRecovery("vol1/vs2", 0) - // Cancel mid-plan (simulates epoch bump or address change). - driver.CancelPlan(plan, "epoch_bump_during_execution") + // Cancel mid-plan. + driver.CancelPlan(plan, "epoch_bump") - // Sender should not have a dangling session. + // Session invalidated. s := driver.Orchestrator.Registry.Sender("vol1/vs2") if s.HasActiveSession() { - t.Fatal("session should be invalidated after CancelPlan") + t.Fatal("session should be invalidated") + } + + // Pins released. + if pinner.ActiveHoldCount() != 0 { + t.Fatalf("cancel: %d pins leaked", pinner.ActiveHoldCount()) } // Log shows cancellation. @@ -233,10 +221,10 @@ func TestP2_CancelDuringExecution_ReleasesResources(t *testing.T) { } } -// --- Observability: execution causality --- +// --- Observability: execution explains causality --- func TestP2_Observability_ExecutionCausality(t *testing.T) { - driver, ca, reader, _ := setupChainTest(t) + driver, ca, reader, _, _ := setupChainTest(t) vol := reader.vol for i := 0; i < 10; i++ { @@ -246,45 +234,32 @@ func TestP2_Observability_ExecutionCausality(t *testing.T) { state := reader.ReadState() if state.WALTailLSN == 0 { - t.Fatal("need post-flush state for observability test") + t.Fatal("need post-flush state") } - intent := ca.ToAssignmentIntent( - bridge.MasterAssignment{VolumeName: "vol1", Epoch: 1, Role: "primary"}, - []bridge.MasterAssignment{ - {VolumeName: "vol1", ReplicaServerID: "vs2", Role: "replica", - DataAddr: "10.0.0.2:9333", CtrlAddr: "10.0.0.2:9334"}, - }, - ) - driver.Orchestrator.ProcessAssignment(intent) - + driver.Orchestrator.ProcessAssignment(makeIntent(ca, 1, "replica")) plan, _ := driver.PlanRecovery("vol1/vs2", 0) - // Should escalate to NeedsRebuild. if plan.Outcome != engine.OutcomeNeedsRebuild { t.Fatalf("outcome=%s", plan.Outcome) } - // Observability: logs explain WHY. + // Verify log explains why. events := driver.Orchestrator.Log.EventsFor("vol1/vs2") - - hasSenderAdded := false - hasEscalation := false + required := map[string]bool{ + "sender_added": false, + "session_created": false, + "connected": false, + "escalated": false, + } for _, e := range events { - if e.Event == "sender_added" { - hasSenderAdded = true - } - if e.Event == "escalated" { - hasEscalation = true + if _, ok := required[e.Event]; ok { + required[e.Event] = true } } - - if !hasSenderAdded { - t.Fatal("observability: must show sender_added") - } - if !hasEscalation { - t.Fatal("observability: must show escalation with reason") + for event, found := range required { + if !found { + t.Fatalf("observability: missing %s event", event) + } } - - t.Logf("observability: %d events explain execution causality", len(events)) }