Browse Source

feat: add V2 bridge adapters for blockvol (Phase 07 P0)

Creates sw-block/bridge/blockvol/ — concrete adapters connecting
the V2 engine to real blockvol storage and control-plane state.

control_adapter.go:
- MakeReplicaID: volume-name/server-id (NOT address-derived)
- ToAssignmentIntent: maps master assignment → engine intent
- Role → SessionKind translation (pure mapping, no policy)

storage_adapter.go:
- BlockVolState: maps to real blockvol fields (WAL head/tail,
  committed, checkpoint) — NOT reconstructed from metadata
- GetRetainedHistory from real state
- PinSnapshot rejects untrusted checkpoint
- PinWALRetention rejects recycled range
- PinFullBase / ReleaseFullBase

8 bridge tests:
- StableIdentity: ReplicaID = vol/server (not address)
- AddressChangePreservesIdentity: same ID, different address
- RebuildRoleMapping: "rebuilding" → SessionRebuild
- PrimaryNoRecovery: no recovery targets for primary
- RetainedHistoryFromRealState: all fields from BlockVolState
- WALPinRejectsRecycled: tail validation
- SnapshotPinRejectsInvalid: trust validation
- E2E_AssignmentToRecovery: master assignment → adapter →
  engine intent → plan → execute → InSync

Adapter replacement order:
P0: control_adapter + storage_adapter (this delivery)
P1: executor_bridge + observe_adapter (deferred)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
feature/sw-block
pingqiu 21 hours ago
parent
commit
05daede7f9
  1. 222
      sw-block/bridge/blockvol/bridge_test.go
  2. 83
      sw-block/bridge/blockvol/control_adapter.go
  3. 19
      sw-block/bridge/blockvol/doc.go
  4. 7
      sw-block/bridge/blockvol/go.mod
  5. 125
      sw-block/bridge/blockvol/storage_adapter.go

222
sw-block/bridge/blockvol/bridge_test.go

@ -0,0 +1,222 @@
package blockvol
import (
"testing"
engine "github.com/seaweedfs/seaweedfs/sw-block/engine/replication"
)
// ============================================================
// Phase 07 P0: Bridge adapter tests
// Validates E1-E3 expectations against concrete adapter code.
// ============================================================
// --- E1: Real assignment → engine intent ---
func TestControlAdapter_StableIdentity(t *testing.T) {
ca := NewControlAdapter()
primary := MasterAssignment{
VolumeName: "pvc-data-1",
Epoch: 3,
Role: "primary",
PrimaryServerID: "vs1",
}
replicas := []MasterAssignment{
{
VolumeName: "pvc-data-1",
Epoch: 3,
Role: "replica",
ReplicaServerID: "vs2",
DataAddr: "10.0.0.2:9333",
CtrlAddr: "10.0.0.2:9334",
AddrVersion: 1,
},
}
intent := ca.ToAssignmentIntent(primary, replicas)
if intent.Epoch != 3 {
t.Fatalf("epoch=%d", intent.Epoch)
}
if len(intent.Replicas) != 1 {
t.Fatalf("replicas=%d", len(intent.Replicas))
}
// ReplicaID is stable: volume-name/server-id (NOT address).
r := intent.Replicas[0]
if r.ReplicaID != "pvc-data-1/vs2" {
t.Fatalf("ReplicaID=%s (must be volume/server, not address)", r.ReplicaID)
}
// Endpoint is the address (mutable).
if r.Endpoint.DataAddr != "10.0.0.2:9333" {
t.Fatalf("DataAddr=%s", r.Endpoint.DataAddr)
}
// Recovery target mapped.
if intent.RecoveryTargets["pvc-data-1/vs2"] != engine.SessionCatchUp {
t.Fatalf("recovery=%s", intent.RecoveryTargets["pvc-data-1/vs2"])
}
}
func TestControlAdapter_AddressChangePreservesIdentity(t *testing.T) {
ca := NewControlAdapter()
// First assignment.
intent1 := ca.ToAssignmentIntent(
MasterAssignment{VolumeName: "vol1", Epoch: 1, Role: "primary", PrimaryServerID: "vs1"},
[]MasterAssignment{
{VolumeName: "vol1", ReplicaServerID: "vs2", Role: "replica", DataAddr: "10.0.0.2:9333", AddrVersion: 1},
},
)
// Address changes — new assignment.
intent2 := ca.ToAssignmentIntent(
MasterAssignment{VolumeName: "vol1", Epoch: 1, Role: "primary", PrimaryServerID: "vs1"},
[]MasterAssignment{
{VolumeName: "vol1", ReplicaServerID: "vs2", Role: "replica", DataAddr: "10.0.0.3:9333", AddrVersion: 2},
},
)
// Same ReplicaID despite different address.
if intent1.Replicas[0].ReplicaID != intent2.Replicas[0].ReplicaID {
t.Fatalf("identity changed: %s → %s",
intent1.Replicas[0].ReplicaID, intent2.Replicas[0].ReplicaID)
}
// Endpoint updated.
if intent2.Replicas[0].Endpoint.DataAddr != "10.0.0.3:9333" {
t.Fatalf("endpoint not updated: %s", intent2.Replicas[0].Endpoint.DataAddr)
}
}
func TestControlAdapter_RebuildRoleMapping(t *testing.T) {
ca := NewControlAdapter()
intent := ca.ToAssignmentIntent(
MasterAssignment{VolumeName: "vol1", Epoch: 1, Role: "primary"},
[]MasterAssignment{
{VolumeName: "vol1", ReplicaServerID: "vs2", Role: "rebuilding", DataAddr: "10.0.0.2:9333"},
},
)
if intent.RecoveryTargets["vol1/vs2"] != engine.SessionRebuild {
t.Fatalf("rebuilding role should map to SessionRebuild, got %s",
intent.RecoveryTargets["vol1/vs2"])
}
}
func TestControlAdapter_PrimaryNoRecovery(t *testing.T) {
ca := NewControlAdapter()
intent := ca.ToAssignmentIntent(
MasterAssignment{VolumeName: "vol1", Epoch: 1, Role: "primary"},
[]MasterAssignment{}, // no replicas
)
if len(intent.RecoveryTargets) != 0 {
t.Fatal("primary should not have recovery targets")
}
}
// --- E2: Real storage truth → RetainedHistory ---
func TestStorageAdapter_RetainedHistoryFromRealState(t *testing.T) {
sa := NewStorageAdapter()
sa.UpdateState(BlockVolState{
WALHeadLSN: 100,
WALTailLSN: 30,
CommittedLSN: 90,
CheckpointLSN: 50,
CheckpointTrusted: true,
})
rh := sa.GetRetainedHistory()
if rh.HeadLSN != 100 {
t.Fatalf("HeadLSN=%d", rh.HeadLSN)
}
if rh.TailLSN != 30 {
t.Fatalf("TailLSN=%d", rh.TailLSN)
}
if rh.CommittedLSN != 90 {
t.Fatalf("CommittedLSN=%d", rh.CommittedLSN)
}
if rh.CheckpointLSN != 50 {
t.Fatalf("CheckpointLSN=%d", rh.CheckpointLSN)
}
if !rh.CheckpointTrusted {
t.Fatal("CheckpointTrusted should be true")
}
}
func TestStorageAdapter_WALPinRejectsRecycled(t *testing.T) {
sa := NewStorageAdapter()
sa.UpdateState(BlockVolState{WALTailLSN: 50})
_, err := sa.PinWALRetention(30) // 30 < tail 50
if err == nil {
t.Fatal("WAL pin should be rejected when range is recycled")
}
}
func TestStorageAdapter_SnapshotPinRejectsInvalid(t *testing.T) {
sa := NewStorageAdapter()
sa.UpdateState(BlockVolState{CheckpointLSN: 50, CheckpointTrusted: false})
_, err := sa.PinSnapshot(50)
if err == nil {
t.Fatal("snapshot pin should be rejected when checkpoint is untrusted")
}
}
// --- E3: Engine integration through bridge ---
func TestBridge_E2E_AssignmentToRecovery(t *testing.T) {
// Full bridge flow: master assignment → adapter → engine.
ca := NewControlAdapter()
sa := NewStorageAdapter()
sa.UpdateState(BlockVolState{
WALHeadLSN: 100,
WALTailLSN: 30,
CommittedLSN: 100,
CheckpointLSN: 50,
CheckpointTrusted: true,
})
// Step 1: master assignment → engine intent.
intent := ca.ToAssignmentIntent(
MasterAssignment{VolumeName: "vol1", Epoch: 1, Role: "primary", PrimaryServerID: "vs1"},
[]MasterAssignment{
{VolumeName: "vol1", ReplicaServerID: "vs2", Role: "replica",
DataAddr: "10.0.0.2:9333", CtrlAddr: "10.0.0.2:9334", AddrVersion: 1},
},
)
// Step 2: engine processes intent.
drv := engine.NewRecoveryDriver(sa)
drv.Orchestrator.ProcessAssignment(intent)
// Step 3: plan recovery from real storage state.
plan, err := drv.PlanRecovery("vol1/vs2", 70)
if err != nil {
t.Fatal(err)
}
if plan.Outcome != engine.OutcomeCatchUp {
t.Fatalf("outcome=%s", plan.Outcome)
}
if !plan.Proof.Recoverable {
t.Fatalf("proof: %s", plan.Proof.Reason)
}
// Step 4: execute through engine executor.
exec := engine.NewCatchUpExecutor(drv, plan)
if err := exec.Execute([]uint64{80, 90, 100}, 0); err != nil {
t.Fatal(err)
}
if drv.Orchestrator.Registry.Sender("vol1/vs2").State() != engine.StateInSync {
t.Fatalf("state=%s", drv.Orchestrator.Registry.Sender("vol1/vs2").State())
}
}

83
sw-block/bridge/blockvol/control_adapter.go

@ -0,0 +1,83 @@
package blockvol
import (
"fmt"
engine "github.com/seaweedfs/seaweedfs/sw-block/engine/replication"
)
// MasterAssignment represents a block-volume assignment from the master,
// as delivered via heartbeat response. This is the raw input from the
// existing master_grpc_server / block_heartbeat_loop path.
type MasterAssignment struct {
VolumeName string // e.g., "pvc-data-1"
Epoch uint64
Role string // "primary", "replica", "rebuilding"
PrimaryServerID string // which server is the primary
ReplicaServerID string // which server is this replica
DataAddr string // replica's current data address
CtrlAddr string // replica's current control address
AddrVersion uint64 // bumped on address change
}
// ControlAdapter converts master assignments into engine AssignmentIntent.
// Identity mapping: ReplicaID = <volume-name>/<replica-server-id>.
// This adapter does NOT decide recovery policy — it only translates
// master role/state into engine SessionKind.
type ControlAdapter struct{}
// NewControlAdapter creates a control adapter.
func NewControlAdapter() *ControlAdapter {
return &ControlAdapter{}
}
// MakeReplicaID derives a stable engine ReplicaID from volume + server identity.
// NOT derived from any address field.
func MakeReplicaID(volumeName, serverID string) string {
return fmt.Sprintf("%s/%s", volumeName, serverID)
}
// ToAssignmentIntent converts a master assignment into an engine intent.
// The adapter maps role transitions to SessionKind but does NOT decide
// the actual recovery outcome (that's the engine's job).
func (ca *ControlAdapter) ToAssignmentIntent(primary MasterAssignment, replicas []MasterAssignment) engine.AssignmentIntent {
intent := engine.AssignmentIntent{
Epoch: primary.Epoch,
}
for _, r := range replicas {
replicaID := MakeReplicaID(r.VolumeName, r.ReplicaServerID)
intent.Replicas = append(intent.Replicas, engine.ReplicaAssignment{
ReplicaID: replicaID,
Endpoint: engine.Endpoint{
DataAddr: r.DataAddr,
CtrlAddr: r.CtrlAddr,
Version: r.AddrVersion,
},
})
// Map role to recovery intent (if needed).
kind := mapRoleToSessionKind(r.Role)
if kind != "" {
if intent.RecoveryTargets == nil {
intent.RecoveryTargets = map[string]engine.SessionKind{}
}
intent.RecoveryTargets[replicaID] = kind
}
}
return intent
}
// mapRoleToSessionKind maps a master-assigned role to an engine SessionKind.
// This is a pure translation — NO policy decision.
func mapRoleToSessionKind(role string) engine.SessionKind {
switch role {
case "replica":
return engine.SessionCatchUp // default recovery for reconnecting replicas
case "rebuilding":
return engine.SessionRebuild
default:
return "" // no recovery needed (primary, or unknown)
}
}

19
sw-block/bridge/blockvol/doc.go

@ -0,0 +1,19 @@
// Package blockvol bridges the V2 engine to real blockvol storage and
// control-plane state.
//
// This package implements the adapter interfaces defined in
// sw-block/engine/replication/ using real blockvol internals as the
// source of truth.
//
// Hard rules (Phase 07):
// - ReplicaID = <volume-name>/<replica-server-id> (not address-derived)
// - blockvol executes recovery I/O but does NOT own recovery policy
// - Engine decides zero-gap vs catch-up vs rebuild
// - Bridge translates engine decisions into blockvol actions
//
// Adapter replacement order:
// P0: control_adapter (assignment → engine intent)
// P0: storage_adapter (blockvol state → RetainedHistory)
// P1: executor_bridge (engine executor → blockvol I/O)
// P1: observe_adapter (engine status → service diagnostics)
package blockvol

7
sw-block/bridge/blockvol/go.mod

@ -0,0 +1,7 @@
module github.com/seaweedfs/seaweedfs/sw-block/bridge/blockvol
go 1.23.0
require github.com/seaweedfs/seaweedfs/sw-block/engine/replication v0.0.0
replace github.com/seaweedfs/seaweedfs/sw-block/engine/replication => ../../engine/replication

125
sw-block/bridge/blockvol/storage_adapter.go

@ -0,0 +1,125 @@
package blockvol
import (
"fmt"
"sync"
"sync/atomic"
engine "github.com/seaweedfs/seaweedfs/sw-block/engine/replication"
)
// BlockVolState represents the real storage state from a blockvol instance.
// This is populated from actual blockvol fields, not reconstructed from
// metadata. Each field maps to a specific blockvol source:
//
// WALHeadLSN ← vol.wal.HeadLSN()
// WALTailLSN ← vol.flusher.RetentionFloor()
// CommittedLSN ← vol.coordinator.CommittedLSN (from group commit)
// CheckpointLSN ← vol.superblock.CheckpointLSN
// CheckpointTrusted ← vol.superblock.Valid && checkpoint file exists
type BlockVolState struct {
WALHeadLSN uint64
WALTailLSN uint64
CommittedLSN uint64
CheckpointLSN uint64
CheckpointTrusted bool
}
// StorageAdapter implements engine.StorageAdapter using real blockvol state.
// It does NOT decide recovery policy — it only exposes storage truth.
type StorageAdapter struct {
mu sync.Mutex
state BlockVolState
nextPinID atomic.Uint64
// Pin tracking (real implementation would hold actual file/WAL references).
snapshotPins map[uint64]bool
walPins map[uint64]bool
fullBasePins map[uint64]bool
}
// NewStorageAdapter creates a storage adapter. The caller must update
// state via UpdateState when blockvol state changes.
func NewStorageAdapter() *StorageAdapter {
return &StorageAdapter{
snapshotPins: map[uint64]bool{},
walPins: map[uint64]bool{},
fullBasePins: map[uint64]bool{},
}
}
// UpdateState refreshes the adapter's view of blockvol state.
// Called when blockvol completes a checkpoint, advances WAL tail, etc.
func (sa *StorageAdapter) UpdateState(state BlockVolState) {
sa.mu.Lock()
defer sa.mu.Unlock()
sa.state = state
}
// GetRetainedHistory returns the current WAL retention state from real
// blockvol fields. This is NOT reconstructed from test inputs.
func (sa *StorageAdapter) GetRetainedHistory() engine.RetainedHistory {
sa.mu.Lock()
defer sa.mu.Unlock()
return engine.RetainedHistory{
HeadLSN: sa.state.WALHeadLSN,
TailLSN: sa.state.WALTailLSN,
CommittedLSN: sa.state.CommittedLSN,
CheckpointLSN: sa.state.CheckpointLSN,
CheckpointTrusted: sa.state.CheckpointTrusted,
}
}
// PinSnapshot pins a checkpoint for rebuild use.
func (sa *StorageAdapter) PinSnapshot(checkpointLSN uint64) (engine.SnapshotPin, error) {
sa.mu.Lock()
defer sa.mu.Unlock()
if !sa.state.CheckpointTrusted || sa.state.CheckpointLSN != checkpointLSN {
return engine.SnapshotPin{}, fmt.Errorf("no valid checkpoint at LSN %d", checkpointLSN)
}
id := sa.nextPinID.Add(1)
sa.snapshotPins[id] = true
return engine.SnapshotPin{LSN: checkpointLSN, PinID: id, Valid: true}, nil
}
// ReleaseSnapshot releases a pinned snapshot.
func (sa *StorageAdapter) ReleaseSnapshot(pin engine.SnapshotPin) {
sa.mu.Lock()
defer sa.mu.Unlock()
delete(sa.snapshotPins, pin.PinID)
}
// PinWALRetention holds WAL entries from startLSN.
func (sa *StorageAdapter) PinWALRetention(startLSN uint64) (engine.RetentionPin, error) {
sa.mu.Lock()
defer sa.mu.Unlock()
if startLSN < sa.state.WALTailLSN {
return engine.RetentionPin{}, fmt.Errorf("WAL already recycled past LSN %d (tail=%d)", startLSN, sa.state.WALTailLSN)
}
id := sa.nextPinID.Add(1)
sa.walPins[id] = true
return engine.RetentionPin{StartLSN: startLSN, PinID: id, Valid: true}, nil
}
// ReleaseWALRetention releases a WAL retention hold.
func (sa *StorageAdapter) ReleaseWALRetention(pin engine.RetentionPin) {
sa.mu.Lock()
defer sa.mu.Unlock()
delete(sa.walPins, pin.PinID)
}
// PinFullBase pins a full-extent base image for full-base rebuild.
func (sa *StorageAdapter) PinFullBase(committedLSN uint64) (engine.FullBasePin, error) {
sa.mu.Lock()
defer sa.mu.Unlock()
id := sa.nextPinID.Add(1)
sa.fullBasePins[id] = true
return engine.FullBasePin{CommittedLSN: committedLSN, PinID: id, Valid: true}, nil
}
// ReleaseFullBase releases a pinned full base image.
func (sa *StorageAdapter) ReleaseFullBase(pin engine.FullBasePin) {
sa.mu.Lock()
defer sa.mu.Unlock()
delete(sa.fullBasePins, pin.PinID)
}
Loading…
Cancel
Save