@ -2,14 +2,31 @@ package replication
import "fmt"
// === Phase 06 P2: Stepwise Executor ===
//
// Replaces CompleteCatchUp/CompleteRebuild convenience wrappers with
// explicit stepwise execution that owns resource lifecycle.
// All exit paths (success, failure, cancellation) release resources.
// === Phase 06 P2 / Phase 08 P2: Stepwise Executor ===
// CatchUpIO is the I/O interface that the catch-up executor calls to
// perform real WAL streaming. Implemented by the v2bridge executor.
// The engine defines this interface; it does NOT import weed/.
type CatchUpIO interface {
// StreamWALEntries reads WAL entries from startExclusive+1 to endInclusive.
// Returns the highest LSN successfully transferred.
StreamWALEntries ( startExclusive , endInclusive uint64 ) ( transferredTo uint64 , err error )
}
// RebuildIO is the I/O interface that the rebuild executor calls to
// perform real data transfer. Implemented by the v2bridge executor.
type RebuildIO interface {
// TransferFullBase transfers the full extent image at committedLSN.
TransferFullBase ( committedLSN uint64 ) error
// TransferSnapshot transfers a checkpoint/snapshot at snapshotLSN.
TransferSnapshot ( snapshotLSN uint64 ) error
// StreamWALEntries for tail replay after snapshot transfer.
StreamWALEntries ( startExclusive , endInclusive uint64 ) ( transferredTo uint64 , err error )
}
// CatchUpExecutor drives stepwise catch-up execution with resource lifecycle.
// Created from a RecoveryPlan. Releases all resources on every exit path.
// When IO is set, the executor calls real bridge I/O for WAL streaming.
// When IO is nil, the executor uses caller-supplied progress LSNs (test mode).
type CatchUpExecutor struct {
driver * RecoveryDriver
plan * RecoveryPlan
@ -17,9 +34,10 @@ type CatchUpExecutor struct {
sessID uint64
released bool
// IO performs real WAL streaming. If nil, uses progressLSNs from Execute().
IO CatchUpIO
// OnStep is an optional callback invoked between executor-managed steps.
// Used for deterministic fault injection in tests (e.g., epoch bump).
// step is the 0-based index of the completed step.
OnStep func ( step int )
}
@ -57,36 +75,48 @@ func (e *CatchUpExecutor) Execute(progressLSNs []uint64, startTick uint64) error
return err
}
e . driver . Orchestrator . Log . Record ( e . replicaID , e . sessID , "exec_catchup_started" ,
fmt . Sprintf ( "steps=%d tick=%d" , len ( progressLSNs ) , startTick ) )
fmt . Sprintf ( "target=%d tick=%d io=%v" , e . plan . CatchUpTarget , startTick , e . IO != nil ) )
// Step 2: stepwise progress.
for i , lsn := range progressLSNs {
// Check if session was invalidated (epoch bump / endpoint change).
if ! s . HasActiveSession ( ) || s . SessionID ( ) != e . sessID {
e . release ( "session_invalidated_mid_execution" )
return fmt . Errorf ( "session invalidated during catch-up step %d" , i )
// Step 2: progress — either via real IO bridge or caller-supplied LSNs.
if e . IO != nil {
// Real I/O path: stream WAL entries through the bridge.
transferred , err := e . IO . StreamWALEntries ( e . plan . CatchUpStartLSN , e . plan . CatchUpTarget )
if err != nil {
e . release ( fmt . Sprintf ( "io_stream_failed: %s" , err ) )
return err
}
tick := startTick + uint64 ( i + 1 )
if err := s . RecordCatchUpProgress ( e . sessID , lsn , tick ) ; err != nil {
e . release ( fmt . Sprintf ( "progress_failed_step_%d: %s" , i , err ) )
tick := startTick + 1
if err := s . RecordCatchUpProgress ( e . sessID , transferred , tick ) ; err != nil {
e . release ( fmt . Sprintf ( "progress_after_io: %s" , err ) )
return err
}
} else {
// Test path: caller-supplied progress LSNs.
for i , lsn := range progressLSNs {
if ! s . HasActiveSession ( ) || s . SessionID ( ) != e . sessID {
e . release ( "session_invalidated_mid_execution" )
return fmt . Errorf ( "session invalidated during catch-up step %d" , i )
}
// Fire step callback (test hook for fault injection).
if e . OnStep != nil {
e . OnStep ( i )
}
tick := startTick + uint64 ( i + 1 )
if err := s . RecordCatchUpProgress ( e . sessID , lsn , tick ) ; err != nil {
e . release ( fmt . Sprintf ( "progress_failed_step_%d: %s" , i , err ) )
return err
}
// Check budget after each step.
v , err := s . CheckBudget ( e . sessID , tick )
if err != nil {
e . release ( fmt . Sprintf ( "budget_check_failed: %s" , err ) )
return err
}
if v != BudgetOK {
e . release ( fmt . Sprintf ( "budget_escalated: %s" , v ) )
return fmt . Errorf ( "budget violation at step %d: %s" , i , v )
if e . OnStep != nil {
e . OnStep ( i )
}
v , err := s . CheckBudget ( e . sessID , tick )
if err != nil {
e . release ( fmt . Sprintf ( "budget_check_failed: %s" , err ) )
return err
}
if v != BudgetOK {
e . release ( fmt . Sprintf ( "budget_escalated: %s" , v ) )
return fmt . Errorf ( "budget violation at step %d: %s" , i , v )
}
}
}
@ -129,12 +159,17 @@ func (e *CatchUpExecutor) release(reason string) {
}
// RebuildExecutor drives stepwise rebuild execution with resource lifecycle.
// When IO is set, the executor calls real bridge I/O for data transfer.
// When IO is nil, the executor only advances sender/session state (test mode).
type RebuildExecutor struct {
driver * RecoveryDriver
plan * RecoveryPlan
replicaID string
sessID uint64
released bool
// IO performs real rebuild data transfer. If nil, only advances FSM (test mode).
IO RebuildIO
}
// NewRebuildExecutor creates a rebuild executor from a plan.
@ -185,36 +220,56 @@ func (e *RebuildExecutor) Execute() error {
}
e . driver . Orchestrator . Log . Record ( e . replicaID , e . sessID , "exec_rebuild_started" ,
fmt . Sprintf ( "source=%s target=%d" , plan . RebuildSource , plan . RebuildTargetLSN ) )
fmt . Sprintf ( "source=%s target=%d io=%v " , plan . RebuildSource , plan . RebuildTargetLSN , e . IO != nil ) )
// Step 3: transfer.
// Step 3: transfer — with real I/O if bridge is wired .
if err := s . BeginRebuildTransfer ( e . sessID ) ; err != nil {
e . release ( fmt . Sprintf ( "transfer_failed: %s" , err ) )
return err
}
if plan . RebuildSource == RebuildSnapshotTail {
// Real I/O: transfer snapshot through bridge.
if e . IO != nil {
if err := e . IO . TransferSnapshot ( plan . RebuildSnapshotLSN ) ; err != nil {
e . release ( fmt . Sprintf ( "io_snapshot_transfer_failed: %s" , err ) )
return err
}
}
if err := s . RecordRebuildTransferProgress ( e . sessID , plan . RebuildSnapshotLSN ) ; err != nil {
e . release ( fmt . Sprintf ( "transfer_progress_failed: %s" , err ) )
return err
}
// Check invalidation before tail replay.
if ! s . HasActiveSession ( ) || s . SessionID ( ) != e . sessID {
e . release ( "session_invalidated_mid_rebuild" )
return fmt . Errorf ( "session invalidated during rebuild" )
}
// Step 4: tail replay.
if err := s . BeginRebuildTailReplay ( e . sessID ) ; err != nil {
e . release ( fmt . Sprintf ( "tail_replay_failed: %s" , err ) )
return err
}
// Real I/O: stream WAL tail through bridge.
if e . IO != nil {
_ , ioErr := e . IO . StreamWALEntries ( plan . RebuildSnapshotLSN , plan . RebuildTargetLSN )
if ioErr != nil {
e . release ( fmt . Sprintf ( "io_tail_replay_failed: %s" , ioErr ) )
return ioErr
}
}
if err := s . RecordRebuildTailProgress ( e . sessID , plan . RebuildTargetLSN ) ; err != nil {
e . release ( fmt . Sprintf ( "tail_progress_failed: %s" , err ) )
return err
}
} else {
// Real I/O: transfer full base through bridge.
if e . IO != nil {
if err := e . IO . TransferFullBase ( plan . RebuildTargetLSN ) ; err != nil {
e . release ( fmt . Sprintf ( "io_full_base_failed: %s" , err ) )
return err
}
}
if err := s . RecordRebuildTransferProgress ( e . sessID , plan . RebuildTargetLSN ) ; err != nil {
e . release ( fmt . Sprintf ( "transfer_progress_failed: %s" , err ) )
return err