You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

994 lines
30 KiB

package weed_server
import (
"fmt"
"path/filepath"
"sort"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
"github.com/seaweedfs/seaweedfs/weed/storage/blockvol"
)
// VolumeStatus tracks the lifecycle of a block volume entry.
type VolumeStatus int
const (
StatusPending VolumeStatus = iota // Created via RPC, not yet confirmed by heartbeat
StatusActive // Confirmed by heartbeat from volume server
)
// ReplicaInfo tracks one replica of a block volume (CP8-2).
type ReplicaInfo struct {
Server string // replica VS address
Path string // file path on replica VS
ISCSIAddr string // iSCSI target address
IQN string // iSCSI qualified name
DataAddr string // WAL receiver data listen addr
CtrlAddr string // WAL receiver ctrl listen addr
HealthScore float64 // from heartbeat (0.0-1.0)
WALHeadLSN uint64 // from heartbeat
WALLag uint64 // computed: primary.WALHeadLSN - replica.WALHeadLSN
LastHeartbeat time.Time // last heartbeat received from this replica
Role uint32 // replica role (RoleReplica, RoleRebuilding, etc.)
}
const (
// DefaultPromotionLSNTolerance is the max WAL LSN lag allowed for promotion eligibility.
// Configurable per-registry via SetPromotionLSNTolerance.
DefaultPromotionLSNTolerance uint64 = 100
)
// BlockVolumeEntry tracks one block volume across the cluster.
type BlockVolumeEntry struct {
Name string
VolumeServer string // volume server address (ip:port or grpc addr)
Path string // file path on volume server
IQN string
ISCSIAddr string
SizeBytes uint64
ReplicaPlacement string // SeaweedFS placement string: "000", "001", "010", "100"
Epoch uint64
Role uint32
Status VolumeStatus
// Deprecated scalar replica fields (CP6-3). Use Replicas[] for new code.
ReplicaServer string
ReplicaPath string
ReplicaISCSIAddr string
ReplicaIQN string
ReplicaDataAddr string
ReplicaCtrlAddr string
RebuildListenAddr string // rebuild server listen addr on primary
// CP8-2: Multi-replica support.
ReplicaFactor int // 2 or 3 (default 2)
Replicas []ReplicaInfo // one per replica (RF-1 entries)
HealthScore float64 // primary health score from heartbeat
ReplicaDegraded bool // primary reports degraded replicas
WALHeadLSN uint64 // primary WAL head LSN from heartbeat
// CP8-3-1: Durability mode.
DurabilityMode string // "best_effort", "sync_all", "sync_quorum"
// Lease tracking for failover (CP6-3 F2).
LastLeaseGrant time.Time
LeaseTTL time.Duration
}
// HasReplica returns true if this volume has any replica (checks both new and deprecated fields).
func (e *BlockVolumeEntry) HasReplica() bool {
return len(e.Replicas) > 0 || e.ReplicaServer != ""
}
// FirstReplica returns the first replica info, or nil if none.
func (e *BlockVolumeEntry) FirstReplica() *ReplicaInfo {
if len(e.Replicas) > 0 {
return &e.Replicas[0]
}
return nil
}
// ReplicaByServer returns the replica hosted on the given server, or nil.
func (e *BlockVolumeEntry) ReplicaByServer(server string) *ReplicaInfo {
for i := range e.Replicas {
if e.Replicas[i].Server == server {
return &e.Replicas[i]
}
}
return nil
}
// BestReplicaForPromotion returns the best replica for promotion, or nil if none eligible.
// Criteria: highest HealthScore, tie-break by highest WALHeadLSN, then first in list.
func (e *BlockVolumeEntry) BestReplicaForPromotion() *ReplicaInfo {
if len(e.Replicas) == 0 {
return nil
}
best := 0
for i := 1; i < len(e.Replicas); i++ {
if e.Replicas[i].HealthScore > e.Replicas[best].HealthScore {
best = i
} else if e.Replicas[i].HealthScore == e.Replicas[best].HealthScore &&
e.Replicas[i].WALHeadLSN > e.Replicas[best].WALHeadLSN {
best = i
}
}
return &e.Replicas[best]
}
// BlockVolumeRegistry is the in-memory registry of block volumes.
// Rebuilt from heartbeats on master restart (no persistence).
type BlockVolumeRegistry struct {
mu sync.RWMutex
volumes map[string]*BlockVolumeEntry // keyed by name
byServer map[string]map[string]bool // server -> set of volume names
blockServers map[string]bool // servers known to support block volumes
// Promotion eligibility: max WAL LSN lag for replica to be promotable.
promotionLSNTolerance uint64
// inflight guards concurrent CreateBlockVolume for the same name.
inflight sync.Map // name -> *inflightEntry
// Metrics (CP8-4).
PromotionsTotal atomic.Uint64
FailoversTotal atomic.Uint64
RebuildsTotal atomic.Uint64
}
type inflightEntry struct{}
// NewBlockVolumeRegistry creates an empty registry.
func NewBlockVolumeRegistry() *BlockVolumeRegistry {
return &BlockVolumeRegistry{
volumes: make(map[string]*BlockVolumeEntry),
byServer: make(map[string]map[string]bool),
blockServers: make(map[string]bool),
promotionLSNTolerance: DefaultPromotionLSNTolerance,
}
}
// Register adds an entry to the registry.
// Returns error if a volume with the same name already exists.
func (r *BlockVolumeRegistry) Register(entry *BlockVolumeEntry) error {
r.mu.Lock()
defer r.mu.Unlock()
if _, ok := r.volumes[entry.Name]; ok {
return fmt.Errorf("block volume %q already registered", entry.Name)
}
r.volumes[entry.Name] = entry
r.addToServer(entry.VolumeServer, entry.Name)
// Also index replica servers so ListByServer finds them.
for _, ri := range entry.Replicas {
r.addToServer(ri.Server, entry.Name)
}
return nil
}
// Unregister removes and returns the entry. Returns nil if not found.
func (r *BlockVolumeRegistry) Unregister(name string) *BlockVolumeEntry {
r.mu.Lock()
defer r.mu.Unlock()
entry, ok := r.volumes[name]
if !ok {
return nil
}
delete(r.volumes, name)
r.removeFromServer(entry.VolumeServer, name)
for _, ri := range entry.Replicas {
r.removeFromServer(ri.Server, name)
}
return entry
}
// UpdateSize updates the size of a registered volume.
// Called only after a successful VS expand to keep registry in sync.
func (r *BlockVolumeRegistry) UpdateSize(name string, newSizeBytes uint64) error {
r.mu.Lock()
defer r.mu.Unlock()
entry, ok := r.volumes[name]
if !ok {
return fmt.Errorf("block volume %q not found in registry", name)
}
entry.SizeBytes = newSizeBytes
return nil
}
// Lookup returns the entry for the given name.
func (r *BlockVolumeRegistry) Lookup(name string) (*BlockVolumeEntry, bool) {
r.mu.RLock()
defer r.mu.RUnlock()
e, ok := r.volumes[name]
return e, ok
}
// ListByServer returns all entries hosted on the given server.
func (r *BlockVolumeRegistry) ListByServer(server string) []*BlockVolumeEntry {
r.mu.RLock()
defer r.mu.RUnlock()
names, ok := r.byServer[server]
if !ok {
return nil
}
entries := make([]*BlockVolumeEntry, 0, len(names))
for name := range names {
if e, ok := r.volumes[name]; ok {
entries = append(entries, e)
}
}
return entries
}
// UpdateFullHeartbeat reconciles the registry from a full heartbeat.
// Called on the first heartbeat from a volume server.
// Marks reported volumes as Active, removes entries for this server
// that are not reported (stale).
func (r *BlockVolumeRegistry) UpdateFullHeartbeat(server string, infos []*master_pb.BlockVolumeInfoMessage) {
r.mu.Lock()
defer r.mu.Unlock()
// Mark server as block-capable since it sent block volume info.
r.blockServers[server] = true
// Build set of reported paths.
reported := make(map[string]*master_pb.BlockVolumeInfoMessage, len(infos))
for _, info := range infos {
reported[info.Path] = info
}
// Find entries for this server that are NOT reported -> reconcile.
if names, ok := r.byServer[server]; ok {
for name := range names {
entry := r.volumes[name]
if entry == nil {
continue
}
if entry.VolumeServer == server {
// Server is the primary: check if primary path is reported.
if _, found := reported[entry.Path]; !found {
delete(r.volumes, name)
delete(names, name)
// Also clean up replica entries from byServer.
for _, ri := range entry.Replicas {
r.removeFromServer(ri.Server, name)
}
}
} else {
// Server is a replica: check if replica path is reported.
ri := entry.ReplicaByServer(server)
if ri == nil {
// No replica record — stale byServer index, just clean up.
delete(names, name)
continue
}
if _, found := reported[ri.Path]; !found {
// Replica path not reported — remove this replica, NOT the whole volume.
r.removeReplicaLocked(entry, server, name)
delete(names, name)
glog.V(0).Infof("block registry: removed stale replica %s for %q (path %s not in heartbeat)",
server, name, ri.Path)
}
}
}
}
// Update or add entries for reported volumes.
for _, info := range infos {
// Find existing entry: search byServer index for matching path (primary or replica).
var existing *BlockVolumeEntry
var existingName string
if names, ok := r.byServer[server]; ok {
for vname := range names {
if e := r.volumes[vname]; e != nil {
if e.VolumeServer == server && e.Path == info.Path {
existing = e
existingName = vname
break
}
if ri := e.ReplicaByServer(server); ri != nil && ri.Path == info.Path {
existing = e
existingName = vname
break
}
}
}
}
// Also try lookup by name derived from path (handles post-restart).
if existing == nil {
name := nameFromPath(info.Path)
if name != "" {
if e, ok := r.volumes[name]; ok {
existing = e
existingName = name
}
}
}
if existing != nil {
isPrimary := existing.VolumeServer == server
isReplica := existing.ReplicaByServer(server) != nil
if isPrimary {
// Primary heartbeat: update primary fields.
existing.SizeBytes = info.VolumeSize
existing.Epoch = info.Epoch
existing.Role = info.Role
existing.Status = StatusActive
existing.LastLeaseGrant = time.Now()
existing.HealthScore = info.HealthScore
existing.ReplicaDegraded = info.ReplicaDegraded
existing.WALHeadLSN = info.WalHeadLsn
// F3: only update DurabilityMode when non-empty (prevents older VS from clearing strict mode).
if info.DurabilityMode != "" {
existing.DurabilityMode = info.DurabilityMode
}
// F5: update replica addresses from heartbeat info.
if info.ReplicaDataAddr != "" {
existing.ReplicaDataAddr = info.ReplicaDataAddr
}
if info.ReplicaCtrlAddr != "" {
existing.ReplicaCtrlAddr = info.ReplicaCtrlAddr
}
// Sync first replica's data addrs to Replicas[].
if info.ReplicaDataAddr != "" && len(existing.Replicas) > 0 {
existing.Replicas[0].DataAddr = info.ReplicaDataAddr
existing.Replicas[0].CtrlAddr = info.ReplicaCtrlAddr
}
} else if isReplica {
// Replica heartbeat: update ReplicaInfo fields.
for i := range existing.Replicas {
if existing.Replicas[i].Server == server {
existing.Replicas[i].Path = info.Path
existing.Replicas[i].WALHeadLSN = info.WalHeadLsn
existing.Replicas[i].HealthScore = info.HealthScore
existing.Replicas[i].LastHeartbeat = time.Now()
existing.Replicas[i].Role = info.Role
if existing.WALHeadLSN > info.WalHeadLsn {
existing.Replicas[i].WALLag = existing.WALHeadLSN - info.WalHeadLsn
} else {
existing.Replicas[i].WALLag = 0
}
break
}
}
} else {
// Fix #3: Server reports a volume that exists but has no record of this server.
// This happens after master restart: primary heartbeat re-created the entry,
// but replica heartbeat arrives and isn't linked. Add as replica.
ri := ReplicaInfo{
Server: server,
Path: info.Path,
HealthScore: info.HealthScore,
WALHeadLSN: info.WalHeadLsn,
LastHeartbeat: time.Now(),
Role: info.Role,
}
existing.Replicas = append(existing.Replicas, ri)
r.addToServer(server, existingName)
// Sync deprecated scalar fields.
if len(existing.Replicas) == 1 {
existing.ReplicaServer = ri.Server
existing.ReplicaPath = ri.Path
}
glog.V(0).Infof("block registry: attached replica %s for %q from heartbeat (path=%s)",
server, existingName, info.Path)
}
} else {
// Auto-register volumes reported by heartbeat but not in registry.
// This recovers state after master restart.
name := nameFromPath(info.Path)
if name == "" {
continue
}
if _, dup := r.volumes[name]; !dup {
entry := &BlockVolumeEntry{
Name: name,
VolumeServer: server,
Path: info.Path,
SizeBytes: info.VolumeSize,
Epoch: info.Epoch,
Role: info.Role,
Status: StatusActive,
LastLeaseGrant: time.Now(),
LeaseTTL: 30 * time.Second,
HealthScore: info.HealthScore,
WALHeadLSN: info.WalHeadLsn,
DurabilityMode: info.DurabilityMode,
}
if info.ReplicaDataAddr != "" {
entry.ReplicaDataAddr = info.ReplicaDataAddr
}
if info.ReplicaCtrlAddr != "" {
entry.ReplicaCtrlAddr = info.ReplicaCtrlAddr
}
r.volumes[name] = entry
r.addToServer(server, name)
glog.V(0).Infof("block registry: auto-registered %q from heartbeat (server=%s, path=%s, size=%d)",
name, server, info.Path, info.VolumeSize)
}
}
}
}
// UpdateDeltaHeartbeat processes incremental new/deleted block volumes.
// Called on subsequent heartbeats (not the first).
func (r *BlockVolumeRegistry) UpdateDeltaHeartbeat(server string, added []*master_pb.BlockVolumeShortInfoMessage, removed []*master_pb.BlockVolumeShortInfoMessage) {
r.mu.Lock()
defer r.mu.Unlock()
// Remove deleted volumes.
for _, rm := range removed {
if names, ok := r.byServer[server]; ok {
for name := range names {
if e := r.volumes[name]; e != nil && e.Path == rm.Path {
delete(r.volumes, name)
delete(names, name)
break
}
}
}
}
// Mark newly appeared volumes as active (if they exist in registry).
for _, add := range added {
if names, ok := r.byServer[server]; ok {
for name := range names {
if e := r.volumes[name]; e != nil && e.Path == add.Path {
e.Status = StatusActive
break
}
}
}
}
}
// PickServer returns the server address with the fewest block volumes.
// servers is the list of online volume server addresses.
// Returns error if no servers available.
func (r *BlockVolumeRegistry) PickServer(servers []string) (string, error) {
if len(servers) == 0 {
return "", fmt.Errorf("no block volume servers available")
}
r.mu.RLock()
defer r.mu.RUnlock()
best := servers[0]
bestCount := r.countForServer(best)
for _, s := range servers[1:] {
c := r.countForServer(s)
if c < bestCount {
best = s
bestCount = c
}
}
return best, nil
}
// AcquireInflight tries to acquire a per-name create lock.
// Returns true if acquired (caller must call ReleaseInflight when done).
// Returns false if another create is already in progress for this name.
func (r *BlockVolumeRegistry) AcquireInflight(name string) bool {
_, loaded := r.inflight.LoadOrStore(name, &inflightEntry{})
return !loaded // true = we stored it (acquired), false = already existed
}
// ReleaseInflight releases the per-name create lock.
func (r *BlockVolumeRegistry) ReleaseInflight(name string) {
r.inflight.Delete(name)
}
// countForServer returns the number of volumes on the given server.
// Caller must hold at least RLock.
func (r *BlockVolumeRegistry) countForServer(server string) int {
if names, ok := r.byServer[server]; ok {
return len(names)
}
return 0
}
func (r *BlockVolumeRegistry) addToServer(server, name string) {
if r.byServer[server] == nil {
r.byServer[server] = make(map[string]bool)
}
r.byServer[server][name] = true
}
func (r *BlockVolumeRegistry) removeFromServer(server, name string) {
if names, ok := r.byServer[server]; ok {
delete(names, name)
if len(names) == 0 {
delete(r.byServer, server)
}
}
}
// removeReplicaLocked removes a replica from an entry by server address.
// Caller must hold r.mu. Also syncs deprecated scalar fields.
func (r *BlockVolumeRegistry) removeReplicaLocked(entry *BlockVolumeEntry, server, name string) {
newReplicas := make([]ReplicaInfo, 0, len(entry.Replicas))
for _, ri := range entry.Replicas {
if ri.Server == server {
continue
}
newReplicas = append(newReplicas, ri)
}
entry.Replicas = newReplicas
// Sync deprecated scalar fields.
if len(entry.Replicas) > 0 {
r0 := &entry.Replicas[0]
entry.ReplicaServer = r0.Server
entry.ReplicaPath = r0.Path
entry.ReplicaISCSIAddr = r0.ISCSIAddr
entry.ReplicaIQN = r0.IQN
entry.ReplicaDataAddr = r0.DataAddr
entry.ReplicaCtrlAddr = r0.CtrlAddr
} else {
entry.ReplicaServer = ""
entry.ReplicaPath = ""
entry.ReplicaISCSIAddr = ""
entry.ReplicaIQN = ""
entry.ReplicaDataAddr = ""
entry.ReplicaCtrlAddr = ""
}
}
// SetReplica sets replica info for a registered volume.
// Deprecated: use AddReplica for new code. This method syncs both scalar and Replicas[].
func (r *BlockVolumeRegistry) SetReplica(name, server, path, iscsiAddr, iqn string) error {
r.mu.Lock()
defer r.mu.Unlock()
entry, ok := r.volumes[name]
if !ok {
return fmt.Errorf("block volume %q not found", name)
}
// Remove old replica from byServer index before replacing.
if entry.ReplicaServer != "" && entry.ReplicaServer != server {
r.removeFromServer(entry.ReplicaServer, name)
}
entry.ReplicaServer = server
entry.ReplicaPath = path
entry.ReplicaISCSIAddr = iscsiAddr
entry.ReplicaIQN = iqn
r.addToServer(server, name)
// CP8-2: also sync to Replicas[].
info := ReplicaInfo{Server: server, Path: path, ISCSIAddr: iscsiAddr, IQN: iqn}
replaced := false
for i := range entry.Replicas {
if entry.Replicas[i].Server == server {
// Preserve existing health/LSN data.
info.HealthScore = entry.Replicas[i].HealthScore
info.WALHeadLSN = entry.Replicas[i].WALHeadLSN
info.DataAddr = entry.Replicas[i].DataAddr
info.CtrlAddr = entry.Replicas[i].CtrlAddr
entry.Replicas[i] = info
replaced = true
break
}
}
if !replaced {
entry.Replicas = append(entry.Replicas, info)
}
return nil
}
// ClearReplica removes all replica info for a registered volume.
// Deprecated: use RemoveReplica for new code. This method clears both scalar and Replicas[].
func (r *BlockVolumeRegistry) ClearReplica(name string) error {
r.mu.Lock()
defer r.mu.Unlock()
entry, ok := r.volumes[name]
if !ok {
return fmt.Errorf("block volume %q not found", name)
}
if entry.ReplicaServer != "" {
r.removeFromServer(entry.ReplicaServer, name)
}
// Remove all replicas from byServer index.
for _, ri := range entry.Replicas {
if ri.Server != entry.ReplicaServer {
r.removeFromServer(ri.Server, name)
}
}
entry.ReplicaServer = ""
entry.ReplicaPath = ""
entry.ReplicaISCSIAddr = ""
entry.ReplicaIQN = ""
entry.ReplicaDataAddr = ""
entry.ReplicaCtrlAddr = ""
entry.Replicas = nil
return nil
}
// SwapPrimaryReplica promotes the replica to primary and clears the old replica.
// The old primary becomes the new replica (if it reconnects, rebuild will handle it).
// Epoch is atomically computed as entry.Epoch+1 inside the lock (R2-F5).
// Returns the new epoch for use in assignment messages.
func (r *BlockVolumeRegistry) SwapPrimaryReplica(name string) (uint64, error) {
r.mu.Lock()
defer r.mu.Unlock()
entry, ok := r.volumes[name]
if !ok {
return 0, fmt.Errorf("block volume %q not found", name)
}
if entry.ReplicaServer == "" {
return 0, fmt.Errorf("block volume %q has no replica", name)
}
// Remove old primary from byServer index.
r.removeFromServer(entry.VolumeServer, name)
oldPrimaryServer := entry.VolumeServer
oldPrimaryPath := entry.Path
oldPrimaryIQN := entry.IQN
oldPrimaryISCSI := entry.ISCSIAddr
// Atomically bump epoch inside lock (R2-F5: prevents race with heartbeat updates).
newEpoch := entry.Epoch + 1
// Promote replica to primary.
entry.VolumeServer = entry.ReplicaServer
entry.Path = entry.ReplicaPath
entry.IQN = entry.ReplicaIQN
entry.ISCSIAddr = entry.ReplicaISCSIAddr
entry.Epoch = newEpoch
entry.Role = blockvol.RoleToWire(blockvol.RolePrimary) // R2-F3
entry.LastLeaseGrant = time.Now()
// Old primary becomes stale replica (will be rebuilt when it reconnects).
entry.ReplicaServer = oldPrimaryServer
entry.ReplicaPath = oldPrimaryPath
entry.ReplicaIQN = oldPrimaryIQN
entry.ReplicaISCSIAddr = oldPrimaryISCSI
entry.ReplicaDataAddr = ""
entry.ReplicaCtrlAddr = ""
// Update byServer index: new primary server now hosts this volume.
r.addToServer(entry.VolumeServer, name)
return newEpoch, nil
}
// AddReplica adds or replaces a replica in the Replicas slice (by server).
// Also updates the byServer index and deprecated scalar fields for backward compat.
func (r *BlockVolumeRegistry) AddReplica(name string, info ReplicaInfo) error {
r.mu.Lock()
defer r.mu.Unlock()
entry, ok := r.volumes[name]
if !ok {
return fmt.Errorf("block volume %q not found", name)
}
// Replace if same server already exists.
replaced := false
for i := range entry.Replicas {
if entry.Replicas[i].Server == info.Server {
entry.Replicas[i] = info
replaced = true
break
}
}
if !replaced {
entry.Replicas = append(entry.Replicas, info)
}
r.addToServer(info.Server, name)
// Sync deprecated scalar fields (first replica → scalar).
if len(entry.Replicas) > 0 {
r0 := &entry.Replicas[0]
entry.ReplicaServer = r0.Server
entry.ReplicaPath = r0.Path
entry.ReplicaISCSIAddr = r0.ISCSIAddr
entry.ReplicaIQN = r0.IQN
entry.ReplicaDataAddr = r0.DataAddr
entry.ReplicaCtrlAddr = r0.CtrlAddr
}
return nil
}
// RemoveReplica removes a replica by server address.
func (r *BlockVolumeRegistry) RemoveReplica(name, server string) error {
r.mu.Lock()
defer r.mu.Unlock()
entry, ok := r.volumes[name]
if !ok {
return fmt.Errorf("block volume %q not found", name)
}
found := false
newReplicas := make([]ReplicaInfo, 0, len(entry.Replicas))
for _, ri := range entry.Replicas {
if ri.Server == server {
found = true
r.removeFromServer(server, name)
continue
}
newReplicas = append(newReplicas, ri)
}
if !found {
return fmt.Errorf("replica on %q not found for volume %q", server, name)
}
entry.Replicas = newReplicas
// Sync deprecated scalar fields.
if len(entry.Replicas) > 0 {
r0 := &entry.Replicas[0]
entry.ReplicaServer = r0.Server
entry.ReplicaPath = r0.Path
entry.ReplicaISCSIAddr = r0.ISCSIAddr
entry.ReplicaIQN = r0.IQN
entry.ReplicaDataAddr = r0.DataAddr
entry.ReplicaCtrlAddr = r0.CtrlAddr
} else {
entry.ReplicaServer = ""
entry.ReplicaPath = ""
entry.ReplicaISCSIAddr = ""
entry.ReplicaIQN = ""
entry.ReplicaDataAddr = ""
entry.ReplicaCtrlAddr = ""
}
return nil
}
// SetPromotionLSNTolerance configures the max WAL LSN lag for promotion eligibility.
func (r *BlockVolumeRegistry) SetPromotionLSNTolerance(tolerance uint64) {
r.mu.Lock()
defer r.mu.Unlock()
r.promotionLSNTolerance = tolerance
}
// PromotionLSNTolerance returns the current promotion LSN tolerance.
func (r *BlockVolumeRegistry) PromotionLSNTolerance() uint64 {
r.mu.RLock()
defer r.mu.RUnlock()
return r.promotionLSNTolerance
}
// PromoteBestReplica promotes the best eligible replica to primary.
// Eligibility: heartbeat fresh (within 2×LeaseTTL), WALHeadLSN within tolerance of primary,
// and role must be RoleReplica (not RoleRebuilding).
// The promoted replica is removed from Replicas[]. Other replicas stay.
// Old primary is NOT added to Replicas (needs rebuild).
// Returns the new epoch.
func (r *BlockVolumeRegistry) PromoteBestReplica(name string) (uint64, error) {
r.mu.Lock()
defer r.mu.Unlock()
entry, ok := r.volumes[name]
if !ok {
return 0, fmt.Errorf("block volume %q not found", name)
}
if len(entry.Replicas) == 0 {
return 0, fmt.Errorf("block volume %q has no replicas", name)
}
// Filter eligible replicas.
now := time.Now()
freshnessCutoff := 2 * entry.LeaseTTL
if freshnessCutoff == 0 {
freshnessCutoff = 60 * time.Second // default if LeaseTTL not set
}
primaryLSN := entry.WALHeadLSN
bestIdx := -1
for i := range entry.Replicas {
ri := &entry.Replicas[i]
// Gate 1: heartbeat freshness.
if !ri.LastHeartbeat.IsZero() && now.Sub(ri.LastHeartbeat) > freshnessCutoff {
continue
}
// Gate 2: WAL LSN recency (skip if primary LSN is 0 — no data yet, all eligible).
if primaryLSN > 0 && ri.WALHeadLSN+r.promotionLSNTolerance < primaryLSN {
continue
}
// Gate 3: role must be RoleReplica (not rebuilding/stale).
if ri.Role != 0 && blockvol.RoleFromWire(ri.Role) != blockvol.RoleReplica {
continue
}
// Eligible — pick best by health score, tie-break by WALHeadLSN.
if bestIdx == -1 {
bestIdx = i
} else if ri.HealthScore > entry.Replicas[bestIdx].HealthScore {
bestIdx = i
} else if ri.HealthScore == entry.Replicas[bestIdx].HealthScore &&
ri.WALHeadLSN > entry.Replicas[bestIdx].WALHeadLSN {
bestIdx = i
}
}
if bestIdx == -1 {
return 0, fmt.Errorf("block volume %q: no eligible replicas for promotion", name)
}
promoted := entry.Replicas[bestIdx]
// Remove old primary from byServer index.
r.removeFromServer(entry.VolumeServer, name)
// Bump epoch atomically.
newEpoch := entry.Epoch + 1
// Promote replica to primary.
entry.VolumeServer = promoted.Server
entry.Path = promoted.Path
entry.IQN = promoted.IQN
entry.ISCSIAddr = promoted.ISCSIAddr
entry.Epoch = newEpoch
entry.Role = blockvol.RoleToWire(blockvol.RolePrimary)
entry.LastLeaseGrant = time.Now()
// Remove promoted from Replicas. Others stay.
entry.Replicas = append(entry.Replicas[:bestIdx], entry.Replicas[bestIdx+1:]...)
// Sync deprecated scalar fields.
if len(entry.Replicas) > 0 {
r0 := &entry.Replicas[0]
entry.ReplicaServer = r0.Server
entry.ReplicaPath = r0.Path
entry.ReplicaISCSIAddr = r0.ISCSIAddr
entry.ReplicaIQN = r0.IQN
entry.ReplicaDataAddr = r0.DataAddr
entry.ReplicaCtrlAddr = r0.CtrlAddr
} else {
entry.ReplicaServer = ""
entry.ReplicaPath = ""
entry.ReplicaISCSIAddr = ""
entry.ReplicaIQN = ""
entry.ReplicaDataAddr = ""
entry.ReplicaCtrlAddr = ""
}
// Update byServer index: new primary server now hosts this volume.
r.addToServer(entry.VolumeServer, name)
return newEpoch, nil
}
// MarkBlockCapable records that the given server supports block volumes.
func (r *BlockVolumeRegistry) MarkBlockCapable(server string) {
r.mu.Lock()
r.blockServers[server] = true
r.mu.Unlock()
}
// UnmarkBlockCapable removes a server from the block-capable set.
func (r *BlockVolumeRegistry) UnmarkBlockCapable(server string) {
r.mu.Lock()
delete(r.blockServers, server)
r.mu.Unlock()
}
// LeaseGrant holds the minimal fields for a lease renewal.
type LeaseGrant struct {
Path string
Epoch uint64
Role uint32
LeaseTtlMs uint32
}
// LeaseGrants generates lightweight lease renewals for all active primary
// volumes on a server. Only primaries need lease renewal — replicas are passive
// WAL receivers without a write lease. Grants carry path + epoch + role + TTL
// and are processed by HandleAssignment's same-role refresh path, which
// validates the epoch and calls lease.Grant().
// Volumes with a pending assignment are excluded (the full assignment handles lease).
func (r *BlockVolumeRegistry) LeaseGrants(server string, pendingPaths map[string]bool) []LeaseGrant {
r.mu.RLock()
defer r.mu.RUnlock()
names, ok := r.byServer[server]
if !ok {
return nil
}
var grants []LeaseGrant
for name := range names {
e := r.volumes[name]
if e == nil || e.Status != StatusActive {
continue
}
// Only primaries need lease renewal. Replicas are passive WAL receivers
// and don't hold a write lease.
if blockvol.RoleFromWire(e.Role) != blockvol.RolePrimary {
continue
}
// Primary must be on this server.
if e.VolumeServer != server {
continue
}
if pendingPaths[e.Path] {
continue
}
grants = append(grants, LeaseGrant{
Path: e.Path,
Epoch: e.Epoch,
Role: e.Role,
LeaseTtlMs: blockvol.LeaseTTLToWire(e.LeaseTTL),
})
}
return grants
}
// ListAll returns all registered block volume entries, sorted by name.
func (r *BlockVolumeRegistry) ListAll() []*BlockVolumeEntry {
r.mu.RLock()
defer r.mu.RUnlock()
entries := make([]*BlockVolumeEntry, 0, len(r.volumes))
for _, e := range r.volumes {
entries = append(entries, e)
}
sort.Slice(entries, func(i, j int) bool { return entries[i].Name < entries[j].Name })
return entries
}
// BlockServerSummary summarizes a block-capable volume server.
type BlockServerSummary struct {
Address string
VolumeCount int
BlockCapable bool
}
// ServerSummaries returns a summary for each block-capable server.
func (r *BlockVolumeRegistry) ServerSummaries() []BlockServerSummary {
r.mu.RLock()
defer r.mu.RUnlock()
summaries := make([]BlockServerSummary, 0, len(r.blockServers))
for addr := range r.blockServers {
count := 0
if names, ok := r.byServer[addr]; ok {
count = len(names)
}
summaries = append(summaries, BlockServerSummary{
Address: addr,
VolumeCount: count,
BlockCapable: true,
})
}
sort.Slice(summaries, func(i, j int) bool { return summaries[i].Address < summaries[j].Address })
return summaries
}
// BlockCapableServers returns the list of servers known to support block volumes.
func (r *BlockVolumeRegistry) BlockCapableServers() []string {
r.mu.RLock()
defer r.mu.RUnlock()
servers := make([]string, 0, len(r.blockServers))
for s := range r.blockServers {
servers = append(servers, s)
}
return servers
}
// MaxBarrierLagLSN returns the maximum WAL lag across all volumes and replicas.
// This is the primary durability-risk metric: primary WALHeadLSN minus
// replica's WALHeadLSN. SLO threshold: < 100 under normal load.
func (r *BlockVolumeRegistry) MaxBarrierLagLSN() uint64 {
r.mu.RLock()
defer r.mu.RUnlock()
var maxLag uint64
for _, entry := range r.volumes {
for _, ri := range entry.Replicas {
if ri.WALLag > maxLag {
maxLag = ri.WALLag
}
}
}
return maxLag
}
// AssignmentQueueDepth returns the total number of pending assignments across all servers.
func (r *BlockVolumeRegistry) AssignmentQueueDepth() int {
// Delegated to the assignment queue, not the registry.
// Placeholder: the queue tracks its own depth.
return 0
}
// nameFromPath extracts the volume name from a .blk file path.
// e.g. "/opt/data/block/my-volume.blk" -> "my-volume"
func nameFromPath(path string) string {
base := filepath.Base(path)
if strings.HasSuffix(base, ".blk") {
return strings.TrimSuffix(base, ".blk")
}
return base
}