diff --git a/sw-block/bridge/blockvol/bridge_test.go b/sw-block/bridge/blockvol/bridge_test.go new file mode 100644 index 000000000..4ff20e2ed --- /dev/null +++ b/sw-block/bridge/blockvol/bridge_test.go @@ -0,0 +1,222 @@ +package blockvol + +import ( + "testing" + + engine "github.com/seaweedfs/seaweedfs/sw-block/engine/replication" +) + +// ============================================================ +// Phase 07 P0: Bridge adapter tests +// Validates E1-E3 expectations against concrete adapter code. +// ============================================================ + +// --- E1: Real assignment → engine intent --- + +func TestControlAdapter_StableIdentity(t *testing.T) { + ca := NewControlAdapter() + + primary := MasterAssignment{ + VolumeName: "pvc-data-1", + Epoch: 3, + Role: "primary", + PrimaryServerID: "vs1", + } + replicas := []MasterAssignment{ + { + VolumeName: "pvc-data-1", + Epoch: 3, + Role: "replica", + ReplicaServerID: "vs2", + DataAddr: "10.0.0.2:9333", + CtrlAddr: "10.0.0.2:9334", + AddrVersion: 1, + }, + } + + intent := ca.ToAssignmentIntent(primary, replicas) + + if intent.Epoch != 3 { + t.Fatalf("epoch=%d", intent.Epoch) + } + if len(intent.Replicas) != 1 { + t.Fatalf("replicas=%d", len(intent.Replicas)) + } + + // ReplicaID is stable: volume-name/server-id (NOT address). + r := intent.Replicas[0] + if r.ReplicaID != "pvc-data-1/vs2" { + t.Fatalf("ReplicaID=%s (must be volume/server, not address)", r.ReplicaID) + } + + // Endpoint is the address (mutable). + if r.Endpoint.DataAddr != "10.0.0.2:9333" { + t.Fatalf("DataAddr=%s", r.Endpoint.DataAddr) + } + + // Recovery target mapped. + if intent.RecoveryTargets["pvc-data-1/vs2"] != engine.SessionCatchUp { + t.Fatalf("recovery=%s", intent.RecoveryTargets["pvc-data-1/vs2"]) + } +} + +func TestControlAdapter_AddressChangePreservesIdentity(t *testing.T) { + ca := NewControlAdapter() + + // First assignment. + intent1 := ca.ToAssignmentIntent( + MasterAssignment{VolumeName: "vol1", Epoch: 1, Role: "primary", PrimaryServerID: "vs1"}, + []MasterAssignment{ + {VolumeName: "vol1", ReplicaServerID: "vs2", Role: "replica", DataAddr: "10.0.0.2:9333", AddrVersion: 1}, + }, + ) + + // Address changes — new assignment. + intent2 := ca.ToAssignmentIntent( + MasterAssignment{VolumeName: "vol1", Epoch: 1, Role: "primary", PrimaryServerID: "vs1"}, + []MasterAssignment{ + {VolumeName: "vol1", ReplicaServerID: "vs2", Role: "replica", DataAddr: "10.0.0.3:9333", AddrVersion: 2}, + }, + ) + + // Same ReplicaID despite different address. + if intent1.Replicas[0].ReplicaID != intent2.Replicas[0].ReplicaID { + t.Fatalf("identity changed: %s → %s", + intent1.Replicas[0].ReplicaID, intent2.Replicas[0].ReplicaID) + } + + // Endpoint updated. + if intent2.Replicas[0].Endpoint.DataAddr != "10.0.0.3:9333" { + t.Fatalf("endpoint not updated: %s", intent2.Replicas[0].Endpoint.DataAddr) + } +} + +func TestControlAdapter_RebuildRoleMapping(t *testing.T) { + ca := NewControlAdapter() + + intent := ca.ToAssignmentIntent( + MasterAssignment{VolumeName: "vol1", Epoch: 1, Role: "primary"}, + []MasterAssignment{ + {VolumeName: "vol1", ReplicaServerID: "vs2", Role: "rebuilding", DataAddr: "10.0.0.2:9333"}, + }, + ) + + if intent.RecoveryTargets["vol1/vs2"] != engine.SessionRebuild { + t.Fatalf("rebuilding role should map to SessionRebuild, got %s", + intent.RecoveryTargets["vol1/vs2"]) + } +} + +func TestControlAdapter_PrimaryNoRecovery(t *testing.T) { + ca := NewControlAdapter() + + intent := ca.ToAssignmentIntent( + MasterAssignment{VolumeName: "vol1", Epoch: 1, Role: "primary"}, + []MasterAssignment{}, // no replicas + ) + + if len(intent.RecoveryTargets) != 0 { + t.Fatal("primary should not have recovery targets") + } +} + +// --- E2: Real storage truth → RetainedHistory --- + +func TestStorageAdapter_RetainedHistoryFromRealState(t *testing.T) { + sa := NewStorageAdapter() + sa.UpdateState(BlockVolState{ + WALHeadLSN: 100, + WALTailLSN: 30, + CommittedLSN: 90, + CheckpointLSN: 50, + CheckpointTrusted: true, + }) + + rh := sa.GetRetainedHistory() + + if rh.HeadLSN != 100 { + t.Fatalf("HeadLSN=%d", rh.HeadLSN) + } + if rh.TailLSN != 30 { + t.Fatalf("TailLSN=%d", rh.TailLSN) + } + if rh.CommittedLSN != 90 { + t.Fatalf("CommittedLSN=%d", rh.CommittedLSN) + } + if rh.CheckpointLSN != 50 { + t.Fatalf("CheckpointLSN=%d", rh.CheckpointLSN) + } + if !rh.CheckpointTrusted { + t.Fatal("CheckpointTrusted should be true") + } +} + +func TestStorageAdapter_WALPinRejectsRecycled(t *testing.T) { + sa := NewStorageAdapter() + sa.UpdateState(BlockVolState{WALTailLSN: 50}) + + _, err := sa.PinWALRetention(30) // 30 < tail 50 + if err == nil { + t.Fatal("WAL pin should be rejected when range is recycled") + } +} + +func TestStorageAdapter_SnapshotPinRejectsInvalid(t *testing.T) { + sa := NewStorageAdapter() + sa.UpdateState(BlockVolState{CheckpointLSN: 50, CheckpointTrusted: false}) + + _, err := sa.PinSnapshot(50) + if err == nil { + t.Fatal("snapshot pin should be rejected when checkpoint is untrusted") + } +} + +// --- E3: Engine integration through bridge --- + +func TestBridge_E2E_AssignmentToRecovery(t *testing.T) { + // Full bridge flow: master assignment → adapter → engine. + ca := NewControlAdapter() + sa := NewStorageAdapter() + sa.UpdateState(BlockVolState{ + WALHeadLSN: 100, + WALTailLSN: 30, + CommittedLSN: 100, + CheckpointLSN: 50, + CheckpointTrusted: true, + }) + + // Step 1: master assignment → engine intent. + intent := ca.ToAssignmentIntent( + MasterAssignment{VolumeName: "vol1", Epoch: 1, Role: "primary", PrimaryServerID: "vs1"}, + []MasterAssignment{ + {VolumeName: "vol1", ReplicaServerID: "vs2", Role: "replica", + DataAddr: "10.0.0.2:9333", CtrlAddr: "10.0.0.2:9334", AddrVersion: 1}, + }, + ) + + // Step 2: engine processes intent. + drv := engine.NewRecoveryDriver(sa) + drv.Orchestrator.ProcessAssignment(intent) + + // Step 3: plan recovery from real storage state. + plan, err := drv.PlanRecovery("vol1/vs2", 70) + if err != nil { + t.Fatal(err) + } + if plan.Outcome != engine.OutcomeCatchUp { + t.Fatalf("outcome=%s", plan.Outcome) + } + if !plan.Proof.Recoverable { + t.Fatalf("proof: %s", plan.Proof.Reason) + } + + // Step 4: execute through engine executor. + exec := engine.NewCatchUpExecutor(drv, plan) + if err := exec.Execute([]uint64{80, 90, 100}, 0); err != nil { + t.Fatal(err) + } + + if drv.Orchestrator.Registry.Sender("vol1/vs2").State() != engine.StateInSync { + t.Fatalf("state=%s", drv.Orchestrator.Registry.Sender("vol1/vs2").State()) + } +} diff --git a/sw-block/bridge/blockvol/control_adapter.go b/sw-block/bridge/blockvol/control_adapter.go new file mode 100644 index 000000000..c0a3a6d16 --- /dev/null +++ b/sw-block/bridge/blockvol/control_adapter.go @@ -0,0 +1,83 @@ +package blockvol + +import ( + "fmt" + + engine "github.com/seaweedfs/seaweedfs/sw-block/engine/replication" +) + +// MasterAssignment represents a block-volume assignment from the master, +// as delivered via heartbeat response. This is the raw input from the +// existing master_grpc_server / block_heartbeat_loop path. +type MasterAssignment struct { + VolumeName string // e.g., "pvc-data-1" + Epoch uint64 + Role string // "primary", "replica", "rebuilding" + PrimaryServerID string // which server is the primary + ReplicaServerID string // which server is this replica + DataAddr string // replica's current data address + CtrlAddr string // replica's current control address + AddrVersion uint64 // bumped on address change +} + +// ControlAdapter converts master assignments into engine AssignmentIntent. +// Identity mapping: ReplicaID = /. +// This adapter does NOT decide recovery policy — it only translates +// master role/state into engine SessionKind. +type ControlAdapter struct{} + +// NewControlAdapter creates a control adapter. +func NewControlAdapter() *ControlAdapter { + return &ControlAdapter{} +} + +// MakeReplicaID derives a stable engine ReplicaID from volume + server identity. +// NOT derived from any address field. +func MakeReplicaID(volumeName, serverID string) string { + return fmt.Sprintf("%s/%s", volumeName, serverID) +} + +// ToAssignmentIntent converts a master assignment into an engine intent. +// The adapter maps role transitions to SessionKind but does NOT decide +// the actual recovery outcome (that's the engine's job). +func (ca *ControlAdapter) ToAssignmentIntent(primary MasterAssignment, replicas []MasterAssignment) engine.AssignmentIntent { + intent := engine.AssignmentIntent{ + Epoch: primary.Epoch, + } + + for _, r := range replicas { + replicaID := MakeReplicaID(r.VolumeName, r.ReplicaServerID) + intent.Replicas = append(intent.Replicas, engine.ReplicaAssignment{ + ReplicaID: replicaID, + Endpoint: engine.Endpoint{ + DataAddr: r.DataAddr, + CtrlAddr: r.CtrlAddr, + Version: r.AddrVersion, + }, + }) + + // Map role to recovery intent (if needed). + kind := mapRoleToSessionKind(r.Role) + if kind != "" { + if intent.RecoveryTargets == nil { + intent.RecoveryTargets = map[string]engine.SessionKind{} + } + intent.RecoveryTargets[replicaID] = kind + } + } + + return intent +} + +// mapRoleToSessionKind maps a master-assigned role to an engine SessionKind. +// This is a pure translation — NO policy decision. +func mapRoleToSessionKind(role string) engine.SessionKind { + switch role { + case "replica": + return engine.SessionCatchUp // default recovery for reconnecting replicas + case "rebuilding": + return engine.SessionRebuild + default: + return "" // no recovery needed (primary, or unknown) + } +} diff --git a/sw-block/bridge/blockvol/doc.go b/sw-block/bridge/blockvol/doc.go new file mode 100644 index 000000000..95b803ecd --- /dev/null +++ b/sw-block/bridge/blockvol/doc.go @@ -0,0 +1,19 @@ +// Package blockvol bridges the V2 engine to real blockvol storage and +// control-plane state. +// +// This package implements the adapter interfaces defined in +// sw-block/engine/replication/ using real blockvol internals as the +// source of truth. +// +// Hard rules (Phase 07): +// - ReplicaID = / (not address-derived) +// - blockvol executes recovery I/O but does NOT own recovery policy +// - Engine decides zero-gap vs catch-up vs rebuild +// - Bridge translates engine decisions into blockvol actions +// +// Adapter replacement order: +// P0: control_adapter (assignment → engine intent) +// P0: storage_adapter (blockvol state → RetainedHistory) +// P1: executor_bridge (engine executor → blockvol I/O) +// P1: observe_adapter (engine status → service diagnostics) +package blockvol diff --git a/sw-block/bridge/blockvol/go.mod b/sw-block/bridge/blockvol/go.mod new file mode 100644 index 000000000..6accf3a0b --- /dev/null +++ b/sw-block/bridge/blockvol/go.mod @@ -0,0 +1,7 @@ +module github.com/seaweedfs/seaweedfs/sw-block/bridge/blockvol + +go 1.23.0 + +require github.com/seaweedfs/seaweedfs/sw-block/engine/replication v0.0.0 + +replace github.com/seaweedfs/seaweedfs/sw-block/engine/replication => ../../engine/replication diff --git a/sw-block/bridge/blockvol/storage_adapter.go b/sw-block/bridge/blockvol/storage_adapter.go new file mode 100644 index 000000000..9169abb97 --- /dev/null +++ b/sw-block/bridge/blockvol/storage_adapter.go @@ -0,0 +1,125 @@ +package blockvol + +import ( + "fmt" + "sync" + "sync/atomic" + + engine "github.com/seaweedfs/seaweedfs/sw-block/engine/replication" +) + +// BlockVolState represents the real storage state from a blockvol instance. +// This is populated from actual blockvol fields, not reconstructed from +// metadata. Each field maps to a specific blockvol source: +// +// WALHeadLSN ← vol.wal.HeadLSN() +// WALTailLSN ← vol.flusher.RetentionFloor() +// CommittedLSN ← vol.coordinator.CommittedLSN (from group commit) +// CheckpointLSN ← vol.superblock.CheckpointLSN +// CheckpointTrusted ← vol.superblock.Valid && checkpoint file exists +type BlockVolState struct { + WALHeadLSN uint64 + WALTailLSN uint64 + CommittedLSN uint64 + CheckpointLSN uint64 + CheckpointTrusted bool +} + +// StorageAdapter implements engine.StorageAdapter using real blockvol state. +// It does NOT decide recovery policy — it only exposes storage truth. +type StorageAdapter struct { + mu sync.Mutex + state BlockVolState + nextPinID atomic.Uint64 + + // Pin tracking (real implementation would hold actual file/WAL references). + snapshotPins map[uint64]bool + walPins map[uint64]bool + fullBasePins map[uint64]bool +} + +// NewStorageAdapter creates a storage adapter. The caller must update +// state via UpdateState when blockvol state changes. +func NewStorageAdapter() *StorageAdapter { + return &StorageAdapter{ + snapshotPins: map[uint64]bool{}, + walPins: map[uint64]bool{}, + fullBasePins: map[uint64]bool{}, + } +} + +// UpdateState refreshes the adapter's view of blockvol state. +// Called when blockvol completes a checkpoint, advances WAL tail, etc. +func (sa *StorageAdapter) UpdateState(state BlockVolState) { + sa.mu.Lock() + defer sa.mu.Unlock() + sa.state = state +} + +// GetRetainedHistory returns the current WAL retention state from real +// blockvol fields. This is NOT reconstructed from test inputs. +func (sa *StorageAdapter) GetRetainedHistory() engine.RetainedHistory { + sa.mu.Lock() + defer sa.mu.Unlock() + return engine.RetainedHistory{ + HeadLSN: sa.state.WALHeadLSN, + TailLSN: sa.state.WALTailLSN, + CommittedLSN: sa.state.CommittedLSN, + CheckpointLSN: sa.state.CheckpointLSN, + CheckpointTrusted: sa.state.CheckpointTrusted, + } +} + +// PinSnapshot pins a checkpoint for rebuild use. +func (sa *StorageAdapter) PinSnapshot(checkpointLSN uint64) (engine.SnapshotPin, error) { + sa.mu.Lock() + defer sa.mu.Unlock() + if !sa.state.CheckpointTrusted || sa.state.CheckpointLSN != checkpointLSN { + return engine.SnapshotPin{}, fmt.Errorf("no valid checkpoint at LSN %d", checkpointLSN) + } + id := sa.nextPinID.Add(1) + sa.snapshotPins[id] = true + return engine.SnapshotPin{LSN: checkpointLSN, PinID: id, Valid: true}, nil +} + +// ReleaseSnapshot releases a pinned snapshot. +func (sa *StorageAdapter) ReleaseSnapshot(pin engine.SnapshotPin) { + sa.mu.Lock() + defer sa.mu.Unlock() + delete(sa.snapshotPins, pin.PinID) +} + +// PinWALRetention holds WAL entries from startLSN. +func (sa *StorageAdapter) PinWALRetention(startLSN uint64) (engine.RetentionPin, error) { + sa.mu.Lock() + defer sa.mu.Unlock() + if startLSN < sa.state.WALTailLSN { + return engine.RetentionPin{}, fmt.Errorf("WAL already recycled past LSN %d (tail=%d)", startLSN, sa.state.WALTailLSN) + } + id := sa.nextPinID.Add(1) + sa.walPins[id] = true + return engine.RetentionPin{StartLSN: startLSN, PinID: id, Valid: true}, nil +} + +// ReleaseWALRetention releases a WAL retention hold. +func (sa *StorageAdapter) ReleaseWALRetention(pin engine.RetentionPin) { + sa.mu.Lock() + defer sa.mu.Unlock() + delete(sa.walPins, pin.PinID) +} + +// PinFullBase pins a full-extent base image for full-base rebuild. +func (sa *StorageAdapter) PinFullBase(committedLSN uint64) (engine.FullBasePin, error) { + sa.mu.Lock() + defer sa.mu.Unlock() + id := sa.nextPinID.Add(1) + sa.fullBasePins[id] = true + return engine.FullBasePin{CommittedLSN: committedLSN, PinID: id, Valid: true}, nil +} + +// ReleaseFullBase releases a pinned full base image. +func (sa *StorageAdapter) ReleaseFullBase(pin engine.FullBasePin) { + sa.mu.Lock() + defer sa.mu.Unlock() + delete(sa.fullBasePins, pin.PinID) +}