From 8c326c871c9b1b3657ba650c9bb2d0d8bec3a50d Mon Sep 17 00:00:00 2001 From: pingqiu Date: Mon, 30 Mar 2026 18:07:20 -0700 Subject: [PATCH] feat: add contract interfaces and pin/release via release-func pattern (Phase 07 P1) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit E5 handoff contract (contract.go): - BlockVolReader: ReadState() → BlockVolState from real blockvol - BlockVolPinner: HoldWALRetention/HoldSnapshot/HoldFullBase → release func - BlockVolExecutor: StreamWALEntries/TransferSnapshot/TransferFullBase/TruncateWAL - Clear import direction: weed-side imports sw-block, not reverse StorageAdapter refactored: - Consumes BlockVolReader + BlockVolPinner interfaces - Pin/release uses release-func pattern (not map-based tracking) - PushStorageAdapter for tests (push-based, no blockvol dependency) 10 bridge tests: - 4 control adapter (identity, address change, role mapping, primary) - 4 storage adapter (retained history, WAL pin reject, snapshot reject, symmetry) - 1 E2E (assignment → adapter → engine → plan → execute → InSync) - 1 contract interface verification Co-Authored-By: Claude Opus 4.6 (1M context) --- sw-block/bridge/blockvol/bridge_test.go | 190 ++++++++------------ sw-block/bridge/blockvol/contract.go | 82 +++++++++ sw-block/bridge/blockvol/storage_adapter.go | 181 +++++++++++-------- 3 files changed, 266 insertions(+), 187 deletions(-) create mode 100644 sw-block/bridge/blockvol/contract.go diff --git a/sw-block/bridge/blockvol/bridge_test.go b/sw-block/bridge/blockvol/bridge_test.go index 4ff20e2ed..53a6a89c9 100644 --- a/sw-block/bridge/blockvol/bridge_test.go +++ b/sw-block/bridge/blockvol/bridge_test.go @@ -7,54 +7,26 @@ import ( ) // ============================================================ -// Phase 07 P0: Bridge adapter tests -// Validates E1-E3 expectations against concrete adapter code. +// Phase 07 P0/P1: Bridge adapter tests // ============================================================ -// --- E1: Real assignment → engine intent --- +// --- E1: Stable identity --- func TestControlAdapter_StableIdentity(t *testing.T) { ca := NewControlAdapter() - primary := MasterAssignment{ - VolumeName: "pvc-data-1", - Epoch: 3, - Role: "primary", - PrimaryServerID: "vs1", - } - replicas := []MasterAssignment{ - { - VolumeName: "pvc-data-1", - Epoch: 3, - Role: "replica", - ReplicaServerID: "vs2", - DataAddr: "10.0.0.2:9333", - CtrlAddr: "10.0.0.2:9334", - AddrVersion: 1, + intent := ca.ToAssignmentIntent( + MasterAssignment{VolumeName: "pvc-data-1", Epoch: 3, Role: "primary", PrimaryServerID: "vs1"}, + []MasterAssignment{ + {VolumeName: "pvc-data-1", Epoch: 3, Role: "replica", ReplicaServerID: "vs2", + DataAddr: "10.0.0.2:9333", CtrlAddr: "10.0.0.2:9334", AddrVersion: 1}, }, - } - - intent := ca.ToAssignmentIntent(primary, replicas) - - if intent.Epoch != 3 { - t.Fatalf("epoch=%d", intent.Epoch) - } - if len(intent.Replicas) != 1 { - t.Fatalf("replicas=%d", len(intent.Replicas)) - } + ) - // ReplicaID is stable: volume-name/server-id (NOT address). r := intent.Replicas[0] if r.ReplicaID != "pvc-data-1/vs2" { - t.Fatalf("ReplicaID=%s (must be volume/server, not address)", r.ReplicaID) - } - - // Endpoint is the address (mutable). - if r.Endpoint.DataAddr != "10.0.0.2:9333" { - t.Fatalf("DataAddr=%s", r.Endpoint.DataAddr) + t.Fatalf("ReplicaID=%s (must be volume/server)", r.ReplicaID) } - - // Recovery target mapped. if intent.RecoveryTargets["pvc-data-1/vs2"] != engine.SessionCatchUp { t.Fatalf("recovery=%s", intent.RecoveryTargets["pvc-data-1/vs2"]) } @@ -63,129 +35,113 @@ func TestControlAdapter_StableIdentity(t *testing.T) { func TestControlAdapter_AddressChangePreservesIdentity(t *testing.T) { ca := NewControlAdapter() - // First assignment. intent1 := ca.ToAssignmentIntent( - MasterAssignment{VolumeName: "vol1", Epoch: 1, Role: "primary", PrimaryServerID: "vs1"}, - []MasterAssignment{ - {VolumeName: "vol1", ReplicaServerID: "vs2", Role: "replica", DataAddr: "10.0.0.2:9333", AddrVersion: 1}, - }, + MasterAssignment{VolumeName: "vol1", Epoch: 1, Role: "primary"}, + []MasterAssignment{{VolumeName: "vol1", ReplicaServerID: "vs2", Role: "replica", DataAddr: "10.0.0.2:9333", AddrVersion: 1}}, ) - - // Address changes — new assignment. intent2 := ca.ToAssignmentIntent( - MasterAssignment{VolumeName: "vol1", Epoch: 1, Role: "primary", PrimaryServerID: "vs1"}, - []MasterAssignment{ - {VolumeName: "vol1", ReplicaServerID: "vs2", Role: "replica", DataAddr: "10.0.0.3:9333", AddrVersion: 2}, - }, + MasterAssignment{VolumeName: "vol1", Epoch: 1, Role: "primary"}, + []MasterAssignment{{VolumeName: "vol1", ReplicaServerID: "vs2", Role: "replica", DataAddr: "10.0.0.3:9333", AddrVersion: 2}}, ) - // Same ReplicaID despite different address. if intent1.Replicas[0].ReplicaID != intent2.Replicas[0].ReplicaID { - t.Fatalf("identity changed: %s → %s", - intent1.Replicas[0].ReplicaID, intent2.Replicas[0].ReplicaID) - } - - // Endpoint updated. - if intent2.Replicas[0].Endpoint.DataAddr != "10.0.0.3:9333" { - t.Fatalf("endpoint not updated: %s", intent2.Replicas[0].Endpoint.DataAddr) + t.Fatal("identity changed") } } func TestControlAdapter_RebuildRoleMapping(t *testing.T) { ca := NewControlAdapter() - intent := ca.ToAssignmentIntent( MasterAssignment{VolumeName: "vol1", Epoch: 1, Role: "primary"}, - []MasterAssignment{ - {VolumeName: "vol1", ReplicaServerID: "vs2", Role: "rebuilding", DataAddr: "10.0.0.2:9333"}, - }, + []MasterAssignment{{VolumeName: "vol1", ReplicaServerID: "vs2", Role: "rebuilding", DataAddr: "10.0.0.2:9333"}}, ) - if intent.RecoveryTargets["vol1/vs2"] != engine.SessionRebuild { - t.Fatalf("rebuilding role should map to SessionRebuild, got %s", - intent.RecoveryTargets["vol1/vs2"]) + t.Fatalf("got %s", intent.RecoveryTargets["vol1/vs2"]) } } func TestControlAdapter_PrimaryNoRecovery(t *testing.T) { ca := NewControlAdapter() - intent := ca.ToAssignmentIntent( MasterAssignment{VolumeName: "vol1", Epoch: 1, Role: "primary"}, - []MasterAssignment{}, // no replicas + []MasterAssignment{}, ) - if len(intent.RecoveryTargets) != 0 { t.Fatal("primary should not have recovery targets") } } -// --- E2: Real storage truth → RetainedHistory --- +// --- E2: Storage adapter via contract interfaces --- -func TestStorageAdapter_RetainedHistoryFromRealState(t *testing.T) { - sa := NewStorageAdapter() - sa.UpdateState(BlockVolState{ - WALHeadLSN: 100, - WALTailLSN: 30, - CommittedLSN: 90, - CheckpointLSN: 50, - CheckpointTrusted: true, +func TestStorageAdapter_RetainedHistoryFromReader(t *testing.T) { + psa := NewPushStorageAdapter() + psa.UpdateState(BlockVolState{ + WALHeadLSN: 100, WALTailLSN: 30, CommittedLSN: 90, + CheckpointLSN: 50, CheckpointTrusted: true, }) - rh := sa.GetRetainedHistory() - - if rh.HeadLSN != 100 { - t.Fatalf("HeadLSN=%d", rh.HeadLSN) - } - if rh.TailLSN != 30 { - t.Fatalf("TailLSN=%d", rh.TailLSN) + rh := psa.GetRetainedHistory() + if rh.HeadLSN != 100 || rh.TailLSN != 30 || rh.CommittedLSN != 90 { + t.Fatalf("head=%d tail=%d committed=%d", rh.HeadLSN, rh.TailLSN, rh.CommittedLSN) } - if rh.CommittedLSN != 90 { - t.Fatalf("CommittedLSN=%d", rh.CommittedLSN) - } - if rh.CheckpointLSN != 50 { - t.Fatalf("CheckpointLSN=%d", rh.CheckpointLSN) - } - if !rh.CheckpointTrusted { - t.Fatal("CheckpointTrusted should be true") + if rh.CheckpointLSN != 50 || !rh.CheckpointTrusted { + t.Fatalf("checkpoint=%d trusted=%v", rh.CheckpointLSN, rh.CheckpointTrusted) } } func TestStorageAdapter_WALPinRejectsRecycled(t *testing.T) { - sa := NewStorageAdapter() - sa.UpdateState(BlockVolState{WALTailLSN: 50}) + psa := NewPushStorageAdapter() + psa.UpdateState(BlockVolState{WALTailLSN: 50}) - _, err := sa.PinWALRetention(30) // 30 < tail 50 + _, err := psa.PinWALRetention(30) if err == nil { - t.Fatal("WAL pin should be rejected when range is recycled") + t.Fatal("should reject recycled range") } } -func TestStorageAdapter_SnapshotPinRejectsInvalid(t *testing.T) { - sa := NewStorageAdapter() - sa.UpdateState(BlockVolState{CheckpointLSN: 50, CheckpointTrusted: false}) +func TestStorageAdapter_SnapshotPinRejectsUntrusted(t *testing.T) { + psa := NewPushStorageAdapter() + psa.UpdateState(BlockVolState{CheckpointLSN: 50, CheckpointTrusted: false}) - _, err := sa.PinSnapshot(50) + _, err := psa.PinSnapshot(50) if err == nil { - t.Fatal("snapshot pin should be rejected when checkpoint is untrusted") + t.Fatal("should reject untrusted checkpoint") } } -// --- E3: Engine integration through bridge --- +func TestStorageAdapter_PinReleaseSymmetry(t *testing.T) { + psa := NewPushStorageAdapter() + psa.UpdateState(BlockVolState{WALTailLSN: 0, CheckpointLSN: 50, CheckpointTrusted: true}) + + walPin, _ := psa.PinWALRetention(10) + snapPin, _ := psa.PinSnapshot(50) + basePin, _ := psa.PinFullBase(100) + + // Pins tracked. + if len(psa.releaseFuncs) != 3 { + t.Fatalf("pins=%d", len(psa.releaseFuncs)) + } + + // Release all. + psa.ReleaseWALRetention(walPin) + psa.ReleaseSnapshot(snapPin) + psa.ReleaseFullBase(basePin) + + if len(psa.releaseFuncs) != 0 { + t.Fatalf("leaked pins=%d", len(psa.releaseFuncs)) + } +} + +// --- E3: End-to-end bridge flow --- func TestBridge_E2E_AssignmentToRecovery(t *testing.T) { - // Full bridge flow: master assignment → adapter → engine. ca := NewControlAdapter() - sa := NewStorageAdapter() - sa.UpdateState(BlockVolState{ - WALHeadLSN: 100, - WALTailLSN: 30, - CommittedLSN: 100, - CheckpointLSN: 50, - CheckpointTrusted: true, + psa := NewPushStorageAdapter() + psa.UpdateState(BlockVolState{ + WALHeadLSN: 100, WALTailLSN: 30, CommittedLSN: 100, + CheckpointLSN: 50, CheckpointTrusted: true, }) - // Step 1: master assignment → engine intent. intent := ca.ToAssignmentIntent( MasterAssignment{VolumeName: "vol1", Epoch: 1, Role: "primary", PrimaryServerID: "vs1"}, []MasterAssignment{ @@ -194,11 +150,9 @@ func TestBridge_E2E_AssignmentToRecovery(t *testing.T) { }, ) - // Step 2: engine processes intent. - drv := engine.NewRecoveryDriver(sa) + drv := engine.NewRecoveryDriver(psa) drv.Orchestrator.ProcessAssignment(intent) - // Step 3: plan recovery from real storage state. plan, err := drv.PlanRecovery("vol1/vs2", 70) if err != nil { t.Fatal(err) @@ -206,11 +160,7 @@ func TestBridge_E2E_AssignmentToRecovery(t *testing.T) { if plan.Outcome != engine.OutcomeCatchUp { t.Fatalf("outcome=%s", plan.Outcome) } - if !plan.Proof.Recoverable { - t.Fatalf("proof: %s", plan.Proof.Reason) - } - // Step 4: execute through engine executor. exec := engine.NewCatchUpExecutor(drv, plan) if err := exec.Execute([]uint64{80, 90, 100}, 0); err != nil { t.Fatal(err) @@ -220,3 +170,11 @@ func TestBridge_E2E_AssignmentToRecovery(t *testing.T) { t.Fatalf("state=%s", drv.Orchestrator.Registry.Sender("vol1/vs2").State()) } } + +// --- E5: Contract interface boundary --- + +func TestContract_BlockVolReaderInterface(t *testing.T) { + // Verify the contract interface is implementable. + var _ BlockVolReader = &pushReader{psa: NewPushStorageAdapter()} + var _ BlockVolPinner = &pushPinner{psa: NewPushStorageAdapter()} +} diff --git a/sw-block/bridge/blockvol/contract.go b/sw-block/bridge/blockvol/contract.go new file mode 100644 index 000000000..c7f5c3b22 --- /dev/null +++ b/sw-block/bridge/blockvol/contract.go @@ -0,0 +1,82 @@ +package blockvol + +// === Phase 07 P1: Handoff contract === +// +// This file defines the interface boundary between: +// - sw-block/bridge/blockvol/ (engine-side, no weed imports) +// - weed/storage/blockvol/v2bridge/ (weed-side, real blockvol imports) +// +// The engine-side bridge defines WHAT the weed-side must provide. +// The weed-side bridge implements HOW using real blockvol internals. +// +// Import direction: +// weed/storage/blockvol/v2bridge/ → imports → sw-block/bridge/blockvol/ +// weed/storage/blockvol/v2bridge/ → imports → sw-block/engine/replication/ +// weed/storage/blockvol/v2bridge/ → imports → weed/storage/blockvol/ +// sw-block/bridge/blockvol/ → imports → sw-block/engine/replication/ +// sw-block/bridge/blockvol/ does NOT import weed/ + +// BlockVolState represents the real storage state from a blockvol instance. +// Each field maps to a specific blockvol source: +// +// WALHeadLSN ← vol.nextLSN - 1 or vol.Status().WALHeadLSN +// WALTailLSN ← vol.flusher.RetentionFloor() +// CommittedLSN ← vol.distCommit.CommittedLSN() +// CheckpointLSN ← vol.flusher.CheckpointLSN() +// CheckpointTrusted ← vol.superblock.Valid + checkpoint file exists +type BlockVolState struct { + WALHeadLSN uint64 + WALTailLSN uint64 + CommittedLSN uint64 + CheckpointLSN uint64 + CheckpointTrusted bool +} + +// BlockVolReader reads real blockvol state. Implemented by the weed-side +// bridge using actual blockvol struct fields. The engine-side bridge +// consumes this interface via the StorageAdapter. +type BlockVolReader interface { + // ReadState returns the current blockvol state snapshot. + // Must read from real blockvol fields: + // WALHeadLSN ← vol.nextLSN - 1 or vol.Status().WALHeadLSN + // WALTailLSN ← vol.flusher.RetentionFloor() + // CommittedLSN ← vol.distCommit.CommittedLSN() + // CheckpointLSN ← vol.flusher.CheckpointLSN() + // CheckpointTrusted ← superblock valid + checkpoint file exists + ReadState() BlockVolState +} + +// BlockVolPinner manages real resource holds against WAL reclaim and +// checkpoint GC. Implemented by the weed-side bridge using actual +// blockvol retention machinery. +type BlockVolPinner interface { + // HoldWALRetention prevents WAL entries from startLSN from being recycled. + // Returns a release function that the caller MUST call when done. + HoldWALRetention(startLSN uint64) (release func(), err error) + + // HoldSnapshot prevents the checkpoint at checkpointLSN from being GC'd. + // Returns a release function. + HoldSnapshot(checkpointLSN uint64) (release func(), err error) + + // HoldFullBase holds a consistent full-extent image at committedLSN. + // Returns a release function. + HoldFullBase(committedLSN uint64) (release func(), err error) +} + +// BlockVolExecutor performs actual recovery I/O. Implemented by the +// weed-side bridge. It does NOT decide recovery policy — it only +// executes what the engine tells it to do. +type BlockVolExecutor interface { + // StreamWALEntries streams WAL entries from startExclusive+1 to endInclusive + // to the replica. Returns the highest LSN successfully transferred. + StreamWALEntries(startExclusive, endInclusive uint64) (transferredTo uint64, err error) + + // TransferSnapshot transfers a checkpoint/snapshot at snapshotLSN to the replica. + TransferSnapshot(snapshotLSN uint64) error + + // TransferFullBase transfers the full extent image to the replica. + TransferFullBase(committedLSN uint64) error + + // TruncateWAL removes entries beyond truncateLSN from the replica. + TruncateWAL(truncateLSN uint64) error +} diff --git a/sw-block/bridge/blockvol/storage_adapter.go b/sw-block/bridge/blockvol/storage_adapter.go index 9169abb97..9ef0cb0fd 100644 --- a/sw-block/bridge/blockvol/storage_adapter.go +++ b/sw-block/bridge/blockvol/storage_adapter.go @@ -8,118 +8,157 @@ import ( engine "github.com/seaweedfs/seaweedfs/sw-block/engine/replication" ) -// BlockVolState represents the real storage state from a blockvol instance. -// This is populated from actual blockvol fields, not reconstructed from -// metadata. Each field maps to a specific blockvol source: +// StorageAdapter implements engine.StorageAdapter by consuming +// BlockVolReader and BlockVolPinner interfaces. When backed by +// real implementations from weed/storage/blockvol/v2bridge/, +// all fields come from actual blockvol state. // -// WALHeadLSN ← vol.wal.HeadLSN() -// WALTailLSN ← vol.flusher.RetentionFloor() -// CommittedLSN ← vol.coordinator.CommittedLSN (from group commit) -// CheckpointLSN ← vol.superblock.CheckpointLSN -// CheckpointTrusted ← vol.superblock.Valid && checkpoint file exists -type BlockVolState struct { - WALHeadLSN uint64 - WALTailLSN uint64 - CommittedLSN uint64 - CheckpointLSN uint64 - CheckpointTrusted bool -} - -// StorageAdapter implements engine.StorageAdapter using real blockvol state. -// It does NOT decide recovery policy — it only exposes storage truth. +// For testing, use PushStorageAdapter (push-based, no blockvol dependency). type StorageAdapter struct { + reader BlockVolReader + pinner BlockVolPinner + mu sync.Mutex - state BlockVolState nextPinID atomic.Uint64 - // Pin tracking (real implementation would hold actual file/WAL references). - snapshotPins map[uint64]bool - walPins map[uint64]bool - fullBasePins map[uint64]bool + // Release functions keyed by pin ID. + releaseFuncs map[uint64]func() } -// NewStorageAdapter creates a storage adapter. The caller must update -// state via UpdateState when blockvol state changes. -func NewStorageAdapter() *StorageAdapter { +// NewStorageAdapter creates a storage adapter backed by real blockvol +// reader and pinner interfaces. +func NewStorageAdapter(reader BlockVolReader, pinner BlockVolPinner) *StorageAdapter { return &StorageAdapter{ - snapshotPins: map[uint64]bool{}, - walPins: map[uint64]bool{}, - fullBasePins: map[uint64]bool{}, + reader: reader, + pinner: pinner, + releaseFuncs: map[uint64]func(){}, } } -// UpdateState refreshes the adapter's view of blockvol state. -// Called when blockvol completes a checkpoint, advances WAL tail, etc. -func (sa *StorageAdapter) UpdateState(state BlockVolState) { - sa.mu.Lock() - defer sa.mu.Unlock() - sa.state = state -} - -// GetRetainedHistory returns the current WAL retention state from real -// blockvol fields. This is NOT reconstructed from test inputs. +// GetRetainedHistory reads real blockvol state via BlockVolReader. func (sa *StorageAdapter) GetRetainedHistory() engine.RetainedHistory { - sa.mu.Lock() - defer sa.mu.Unlock() + state := sa.reader.ReadState() return engine.RetainedHistory{ - HeadLSN: sa.state.WALHeadLSN, - TailLSN: sa.state.WALTailLSN, - CommittedLSN: sa.state.CommittedLSN, - CheckpointLSN: sa.state.CheckpointLSN, - CheckpointTrusted: sa.state.CheckpointTrusted, + HeadLSN: state.WALHeadLSN, + TailLSN: state.WALTailLSN, + CommittedLSN: state.CommittedLSN, + CheckpointLSN: state.CheckpointLSN, + CheckpointTrusted: state.CheckpointTrusted, } } -// PinSnapshot pins a checkpoint for rebuild use. +// PinSnapshot delegates to BlockVolPinner.HoldSnapshot. func (sa *StorageAdapter) PinSnapshot(checkpointLSN uint64) (engine.SnapshotPin, error) { - sa.mu.Lock() - defer sa.mu.Unlock() - if !sa.state.CheckpointTrusted || sa.state.CheckpointLSN != checkpointLSN { - return engine.SnapshotPin{}, fmt.Errorf("no valid checkpoint at LSN %d", checkpointLSN) + release, err := sa.pinner.HoldSnapshot(checkpointLSN) + if err != nil { + return engine.SnapshotPin{}, fmt.Errorf("snapshot pin at LSN %d: %w", checkpointLSN, err) } id := sa.nextPinID.Add(1) - sa.snapshotPins[id] = true + sa.mu.Lock() + sa.releaseFuncs[id] = release + sa.mu.Unlock() return engine.SnapshotPin{LSN: checkpointLSN, PinID: id, Valid: true}, nil } -// ReleaseSnapshot releases a pinned snapshot. +// ReleaseSnapshot calls the held release function. func (sa *StorageAdapter) ReleaseSnapshot(pin engine.SnapshotPin) { sa.mu.Lock() - defer sa.mu.Unlock() - delete(sa.snapshotPins, pin.PinID) + release := sa.releaseFuncs[pin.PinID] + delete(sa.releaseFuncs, pin.PinID) + sa.mu.Unlock() + if release != nil { + release() + } } -// PinWALRetention holds WAL entries from startLSN. +// PinWALRetention delegates to BlockVolPinner.HoldWALRetention. func (sa *StorageAdapter) PinWALRetention(startLSN uint64) (engine.RetentionPin, error) { - sa.mu.Lock() - defer sa.mu.Unlock() - if startLSN < sa.state.WALTailLSN { - return engine.RetentionPin{}, fmt.Errorf("WAL already recycled past LSN %d (tail=%d)", startLSN, sa.state.WALTailLSN) + release, err := sa.pinner.HoldWALRetention(startLSN) + if err != nil { + return engine.RetentionPin{}, fmt.Errorf("WAL retention pin at LSN %d: %w", startLSN, err) } id := sa.nextPinID.Add(1) - sa.walPins[id] = true + sa.mu.Lock() + sa.releaseFuncs[id] = release + sa.mu.Unlock() return engine.RetentionPin{StartLSN: startLSN, PinID: id, Valid: true}, nil } -// ReleaseWALRetention releases a WAL retention hold. +// ReleaseWALRetention calls the held release function. func (sa *StorageAdapter) ReleaseWALRetention(pin engine.RetentionPin) { sa.mu.Lock() - defer sa.mu.Unlock() - delete(sa.walPins, pin.PinID) + release := sa.releaseFuncs[pin.PinID] + delete(sa.releaseFuncs, pin.PinID) + sa.mu.Unlock() + if release != nil { + release() + } } -// PinFullBase pins a full-extent base image for full-base rebuild. +// PinFullBase delegates to BlockVolPinner.HoldFullBase. func (sa *StorageAdapter) PinFullBase(committedLSN uint64) (engine.FullBasePin, error) { - sa.mu.Lock() - defer sa.mu.Unlock() + release, err := sa.pinner.HoldFullBase(committedLSN) + if err != nil { + return engine.FullBasePin{}, fmt.Errorf("full base pin at LSN %d: %w", committedLSN, err) + } id := sa.nextPinID.Add(1) - sa.fullBasePins[id] = true + sa.mu.Lock() + sa.releaseFuncs[id] = release + sa.mu.Unlock() return engine.FullBasePin{CommittedLSN: committedLSN, PinID: id, Valid: true}, nil } -// ReleaseFullBase releases a pinned full base image. +// ReleaseFullBase calls the held release function. func (sa *StorageAdapter) ReleaseFullBase(pin engine.FullBasePin) { sa.mu.Lock() - defer sa.mu.Unlock() - delete(sa.fullBasePins, pin.PinID) + release := sa.releaseFuncs[pin.PinID] + delete(sa.releaseFuncs, pin.PinID) + sa.mu.Unlock() + if release != nil { + release() + } +} + +// PushStorageAdapter is a test-only adapter that uses push-based state +// updates instead of pulling from a BlockVolReader. For use in tests +// that don't have real blockvol instances. +type PushStorageAdapter struct { + *StorageAdapter + state BlockVolState +} + +// NewPushStorageAdapter creates a push-based adapter for tests. +func NewPushStorageAdapter() *PushStorageAdapter { + psa := &PushStorageAdapter{} + psa.StorageAdapter = NewStorageAdapter(&pushReader{psa: psa}, &pushPinner{psa: psa}) + return psa +} + +// UpdateState sets the adapter's state (push model for tests). +func (psa *PushStorageAdapter) UpdateState(state BlockVolState) { + psa.state = state +} + +type pushReader struct{ psa *PushStorageAdapter } + +func (pr *pushReader) ReadState() BlockVolState { return pr.psa.state } + +type pushPinner struct{ psa *PushStorageAdapter } + +func (pp *pushPinner) HoldWALRetention(startLSN uint64) (func(), error) { + if startLSN < pp.psa.state.WALTailLSN { + return nil, fmt.Errorf("WAL recycled past %d (tail=%d)", startLSN, pp.psa.state.WALTailLSN) + } + return func() {}, nil +} + +func (pp *pushPinner) HoldSnapshot(checkpointLSN uint64) (func(), error) { + if !pp.psa.state.CheckpointTrusted || pp.psa.state.CheckpointLSN != checkpointLSN { + return nil, fmt.Errorf("no trusted checkpoint at %d", checkpointLSN) + } + return func() {}, nil +} + +func (pp *pushPinner) HoldFullBase(_ uint64) (func(), error) { + return func() {}, nil }