Browse Source

feat: add stepwise executor with release symmetry (Phase 06 P2)

New: executor.go — CatchUpExecutor + RebuildExecutor
Replaces convenience wrappers with stepwise execution that owns
resource lifecycle on every exit path.

CatchUpExecutor.Execute:
  1. BeginCatchUp (freezes target)
  2. Stepwise RecordCatchUpProgress + CheckBudget per step
  3. RecordTruncation (if required)
  4. CompleteSessionByID
  5. Release resources (success or failure)

RebuildExecutor.Execute:
  1. BeginConnect + RecordHandshake
  2. SelectRebuildFromHistory
  3. BeginRebuildTransfer + progress
  4. BeginRebuildTailReplay + progress (snapshot+tail)
  5. CompleteRebuild
  6. Release resources (success or failure)

Both executors:
- Release all pins on every exit path (success, failure, cancellation)
- Check session validity mid-execution (detect epoch bump / endpoint change)
- Log resource release with causal reason

14 new tests (executor_test.go), mapped to tester expectations:
- E1: Partial catch-up failure releases WAL pin (2 tests)
- E2: Partial rebuild failure releases all pins (1 test)
- E3: Epoch bump / cancel releases resources (3 tests)
- E4: Successful execution releases resources (2 tests)
- E5: Stepwise not convenience (2 tests)

Delivery template:
Changed contracts: executor owns resource lifecycle (not caller)
Fail-closed: session check mid-execution, release on every error
Resources: WAL/snapshot/full-base pins released on all exit paths
Carry-forward: CompleteCatchUp/CompleteRebuild remain test-only

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
feature/sw-block
pingqiu 3 days ago
parent
commit
50442acb2e
  1. 234
      sw-block/engine/replication/executor.go
  2. 312
      sw-block/engine/replication/executor_test.go

234
sw-block/engine/replication/executor.go

@ -0,0 +1,234 @@
package replication
import "fmt"
// === Phase 06 P2: Stepwise Executor ===
//
// Replaces CompleteCatchUp/CompleteRebuild convenience wrappers with
// explicit stepwise execution that owns resource lifecycle.
// All exit paths (success, failure, cancellation) release resources.
// CatchUpExecutor drives stepwise catch-up execution with resource lifecycle.
// Created from a RecoveryPlan. Releases all resources on every exit path.
type CatchUpExecutor struct {
driver *RecoveryDriver
plan *RecoveryPlan
replicaID string
sessID uint64
released bool
}
// NewCatchUpExecutor creates an executor from a plan. The plan's resources
// are now owned by the executor — do not call ReleasePlan separately.
func NewCatchUpExecutor(driver *RecoveryDriver, plan *RecoveryPlan) *CatchUpExecutor {
return &CatchUpExecutor{
driver: driver,
plan: plan,
replicaID: plan.ReplicaID,
sessID: plan.SessionID,
}
}
// Execute runs the full catch-up lifecycle stepwise:
// 1. BeginCatchUp (with startTick)
// 2. For each progress step: RecordCatchUpProgress + CheckBudget
// 3. RecordTruncation (if required)
// 4. CompleteSessionByID
// 5. Release resources
//
// On any failure or cancellation, resources are released before returning.
func (e *CatchUpExecutor) Execute(progressLSNs []uint64, startTick, completeTick uint64) error {
s := e.driver.Orchestrator.Registry.Sender(e.replicaID)
if s == nil {
e.release("sender_not_found")
return fmt.Errorf("sender %q not found", e.replicaID)
}
// Step 1: begin catch-up.
if err := s.BeginCatchUp(e.sessID, startTick); err != nil {
e.release(fmt.Sprintf("begin_catchup_failed: %s", err))
return err
}
e.driver.Orchestrator.Log.Record(e.replicaID, e.sessID, "exec_catchup_started",
fmt.Sprintf("steps=%d tick=%d", len(progressLSNs), startTick))
// Step 2: stepwise progress.
for i, lsn := range progressLSNs {
// Check if session was invalidated (epoch bump / endpoint change).
if !s.HasActiveSession() || s.SessionID() != e.sessID {
e.release("session_invalidated_mid_execution")
return fmt.Errorf("session invalidated during catch-up step %d", i)
}
tick := startTick + uint64(i+1)
if err := s.RecordCatchUpProgress(e.sessID, lsn, tick); err != nil {
e.release(fmt.Sprintf("progress_failed_step_%d: %s", i, err))
return err
}
// Check budget after each step.
v, err := s.CheckBudget(e.sessID, tick)
if err != nil {
e.release(fmt.Sprintf("budget_check_failed: %s", err))
return err
}
if v != BudgetOK {
e.release(fmt.Sprintf("budget_escalated: %s", v))
return fmt.Errorf("budget violation at step %d: %s", i, v)
}
}
// Step 3: truncation (if required).
if e.plan.TruncateLSN > 0 {
if err := s.RecordTruncation(e.sessID, e.plan.TruncateLSN); err != nil {
e.release(fmt.Sprintf("truncation_failed: %s", err))
return err
}
e.driver.Orchestrator.Log.Record(e.replicaID, e.sessID, "exec_truncation",
fmt.Sprintf("truncated_to=%d", e.plan.TruncateLSN))
}
// Step 4: complete.
if !s.CompleteSessionByID(e.sessID) {
e.release("completion_rejected")
return fmt.Errorf("completion rejected")
}
// Step 5: release resources on success.
e.release("")
e.driver.Orchestrator.Log.Record(e.replicaID, e.sessID, "exec_completed", "in_sync")
return nil
}
// Cancel aborts the executor and releases all resources.
func (e *CatchUpExecutor) Cancel(reason string) {
e.release(fmt.Sprintf("cancelled: %s", reason))
}
func (e *CatchUpExecutor) release(reason string) {
if e.released {
return
}
e.released = true
e.driver.ReleasePlan(e.plan)
if reason != "" {
e.driver.Orchestrator.Log.Record(e.replicaID, e.sessID, "exec_resources_released", reason)
}
}
// RebuildExecutor drives stepwise rebuild execution with resource lifecycle.
type RebuildExecutor struct {
driver *RecoveryDriver
plan *RecoveryPlan
replicaID string
sessID uint64
released bool
}
// NewRebuildExecutor creates a rebuild executor from a plan.
func NewRebuildExecutor(driver *RecoveryDriver, plan *RecoveryPlan) *RebuildExecutor {
return &RebuildExecutor{
driver: driver,
plan: plan,
replicaID: plan.ReplicaID,
sessID: plan.SessionID,
}
}
// Execute runs the full rebuild lifecycle stepwise:
// 1. BeginConnect + RecordHandshake
// 2. SelectRebuildFromHistory
// 3. BeginRebuildTransfer + progress steps
// 4. BeginRebuildTailReplay + progress steps (snapshot+tail only)
// 5. CompleteRebuild
// 6. Release resources
func (e *RebuildExecutor) Execute(history *RetainedHistory) error {
s := e.driver.Orchestrator.Registry.Sender(e.replicaID)
if s == nil {
e.release("sender_not_found")
return fmt.Errorf("sender %q not found", e.replicaID)
}
// Step 1: connect + handshake.
if err := s.BeginConnect(e.sessID); err != nil {
e.release(fmt.Sprintf("connect_failed: %s", err))
return err
}
if err := s.RecordHandshake(e.sessID, 0, history.CommittedLSN); err != nil {
e.release(fmt.Sprintf("handshake_failed: %s", err))
return err
}
// Step 2: select source from history.
if err := s.SelectRebuildFromHistory(e.sessID, history); err != nil {
e.release(fmt.Sprintf("source_select_failed: %s", err))
return err
}
source, snapLSN := history.RebuildSourceDecision()
e.driver.Orchestrator.Log.Record(e.replicaID, e.sessID, "exec_rebuild_started",
fmt.Sprintf("source=%s", source))
// Step 3: transfer.
if err := s.BeginRebuildTransfer(e.sessID); err != nil {
e.release(fmt.Sprintf("transfer_failed: %s", err))
return err
}
if source == RebuildSnapshotTail {
// Transfer snapshot base.
if err := s.RecordRebuildTransferProgress(e.sessID, snapLSN); err != nil {
e.release(fmt.Sprintf("transfer_progress_failed: %s", err))
return err
}
// Check invalidation before tail replay.
if !s.HasActiveSession() || s.SessionID() != e.sessID {
e.release("session_invalidated_mid_rebuild")
return fmt.Errorf("session invalidated during rebuild")
}
// Step 4: tail replay.
if err := s.BeginRebuildTailReplay(e.sessID); err != nil {
e.release(fmt.Sprintf("tail_replay_failed: %s", err))
return err
}
if err := s.RecordRebuildTailProgress(e.sessID, history.CommittedLSN); err != nil {
e.release(fmt.Sprintf("tail_progress_failed: %s", err))
return err
}
} else {
// Full base transfer.
if err := s.RecordRebuildTransferProgress(e.sessID, history.CommittedLSN); err != nil {
e.release(fmt.Sprintf("transfer_progress_failed: %s", err))
return err
}
}
// Step 5: complete.
if err := s.CompleteRebuild(e.sessID); err != nil {
e.release(fmt.Sprintf("rebuild_completion_failed: %s", err))
return err
}
// Step 6: release on success.
e.release("")
e.driver.Orchestrator.Log.Record(e.replicaID, e.sessID, "exec_rebuild_completed", "in_sync")
return nil
}
// Cancel aborts the rebuild executor and releases all resources.
func (e *RebuildExecutor) Cancel(reason string) {
e.release(fmt.Sprintf("cancelled: %s", reason))
}
func (e *RebuildExecutor) release(reason string) {
if e.released {
return
}
e.released = true
e.driver.ReleasePlan(e.plan)
if reason != "" {
e.driver.Orchestrator.Log.Record(e.replicaID, e.sessID, "exec_resources_released", reason)
}
}

312
sw-block/engine/replication/executor_test.go

@ -0,0 +1,312 @@
package replication
import (
"testing"
)
// ============================================================
// Phase 06 P2: Executor tests — stepwise execution with release symmetry
//
// Tests map to tester expectation template E1-E5.
// ============================================================
func setupCatchUpDriver(t *testing.T, replicaFlushedLSN uint64) (*RecoveryDriver, *RecoveryPlan) {
t.Helper()
storage := newMockStorage(RetainedHistory{
HeadLSN: 100, TailLSN: 30, CommittedLSN: 100,
})
driver := NewRecoveryDriver(storage)
driver.Orchestrator.ProcessAssignment(AssignmentIntent{
Replicas: []ReplicaAssignment{
{ReplicaID: "r1", Endpoint: Endpoint{DataAddr: "r1:9333", Version: 1}},
},
Epoch: 1,
RecoveryTargets: map[string]SessionKind{"r1": SessionCatchUp},
})
plan, err := driver.PlanRecovery("r1", replicaFlushedLSN)
if err != nil {
t.Fatal(err)
}
return driver, plan
}
func setupRebuildDriver(t *testing.T) (*RecoveryDriver, *RecoveryPlan, *mockStorage) {
t.Helper()
storage := newMockStorage(RetainedHistory{
HeadLSN: 100, TailLSN: 30, CommittedLSN: 100,
CheckpointLSN: 50, CheckpointTrusted: true,
})
driver := NewRecoveryDriver(storage)
// First: catch-up fails → NeedsRebuild.
driver.Orchestrator.ProcessAssignment(AssignmentIntent{
Replicas: []ReplicaAssignment{
{ReplicaID: "r1", Endpoint: Endpoint{DataAddr: "r1:9333", Version: 1}},
},
Epoch: 1,
RecoveryTargets: map[string]SessionKind{"r1": SessionCatchUp},
})
driver.PlanRecovery("r1", 10) // NeedsRebuild (10 < tail 30)
// Rebuild assignment.
driver.Orchestrator.ProcessAssignment(AssignmentIntent{
Replicas: []ReplicaAssignment{
{ReplicaID: "r1", Endpoint: Endpoint{DataAddr: "r1:9333", Version: 1}},
},
Epoch: 1,
RecoveryTargets: map[string]SessionKind{"r1": SessionRebuild},
})
plan, err := driver.PlanRebuild("r1")
if err != nil {
t.Fatal(err)
}
return driver, plan, storage
}
// --- E1: Partial catch-up releases resources on failure ---
func TestExecutor_E1_PartialCatchUp_ProgressFailure_ReleasesWAL(t *testing.T) {
driver, plan := setupCatchUpDriver(t, 70)
storage := driver.Storage.(*mockStorage)
if len(storage.pinnedWAL) != 1 {
t.Fatalf("WAL pin should exist before execution: %d", len(storage.pinnedWAL))
}
exec := NewCatchUpExecutor(driver, plan)
// Progress with LSN that will exceed frozen target (target=100, try 101).
err := exec.Execute([]uint64{80, 90, 101}, 0, 0)
if err == nil {
t.Fatal("should fail on progress beyond frozen target")
}
// WAL pin released on failure.
if len(storage.pinnedWAL) != 0 {
t.Fatal("E1: WAL pin must be released after partial catch-up failure")
}
// Release event logged.
hasRelease := false
for _, e := range driver.Orchestrator.Log.EventsFor("r1") {
if e.Event == "exec_resources_released" {
hasRelease = true
}
}
if !hasRelease {
t.Fatal("E1: resource release should be logged")
}
}
func TestExecutor_E1_BudgetEscalation_ReleasesWAL(t *testing.T) {
storage := newMockStorage(RetainedHistory{
HeadLSN: 1000, TailLSN: 0, CommittedLSN: 1000,
})
driver := NewRecoveryDriver(storage)
driver.Orchestrator.ProcessAssignment(AssignmentIntent{
Replicas: []ReplicaAssignment{
{ReplicaID: "r1", Endpoint: Endpoint{DataAddr: "r1:9333", Version: 1}},
},
Epoch: 1,
RecoveryTargets: map[string]SessionKind{"r1": SessionCatchUp},
})
// Attach with budget.
s := driver.Orchestrator.Registry.Sender("r1")
s.InvalidateSession("setup", StateDisconnected)
s.AttachSession(1, SessionCatchUp, WithBudget(CatchUpBudget{MaxDurationTicks: 3}))
plan, _ := driver.PlanRecovery("r1", 500)
exec := NewCatchUpExecutor(driver, plan)
// Execute with ticks that exceed budget (3 steps, each +1 tick, budget=3).
err := exec.Execute([]uint64{600, 700, 800, 900}, 0, 0)
if err == nil {
t.Fatal("should escalate on budget")
}
if len(storage.pinnedWAL) != 0 {
t.Fatal("E1: WAL pin must be released after budget escalation")
}
}
// --- E2: Partial rebuild releases resources on failure ---
func TestExecutor_E2_PartialRebuild_TransferFailure_ReleasesAll(t *testing.T) {
driver, plan, storage := setupRebuildDriver(t)
if plan.RebuildSource != RebuildSnapshotTail {
t.Fatalf("source=%s", plan.RebuildSource)
}
// Pins should exist.
if len(storage.pinnedSnaps) == 0 || len(storage.pinnedWAL) == 0 {
t.Fatal("snapshot + WAL pins should exist before execution")
}
exec := NewRebuildExecutor(driver, plan)
// Invalidate session before execution → will fail at connect.
driver.Orchestrator.Registry.Sender("r1").UpdateEpoch(2)
err := exec.Execute(&storage.history)
if err == nil {
t.Fatal("should fail on invalidated session")
}
// All pins released.
if len(storage.pinnedSnaps) != 0 {
t.Fatal("E2: snapshot pin must be released after failed rebuild")
}
if len(storage.pinnedWAL) != 0 {
t.Fatal("E2: WAL pin must be released after failed rebuild")
}
}
// --- E3: Cancellation mid-execution releases resources ---
func TestExecutor_E3_EpochBump_MidCatchUp_ReleasesWAL(t *testing.T) {
driver, plan := setupCatchUpDriver(t, 70)
storage := driver.Storage.(*mockStorage)
// Invalidate session BEFORE execution to simulate epoch bump.
driver.Orchestrator.InvalidateEpoch(2)
driver.Orchestrator.UpdateSenderEpoch("r1", 2)
exec := NewCatchUpExecutor(driver, plan)
err := exec.Execute([]uint64{80, 90, 100}, 0, 0)
if err == nil {
t.Fatal("should fail on stale session")
}
if len(storage.pinnedWAL) != 0 {
t.Fatal("E3: WAL pin must be released after epoch bump")
}
}
func TestExecutor_E3_Cancel_ReleasesResources(t *testing.T) {
driver, plan := setupCatchUpDriver(t, 70)
storage := driver.Storage.(*mockStorage)
exec := NewCatchUpExecutor(driver, plan)
exec.Cancel("test_cancellation")
if len(storage.pinnedWAL) != 0 {
t.Fatal("E3: WAL pin must be released after cancellation")
}
hasRelease := false
for _, e := range driver.Orchestrator.Log.EventsFor("r1") {
if e.Event == "exec_resources_released" {
hasRelease = true
}
}
if !hasRelease {
t.Fatal("E3: cancellation release should be logged")
}
}
func TestExecutor_E3_RebuildCancel_ReleasesAll(t *testing.T) {
driver, plan, storage := setupRebuildDriver(t)
exec := NewRebuildExecutor(driver, plan)
exec.Cancel("epoch_bump")
if len(storage.pinnedSnaps) != 0 || len(storage.pinnedWAL) != 0 {
t.Fatal("E3: all rebuild pins must be released after cancellation")
}
}
// --- E4: Successful execution releases resources ---
func TestExecutor_E4_SuccessfulCatchUp_ReleasesWAL(t *testing.T) {
driver, plan := setupCatchUpDriver(t, 70)
storage := driver.Storage.(*mockStorage)
exec := NewCatchUpExecutor(driver, plan)
err := exec.Execute([]uint64{80, 90, 100}, 0, 0)
if err != nil {
t.Fatal(err)
}
if len(storage.pinnedWAL) != 0 {
t.Fatal("E4: WAL pin must be released after successful catch-up")
}
if driver.Orchestrator.Registry.Sender("r1").State() != StateInSync {
t.Fatalf("state=%s", driver.Orchestrator.Registry.Sender("r1").State())
}
}
func TestExecutor_E4_SuccessfulRebuild_ReleasesAll(t *testing.T) {
driver, plan, storage := setupRebuildDriver(t)
exec := NewRebuildExecutor(driver, plan)
err := exec.Execute(&storage.history)
if err != nil {
t.Fatal(err)
}
if len(storage.pinnedSnaps) != 0 || len(storage.pinnedWAL) != 0 {
t.Fatal("E4: all pins must be released after successful rebuild")
}
if driver.Orchestrator.Registry.Sender("r1").State() != StateInSync {
t.Fatalf("state=%s", driver.Orchestrator.Registry.Sender("r1").State())
}
}
// --- E5: Executor drives sender APIs stepwise ---
func TestExecutor_E5_CatchUp_StepwiseNotConvenience(t *testing.T) {
driver, plan := setupCatchUpDriver(t, 70)
exec := NewCatchUpExecutor(driver, plan)
err := exec.Execute([]uint64{80, 90, 100}, 0, 0)
if err != nil {
t.Fatal(err)
}
// Verify stepwise execution happened via log events.
events := driver.Orchestrator.Log.EventsFor("r1")
hasStarted := false
hasCompleted := false
for _, e := range events {
if e.Event == "exec_catchup_started" {
hasStarted = true
}
if e.Event == "exec_completed" {
hasCompleted = true
}
}
if !hasStarted || !hasCompleted {
t.Fatal("E5: executor should log stepwise start and completion")
}
}
func TestExecutor_E5_Rebuild_StepwiseNotConvenience(t *testing.T) {
driver, plan, storage := setupRebuildDriver(t)
exec := NewRebuildExecutor(driver, plan)
err := exec.Execute(&storage.history)
if err != nil {
t.Fatal(err)
}
events := driver.Orchestrator.Log.EventsFor("r1")
hasStarted := false
hasCompleted := false
for _, e := range events {
if e.Event == "exec_rebuild_started" {
hasStarted = true
}
if e.Event == "exec_rebuild_completed" {
hasCompleted = true
}
}
if !hasStarted || !hasCompleted {
t.Fatal("E5: rebuild executor should log stepwise start and completion")
}
}
Loading…
Cancel
Save