You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

1408 lines
44 KiB

package weed_server
import (
"fmt"
"path/filepath"
"sort"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
"github.com/seaweedfs/seaweedfs/weed/storage/blockvol"
)
// VolumeStatus tracks the lifecycle of a block volume entry.
type VolumeStatus int
const (
StatusPending VolumeStatus = iota // Created via RPC, not yet confirmed by heartbeat
StatusActive // Confirmed by heartbeat from volume server
)
// ReplicaInfo tracks one replica of a block volume (CP8-2).
type ReplicaInfo struct {
Server string // replica VS address
Path string // file path on replica VS
ISCSIAddr string // iSCSI target address
IQN string // iSCSI qualified name
NvmeAddr string // NVMe/TCP target address (ip:port), empty if NVMe disabled
NQN string // NVMe subsystem NQN, empty if NVMe disabled
DataAddr string // WAL receiver data listen addr
CtrlAddr string // WAL receiver ctrl listen addr
HealthScore float64 // from heartbeat (0.0-1.0)
WALHeadLSN uint64 // from heartbeat
WALLag uint64 // computed: primary.WALHeadLSN - replica.WALHeadLSN
LastHeartbeat time.Time // last heartbeat received from this replica
Role uint32 // replica role (RoleReplica, RoleRebuilding, etc.)
}
const (
// DefaultPromotionLSNTolerance is the max WAL LSN lag allowed for promotion eligibility.
// Configurable per-registry via SetPromotionLSNTolerance.
DefaultPromotionLSNTolerance uint64 = 100
)
// BlockVolumeEntry tracks one block volume across the cluster.
type BlockVolumeEntry struct {
Name string
VolumeServer string // volume server address (ip:port or grpc addr)
Path string // file path on volume server
IQN string
ISCSIAddr string
NvmeAddr string // NVMe/TCP target address (ip:port), empty if NVMe disabled
NQN string // NVMe subsystem NQN, empty if NVMe disabled
SizeBytes uint64
ReplicaPlacement string // SeaweedFS placement string: "000", "001", "010", "100"
Epoch uint64
Role uint32
Status VolumeStatus
// Deprecated scalar replica fields (CP6-3). Use Replicas[] for new code.
ReplicaServer string
ReplicaPath string
ReplicaISCSIAddr string
ReplicaIQN string
ReplicaDataAddr string
ReplicaCtrlAddr string
RebuildListenAddr string // rebuild server listen addr on primary
// CP8-2: Multi-replica support.
ReplicaFactor int // 2 or 3 (default 2)
Replicas []ReplicaInfo // one per replica (RF-1 entries)
HealthScore float64 // primary health score from heartbeat
ReplicaDegraded bool // primary reports degraded replicas
WALHeadLSN uint64 // primary WAL head LSN from heartbeat
// CP8-3-1: Durability mode.
DurabilityMode string // "best_effort", "sync_all", "sync_quorum"
// Lease tracking for failover (CP6-3 F2).
LastLeaseGrant time.Time
LeaseTTL time.Duration
// CP11A-2: Coordinated expand tracking.
ExpandInProgress bool
ExpandFailed bool // true = primary committed but replica(s) failed; size suppressed
PendingExpandSize uint64
ExpandEpoch uint64
}
// HasReplica returns true if this volume has any replica (checks both new and deprecated fields).
func (e *BlockVolumeEntry) HasReplica() bool {
return len(e.Replicas) > 0 || e.ReplicaServer != ""
}
// FirstReplica returns the first replica info, or nil if none.
func (e *BlockVolumeEntry) FirstReplica() *ReplicaInfo {
if len(e.Replicas) > 0 {
return &e.Replicas[0]
}
return nil
}
// ReplicaByServer returns the replica hosted on the given server, or nil.
func (e *BlockVolumeEntry) ReplicaByServer(server string) *ReplicaInfo {
for i := range e.Replicas {
if e.Replicas[i].Server == server {
return &e.Replicas[i]
}
}
return nil
}
// BestReplicaForPromotion returns the best replica for promotion, or nil if none eligible.
// Criteria: highest HealthScore, tie-break by highest WALHeadLSN, then first in list.
func (e *BlockVolumeEntry) BestReplicaForPromotion() *ReplicaInfo {
if len(e.Replicas) == 0 {
return nil
}
best := 0
for i := 1; i < len(e.Replicas); i++ {
if e.Replicas[i].HealthScore > e.Replicas[best].HealthScore {
best = i
} else if e.Replicas[i].HealthScore == e.Replicas[best].HealthScore &&
e.Replicas[i].WALHeadLSN > e.Replicas[best].WALHeadLSN {
best = i
}
}
return &e.Replicas[best]
}
// BlockVolumeRegistry is the in-memory registry of block volumes.
// Rebuilt from heartbeats on master restart (no persistence).
type BlockVolumeRegistry struct {
mu sync.RWMutex
volumes map[string]*BlockVolumeEntry // keyed by name
byServer map[string]map[string]bool // server -> set of volume names
blockServers map[string]bool // servers known to support block volumes
// Promotion eligibility: max WAL LSN lag for replica to be promotable.
promotionLSNTolerance uint64
// inflight guards concurrent CreateBlockVolume for the same name.
inflight sync.Map // name -> *inflightEntry
// Metrics (CP8-4).
PromotionsTotal atomic.Uint64
FailoversTotal atomic.Uint64
RebuildsTotal atomic.Uint64
}
type inflightEntry struct{}
// NewBlockVolumeRegistry creates an empty registry.
func NewBlockVolumeRegistry() *BlockVolumeRegistry {
return &BlockVolumeRegistry{
volumes: make(map[string]*BlockVolumeEntry),
byServer: make(map[string]map[string]bool),
blockServers: make(map[string]bool),
promotionLSNTolerance: DefaultPromotionLSNTolerance,
}
}
// Register adds an entry to the registry.
// Returns error if a volume with the same name already exists.
func (r *BlockVolumeRegistry) Register(entry *BlockVolumeEntry) error {
r.mu.Lock()
defer r.mu.Unlock()
if _, ok := r.volumes[entry.Name]; ok {
return fmt.Errorf("block volume %q already registered", entry.Name)
}
r.volumes[entry.Name] = entry
r.addToServer(entry.VolumeServer, entry.Name)
// Also index replica servers so ListByServer finds them.
for _, ri := range entry.Replicas {
r.addToServer(ri.Server, entry.Name)
}
return nil
}
// Unregister removes and returns the entry. Returns nil if not found.
func (r *BlockVolumeRegistry) Unregister(name string) *BlockVolumeEntry {
r.mu.Lock()
defer r.mu.Unlock()
entry, ok := r.volumes[name]
if !ok {
return nil
}
delete(r.volumes, name)
r.removeFromServer(entry.VolumeServer, name)
for _, ri := range entry.Replicas {
r.removeFromServer(ri.Server, name)
}
return entry
}
// AcquireExpandInflight tries to acquire an expand lock for the named volume
// and records the pending expand metadata on the entry.
// Returns false if an expand is already in flight or failed (requires ClearExpandFailed first).
func (r *BlockVolumeRegistry) AcquireExpandInflight(name string, pendingSize, expandEpoch uint64) bool {
r.mu.Lock()
defer r.mu.Unlock()
entry, ok := r.volumes[name]
if !ok {
return false
}
if entry.ExpandInProgress || entry.ExpandFailed {
return false
}
entry.ExpandInProgress = true
entry.PendingExpandSize = pendingSize
entry.ExpandEpoch = expandEpoch
return true
}
// ReleaseExpandInflight clears all expand tracking fields for the named volume.
// Only call on clean success or clean cancel (all nodes rolled back).
func (r *BlockVolumeRegistry) ReleaseExpandInflight(name string) {
r.mu.Lock()
defer r.mu.Unlock()
entry, ok := r.volumes[name]
if !ok {
return
}
entry.ExpandInProgress = false
entry.ExpandFailed = false
entry.PendingExpandSize = 0
entry.ExpandEpoch = 0
}
// MarkExpandFailed transitions the entry from in-progress to failed.
// ExpandInProgress stays true so heartbeat continues to suppress size updates.
// The entry remains locked until ClearExpandFailed is called (manual reconciliation).
func (r *BlockVolumeRegistry) MarkExpandFailed(name string) {
r.mu.Lock()
defer r.mu.Unlock()
entry, ok := r.volumes[name]
if !ok {
return
}
entry.ExpandFailed = true
// Keep ExpandInProgress=true, PendingExpandSize, ExpandEpoch — all needed for diagnosis.
}
// ClearExpandFailed resets the expand-failed state so a new expand can be attempted.
// Called by an operator or automated reconciliation after the inconsistency is resolved
// (e.g., failed replica rebuilt or manually expanded).
func (r *BlockVolumeRegistry) ClearExpandFailed(name string) {
r.mu.Lock()
defer r.mu.Unlock()
entry, ok := r.volumes[name]
if !ok {
return
}
entry.ExpandInProgress = false
entry.ExpandFailed = false
entry.PendingExpandSize = 0
entry.ExpandEpoch = 0
}
// UpdateSize updates the size of a registered volume.
// Called only after a successful VS expand to keep registry in sync.
func (r *BlockVolumeRegistry) UpdateSize(name string, newSizeBytes uint64) error {
r.mu.Lock()
defer r.mu.Unlock()
entry, ok := r.volumes[name]
if !ok {
return fmt.Errorf("block volume %q not found in registry", name)
}
entry.SizeBytes = newSizeBytes
return nil
}
// Lookup returns the entry for the given name.
func (r *BlockVolumeRegistry) Lookup(name string) (*BlockVolumeEntry, bool) {
r.mu.RLock()
defer r.mu.RUnlock()
e, ok := r.volumes[name]
return e, ok
}
// ListByServer returns all entries hosted on the given server.
func (r *BlockVolumeRegistry) ListByServer(server string) []*BlockVolumeEntry {
r.mu.RLock()
defer r.mu.RUnlock()
names, ok := r.byServer[server]
if !ok {
return nil
}
entries := make([]*BlockVolumeEntry, 0, len(names))
for name := range names {
if e, ok := r.volumes[name]; ok {
entries = append(entries, e)
}
}
return entries
}
// UpdateFullHeartbeat reconciles the registry from a full heartbeat.
// Called on the first heartbeat from a volume server.
// Marks reported volumes as Active, removes entries for this server
// that are not reported (stale).
func (r *BlockVolumeRegistry) UpdateFullHeartbeat(server string, infos []*master_pb.BlockVolumeInfoMessage) {
r.mu.Lock()
defer r.mu.Unlock()
// Mark server as block-capable since it sent block volume info.
r.blockServers[server] = true
// Build set of reported paths.
reported := make(map[string]*master_pb.BlockVolumeInfoMessage, len(infos))
for _, info := range infos {
reported[info.Path] = info
}
// Find entries for this server that are NOT reported -> reconcile.
if names, ok := r.byServer[server]; ok {
for name := range names {
entry := r.volumes[name]
if entry == nil {
continue
}
if entry.VolumeServer == server {
// Server is the primary: check if primary path is reported.
if _, found := reported[entry.Path]; !found {
// B-10: Do not delete entries with a coordinated expand in flight.
// The primary may have restarted mid-expand; deleting the entry
// would orphan the volume and strand the expand coordinator.
if entry.ExpandInProgress {
glog.Warningf("block registry: skipping stale-cleanup for %q (ExpandInProgress=true, server=%s)",
name, server)
continue
}
delete(r.volumes, name)
delete(names, name)
// Also clean up replica entries from byServer.
for _, ri := range entry.Replicas {
r.removeFromServer(ri.Server, name)
}
}
} else {
// Server is a replica: check if replica path is reported.
ri := entry.ReplicaByServer(server)
if ri == nil {
// No replica record — stale byServer index, just clean up.
delete(names, name)
continue
}
if _, found := reported[ri.Path]; !found {
// Replica path not reported — remove this replica, NOT the whole volume.
r.removeReplicaLocked(entry, server, name)
delete(names, name)
glog.V(0).Infof("block registry: removed stale replica %s for %q (path %s not in heartbeat)",
server, name, ri.Path)
}
}
}
}
// Update or add entries for reported volumes.
for _, info := range infos {
// Find existing entry: search byServer index for matching path (primary or replica).
var existing *BlockVolumeEntry
var existingName string
if names, ok := r.byServer[server]; ok {
for vname := range names {
if e := r.volumes[vname]; e != nil {
if e.VolumeServer == server && e.Path == info.Path {
existing = e
existingName = vname
break
}
if ri := e.ReplicaByServer(server); ri != nil && ri.Path == info.Path {
existing = e
existingName = vname
break
}
}
}
}
// Also try lookup by name derived from path (handles post-restart).
if existing == nil {
name := nameFromPath(info.Path)
if name != "" {
if e, ok := r.volumes[name]; ok {
existing = e
existingName = name
}
}
}
if existing != nil {
isPrimary := existing.VolumeServer == server
isReplica := existing.ReplicaByServer(server) != nil
if isPrimary {
// Primary heartbeat: update primary fields.
// CP11A-2: skip size update during coordinated expand.
if !existing.ExpandInProgress {
existing.SizeBytes = info.VolumeSize
}
existing.Epoch = info.Epoch
existing.Role = info.Role
existing.Status = StatusActive
existing.LastLeaseGrant = time.Now()
existing.HealthScore = info.HealthScore
existing.ReplicaDegraded = info.ReplicaDegraded
existing.WALHeadLSN = info.WalHeadLsn
// F3: only update DurabilityMode when non-empty (prevents older VS from clearing strict mode).
if info.DurabilityMode != "" {
existing.DurabilityMode = info.DurabilityMode
}
// F5: update replica addresses from heartbeat info.
if info.ReplicaDataAddr != "" {
existing.ReplicaDataAddr = info.ReplicaDataAddr
}
if info.ReplicaCtrlAddr != "" {
existing.ReplicaCtrlAddr = info.ReplicaCtrlAddr
}
// NVMe publication: update NVMe fields from heartbeat.
// Required for master restart reconstruction and NVMe enable/disable.
existing.NvmeAddr = info.NvmeAddr
existing.NQN = info.Nqn
// Sync first replica's data addrs to Replicas[].
if info.ReplicaDataAddr != "" && len(existing.Replicas) > 0 {
existing.Replicas[0].DataAddr = info.ReplicaDataAddr
existing.Replicas[0].CtrlAddr = info.ReplicaCtrlAddr
}
} else if isReplica {
// Replica heartbeat: update ReplicaInfo fields.
for i := range existing.Replicas {
if existing.Replicas[i].Server == server {
existing.Replicas[i].Path = info.Path
existing.Replicas[i].WALHeadLSN = info.WalHeadLsn
existing.Replicas[i].HealthScore = info.HealthScore
existing.Replicas[i].LastHeartbeat = time.Now()
existing.Replicas[i].Role = info.Role
existing.Replicas[i].NvmeAddr = info.NvmeAddr
existing.Replicas[i].NQN = info.Nqn
if existing.WALHeadLSN > info.WalHeadLsn {
existing.Replicas[i].WALLag = existing.WALHeadLSN - info.WalHeadLsn
} else {
existing.Replicas[i].WALLag = 0
}
break
}
}
} else {
// Fix #3: Server reports a volume that exists but has no record of this server.
// This happens after master restart: primary heartbeat re-created the entry,
// but replica heartbeat arrives and isn't linked. Add as replica.
ri := ReplicaInfo{
Server: server,
Path: info.Path,
HealthScore: info.HealthScore,
WALHeadLSN: info.WalHeadLsn,
LastHeartbeat: time.Now(),
Role: info.Role,
NvmeAddr: info.NvmeAddr,
NQN: info.Nqn,
}
existing.Replicas = append(existing.Replicas, ri)
r.addToServer(server, existingName)
// Sync deprecated scalar fields.
if len(existing.Replicas) == 1 {
existing.ReplicaServer = ri.Server
existing.ReplicaPath = ri.Path
}
glog.V(0).Infof("block registry: attached replica %s for %q from heartbeat (path=%s)",
server, existingName, info.Path)
}
} else {
// Auto-register volumes reported by heartbeat but not in registry.
// This recovers state after master restart.
name := nameFromPath(info.Path)
if name == "" {
continue
}
if _, dup := r.volumes[name]; !dup {
entry := &BlockVolumeEntry{
Name: name,
VolumeServer: server,
Path: info.Path,
SizeBytes: info.VolumeSize,
Epoch: info.Epoch,
Role: info.Role,
Status: StatusActive,
LastLeaseGrant: time.Now(),
LeaseTTL: 30 * time.Second,
HealthScore: info.HealthScore,
WALHeadLSN: info.WalHeadLsn,
DurabilityMode: info.DurabilityMode,
}
if info.ReplicaDataAddr != "" {
entry.ReplicaDataAddr = info.ReplicaDataAddr
}
if info.ReplicaCtrlAddr != "" {
entry.ReplicaCtrlAddr = info.ReplicaCtrlAddr
}
// NVMe publication: propagate NVMe fields from heartbeat.
entry.NvmeAddr = info.NvmeAddr
entry.NQN = info.Nqn
r.volumes[name] = entry
r.addToServer(server, name)
glog.V(0).Infof("block registry: auto-registered %q from heartbeat (server=%s, path=%s, size=%d)",
name, server, info.Path, info.VolumeSize)
}
}
}
}
// UpdateDeltaHeartbeat processes incremental new/deleted block volumes.
// Called on subsequent heartbeats (not the first).
func (r *BlockVolumeRegistry) UpdateDeltaHeartbeat(server string, added []*master_pb.BlockVolumeShortInfoMessage, removed []*master_pb.BlockVolumeShortInfoMessage) {
r.mu.Lock()
defer r.mu.Unlock()
// Remove deleted volumes.
for _, rm := range removed {
if names, ok := r.byServer[server]; ok {
for name := range names {
if e := r.volumes[name]; e != nil && e.Path == rm.Path {
delete(r.volumes, name)
delete(names, name)
break
}
}
}
}
// Mark newly appeared volumes as active (if they exist in registry).
for _, add := range added {
if names, ok := r.byServer[server]; ok {
for name := range names {
if e := r.volumes[name]; e != nil && e.Path == add.Path {
e.Status = StatusActive
break
}
}
}
}
}
// PickServer returns the server address with the fewest block volumes.
// servers is the list of online volume server addresses.
// Returns error if no servers available.
func (r *BlockVolumeRegistry) PickServer(servers []string) (string, error) {
if len(servers) == 0 {
return "", fmt.Errorf("no block volume servers available")
}
r.mu.RLock()
defer r.mu.RUnlock()
best := servers[0]
bestCount := r.countForServer(best)
for _, s := range servers[1:] {
c := r.countForServer(s)
if c < bestCount {
best = s
bestCount = c
}
}
return best, nil
}
// AcquireInflight tries to acquire a per-name create lock.
// Returns true if acquired (caller must call ReleaseInflight when done).
// Returns false if another create is already in progress for this name.
func (r *BlockVolumeRegistry) AcquireInflight(name string) bool {
_, loaded := r.inflight.LoadOrStore(name, &inflightEntry{})
return !loaded // true = we stored it (acquired), false = already existed
}
// ReleaseInflight releases the per-name create lock.
func (r *BlockVolumeRegistry) ReleaseInflight(name string) {
r.inflight.Delete(name)
}
// countForServer returns the number of volumes on the given server.
// Caller must hold at least RLock.
func (r *BlockVolumeRegistry) countForServer(server string) int {
if names, ok := r.byServer[server]; ok {
return len(names)
}
return 0
}
func (r *BlockVolumeRegistry) addToServer(server, name string) {
if r.byServer[server] == nil {
r.byServer[server] = make(map[string]bool)
}
r.byServer[server][name] = true
}
func (r *BlockVolumeRegistry) removeFromServer(server, name string) {
if names, ok := r.byServer[server]; ok {
delete(names, name)
if len(names) == 0 {
delete(r.byServer, server)
}
}
}
// removeReplicaLocked removes a replica from an entry by server address.
// Caller must hold r.mu. Also syncs deprecated scalar fields.
func (r *BlockVolumeRegistry) removeReplicaLocked(entry *BlockVolumeEntry, server, name string) {
newReplicas := make([]ReplicaInfo, 0, len(entry.Replicas))
for _, ri := range entry.Replicas {
if ri.Server == server {
continue
}
newReplicas = append(newReplicas, ri)
}
entry.Replicas = newReplicas
// Sync deprecated scalar fields.
if len(entry.Replicas) > 0 {
r0 := &entry.Replicas[0]
entry.ReplicaServer = r0.Server
entry.ReplicaPath = r0.Path
entry.ReplicaISCSIAddr = r0.ISCSIAddr
entry.ReplicaIQN = r0.IQN
entry.ReplicaDataAddr = r0.DataAddr
entry.ReplicaCtrlAddr = r0.CtrlAddr
} else {
entry.ReplicaServer = ""
entry.ReplicaPath = ""
entry.ReplicaISCSIAddr = ""
entry.ReplicaIQN = ""
entry.ReplicaDataAddr = ""
entry.ReplicaCtrlAddr = ""
}
}
// SetReplica sets replica info for a registered volume.
// Deprecated: use AddReplica for new code. This method syncs both scalar and Replicas[].
func (r *BlockVolumeRegistry) SetReplica(name, server, path, iscsiAddr, iqn string) error {
r.mu.Lock()
defer r.mu.Unlock()
entry, ok := r.volumes[name]
if !ok {
return fmt.Errorf("block volume %q not found", name)
}
// Remove old replica from byServer index before replacing.
if entry.ReplicaServer != "" && entry.ReplicaServer != server {
r.removeFromServer(entry.ReplicaServer, name)
}
entry.ReplicaServer = server
entry.ReplicaPath = path
entry.ReplicaISCSIAddr = iscsiAddr
entry.ReplicaIQN = iqn
r.addToServer(server, name)
// CP8-2: also sync to Replicas[].
info := ReplicaInfo{Server: server, Path: path, ISCSIAddr: iscsiAddr, IQN: iqn}
replaced := false
for i := range entry.Replicas {
if entry.Replicas[i].Server == server {
// Preserve existing health/LSN data.
info.HealthScore = entry.Replicas[i].HealthScore
info.WALHeadLSN = entry.Replicas[i].WALHeadLSN
info.DataAddr = entry.Replicas[i].DataAddr
info.CtrlAddr = entry.Replicas[i].CtrlAddr
entry.Replicas[i] = info
replaced = true
break
}
}
if !replaced {
entry.Replicas = append(entry.Replicas, info)
}
return nil
}
// ClearReplica removes all replica info for a registered volume.
// Deprecated: use RemoveReplica for new code. This method clears both scalar and Replicas[].
func (r *BlockVolumeRegistry) ClearReplica(name string) error {
r.mu.Lock()
defer r.mu.Unlock()
entry, ok := r.volumes[name]
if !ok {
return fmt.Errorf("block volume %q not found", name)
}
if entry.ReplicaServer != "" {
r.removeFromServer(entry.ReplicaServer, name)
}
// Remove all replicas from byServer index.
for _, ri := range entry.Replicas {
if ri.Server != entry.ReplicaServer {
r.removeFromServer(ri.Server, name)
}
}
entry.ReplicaServer = ""
entry.ReplicaPath = ""
entry.ReplicaISCSIAddr = ""
entry.ReplicaIQN = ""
entry.ReplicaDataAddr = ""
entry.ReplicaCtrlAddr = ""
entry.Replicas = nil
return nil
}
// SwapPrimaryReplica promotes the replica to primary and clears the old replica.
// The old primary becomes the new replica (if it reconnects, rebuild will handle it).
// Epoch is atomically computed as entry.Epoch+1 inside the lock (R2-F5).
// Returns the new epoch for use in assignment messages.
func (r *BlockVolumeRegistry) SwapPrimaryReplica(name string) (uint64, error) {
r.mu.Lock()
defer r.mu.Unlock()
entry, ok := r.volumes[name]
if !ok {
return 0, fmt.Errorf("block volume %q not found", name)
}
if entry.ReplicaServer == "" {
return 0, fmt.Errorf("block volume %q has no replica", name)
}
// Remove old primary from byServer index.
r.removeFromServer(entry.VolumeServer, name)
oldPrimaryServer := entry.VolumeServer
oldPrimaryPath := entry.Path
oldPrimaryIQN := entry.IQN
oldPrimaryISCSI := entry.ISCSIAddr
// Atomically bump epoch inside lock (R2-F5: prevents race with heartbeat updates).
newEpoch := entry.Epoch + 1
// Promote replica to primary.
entry.VolumeServer = entry.ReplicaServer
entry.Path = entry.ReplicaPath
entry.IQN = entry.ReplicaIQN
entry.ISCSIAddr = entry.ReplicaISCSIAddr
entry.Epoch = newEpoch
entry.Role = blockvol.RoleToWire(blockvol.RolePrimary) // R2-F3
entry.LastLeaseGrant = time.Now()
// Old primary becomes stale replica (will be rebuilt when it reconnects).
entry.ReplicaServer = oldPrimaryServer
entry.ReplicaPath = oldPrimaryPath
entry.ReplicaIQN = oldPrimaryIQN
entry.ReplicaISCSIAddr = oldPrimaryISCSI
entry.ReplicaDataAddr = ""
entry.ReplicaCtrlAddr = ""
// Update byServer index: new primary server now hosts this volume.
r.addToServer(entry.VolumeServer, name)
return newEpoch, nil
}
// AddReplica adds or replaces a replica in the Replicas slice (by server).
// Also updates the byServer index and deprecated scalar fields for backward compat.
func (r *BlockVolumeRegistry) AddReplica(name string, info ReplicaInfo) error {
r.mu.Lock()
defer r.mu.Unlock()
entry, ok := r.volumes[name]
if !ok {
return fmt.Errorf("block volume %q not found", name)
}
// Replace if same server already exists.
replaced := false
for i := range entry.Replicas {
if entry.Replicas[i].Server == info.Server {
entry.Replicas[i] = info
replaced = true
break
}
}
if !replaced {
entry.Replicas = append(entry.Replicas, info)
}
r.addToServer(info.Server, name)
// Sync deprecated scalar fields (first replica → scalar).
if len(entry.Replicas) > 0 {
r0 := &entry.Replicas[0]
entry.ReplicaServer = r0.Server
entry.ReplicaPath = r0.Path
entry.ReplicaISCSIAddr = r0.ISCSIAddr
entry.ReplicaIQN = r0.IQN
entry.ReplicaDataAddr = r0.DataAddr
entry.ReplicaCtrlAddr = r0.CtrlAddr
}
return nil
}
// RemoveReplica removes a replica by server address.
func (r *BlockVolumeRegistry) RemoveReplica(name, server string) error {
r.mu.Lock()
defer r.mu.Unlock()
entry, ok := r.volumes[name]
if !ok {
return fmt.Errorf("block volume %q not found", name)
}
found := false
newReplicas := make([]ReplicaInfo, 0, len(entry.Replicas))
for _, ri := range entry.Replicas {
if ri.Server == server {
found = true
r.removeFromServer(server, name)
continue
}
newReplicas = append(newReplicas, ri)
}
if !found {
return fmt.Errorf("replica on %q not found for volume %q", server, name)
}
entry.Replicas = newReplicas
// Sync deprecated scalar fields.
if len(entry.Replicas) > 0 {
r0 := &entry.Replicas[0]
entry.ReplicaServer = r0.Server
entry.ReplicaPath = r0.Path
entry.ReplicaISCSIAddr = r0.ISCSIAddr
entry.ReplicaIQN = r0.IQN
entry.ReplicaDataAddr = r0.DataAddr
entry.ReplicaCtrlAddr = r0.CtrlAddr
} else {
entry.ReplicaServer = ""
entry.ReplicaPath = ""
entry.ReplicaISCSIAddr = ""
entry.ReplicaIQN = ""
entry.ReplicaDataAddr = ""
entry.ReplicaCtrlAddr = ""
}
return nil
}
// SetPromotionLSNTolerance configures the max WAL LSN lag for promotion eligibility.
func (r *BlockVolumeRegistry) SetPromotionLSNTolerance(tolerance uint64) {
r.mu.Lock()
defer r.mu.Unlock()
r.promotionLSNTolerance = tolerance
}
// PromotionLSNTolerance returns the current promotion LSN tolerance.
func (r *BlockVolumeRegistry) PromotionLSNTolerance() uint64 {
r.mu.RLock()
defer r.mu.RUnlock()
return r.promotionLSNTolerance
}
// PromotionRejection records why a specific replica was rejected for promotion.
type PromotionRejection struct {
Server string
Reason string // "stale_heartbeat", "wal_lag", "wrong_role", "server_dead"
}
// PromotionPreflightResult is the reusable result of a promotion evaluation.
// Used by auto-promotion, manual promote API, preflight status, and logging.
type PromotionPreflightResult struct {
VolumeName string
Promotable bool // true if a candidate was found
Candidate *ReplicaInfo // best candidate (nil if !Promotable)
CandidateIdx int // index in Replicas[] (-1 if !Promotable)
Rejections []PromotionRejection // why each non-candidate was rejected
Reason string // human-readable summary when !Promotable
}
// evaluatePromotionLocked evaluates promotion candidates for a volume.
// Caller must hold r.mu (read or write). Returns a preflight result without
// mutating the registry. The four gates:
// 1. Heartbeat freshness (within 2×LeaseTTL)
// 2. WAL LSN recency (within promotionLSNTolerance of primary)
// 3. Role must be RoleReplica (not RoleRebuilding)
// 4. Server must be in blockServers (alive) — fixes B-12
func (r *BlockVolumeRegistry) evaluatePromotionLocked(entry *BlockVolumeEntry) PromotionPreflightResult {
result := PromotionPreflightResult{
VolumeName: entry.Name,
CandidateIdx: -1,
}
if len(entry.Replicas) == 0 {
result.Reason = "no replicas"
return result
}
now := time.Now()
freshnessCutoff := 2 * entry.LeaseTTL
if freshnessCutoff == 0 {
freshnessCutoff = 60 * time.Second
}
primaryLSN := entry.WALHeadLSN
bestIdx := -1
for i := range entry.Replicas {
ri := &entry.Replicas[i]
// Gate 1: heartbeat freshness. Zero means never heartbeated — unsafe
// to promote because the registry has no proof the replica is alive,
// caught up, or fully initialized.
if ri.LastHeartbeat.IsZero() {
result.Rejections = append(result.Rejections, PromotionRejection{
Server: ri.Server,
Reason: "no_heartbeat",
})
continue
}
if now.Sub(ri.LastHeartbeat) > freshnessCutoff {
result.Rejections = append(result.Rejections, PromotionRejection{
Server: ri.Server,
Reason: "stale_heartbeat",
})
continue
}
// Gate 2: WAL LSN recency (skip if primary LSN is 0 — no data yet, all eligible).
if primaryLSN > 0 && ri.WALHeadLSN+r.promotionLSNTolerance < primaryLSN {
result.Rejections = append(result.Rejections, PromotionRejection{
Server: ri.Server,
Reason: "wal_lag",
})
continue
}
// Gate 3: role must be exactly RoleReplica. Zero/unset role means
// the replica was created but never confirmed its role via heartbeat.
if blockvol.RoleFromWire(ri.Role) != blockvol.RoleReplica {
result.Rejections = append(result.Rejections, PromotionRejection{
Server: ri.Server,
Reason: "wrong_role",
})
continue
}
// Gate 4: server must be alive (in blockServers set) — B-12 fix.
if !r.blockServers[ri.Server] {
result.Rejections = append(result.Rejections, PromotionRejection{
Server: ri.Server,
Reason: "server_dead",
})
continue
}
// Eligible — pick best by health score, tie-break by WALHeadLSN.
if bestIdx == -1 {
bestIdx = i
} else if ri.HealthScore > entry.Replicas[bestIdx].HealthScore {
bestIdx = i
} else if ri.HealthScore == entry.Replicas[bestIdx].HealthScore &&
ri.WALHeadLSN > entry.Replicas[bestIdx].WALHeadLSN {
bestIdx = i
}
}
if bestIdx == -1 {
result.Reason = "no eligible replicas"
if len(result.Rejections) > 0 {
result.Reason += ": " + result.Rejections[0].Reason
if len(result.Rejections) > 1 {
result.Reason += fmt.Sprintf(" (+%d more)", len(result.Rejections)-1)
}
}
return result
}
result.Promotable = true
ri := entry.Replicas[bestIdx]
result.Candidate = &ri
result.CandidateIdx = bestIdx
return result
}
// EvaluatePromotion returns a read-only preflight result for the named volume
// without mutating the registry. Safe for status/logging/manual promote preview.
func (r *BlockVolumeRegistry) EvaluatePromotion(name string) (PromotionPreflightResult, error) {
r.mu.RLock()
defer r.mu.RUnlock()
entry, ok := r.volumes[name]
if !ok {
return PromotionPreflightResult{VolumeName: name, Reason: "volume not found"}, fmt.Errorf("block volume %q not found", name)
}
return r.evaluatePromotionLocked(entry), nil
}
// applyPromotionLocked applies the promotion of a replica at candidateIdx to primary.
// Caller must hold r.mu (write lock). The promoted replica is removed from Replicas[].
// Old primary is NOT added to Replicas (needs rebuild). Returns the new epoch.
func (r *BlockVolumeRegistry) applyPromotionLocked(entry *BlockVolumeEntry, name string, candidate ReplicaInfo, candidateIdx int) uint64 {
// Remove old primary from byServer index.
r.removeFromServer(entry.VolumeServer, name)
// Bump epoch atomically.
newEpoch := entry.Epoch + 1
// Promote replica to primary.
entry.VolumeServer = candidate.Server
entry.Path = candidate.Path
entry.IQN = candidate.IQN
entry.ISCSIAddr = candidate.ISCSIAddr
entry.NvmeAddr = candidate.NvmeAddr
entry.NQN = candidate.NQN
entry.Epoch = newEpoch
entry.Role = blockvol.RoleToWire(blockvol.RolePrimary)
entry.LastLeaseGrant = time.Now()
// Clear stale rebuild/publication metadata from old primary (B-11 partial fix).
entry.RebuildListenAddr = ""
// Remove promoted from Replicas. Others stay.
entry.Replicas = append(entry.Replicas[:candidateIdx], entry.Replicas[candidateIdx+1:]...)
// Sync deprecated scalar fields.
if len(entry.Replicas) > 0 {
r0 := &entry.Replicas[0]
entry.ReplicaServer = r0.Server
entry.ReplicaPath = r0.Path
entry.ReplicaISCSIAddr = r0.ISCSIAddr
entry.ReplicaIQN = r0.IQN
entry.ReplicaDataAddr = r0.DataAddr
entry.ReplicaCtrlAddr = r0.CtrlAddr
} else {
entry.ReplicaServer = ""
entry.ReplicaPath = ""
entry.ReplicaISCSIAddr = ""
entry.ReplicaIQN = ""
entry.ReplicaDataAddr = ""
entry.ReplicaCtrlAddr = ""
}
// Update byServer index: new primary server now hosts this volume.
r.addToServer(entry.VolumeServer, name)
return newEpoch
}
// PromoteBestReplica promotes the best eligible replica to primary.
// Eligibility: heartbeat fresh (within 2×LeaseTTL), WALHeadLSN within tolerance of primary,
// role must be RoleReplica (not RoleRebuilding), and server must be alive (B-12 fix).
// The promoted replica is removed from Replicas[]. Other replicas stay.
// Old primary is NOT added to Replicas (needs rebuild).
// Returns the new epoch and the preflight result.
func (r *BlockVolumeRegistry) PromoteBestReplica(name string) (uint64, error) {
r.mu.Lock()
defer r.mu.Unlock()
entry, ok := r.volumes[name]
if !ok {
return 0, fmt.Errorf("block volume %q not found", name)
}
pf := r.evaluatePromotionLocked(entry)
if !pf.Promotable {
return 0, fmt.Errorf("block volume %q: %s", name, pf.Reason)
}
promoted := *pf.Candidate
bestIdx := pf.CandidateIdx
newEpoch := r.applyPromotionLocked(entry, name, promoted, bestIdx)
return newEpoch, nil
}
// evaluateManualPromotionLocked evaluates promotion candidates for a manual promote request.
// Caller must hold r.mu (read or write).
//
// Differences from evaluatePromotionLocked:
// - Primary-alive gate: if !force and current primary is alive, reject with "primary_alive".
// - Target filtering: if targetServer != "", only evaluate that specific replica.
// Returns Reason="target_not_found" if that server is not a replica.
// - Force flag: bypasses soft gates (primary_alive, stale_heartbeat, wal_lag)
// but keeps hard gates (no_heartbeat with zero time, wrong_role, server_dead).
//
// Gate table:
//
// Gate | Normal | Force
// primary_alive | Reject | Skip
// no_heartbeat(0) | Reject | Reject
// stale_heartbeat | Reject | Skip
// wal_lag | Reject | Skip
// wrong_role | Reject | Reject
// server_dead | Reject | Reject
func (r *BlockVolumeRegistry) evaluateManualPromotionLocked(entry *BlockVolumeEntry, targetServer string, force bool) PromotionPreflightResult {
result := PromotionPreflightResult{
VolumeName: entry.Name,
CandidateIdx: -1,
}
// Primary-alive gate (soft — skipped when force=true).
if !force && r.blockServers[entry.VolumeServer] {
result.Reason = "primary_alive"
return result
}
if len(entry.Replicas) == 0 {
result.Reason = "no replicas"
return result
}
// Target filtering: if a specific server is requested, find its index first.
// Return early if not found.
if targetServer != "" {
found := false
for i := range entry.Replicas {
if entry.Replicas[i].Server == targetServer {
found = true
break
}
}
if !found {
result.Reason = "target_not_found"
return result
}
}
now := time.Now()
freshnessCutoff := 2 * entry.LeaseTTL
if freshnessCutoff == 0 {
freshnessCutoff = 60 * time.Second
}
primaryLSN := entry.WALHeadLSN
bestIdx := -1
for i := range entry.Replicas {
ri := &entry.Replicas[i]
// If targeting a specific server, skip all others.
if targetServer != "" && ri.Server != targetServer {
continue
}
// Hard gate: no heartbeat (zero time) — unsafe regardless of force.
if ri.LastHeartbeat.IsZero() {
result.Rejections = append(result.Rejections, PromotionRejection{
Server: ri.Server,
Reason: "no_heartbeat",
})
continue
}
// Soft gate: stale heartbeat — skipped when force=true.
if !force && now.Sub(ri.LastHeartbeat) > freshnessCutoff {
result.Rejections = append(result.Rejections, PromotionRejection{
Server: ri.Server,
Reason: "stale_heartbeat",
})
continue
}
// Soft gate: WAL lag — skipped when force=true.
if !force && primaryLSN > 0 && ri.WALHeadLSN+r.promotionLSNTolerance < primaryLSN {
result.Rejections = append(result.Rejections, PromotionRejection{
Server: ri.Server,
Reason: "wal_lag",
})
continue
}
// Hard gate: role must be exactly RoleReplica.
if blockvol.RoleFromWire(ri.Role) != blockvol.RoleReplica {
result.Rejections = append(result.Rejections, PromotionRejection{
Server: ri.Server,
Reason: "wrong_role",
})
continue
}
// Hard gate: server must be alive (in blockServers set).
if !r.blockServers[ri.Server] {
result.Rejections = append(result.Rejections, PromotionRejection{
Server: ri.Server,
Reason: "server_dead",
})
continue
}
// Eligible — pick best by health score, tie-break by WALHeadLSN.
if bestIdx == -1 {
bestIdx = i
} else if ri.HealthScore > entry.Replicas[bestIdx].HealthScore {
bestIdx = i
} else if ri.HealthScore == entry.Replicas[bestIdx].HealthScore &&
ri.WALHeadLSN > entry.Replicas[bestIdx].WALHeadLSN {
bestIdx = i
}
}
if bestIdx == -1 {
result.Reason = "no eligible replicas"
if len(result.Rejections) > 0 {
result.Reason += ": " + result.Rejections[0].Reason
if len(result.Rejections) > 1 {
result.Reason += fmt.Sprintf(" (+%d more)", len(result.Rejections)-1)
}
}
return result
}
result.Promotable = true
ri := entry.Replicas[bestIdx]
result.Candidate = &ri
result.CandidateIdx = bestIdx
return result
}
// ManualPromote promotes a specific replica (or the best eligible replica) to primary.
// Unlike PromoteBestReplica, it accepts operator overrides:
// - targetServer: if non-empty, only that replica is considered.
// - force: bypasses soft gates (primary_alive, stale_heartbeat, wal_lag).
//
// Returns (newEpoch, oldPrimary, oldPath, preflightResult, nil) on success.
// oldPrimary and oldPath are captured under the lock to avoid TOCTOU with
// concurrent auto-failover (BUG-T5-2 fix).
// Returns (0, "", "", preflightResult, err) on rejection or lookup failure.
func (r *BlockVolumeRegistry) ManualPromote(name, targetServer string, force bool) (uint64, string, string, PromotionPreflightResult, error) {
r.mu.Lock()
defer r.mu.Unlock()
entry, ok := r.volumes[name]
if !ok {
return 0, "", "", PromotionPreflightResult{VolumeName: name, Reason: "volume not found"},
fmt.Errorf("block volume %q not found", name)
}
// Capture old primary info under lock (BUG-T5-2 fix).
oldPrimary := entry.VolumeServer
oldPath := entry.Path
pf := r.evaluateManualPromotionLocked(entry, targetServer, force)
if !pf.Promotable {
return 0, "", "", pf, fmt.Errorf("block volume %q: %s", name, pf.Reason)
}
promoted := *pf.Candidate
candidateIdx := pf.CandidateIdx
newEpoch := r.applyPromotionLocked(entry, name, promoted, candidateIdx)
return newEpoch, oldPrimary, oldPath, pf, nil
}
// MarkBlockCapable records that the given server supports block volumes.
func (r *BlockVolumeRegistry) MarkBlockCapable(server string) {
r.mu.Lock()
r.blockServers[server] = true
r.mu.Unlock()
}
// UnmarkBlockCapable removes a server from the block-capable set.
func (r *BlockVolumeRegistry) UnmarkBlockCapable(server string) {
r.mu.Lock()
delete(r.blockServers, server)
r.mu.Unlock()
}
// LeaseGrant holds the minimal fields for a lease renewal.
type LeaseGrant struct {
Path string
Epoch uint64
Role uint32
LeaseTtlMs uint32
}
// LeaseGrants generates lightweight lease renewals for all active primary
// volumes on a server. Only primaries need lease renewal — replicas are passive
// WAL receivers without a write lease. Grants carry path + epoch + role + TTL
// and are processed by HandleAssignment's same-role refresh path, which
// validates the epoch and calls lease.Grant().
// Volumes with a pending assignment are excluded (the full assignment handles lease).
func (r *BlockVolumeRegistry) LeaseGrants(server string, pendingPaths map[string]bool) []LeaseGrant {
r.mu.RLock()
defer r.mu.RUnlock()
names, ok := r.byServer[server]
if !ok {
return nil
}
var grants []LeaseGrant
for name := range names {
e := r.volumes[name]
if e == nil || e.Status != StatusActive {
continue
}
// Only primaries need lease renewal. Replicas are passive WAL receivers
// and don't hold a write lease.
if blockvol.RoleFromWire(e.Role) != blockvol.RolePrimary {
continue
}
// Primary must be on this server.
if e.VolumeServer != server {
continue
}
if pendingPaths[e.Path] {
continue
}
grants = append(grants, LeaseGrant{
Path: e.Path,
Epoch: e.Epoch,
Role: e.Role,
LeaseTtlMs: blockvol.LeaseTTLToWire(e.LeaseTTL),
})
}
return grants
}
// ListAll returns all registered block volume entries, sorted by name.
func (r *BlockVolumeRegistry) ListAll() []*BlockVolumeEntry {
r.mu.RLock()
defer r.mu.RUnlock()
entries := make([]*BlockVolumeEntry, 0, len(r.volumes))
for _, e := range r.volumes {
entries = append(entries, e)
}
sort.Slice(entries, func(i, j int) bool { return entries[i].Name < entries[j].Name })
return entries
}
// BlockServerSummary summarizes a block-capable volume server.
type BlockServerSummary struct {
Address string
VolumeCount int
BlockCapable bool
}
// ServerSummaries returns a summary for each block-capable server.
func (r *BlockVolumeRegistry) ServerSummaries() []BlockServerSummary {
r.mu.RLock()
defer r.mu.RUnlock()
summaries := make([]BlockServerSummary, 0, len(r.blockServers))
for addr := range r.blockServers {
count := 0
if names, ok := r.byServer[addr]; ok {
count = len(names)
}
summaries = append(summaries, BlockServerSummary{
Address: addr,
VolumeCount: count,
BlockCapable: true,
})
}
sort.Slice(summaries, func(i, j int) bool { return summaries[i].Address < summaries[j].Address })
return summaries
}
// IsBlockCapable returns true if the given server is in the block-capable set (alive).
func (r *BlockVolumeRegistry) IsBlockCapable(server string) bool {
r.mu.RLock()
defer r.mu.RUnlock()
return r.blockServers[server]
}
// VolumesWithDeadPrimary returns names of volumes where the given server is a replica
// and the current primary is NOT in the block-capable set (dead/disconnected).
// Used by T2 (B-06) to detect orphaned primaries that need re-promotion.
func (r *BlockVolumeRegistry) VolumesWithDeadPrimary(replicaServer string) []string {
r.mu.RLock()
defer r.mu.RUnlock()
names, ok := r.byServer[replicaServer]
if !ok {
return nil
}
var orphaned []string
for name := range names {
entry := r.volumes[name]
if entry == nil {
continue
}
// Only consider volumes where this server is a replica (not the primary).
if entry.VolumeServer == replicaServer {
continue
}
// Check if the primary server is dead.
if !r.blockServers[entry.VolumeServer] {
orphaned = append(orphaned, name)
}
}
return orphaned
}
// BlockCapableServers returns the list of servers known to support block volumes.
func (r *BlockVolumeRegistry) BlockCapableServers() []string {
r.mu.RLock()
defer r.mu.RUnlock()
servers := make([]string, 0, len(r.blockServers))
for s := range r.blockServers {
servers = append(servers, s)
}
return servers
}
// MaxBarrierLagLSN returns the maximum WAL lag across all volumes and replicas.
// This is the primary durability-risk metric: primary WALHeadLSN minus
// replica's WALHeadLSN. SLO threshold: < 100 under normal load.
func (r *BlockVolumeRegistry) MaxBarrierLagLSN() uint64 {
r.mu.RLock()
defer r.mu.RUnlock()
var maxLag uint64
for _, entry := range r.volumes {
for _, ri := range entry.Replicas {
if ri.WALLag > maxLag {
maxLag = ri.WALLag
}
}
}
return maxLag
}
// AssignmentQueueDepth returns the total number of pending assignments across all servers.
func (r *BlockVolumeRegistry) AssignmentQueueDepth() int {
// Delegated to the assignment queue, not the registry.
// Placeholder: the queue tracks its own depth.
return 0
}
// nameFromPath extracts the volume name from a .blk file path.
// e.g. "/opt/data/block/my-volume.blk" -> "my-volume"
func nameFromPath(path string) string {
base := filepath.Base(path)
if strings.HasSuffix(base, ".blk") {
return strings.TrimSuffix(base, ".blk")
}
return base
}