Browse Source

feat: add contract interfaces and pin/release via release-func pattern (Phase 07 P1)

E5 handoff contract (contract.go):
- BlockVolReader: ReadState() → BlockVolState from real blockvol
- BlockVolPinner: HoldWALRetention/HoldSnapshot/HoldFullBase → release func
- BlockVolExecutor: StreamWALEntries/TransferSnapshot/TransferFullBase/TruncateWAL
- Clear import direction: weed-side imports sw-block, not reverse

StorageAdapter refactored:
- Consumes BlockVolReader + BlockVolPinner interfaces
- Pin/release uses release-func pattern (not map-based tracking)
- PushStorageAdapter for tests (push-based, no blockvol dependency)

10 bridge tests:
- 4 control adapter (identity, address change, role mapping, primary)
- 4 storage adapter (retained history, WAL pin reject, snapshot reject, symmetry)
- 1 E2E (assignment → adapter → engine → plan → execute → InSync)
- 1 contract interface verification

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
feature/sw-block
pingqiu 21 hours ago
parent
commit
8c326c871c
  1. 190
      sw-block/bridge/blockvol/bridge_test.go
  2. 82
      sw-block/bridge/blockvol/contract.go
  3. 181
      sw-block/bridge/blockvol/storage_adapter.go

190
sw-block/bridge/blockvol/bridge_test.go

@ -7,54 +7,26 @@ import (
)
// ============================================================
// Phase 07 P0: Bridge adapter tests
// Validates E1-E3 expectations against concrete adapter code.
// Phase 07 P0/P1: Bridge adapter tests
// ============================================================
// --- E1: Real assignment → engine intent ---
// --- E1: Stable identity ---
func TestControlAdapter_StableIdentity(t *testing.T) {
ca := NewControlAdapter()
primary := MasterAssignment{
VolumeName: "pvc-data-1",
Epoch: 3,
Role: "primary",
PrimaryServerID: "vs1",
}
replicas := []MasterAssignment{
{
VolumeName: "pvc-data-1",
Epoch: 3,
Role: "replica",
ReplicaServerID: "vs2",
DataAddr: "10.0.0.2:9333",
CtrlAddr: "10.0.0.2:9334",
AddrVersion: 1,
intent := ca.ToAssignmentIntent(
MasterAssignment{VolumeName: "pvc-data-1", Epoch: 3, Role: "primary", PrimaryServerID: "vs1"},
[]MasterAssignment{
{VolumeName: "pvc-data-1", Epoch: 3, Role: "replica", ReplicaServerID: "vs2",
DataAddr: "10.0.0.2:9333", CtrlAddr: "10.0.0.2:9334", AddrVersion: 1},
},
}
intent := ca.ToAssignmentIntent(primary, replicas)
if intent.Epoch != 3 {
t.Fatalf("epoch=%d", intent.Epoch)
}
if len(intent.Replicas) != 1 {
t.Fatalf("replicas=%d", len(intent.Replicas))
}
)
// ReplicaID is stable: volume-name/server-id (NOT address).
r := intent.Replicas[0]
if r.ReplicaID != "pvc-data-1/vs2" {
t.Fatalf("ReplicaID=%s (must be volume/server, not address)", r.ReplicaID)
}
// Endpoint is the address (mutable).
if r.Endpoint.DataAddr != "10.0.0.2:9333" {
t.Fatalf("DataAddr=%s", r.Endpoint.DataAddr)
t.Fatalf("ReplicaID=%s (must be volume/server)", r.ReplicaID)
}
// Recovery target mapped.
if intent.RecoveryTargets["pvc-data-1/vs2"] != engine.SessionCatchUp {
t.Fatalf("recovery=%s", intent.RecoveryTargets["pvc-data-1/vs2"])
}
@ -63,129 +35,113 @@ func TestControlAdapter_StableIdentity(t *testing.T) {
func TestControlAdapter_AddressChangePreservesIdentity(t *testing.T) {
ca := NewControlAdapter()
// First assignment.
intent1 := ca.ToAssignmentIntent(
MasterAssignment{VolumeName: "vol1", Epoch: 1, Role: "primary", PrimaryServerID: "vs1"},
[]MasterAssignment{
{VolumeName: "vol1", ReplicaServerID: "vs2", Role: "replica", DataAddr: "10.0.0.2:9333", AddrVersion: 1},
},
MasterAssignment{VolumeName: "vol1", Epoch: 1, Role: "primary"},
[]MasterAssignment{{VolumeName: "vol1", ReplicaServerID: "vs2", Role: "replica", DataAddr: "10.0.0.2:9333", AddrVersion: 1}},
)
// Address changes — new assignment.
intent2 := ca.ToAssignmentIntent(
MasterAssignment{VolumeName: "vol1", Epoch: 1, Role: "primary", PrimaryServerID: "vs1"},
[]MasterAssignment{
{VolumeName: "vol1", ReplicaServerID: "vs2", Role: "replica", DataAddr: "10.0.0.3:9333", AddrVersion: 2},
},
MasterAssignment{VolumeName: "vol1", Epoch: 1, Role: "primary"},
[]MasterAssignment{{VolumeName: "vol1", ReplicaServerID: "vs2", Role: "replica", DataAddr: "10.0.0.3:9333", AddrVersion: 2}},
)
// Same ReplicaID despite different address.
if intent1.Replicas[0].ReplicaID != intent2.Replicas[0].ReplicaID {
t.Fatalf("identity changed: %s → %s",
intent1.Replicas[0].ReplicaID, intent2.Replicas[0].ReplicaID)
}
// Endpoint updated.
if intent2.Replicas[0].Endpoint.DataAddr != "10.0.0.3:9333" {
t.Fatalf("endpoint not updated: %s", intent2.Replicas[0].Endpoint.DataAddr)
t.Fatal("identity changed")
}
}
func TestControlAdapter_RebuildRoleMapping(t *testing.T) {
ca := NewControlAdapter()
intent := ca.ToAssignmentIntent(
MasterAssignment{VolumeName: "vol1", Epoch: 1, Role: "primary"},
[]MasterAssignment{
{VolumeName: "vol1", ReplicaServerID: "vs2", Role: "rebuilding", DataAddr: "10.0.0.2:9333"},
},
[]MasterAssignment{{VolumeName: "vol1", ReplicaServerID: "vs2", Role: "rebuilding", DataAddr: "10.0.0.2:9333"}},
)
if intent.RecoveryTargets["vol1/vs2"] != engine.SessionRebuild {
t.Fatalf("rebuilding role should map to SessionRebuild, got %s",
intent.RecoveryTargets["vol1/vs2"])
t.Fatalf("got %s", intent.RecoveryTargets["vol1/vs2"])
}
}
func TestControlAdapter_PrimaryNoRecovery(t *testing.T) {
ca := NewControlAdapter()
intent := ca.ToAssignmentIntent(
MasterAssignment{VolumeName: "vol1", Epoch: 1, Role: "primary"},
[]MasterAssignment{}, // no replicas
[]MasterAssignment{},
)
if len(intent.RecoveryTargets) != 0 {
t.Fatal("primary should not have recovery targets")
}
}
// --- E2: Real storage truth → RetainedHistory ---
// --- E2: Storage adapter via contract interfaces ---
func TestStorageAdapter_RetainedHistoryFromRealState(t *testing.T) {
sa := NewStorageAdapter()
sa.UpdateState(BlockVolState{
WALHeadLSN: 100,
WALTailLSN: 30,
CommittedLSN: 90,
CheckpointLSN: 50,
CheckpointTrusted: true,
func TestStorageAdapter_RetainedHistoryFromReader(t *testing.T) {
psa := NewPushStorageAdapter()
psa.UpdateState(BlockVolState{
WALHeadLSN: 100, WALTailLSN: 30, CommittedLSN: 90,
CheckpointLSN: 50, CheckpointTrusted: true,
})
rh := sa.GetRetainedHistory()
if rh.HeadLSN != 100 {
t.Fatalf("HeadLSN=%d", rh.HeadLSN)
}
if rh.TailLSN != 30 {
t.Fatalf("TailLSN=%d", rh.TailLSN)
rh := psa.GetRetainedHistory()
if rh.HeadLSN != 100 || rh.TailLSN != 30 || rh.CommittedLSN != 90 {
t.Fatalf("head=%d tail=%d committed=%d", rh.HeadLSN, rh.TailLSN, rh.CommittedLSN)
}
if rh.CommittedLSN != 90 {
t.Fatalf("CommittedLSN=%d", rh.CommittedLSN)
}
if rh.CheckpointLSN != 50 {
t.Fatalf("CheckpointLSN=%d", rh.CheckpointLSN)
}
if !rh.CheckpointTrusted {
t.Fatal("CheckpointTrusted should be true")
if rh.CheckpointLSN != 50 || !rh.CheckpointTrusted {
t.Fatalf("checkpoint=%d trusted=%v", rh.CheckpointLSN, rh.CheckpointTrusted)
}
}
func TestStorageAdapter_WALPinRejectsRecycled(t *testing.T) {
sa := NewStorageAdapter()
sa.UpdateState(BlockVolState{WALTailLSN: 50})
psa := NewPushStorageAdapter()
psa.UpdateState(BlockVolState{WALTailLSN: 50})
_, err := sa.PinWALRetention(30) // 30 < tail 50
_, err := psa.PinWALRetention(30)
if err == nil {
t.Fatal("WAL pin should be rejected when range is recycled")
t.Fatal("should reject recycled range")
}
}
func TestStorageAdapter_SnapshotPinRejectsInvalid(t *testing.T) {
sa := NewStorageAdapter()
sa.UpdateState(BlockVolState{CheckpointLSN: 50, CheckpointTrusted: false})
func TestStorageAdapter_SnapshotPinRejectsUntrusted(t *testing.T) {
psa := NewPushStorageAdapter()
psa.UpdateState(BlockVolState{CheckpointLSN: 50, CheckpointTrusted: false})
_, err := sa.PinSnapshot(50)
_, err := psa.PinSnapshot(50)
if err == nil {
t.Fatal("snapshot pin should be rejected when checkpoint is untrusted")
t.Fatal("should reject untrusted checkpoint")
}
}
// --- E3: Engine integration through bridge ---
func TestStorageAdapter_PinReleaseSymmetry(t *testing.T) {
psa := NewPushStorageAdapter()
psa.UpdateState(BlockVolState{WALTailLSN: 0, CheckpointLSN: 50, CheckpointTrusted: true})
walPin, _ := psa.PinWALRetention(10)
snapPin, _ := psa.PinSnapshot(50)
basePin, _ := psa.PinFullBase(100)
// Pins tracked.
if len(psa.releaseFuncs) != 3 {
t.Fatalf("pins=%d", len(psa.releaseFuncs))
}
// Release all.
psa.ReleaseWALRetention(walPin)
psa.ReleaseSnapshot(snapPin)
psa.ReleaseFullBase(basePin)
if len(psa.releaseFuncs) != 0 {
t.Fatalf("leaked pins=%d", len(psa.releaseFuncs))
}
}
// --- E3: End-to-end bridge flow ---
func TestBridge_E2E_AssignmentToRecovery(t *testing.T) {
// Full bridge flow: master assignment → adapter → engine.
ca := NewControlAdapter()
sa := NewStorageAdapter()
sa.UpdateState(BlockVolState{
WALHeadLSN: 100,
WALTailLSN: 30,
CommittedLSN: 100,
CheckpointLSN: 50,
CheckpointTrusted: true,
psa := NewPushStorageAdapter()
psa.UpdateState(BlockVolState{
WALHeadLSN: 100, WALTailLSN: 30, CommittedLSN: 100,
CheckpointLSN: 50, CheckpointTrusted: true,
})
// Step 1: master assignment → engine intent.
intent := ca.ToAssignmentIntent(
MasterAssignment{VolumeName: "vol1", Epoch: 1, Role: "primary", PrimaryServerID: "vs1"},
[]MasterAssignment{
@ -194,11 +150,9 @@ func TestBridge_E2E_AssignmentToRecovery(t *testing.T) {
},
)
// Step 2: engine processes intent.
drv := engine.NewRecoveryDriver(sa)
drv := engine.NewRecoveryDriver(psa)
drv.Orchestrator.ProcessAssignment(intent)
// Step 3: plan recovery from real storage state.
plan, err := drv.PlanRecovery("vol1/vs2", 70)
if err != nil {
t.Fatal(err)
@ -206,11 +160,7 @@ func TestBridge_E2E_AssignmentToRecovery(t *testing.T) {
if plan.Outcome != engine.OutcomeCatchUp {
t.Fatalf("outcome=%s", plan.Outcome)
}
if !plan.Proof.Recoverable {
t.Fatalf("proof: %s", plan.Proof.Reason)
}
// Step 4: execute through engine executor.
exec := engine.NewCatchUpExecutor(drv, plan)
if err := exec.Execute([]uint64{80, 90, 100}, 0); err != nil {
t.Fatal(err)
@ -220,3 +170,11 @@ func TestBridge_E2E_AssignmentToRecovery(t *testing.T) {
t.Fatalf("state=%s", drv.Orchestrator.Registry.Sender("vol1/vs2").State())
}
}
// --- E5: Contract interface boundary ---
func TestContract_BlockVolReaderInterface(t *testing.T) {
// Verify the contract interface is implementable.
var _ BlockVolReader = &pushReader{psa: NewPushStorageAdapter()}
var _ BlockVolPinner = &pushPinner{psa: NewPushStorageAdapter()}
}

82
sw-block/bridge/blockvol/contract.go

@ -0,0 +1,82 @@
package blockvol
// === Phase 07 P1: Handoff contract ===
//
// This file defines the interface boundary between:
// - sw-block/bridge/blockvol/ (engine-side, no weed imports)
// - weed/storage/blockvol/v2bridge/ (weed-side, real blockvol imports)
//
// The engine-side bridge defines WHAT the weed-side must provide.
// The weed-side bridge implements HOW using real blockvol internals.
//
// Import direction:
// weed/storage/blockvol/v2bridge/ → imports → sw-block/bridge/blockvol/
// weed/storage/blockvol/v2bridge/ → imports → sw-block/engine/replication/
// weed/storage/blockvol/v2bridge/ → imports → weed/storage/blockvol/
// sw-block/bridge/blockvol/ → imports → sw-block/engine/replication/
// sw-block/bridge/blockvol/ does NOT import weed/
// BlockVolState represents the real storage state from a blockvol instance.
// Each field maps to a specific blockvol source:
//
// WALHeadLSN ← vol.nextLSN - 1 or vol.Status().WALHeadLSN
// WALTailLSN ← vol.flusher.RetentionFloor()
// CommittedLSN ← vol.distCommit.CommittedLSN()
// CheckpointLSN ← vol.flusher.CheckpointLSN()
// CheckpointTrusted ← vol.superblock.Valid + checkpoint file exists
type BlockVolState struct {
WALHeadLSN uint64
WALTailLSN uint64
CommittedLSN uint64
CheckpointLSN uint64
CheckpointTrusted bool
}
// BlockVolReader reads real blockvol state. Implemented by the weed-side
// bridge using actual blockvol struct fields. The engine-side bridge
// consumes this interface via the StorageAdapter.
type BlockVolReader interface {
// ReadState returns the current blockvol state snapshot.
// Must read from real blockvol fields:
// WALHeadLSN ← vol.nextLSN - 1 or vol.Status().WALHeadLSN
// WALTailLSN ← vol.flusher.RetentionFloor()
// CommittedLSN ← vol.distCommit.CommittedLSN()
// CheckpointLSN ← vol.flusher.CheckpointLSN()
// CheckpointTrusted ← superblock valid + checkpoint file exists
ReadState() BlockVolState
}
// BlockVolPinner manages real resource holds against WAL reclaim and
// checkpoint GC. Implemented by the weed-side bridge using actual
// blockvol retention machinery.
type BlockVolPinner interface {
// HoldWALRetention prevents WAL entries from startLSN from being recycled.
// Returns a release function that the caller MUST call when done.
HoldWALRetention(startLSN uint64) (release func(), err error)
// HoldSnapshot prevents the checkpoint at checkpointLSN from being GC'd.
// Returns a release function.
HoldSnapshot(checkpointLSN uint64) (release func(), err error)
// HoldFullBase holds a consistent full-extent image at committedLSN.
// Returns a release function.
HoldFullBase(committedLSN uint64) (release func(), err error)
}
// BlockVolExecutor performs actual recovery I/O. Implemented by the
// weed-side bridge. It does NOT decide recovery policy — it only
// executes what the engine tells it to do.
type BlockVolExecutor interface {
// StreamWALEntries streams WAL entries from startExclusive+1 to endInclusive
// to the replica. Returns the highest LSN successfully transferred.
StreamWALEntries(startExclusive, endInclusive uint64) (transferredTo uint64, err error)
// TransferSnapshot transfers a checkpoint/snapshot at snapshotLSN to the replica.
TransferSnapshot(snapshotLSN uint64) error
// TransferFullBase transfers the full extent image to the replica.
TransferFullBase(committedLSN uint64) error
// TruncateWAL removes entries beyond truncateLSN from the replica.
TruncateWAL(truncateLSN uint64) error
}

181
sw-block/bridge/blockvol/storage_adapter.go

@ -8,118 +8,157 @@ import (
engine "github.com/seaweedfs/seaweedfs/sw-block/engine/replication"
)
// BlockVolState represents the real storage state from a blockvol instance.
// This is populated from actual blockvol fields, not reconstructed from
// metadata. Each field maps to a specific blockvol source:
// StorageAdapter implements engine.StorageAdapter by consuming
// BlockVolReader and BlockVolPinner interfaces. When backed by
// real implementations from weed/storage/blockvol/v2bridge/,
// all fields come from actual blockvol state.
//
// WALHeadLSN ← vol.wal.HeadLSN()
// WALTailLSN ← vol.flusher.RetentionFloor()
// CommittedLSN ← vol.coordinator.CommittedLSN (from group commit)
// CheckpointLSN ← vol.superblock.CheckpointLSN
// CheckpointTrusted ← vol.superblock.Valid && checkpoint file exists
type BlockVolState struct {
WALHeadLSN uint64
WALTailLSN uint64
CommittedLSN uint64
CheckpointLSN uint64
CheckpointTrusted bool
}
// StorageAdapter implements engine.StorageAdapter using real blockvol state.
// It does NOT decide recovery policy — it only exposes storage truth.
// For testing, use PushStorageAdapter (push-based, no blockvol dependency).
type StorageAdapter struct {
reader BlockVolReader
pinner BlockVolPinner
mu sync.Mutex
state BlockVolState
nextPinID atomic.Uint64
// Pin tracking (real implementation would hold actual file/WAL references).
snapshotPins map[uint64]bool
walPins map[uint64]bool
fullBasePins map[uint64]bool
// Release functions keyed by pin ID.
releaseFuncs map[uint64]func()
}
// NewStorageAdapter creates a storage adapter. The caller must update
// state via UpdateState when blockvol state changes.
func NewStorageAdapter() *StorageAdapter {
// NewStorageAdapter creates a storage adapter backed by real blockvol
// reader and pinner interfaces.
func NewStorageAdapter(reader BlockVolReader, pinner BlockVolPinner) *StorageAdapter {
return &StorageAdapter{
snapshotPins: map[uint64]bool{},
walPins: map[uint64]bool{},
fullBasePins: map[uint64]bool{},
reader: reader,
pinner: pinner,
releaseFuncs: map[uint64]func(){},
}
}
// UpdateState refreshes the adapter's view of blockvol state.
// Called when blockvol completes a checkpoint, advances WAL tail, etc.
func (sa *StorageAdapter) UpdateState(state BlockVolState) {
sa.mu.Lock()
defer sa.mu.Unlock()
sa.state = state
}
// GetRetainedHistory returns the current WAL retention state from real
// blockvol fields. This is NOT reconstructed from test inputs.
// GetRetainedHistory reads real blockvol state via BlockVolReader.
func (sa *StorageAdapter) GetRetainedHistory() engine.RetainedHistory {
sa.mu.Lock()
defer sa.mu.Unlock()
state := sa.reader.ReadState()
return engine.RetainedHistory{
HeadLSN: sa.state.WALHeadLSN,
TailLSN: sa.state.WALTailLSN,
CommittedLSN: sa.state.CommittedLSN,
CheckpointLSN: sa.state.CheckpointLSN,
CheckpointTrusted: sa.state.CheckpointTrusted,
HeadLSN: state.WALHeadLSN,
TailLSN: state.WALTailLSN,
CommittedLSN: state.CommittedLSN,
CheckpointLSN: state.CheckpointLSN,
CheckpointTrusted: state.CheckpointTrusted,
}
}
// PinSnapshot pins a checkpoint for rebuild use.
// PinSnapshot delegates to BlockVolPinner.HoldSnapshot.
func (sa *StorageAdapter) PinSnapshot(checkpointLSN uint64) (engine.SnapshotPin, error) {
sa.mu.Lock()
defer sa.mu.Unlock()
if !sa.state.CheckpointTrusted || sa.state.CheckpointLSN != checkpointLSN {
return engine.SnapshotPin{}, fmt.Errorf("no valid checkpoint at LSN %d", checkpointLSN)
release, err := sa.pinner.HoldSnapshot(checkpointLSN)
if err != nil {
return engine.SnapshotPin{}, fmt.Errorf("snapshot pin at LSN %d: %w", checkpointLSN, err)
}
id := sa.nextPinID.Add(1)
sa.snapshotPins[id] = true
sa.mu.Lock()
sa.releaseFuncs[id] = release
sa.mu.Unlock()
return engine.SnapshotPin{LSN: checkpointLSN, PinID: id, Valid: true}, nil
}
// ReleaseSnapshot releases a pinned snapshot.
// ReleaseSnapshot calls the held release function.
func (sa *StorageAdapter) ReleaseSnapshot(pin engine.SnapshotPin) {
sa.mu.Lock()
defer sa.mu.Unlock()
delete(sa.snapshotPins, pin.PinID)
release := sa.releaseFuncs[pin.PinID]
delete(sa.releaseFuncs, pin.PinID)
sa.mu.Unlock()
if release != nil {
release()
}
}
// PinWALRetention holds WAL entries from startLSN.
// PinWALRetention delegates to BlockVolPinner.HoldWALRetention.
func (sa *StorageAdapter) PinWALRetention(startLSN uint64) (engine.RetentionPin, error) {
sa.mu.Lock()
defer sa.mu.Unlock()
if startLSN < sa.state.WALTailLSN {
return engine.RetentionPin{}, fmt.Errorf("WAL already recycled past LSN %d (tail=%d)", startLSN, sa.state.WALTailLSN)
release, err := sa.pinner.HoldWALRetention(startLSN)
if err != nil {
return engine.RetentionPin{}, fmt.Errorf("WAL retention pin at LSN %d: %w", startLSN, err)
}
id := sa.nextPinID.Add(1)
sa.walPins[id] = true
sa.mu.Lock()
sa.releaseFuncs[id] = release
sa.mu.Unlock()
return engine.RetentionPin{StartLSN: startLSN, PinID: id, Valid: true}, nil
}
// ReleaseWALRetention releases a WAL retention hold.
// ReleaseWALRetention calls the held release function.
func (sa *StorageAdapter) ReleaseWALRetention(pin engine.RetentionPin) {
sa.mu.Lock()
defer sa.mu.Unlock()
delete(sa.walPins, pin.PinID)
release := sa.releaseFuncs[pin.PinID]
delete(sa.releaseFuncs, pin.PinID)
sa.mu.Unlock()
if release != nil {
release()
}
}
// PinFullBase pins a full-extent base image for full-base rebuild.
// PinFullBase delegates to BlockVolPinner.HoldFullBase.
func (sa *StorageAdapter) PinFullBase(committedLSN uint64) (engine.FullBasePin, error) {
sa.mu.Lock()
defer sa.mu.Unlock()
release, err := sa.pinner.HoldFullBase(committedLSN)
if err != nil {
return engine.FullBasePin{}, fmt.Errorf("full base pin at LSN %d: %w", committedLSN, err)
}
id := sa.nextPinID.Add(1)
sa.fullBasePins[id] = true
sa.mu.Lock()
sa.releaseFuncs[id] = release
sa.mu.Unlock()
return engine.FullBasePin{CommittedLSN: committedLSN, PinID: id, Valid: true}, nil
}
// ReleaseFullBase releases a pinned full base image.
// ReleaseFullBase calls the held release function.
func (sa *StorageAdapter) ReleaseFullBase(pin engine.FullBasePin) {
sa.mu.Lock()
defer sa.mu.Unlock()
delete(sa.fullBasePins, pin.PinID)
release := sa.releaseFuncs[pin.PinID]
delete(sa.releaseFuncs, pin.PinID)
sa.mu.Unlock()
if release != nil {
release()
}
}
// PushStorageAdapter is a test-only adapter that uses push-based state
// updates instead of pulling from a BlockVolReader. For use in tests
// that don't have real blockvol instances.
type PushStorageAdapter struct {
*StorageAdapter
state BlockVolState
}
// NewPushStorageAdapter creates a push-based adapter for tests.
func NewPushStorageAdapter() *PushStorageAdapter {
psa := &PushStorageAdapter{}
psa.StorageAdapter = NewStorageAdapter(&pushReader{psa: psa}, &pushPinner{psa: psa})
return psa
}
// UpdateState sets the adapter's state (push model for tests).
func (psa *PushStorageAdapter) UpdateState(state BlockVolState) {
psa.state = state
}
type pushReader struct{ psa *PushStorageAdapter }
func (pr *pushReader) ReadState() BlockVolState { return pr.psa.state }
type pushPinner struct{ psa *PushStorageAdapter }
func (pp *pushPinner) HoldWALRetention(startLSN uint64) (func(), error) {
if startLSN < pp.psa.state.WALTailLSN {
return nil, fmt.Errorf("WAL recycled past %d (tail=%d)", startLSN, pp.psa.state.WALTailLSN)
}
return func() {}, nil
}
func (pp *pushPinner) HoldSnapshot(checkpointLSN uint64) (func(), error) {
if !pp.psa.state.CheckpointTrusted || pp.psa.state.CheckpointLSN != checkpointLSN {
return nil, fmt.Errorf("no trusted checkpoint at %d", checkpointLSN)
}
return func() {}, nil
}
func (pp *pushPinner) HoldFullBase(_ uint64) (func(), error) {
return func() {}, nil
}
Loading…
Cancel
Save