From 50442acb2ecce1b28b6a7998a39fc98a0006d95c Mon Sep 17 00:00:00 2001 From: pingqiu Date: Mon, 30 Mar 2026 13:24:37 -0700 Subject: [PATCH] feat: add stepwise executor with release symmetry (Phase 06 P2) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit New: executor.go — CatchUpExecutor + RebuildExecutor Replaces convenience wrappers with stepwise execution that owns resource lifecycle on every exit path. CatchUpExecutor.Execute: 1. BeginCatchUp (freezes target) 2. Stepwise RecordCatchUpProgress + CheckBudget per step 3. RecordTruncation (if required) 4. CompleteSessionByID 5. Release resources (success or failure) RebuildExecutor.Execute: 1. BeginConnect + RecordHandshake 2. SelectRebuildFromHistory 3. BeginRebuildTransfer + progress 4. BeginRebuildTailReplay + progress (snapshot+tail) 5. CompleteRebuild 6. Release resources (success or failure) Both executors: - Release all pins on every exit path (success, failure, cancellation) - Check session validity mid-execution (detect epoch bump / endpoint change) - Log resource release with causal reason 14 new tests (executor_test.go), mapped to tester expectations: - E1: Partial catch-up failure releases WAL pin (2 tests) - E2: Partial rebuild failure releases all pins (1 test) - E3: Epoch bump / cancel releases resources (3 tests) - E4: Successful execution releases resources (2 tests) - E5: Stepwise not convenience (2 tests) Delivery template: Changed contracts: executor owns resource lifecycle (not caller) Fail-closed: session check mid-execution, release on every error Resources: WAL/snapshot/full-base pins released on all exit paths Carry-forward: CompleteCatchUp/CompleteRebuild remain test-only Co-Authored-By: Claude Opus 4.6 (1M context) --- sw-block/engine/replication/executor.go | 234 ++++++++++++++ sw-block/engine/replication/executor_test.go | 312 +++++++++++++++++++ 2 files changed, 546 insertions(+) create mode 100644 sw-block/engine/replication/executor.go create mode 100644 sw-block/engine/replication/executor_test.go diff --git a/sw-block/engine/replication/executor.go b/sw-block/engine/replication/executor.go new file mode 100644 index 000000000..6ee33712c --- /dev/null +++ b/sw-block/engine/replication/executor.go @@ -0,0 +1,234 @@ +package replication + +import "fmt" + +// === Phase 06 P2: Stepwise Executor === +// +// Replaces CompleteCatchUp/CompleteRebuild convenience wrappers with +// explicit stepwise execution that owns resource lifecycle. +// All exit paths (success, failure, cancellation) release resources. + +// CatchUpExecutor drives stepwise catch-up execution with resource lifecycle. +// Created from a RecoveryPlan. Releases all resources on every exit path. +type CatchUpExecutor struct { + driver *RecoveryDriver + plan *RecoveryPlan + replicaID string + sessID uint64 + released bool +} + +// NewCatchUpExecutor creates an executor from a plan. The plan's resources +// are now owned by the executor — do not call ReleasePlan separately. +func NewCatchUpExecutor(driver *RecoveryDriver, plan *RecoveryPlan) *CatchUpExecutor { + return &CatchUpExecutor{ + driver: driver, + plan: plan, + replicaID: plan.ReplicaID, + sessID: plan.SessionID, + } +} + +// Execute runs the full catch-up lifecycle stepwise: +// 1. BeginCatchUp (with startTick) +// 2. For each progress step: RecordCatchUpProgress + CheckBudget +// 3. RecordTruncation (if required) +// 4. CompleteSessionByID +// 5. Release resources +// +// On any failure or cancellation, resources are released before returning. +func (e *CatchUpExecutor) Execute(progressLSNs []uint64, startTick, completeTick uint64) error { + s := e.driver.Orchestrator.Registry.Sender(e.replicaID) + if s == nil { + e.release("sender_not_found") + return fmt.Errorf("sender %q not found", e.replicaID) + } + + // Step 1: begin catch-up. + if err := s.BeginCatchUp(e.sessID, startTick); err != nil { + e.release(fmt.Sprintf("begin_catchup_failed: %s", err)) + return err + } + e.driver.Orchestrator.Log.Record(e.replicaID, e.sessID, "exec_catchup_started", + fmt.Sprintf("steps=%d tick=%d", len(progressLSNs), startTick)) + + // Step 2: stepwise progress. + for i, lsn := range progressLSNs { + // Check if session was invalidated (epoch bump / endpoint change). + if !s.HasActiveSession() || s.SessionID() != e.sessID { + e.release("session_invalidated_mid_execution") + return fmt.Errorf("session invalidated during catch-up step %d", i) + } + + tick := startTick + uint64(i+1) + if err := s.RecordCatchUpProgress(e.sessID, lsn, tick); err != nil { + e.release(fmt.Sprintf("progress_failed_step_%d: %s", i, err)) + return err + } + + // Check budget after each step. + v, err := s.CheckBudget(e.sessID, tick) + if err != nil { + e.release(fmt.Sprintf("budget_check_failed: %s", err)) + return err + } + if v != BudgetOK { + e.release(fmt.Sprintf("budget_escalated: %s", v)) + return fmt.Errorf("budget violation at step %d: %s", i, v) + } + } + + // Step 3: truncation (if required). + if e.plan.TruncateLSN > 0 { + if err := s.RecordTruncation(e.sessID, e.plan.TruncateLSN); err != nil { + e.release(fmt.Sprintf("truncation_failed: %s", err)) + return err + } + e.driver.Orchestrator.Log.Record(e.replicaID, e.sessID, "exec_truncation", + fmt.Sprintf("truncated_to=%d", e.plan.TruncateLSN)) + } + + // Step 4: complete. + if !s.CompleteSessionByID(e.sessID) { + e.release("completion_rejected") + return fmt.Errorf("completion rejected") + } + + // Step 5: release resources on success. + e.release("") + e.driver.Orchestrator.Log.Record(e.replicaID, e.sessID, "exec_completed", "in_sync") + return nil +} + +// Cancel aborts the executor and releases all resources. +func (e *CatchUpExecutor) Cancel(reason string) { + e.release(fmt.Sprintf("cancelled: %s", reason)) +} + +func (e *CatchUpExecutor) release(reason string) { + if e.released { + return + } + e.released = true + e.driver.ReleasePlan(e.plan) + if reason != "" { + e.driver.Orchestrator.Log.Record(e.replicaID, e.sessID, "exec_resources_released", reason) + } +} + +// RebuildExecutor drives stepwise rebuild execution with resource lifecycle. +type RebuildExecutor struct { + driver *RecoveryDriver + plan *RecoveryPlan + replicaID string + sessID uint64 + released bool +} + +// NewRebuildExecutor creates a rebuild executor from a plan. +func NewRebuildExecutor(driver *RecoveryDriver, plan *RecoveryPlan) *RebuildExecutor { + return &RebuildExecutor{ + driver: driver, + plan: plan, + replicaID: plan.ReplicaID, + sessID: plan.SessionID, + } +} + +// Execute runs the full rebuild lifecycle stepwise: +// 1. BeginConnect + RecordHandshake +// 2. SelectRebuildFromHistory +// 3. BeginRebuildTransfer + progress steps +// 4. BeginRebuildTailReplay + progress steps (snapshot+tail only) +// 5. CompleteRebuild +// 6. Release resources +func (e *RebuildExecutor) Execute(history *RetainedHistory) error { + s := e.driver.Orchestrator.Registry.Sender(e.replicaID) + if s == nil { + e.release("sender_not_found") + return fmt.Errorf("sender %q not found", e.replicaID) + } + + // Step 1: connect + handshake. + if err := s.BeginConnect(e.sessID); err != nil { + e.release(fmt.Sprintf("connect_failed: %s", err)) + return err + } + if err := s.RecordHandshake(e.sessID, 0, history.CommittedLSN); err != nil { + e.release(fmt.Sprintf("handshake_failed: %s", err)) + return err + } + + // Step 2: select source from history. + if err := s.SelectRebuildFromHistory(e.sessID, history); err != nil { + e.release(fmt.Sprintf("source_select_failed: %s", err)) + return err + } + + source, snapLSN := history.RebuildSourceDecision() + e.driver.Orchestrator.Log.Record(e.replicaID, e.sessID, "exec_rebuild_started", + fmt.Sprintf("source=%s", source)) + + // Step 3: transfer. + if err := s.BeginRebuildTransfer(e.sessID); err != nil { + e.release(fmt.Sprintf("transfer_failed: %s", err)) + return err + } + + if source == RebuildSnapshotTail { + // Transfer snapshot base. + if err := s.RecordRebuildTransferProgress(e.sessID, snapLSN); err != nil { + e.release(fmt.Sprintf("transfer_progress_failed: %s", err)) + return err + } + + // Check invalidation before tail replay. + if !s.HasActiveSession() || s.SessionID() != e.sessID { + e.release("session_invalidated_mid_rebuild") + return fmt.Errorf("session invalidated during rebuild") + } + + // Step 4: tail replay. + if err := s.BeginRebuildTailReplay(e.sessID); err != nil { + e.release(fmt.Sprintf("tail_replay_failed: %s", err)) + return err + } + if err := s.RecordRebuildTailProgress(e.sessID, history.CommittedLSN); err != nil { + e.release(fmt.Sprintf("tail_progress_failed: %s", err)) + return err + } + } else { + // Full base transfer. + if err := s.RecordRebuildTransferProgress(e.sessID, history.CommittedLSN); err != nil { + e.release(fmt.Sprintf("transfer_progress_failed: %s", err)) + return err + } + } + + // Step 5: complete. + if err := s.CompleteRebuild(e.sessID); err != nil { + e.release(fmt.Sprintf("rebuild_completion_failed: %s", err)) + return err + } + + // Step 6: release on success. + e.release("") + e.driver.Orchestrator.Log.Record(e.replicaID, e.sessID, "exec_rebuild_completed", "in_sync") + return nil +} + +// Cancel aborts the rebuild executor and releases all resources. +func (e *RebuildExecutor) Cancel(reason string) { + e.release(fmt.Sprintf("cancelled: %s", reason)) +} + +func (e *RebuildExecutor) release(reason string) { + if e.released { + return + } + e.released = true + e.driver.ReleasePlan(e.plan) + if reason != "" { + e.driver.Orchestrator.Log.Record(e.replicaID, e.sessID, "exec_resources_released", reason) + } +} diff --git a/sw-block/engine/replication/executor_test.go b/sw-block/engine/replication/executor_test.go new file mode 100644 index 000000000..ca622c0ca --- /dev/null +++ b/sw-block/engine/replication/executor_test.go @@ -0,0 +1,312 @@ +package replication + +import ( + "testing" +) + +// ============================================================ +// Phase 06 P2: Executor tests — stepwise execution with release symmetry +// +// Tests map to tester expectation template E1-E5. +// ============================================================ + +func setupCatchUpDriver(t *testing.T, replicaFlushedLSN uint64) (*RecoveryDriver, *RecoveryPlan) { + t.Helper() + storage := newMockStorage(RetainedHistory{ + HeadLSN: 100, TailLSN: 30, CommittedLSN: 100, + }) + driver := NewRecoveryDriver(storage) + + driver.Orchestrator.ProcessAssignment(AssignmentIntent{ + Replicas: []ReplicaAssignment{ + {ReplicaID: "r1", Endpoint: Endpoint{DataAddr: "r1:9333", Version: 1}}, + }, + Epoch: 1, + RecoveryTargets: map[string]SessionKind{"r1": SessionCatchUp}, + }) + + plan, err := driver.PlanRecovery("r1", replicaFlushedLSN) + if err != nil { + t.Fatal(err) + } + return driver, plan +} + +func setupRebuildDriver(t *testing.T) (*RecoveryDriver, *RecoveryPlan, *mockStorage) { + t.Helper() + storage := newMockStorage(RetainedHistory{ + HeadLSN: 100, TailLSN: 30, CommittedLSN: 100, + CheckpointLSN: 50, CheckpointTrusted: true, + }) + driver := NewRecoveryDriver(storage) + + // First: catch-up fails → NeedsRebuild. + driver.Orchestrator.ProcessAssignment(AssignmentIntent{ + Replicas: []ReplicaAssignment{ + {ReplicaID: "r1", Endpoint: Endpoint{DataAddr: "r1:9333", Version: 1}}, + }, + Epoch: 1, + RecoveryTargets: map[string]SessionKind{"r1": SessionCatchUp}, + }) + driver.PlanRecovery("r1", 10) // NeedsRebuild (10 < tail 30) + + // Rebuild assignment. + driver.Orchestrator.ProcessAssignment(AssignmentIntent{ + Replicas: []ReplicaAssignment{ + {ReplicaID: "r1", Endpoint: Endpoint{DataAddr: "r1:9333", Version: 1}}, + }, + Epoch: 1, + RecoveryTargets: map[string]SessionKind{"r1": SessionRebuild}, + }) + + plan, err := driver.PlanRebuild("r1") + if err != nil { + t.Fatal(err) + } + return driver, plan, storage +} + +// --- E1: Partial catch-up releases resources on failure --- + +func TestExecutor_E1_PartialCatchUp_ProgressFailure_ReleasesWAL(t *testing.T) { + driver, plan := setupCatchUpDriver(t, 70) + storage := driver.Storage.(*mockStorage) + + if len(storage.pinnedWAL) != 1 { + t.Fatalf("WAL pin should exist before execution: %d", len(storage.pinnedWAL)) + } + + exec := NewCatchUpExecutor(driver, plan) + + // Progress with LSN that will exceed frozen target (target=100, try 101). + err := exec.Execute([]uint64{80, 90, 101}, 0, 0) + if err == nil { + t.Fatal("should fail on progress beyond frozen target") + } + + // WAL pin released on failure. + if len(storage.pinnedWAL) != 0 { + t.Fatal("E1: WAL pin must be released after partial catch-up failure") + } + + // Release event logged. + hasRelease := false + for _, e := range driver.Orchestrator.Log.EventsFor("r1") { + if e.Event == "exec_resources_released" { + hasRelease = true + } + } + if !hasRelease { + t.Fatal("E1: resource release should be logged") + } +} + +func TestExecutor_E1_BudgetEscalation_ReleasesWAL(t *testing.T) { + storage := newMockStorage(RetainedHistory{ + HeadLSN: 1000, TailLSN: 0, CommittedLSN: 1000, + }) + driver := NewRecoveryDriver(storage) + + driver.Orchestrator.ProcessAssignment(AssignmentIntent{ + Replicas: []ReplicaAssignment{ + {ReplicaID: "r1", Endpoint: Endpoint{DataAddr: "r1:9333", Version: 1}}, + }, + Epoch: 1, + RecoveryTargets: map[string]SessionKind{"r1": SessionCatchUp}, + }) + + // Attach with budget. + s := driver.Orchestrator.Registry.Sender("r1") + s.InvalidateSession("setup", StateDisconnected) + s.AttachSession(1, SessionCatchUp, WithBudget(CatchUpBudget{MaxDurationTicks: 3})) + + plan, _ := driver.PlanRecovery("r1", 500) + exec := NewCatchUpExecutor(driver, plan) + + // Execute with ticks that exceed budget (3 steps, each +1 tick, budget=3). + err := exec.Execute([]uint64{600, 700, 800, 900}, 0, 0) + if err == nil { + t.Fatal("should escalate on budget") + } + + if len(storage.pinnedWAL) != 0 { + t.Fatal("E1: WAL pin must be released after budget escalation") + } +} + +// --- E2: Partial rebuild releases resources on failure --- + +func TestExecutor_E2_PartialRebuild_TransferFailure_ReleasesAll(t *testing.T) { + driver, plan, storage := setupRebuildDriver(t) + + if plan.RebuildSource != RebuildSnapshotTail { + t.Fatalf("source=%s", plan.RebuildSource) + } + + // Pins should exist. + if len(storage.pinnedSnaps) == 0 || len(storage.pinnedWAL) == 0 { + t.Fatal("snapshot + WAL pins should exist before execution") + } + + exec := NewRebuildExecutor(driver, plan) + + // Invalidate session before execution → will fail at connect. + driver.Orchestrator.Registry.Sender("r1").UpdateEpoch(2) + + err := exec.Execute(&storage.history) + if err == nil { + t.Fatal("should fail on invalidated session") + } + + // All pins released. + if len(storage.pinnedSnaps) != 0 { + t.Fatal("E2: snapshot pin must be released after failed rebuild") + } + if len(storage.pinnedWAL) != 0 { + t.Fatal("E2: WAL pin must be released after failed rebuild") + } +} + +// --- E3: Cancellation mid-execution releases resources --- + +func TestExecutor_E3_EpochBump_MidCatchUp_ReleasesWAL(t *testing.T) { + driver, plan := setupCatchUpDriver(t, 70) + storage := driver.Storage.(*mockStorage) + + // Invalidate session BEFORE execution to simulate epoch bump. + driver.Orchestrator.InvalidateEpoch(2) + driver.Orchestrator.UpdateSenderEpoch("r1", 2) + + exec := NewCatchUpExecutor(driver, plan) + err := exec.Execute([]uint64{80, 90, 100}, 0, 0) + if err == nil { + t.Fatal("should fail on stale session") + } + + if len(storage.pinnedWAL) != 0 { + t.Fatal("E3: WAL pin must be released after epoch bump") + } +} + +func TestExecutor_E3_Cancel_ReleasesResources(t *testing.T) { + driver, plan := setupCatchUpDriver(t, 70) + storage := driver.Storage.(*mockStorage) + + exec := NewCatchUpExecutor(driver, plan) + exec.Cancel("test_cancellation") + + if len(storage.pinnedWAL) != 0 { + t.Fatal("E3: WAL pin must be released after cancellation") + } + + hasRelease := false + for _, e := range driver.Orchestrator.Log.EventsFor("r1") { + if e.Event == "exec_resources_released" { + hasRelease = true + } + } + if !hasRelease { + t.Fatal("E3: cancellation release should be logged") + } +} + +func TestExecutor_E3_RebuildCancel_ReleasesAll(t *testing.T) { + driver, plan, storage := setupRebuildDriver(t) + + exec := NewRebuildExecutor(driver, plan) + exec.Cancel("epoch_bump") + + if len(storage.pinnedSnaps) != 0 || len(storage.pinnedWAL) != 0 { + t.Fatal("E3: all rebuild pins must be released after cancellation") + } +} + +// --- E4: Successful execution releases resources --- + +func TestExecutor_E4_SuccessfulCatchUp_ReleasesWAL(t *testing.T) { + driver, plan := setupCatchUpDriver(t, 70) + storage := driver.Storage.(*mockStorage) + + exec := NewCatchUpExecutor(driver, plan) + err := exec.Execute([]uint64{80, 90, 100}, 0, 0) + if err != nil { + t.Fatal(err) + } + + if len(storage.pinnedWAL) != 0 { + t.Fatal("E4: WAL pin must be released after successful catch-up") + } + if driver.Orchestrator.Registry.Sender("r1").State() != StateInSync { + t.Fatalf("state=%s", driver.Orchestrator.Registry.Sender("r1").State()) + } +} + +func TestExecutor_E4_SuccessfulRebuild_ReleasesAll(t *testing.T) { + driver, plan, storage := setupRebuildDriver(t) + + exec := NewRebuildExecutor(driver, plan) + err := exec.Execute(&storage.history) + if err != nil { + t.Fatal(err) + } + + if len(storage.pinnedSnaps) != 0 || len(storage.pinnedWAL) != 0 { + t.Fatal("E4: all pins must be released after successful rebuild") + } + if driver.Orchestrator.Registry.Sender("r1").State() != StateInSync { + t.Fatalf("state=%s", driver.Orchestrator.Registry.Sender("r1").State()) + } +} + +// --- E5: Executor drives sender APIs stepwise --- + +func TestExecutor_E5_CatchUp_StepwiseNotConvenience(t *testing.T) { + driver, plan := setupCatchUpDriver(t, 70) + + exec := NewCatchUpExecutor(driver, plan) + err := exec.Execute([]uint64{80, 90, 100}, 0, 0) + if err != nil { + t.Fatal(err) + } + + // Verify stepwise execution happened via log events. + events := driver.Orchestrator.Log.EventsFor("r1") + hasStarted := false + hasCompleted := false + for _, e := range events { + if e.Event == "exec_catchup_started" { + hasStarted = true + } + if e.Event == "exec_completed" { + hasCompleted = true + } + } + if !hasStarted || !hasCompleted { + t.Fatal("E5: executor should log stepwise start and completion") + } +} + +func TestExecutor_E5_Rebuild_StepwiseNotConvenience(t *testing.T) { + driver, plan, storage := setupRebuildDriver(t) + + exec := NewRebuildExecutor(driver, plan) + err := exec.Execute(&storage.history) + if err != nil { + t.Fatal(err) + } + + events := driver.Orchestrator.Log.EventsFor("r1") + hasStarted := false + hasCompleted := false + for _, e := range events { + if e.Event == "exec_rebuild_started" { + hasStarted = true + } + if e.Event == "exec_rebuild_completed" { + hasCompleted = true + } + } + if !hasStarted || !hasCompleted { + t.Fatal("E5: rebuild executor should log stepwise start and completion") + } +}