From f5c0aab4540926fce827a96861e5e59867031e7d Mon Sep 17 00:00:00 2001 From: pingqiu Date: Mon, 30 Mar 2026 13:33:52 -0700 Subject: [PATCH] fix: rebuild executor consumes bound plan, fix catch-up timing MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Planner/executor contract: - RebuildExecutor.Execute() takes no arguments — consumes plan-bound RebuildSource, RebuildSnapshotLSN, RebuildTargetLSN - RecoveryPlan binds all rebuild targets at plan time - Executor cannot re-derive policy from caller-supplied history Catch-up timing: - Removed unused completeTick parameter from CatchUpExecutor.Execute - Per-step ticks synthesized as startTick + stepIndex + 1 - API shape matches implementation New test: PlanExecuteConsistency_RebuildCannotSwitchSource - Plans snapshot+tail, then mutates storage history - Executor succeeds using plan-bound values (not re-derived) Co-Authored-By: Claude Opus 4.6 (1M context) --- sw-block/engine/replication/driver.go | 21 +++++--- sw-block/engine/replication/executor.go | 44 +++++++++-------- sw-block/engine/replication/executor_test.go | 50 ++++++++++++++++---- 3 files changed, 80 insertions(+), 35 deletions(-) diff --git a/sw-block/engine/replication/driver.go b/sw-block/engine/replication/driver.go index e7b7e2ebf..8eab1f1c2 100644 --- a/sw-block/engine/replication/driver.go +++ b/sw-block/engine/replication/driver.go @@ -23,6 +23,7 @@ import "fmt" // a stepwise execution interface. // RecoveryPlan represents a planned recovery operation with acquired resources. +// The executor consumes this plan — it does not re-derive policy. type RecoveryPlan struct { ReplicaID string SessionID uint64 @@ -34,10 +35,14 @@ type RecoveryPlan struct { SnapshotPin *SnapshotPin // for snapshot+tail rebuild FullBasePin *FullBasePin // for full-base rebuild - // Targets. - CatchUpTarget uint64 // for catch-up: target LSN + // Catch-up targets (bound at plan time). + CatchUpTarget uint64 // for catch-up: committed LSN at plan time TruncateLSN uint64 // non-zero if truncation required - RebuildSource RebuildSource + + // Rebuild targets (bound at plan time). + RebuildSource RebuildSource + RebuildSnapshotLSN uint64 // for snapshot+tail: the snapshot LSN + RebuildTargetLSN uint64 // committed LSN at plan time } // RecoveryDriver plans and executes recovery operations using real @@ -138,10 +143,12 @@ func (d *RecoveryDriver) PlanRebuild(replicaID string) (*RecoveryPlan, error) { source, snapLSN := history.RebuildSourceDecision() plan := &RecoveryPlan{ - ReplicaID: replicaID, - SessionID: sessID, - Outcome: OutcomeNeedsRebuild, - RebuildSource: source, + ReplicaID: replicaID, + SessionID: sessID, + Outcome: OutcomeNeedsRebuild, + RebuildSource: source, + RebuildSnapshotLSN: snapLSN, + RebuildTargetLSN: history.CommittedLSN, } if source == RebuildSnapshotTail { diff --git a/sw-block/engine/replication/executor.go b/sw-block/engine/replication/executor.go index 6ee33712c..410f48194 100644 --- a/sw-block/engine/replication/executor.go +++ b/sw-block/engine/replication/executor.go @@ -36,8 +36,10 @@ func NewCatchUpExecutor(driver *RecoveryDriver, plan *RecoveryPlan) *CatchUpExec // 4. CompleteSessionByID // 5. Release resources // +// progressLSNs are the stepwise LSN targets. startTick is the time when +// catch-up begins. Each step is assigned tick = startTick + stepIndex + 1. // On any failure or cancellation, resources are released before returning. -func (e *CatchUpExecutor) Execute(progressLSNs []uint64, startTick, completeTick uint64) error { +func (e *CatchUpExecutor) Execute(progressLSNs []uint64, startTick uint64) error { s := e.driver.Orchestrator.Registry.Sender(e.replicaID) if s == nil { e.release("sender_not_found") @@ -135,39 +137,45 @@ func NewRebuildExecutor(driver *RecoveryDriver, plan *RecoveryPlan) *RebuildExec } } -// Execute runs the full rebuild lifecycle stepwise: -// 1. BeginConnect + RecordHandshake -// 2. SelectRebuildFromHistory -// 3. BeginRebuildTransfer + progress steps -// 4. BeginRebuildTailReplay + progress steps (snapshot+tail only) +// Execute runs the full rebuild lifecycle from the bound plan. +// Does NOT re-derive policy from caller-supplied history — uses +// plan.RebuildSource, plan.RebuildSnapshotLSN, plan.RebuildTargetLSN. +// +// Steps: +// 1. BeginConnect + RecordHandshake (target from plan) +// 2. SelectRebuildSource (source/snapshot from plan) +// 3. BeginRebuildTransfer + progress +// 4. BeginRebuildTailReplay + progress (snapshot+tail only) // 5. CompleteRebuild // 6. Release resources -func (e *RebuildExecutor) Execute(history *RetainedHistory) error { +func (e *RebuildExecutor) Execute() error { s := e.driver.Orchestrator.Registry.Sender(e.replicaID) if s == nil { e.release("sender_not_found") return fmt.Errorf("sender %q not found", e.replicaID) } - // Step 1: connect + handshake. + plan := e.plan + + // Step 1: connect + handshake with plan-bound target. if err := s.BeginConnect(e.sessID); err != nil { e.release(fmt.Sprintf("connect_failed: %s", err)) return err } - if err := s.RecordHandshake(e.sessID, 0, history.CommittedLSN); err != nil { + if err := s.RecordHandshake(e.sessID, 0, plan.RebuildTargetLSN); err != nil { e.release(fmt.Sprintf("handshake_failed: %s", err)) return err } - // Step 2: select source from history. - if err := s.SelectRebuildFromHistory(e.sessID, history); err != nil { + // Step 2: select source from plan-bound values (not re-derived). + valid := plan.RebuildSource == RebuildSnapshotTail + if err := s.SelectRebuildSource(e.sessID, plan.RebuildSnapshotLSN, valid, plan.RebuildTargetLSN); err != nil { e.release(fmt.Sprintf("source_select_failed: %s", err)) return err } - source, snapLSN := history.RebuildSourceDecision() e.driver.Orchestrator.Log.Record(e.replicaID, e.sessID, "exec_rebuild_started", - fmt.Sprintf("source=%s", source)) + fmt.Sprintf("source=%s target=%d", plan.RebuildSource, plan.RebuildTargetLSN)) // Step 3: transfer. if err := s.BeginRebuildTransfer(e.sessID); err != nil { @@ -175,9 +183,8 @@ func (e *RebuildExecutor) Execute(history *RetainedHistory) error { return err } - if source == RebuildSnapshotTail { - // Transfer snapshot base. - if err := s.RecordRebuildTransferProgress(e.sessID, snapLSN); err != nil { + if plan.RebuildSource == RebuildSnapshotTail { + if err := s.RecordRebuildTransferProgress(e.sessID, plan.RebuildSnapshotLSN); err != nil { e.release(fmt.Sprintf("transfer_progress_failed: %s", err)) return err } @@ -193,13 +200,12 @@ func (e *RebuildExecutor) Execute(history *RetainedHistory) error { e.release(fmt.Sprintf("tail_replay_failed: %s", err)) return err } - if err := s.RecordRebuildTailProgress(e.sessID, history.CommittedLSN); err != nil { + if err := s.RecordRebuildTailProgress(e.sessID, plan.RebuildTargetLSN); err != nil { e.release(fmt.Sprintf("tail_progress_failed: %s", err)) return err } } else { - // Full base transfer. - if err := s.RecordRebuildTransferProgress(e.sessID, history.CommittedLSN); err != nil { + if err := s.RecordRebuildTransferProgress(e.sessID, plan.RebuildTargetLSN); err != nil { e.release(fmt.Sprintf("transfer_progress_failed: %s", err)) return err } diff --git a/sw-block/engine/replication/executor_test.go b/sw-block/engine/replication/executor_test.go index ca622c0ca..bb6fdd84c 100644 --- a/sw-block/engine/replication/executor_test.go +++ b/sw-block/engine/replication/executor_test.go @@ -79,7 +79,7 @@ func TestExecutor_E1_PartialCatchUp_ProgressFailure_ReleasesWAL(t *testing.T) { exec := NewCatchUpExecutor(driver, plan) // Progress with LSN that will exceed frozen target (target=100, try 101). - err := exec.Execute([]uint64{80, 90, 101}, 0, 0) + err := exec.Execute([]uint64{80, 90, 101}, 0) if err == nil { t.Fatal("should fail on progress beyond frozen target") } @@ -124,7 +124,7 @@ func TestExecutor_E1_BudgetEscalation_ReleasesWAL(t *testing.T) { exec := NewCatchUpExecutor(driver, plan) // Execute with ticks that exceed budget (3 steps, each +1 tick, budget=3). - err := exec.Execute([]uint64{600, 700, 800, 900}, 0, 0) + err := exec.Execute([]uint64{600, 700, 800, 900}, 0) if err == nil { t.Fatal("should escalate on budget") } @@ -153,7 +153,7 @@ func TestExecutor_E2_PartialRebuild_TransferFailure_ReleasesAll(t *testing.T) { // Invalidate session before execution → will fail at connect. driver.Orchestrator.Registry.Sender("r1").UpdateEpoch(2) - err := exec.Execute(&storage.history) + err := exec.Execute() if err == nil { t.Fatal("should fail on invalidated session") } @@ -178,7 +178,7 @@ func TestExecutor_E3_EpochBump_MidCatchUp_ReleasesWAL(t *testing.T) { driver.Orchestrator.UpdateSenderEpoch("r1", 2) exec := NewCatchUpExecutor(driver, plan) - err := exec.Execute([]uint64{80, 90, 100}, 0, 0) + err := exec.Execute([]uint64{80, 90, 100}, 0) if err == nil { t.Fatal("should fail on stale session") } @@ -228,7 +228,7 @@ func TestExecutor_E4_SuccessfulCatchUp_ReleasesWAL(t *testing.T) { storage := driver.Storage.(*mockStorage) exec := NewCatchUpExecutor(driver, plan) - err := exec.Execute([]uint64{80, 90, 100}, 0, 0) + err := exec.Execute([]uint64{80, 90, 100}, 0) if err != nil { t.Fatal(err) } @@ -245,7 +245,7 @@ func TestExecutor_E4_SuccessfulRebuild_ReleasesAll(t *testing.T) { driver, plan, storage := setupRebuildDriver(t) exec := NewRebuildExecutor(driver, plan) - err := exec.Execute(&storage.history) + err := exec.Execute() if err != nil { t.Fatal(err) } @@ -258,13 +258,45 @@ func TestExecutor_E4_SuccessfulRebuild_ReleasesAll(t *testing.T) { } } +// --- Plan/execute consistency --- + +func TestExecutor_PlanExecuteConsistency_RebuildCannotSwitchSource(t *testing.T) { + // Plan acquires snapshot+tail resources. + // Prove the executor uses plan-bound source, not re-derived policy. + driver, plan, storage := setupRebuildDriver(t) + + if plan.RebuildSource != RebuildSnapshotTail { + t.Fatalf("plan source=%s", plan.RebuildSource) + } + if plan.RebuildSnapshotLSN != 50 { + t.Fatalf("plan snapshot=%d", plan.RebuildSnapshotLSN) + } + if plan.RebuildTargetLSN != 100 { + t.Fatalf("plan target=%d", plan.RebuildTargetLSN) + } + + // Even if storage history changes after planning, executor uses plan. + storage.history.CheckpointTrusted = false // would cause full_base if re-derived + storage.history.TailLSN = 80 // would make snapshot unreplayable + + exec := NewRebuildExecutor(driver, plan) + err := exec.Execute() // uses plan-bound values, not storage.history + if err != nil { + t.Fatalf("execution should succeed from plan-bound values: %v", err) + } + + if driver.Orchestrator.Registry.Sender("r1").State() != StateInSync { + t.Fatalf("state=%s", driver.Orchestrator.Registry.Sender("r1").State()) + } +} + // --- E5: Executor drives sender APIs stepwise --- func TestExecutor_E5_CatchUp_StepwiseNotConvenience(t *testing.T) { driver, plan := setupCatchUpDriver(t, 70) exec := NewCatchUpExecutor(driver, plan) - err := exec.Execute([]uint64{80, 90, 100}, 0, 0) + err := exec.Execute([]uint64{80, 90, 100}, 0) if err != nil { t.Fatal(err) } @@ -287,10 +319,10 @@ func TestExecutor_E5_CatchUp_StepwiseNotConvenience(t *testing.T) { } func TestExecutor_E5_Rebuild_StepwiseNotConvenience(t *testing.T) { - driver, plan, storage := setupRebuildDriver(t) + driver, plan, _ := setupRebuildDriver(t) exec := NewRebuildExecutor(driver, plan) - err := exec.Execute(&storage.history) + err := exec.Execute() if err != nil { t.Fatal(err) }