You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
994 lines
30 KiB
994 lines
30 KiB
package weed_server
|
|
|
|
import (
|
|
"fmt"
|
|
"path/filepath"
|
|
"sort"
|
|
"strings"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/glog"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
|
|
"github.com/seaweedfs/seaweedfs/weed/storage/blockvol"
|
|
)
|
|
|
|
// VolumeStatus tracks the lifecycle of a block volume entry.
|
|
type VolumeStatus int
|
|
|
|
const (
|
|
StatusPending VolumeStatus = iota // Created via RPC, not yet confirmed by heartbeat
|
|
StatusActive // Confirmed by heartbeat from volume server
|
|
)
|
|
|
|
// ReplicaInfo tracks one replica of a block volume (CP8-2).
|
|
type ReplicaInfo struct {
|
|
Server string // replica VS address
|
|
Path string // file path on replica VS
|
|
ISCSIAddr string // iSCSI target address
|
|
IQN string // iSCSI qualified name
|
|
DataAddr string // WAL receiver data listen addr
|
|
CtrlAddr string // WAL receiver ctrl listen addr
|
|
HealthScore float64 // from heartbeat (0.0-1.0)
|
|
WALHeadLSN uint64 // from heartbeat
|
|
WALLag uint64 // computed: primary.WALHeadLSN - replica.WALHeadLSN
|
|
LastHeartbeat time.Time // last heartbeat received from this replica
|
|
Role uint32 // replica role (RoleReplica, RoleRebuilding, etc.)
|
|
}
|
|
|
|
const (
|
|
// DefaultPromotionLSNTolerance is the max WAL LSN lag allowed for promotion eligibility.
|
|
// Configurable per-registry via SetPromotionLSNTolerance.
|
|
DefaultPromotionLSNTolerance uint64 = 100
|
|
)
|
|
|
|
// BlockVolumeEntry tracks one block volume across the cluster.
|
|
type BlockVolumeEntry struct {
|
|
Name string
|
|
VolumeServer string // volume server address (ip:port or grpc addr)
|
|
Path string // file path on volume server
|
|
IQN string
|
|
ISCSIAddr string
|
|
SizeBytes uint64
|
|
ReplicaPlacement string // SeaweedFS placement string: "000", "001", "010", "100"
|
|
Epoch uint64
|
|
Role uint32
|
|
Status VolumeStatus
|
|
|
|
// Deprecated scalar replica fields (CP6-3). Use Replicas[] for new code.
|
|
ReplicaServer string
|
|
ReplicaPath string
|
|
ReplicaISCSIAddr string
|
|
ReplicaIQN string
|
|
ReplicaDataAddr string
|
|
ReplicaCtrlAddr string
|
|
RebuildListenAddr string // rebuild server listen addr on primary
|
|
|
|
// CP8-2: Multi-replica support.
|
|
ReplicaFactor int // 2 or 3 (default 2)
|
|
Replicas []ReplicaInfo // one per replica (RF-1 entries)
|
|
HealthScore float64 // primary health score from heartbeat
|
|
ReplicaDegraded bool // primary reports degraded replicas
|
|
WALHeadLSN uint64 // primary WAL head LSN from heartbeat
|
|
|
|
// CP8-3-1: Durability mode.
|
|
DurabilityMode string // "best_effort", "sync_all", "sync_quorum"
|
|
|
|
// Lease tracking for failover (CP6-3 F2).
|
|
LastLeaseGrant time.Time
|
|
LeaseTTL time.Duration
|
|
}
|
|
|
|
// HasReplica returns true if this volume has any replica (checks both new and deprecated fields).
|
|
func (e *BlockVolumeEntry) HasReplica() bool {
|
|
return len(e.Replicas) > 0 || e.ReplicaServer != ""
|
|
}
|
|
|
|
// FirstReplica returns the first replica info, or nil if none.
|
|
func (e *BlockVolumeEntry) FirstReplica() *ReplicaInfo {
|
|
if len(e.Replicas) > 0 {
|
|
return &e.Replicas[0]
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// ReplicaByServer returns the replica hosted on the given server, or nil.
|
|
func (e *BlockVolumeEntry) ReplicaByServer(server string) *ReplicaInfo {
|
|
for i := range e.Replicas {
|
|
if e.Replicas[i].Server == server {
|
|
return &e.Replicas[i]
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// BestReplicaForPromotion returns the best replica for promotion, or nil if none eligible.
|
|
// Criteria: highest HealthScore, tie-break by highest WALHeadLSN, then first in list.
|
|
func (e *BlockVolumeEntry) BestReplicaForPromotion() *ReplicaInfo {
|
|
if len(e.Replicas) == 0 {
|
|
return nil
|
|
}
|
|
best := 0
|
|
for i := 1; i < len(e.Replicas); i++ {
|
|
if e.Replicas[i].HealthScore > e.Replicas[best].HealthScore {
|
|
best = i
|
|
} else if e.Replicas[i].HealthScore == e.Replicas[best].HealthScore &&
|
|
e.Replicas[i].WALHeadLSN > e.Replicas[best].WALHeadLSN {
|
|
best = i
|
|
}
|
|
}
|
|
return &e.Replicas[best]
|
|
}
|
|
|
|
// BlockVolumeRegistry is the in-memory registry of block volumes.
|
|
// Rebuilt from heartbeats on master restart (no persistence).
|
|
type BlockVolumeRegistry struct {
|
|
mu sync.RWMutex
|
|
volumes map[string]*BlockVolumeEntry // keyed by name
|
|
byServer map[string]map[string]bool // server -> set of volume names
|
|
blockServers map[string]bool // servers known to support block volumes
|
|
|
|
// Promotion eligibility: max WAL LSN lag for replica to be promotable.
|
|
promotionLSNTolerance uint64
|
|
|
|
// inflight guards concurrent CreateBlockVolume for the same name.
|
|
inflight sync.Map // name -> *inflightEntry
|
|
|
|
// Metrics (CP8-4).
|
|
PromotionsTotal atomic.Uint64
|
|
FailoversTotal atomic.Uint64
|
|
RebuildsTotal atomic.Uint64
|
|
}
|
|
|
|
type inflightEntry struct{}
|
|
|
|
// NewBlockVolumeRegistry creates an empty registry.
|
|
func NewBlockVolumeRegistry() *BlockVolumeRegistry {
|
|
return &BlockVolumeRegistry{
|
|
volumes: make(map[string]*BlockVolumeEntry),
|
|
byServer: make(map[string]map[string]bool),
|
|
blockServers: make(map[string]bool),
|
|
promotionLSNTolerance: DefaultPromotionLSNTolerance,
|
|
}
|
|
}
|
|
|
|
// Register adds an entry to the registry.
|
|
// Returns error if a volume with the same name already exists.
|
|
func (r *BlockVolumeRegistry) Register(entry *BlockVolumeEntry) error {
|
|
r.mu.Lock()
|
|
defer r.mu.Unlock()
|
|
if _, ok := r.volumes[entry.Name]; ok {
|
|
return fmt.Errorf("block volume %q already registered", entry.Name)
|
|
}
|
|
r.volumes[entry.Name] = entry
|
|
r.addToServer(entry.VolumeServer, entry.Name)
|
|
// Also index replica servers so ListByServer finds them.
|
|
for _, ri := range entry.Replicas {
|
|
r.addToServer(ri.Server, entry.Name)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Unregister removes and returns the entry. Returns nil if not found.
|
|
func (r *BlockVolumeRegistry) Unregister(name string) *BlockVolumeEntry {
|
|
r.mu.Lock()
|
|
defer r.mu.Unlock()
|
|
entry, ok := r.volumes[name]
|
|
if !ok {
|
|
return nil
|
|
}
|
|
delete(r.volumes, name)
|
|
r.removeFromServer(entry.VolumeServer, name)
|
|
for _, ri := range entry.Replicas {
|
|
r.removeFromServer(ri.Server, name)
|
|
}
|
|
return entry
|
|
}
|
|
|
|
// UpdateSize updates the size of a registered volume.
|
|
// Called only after a successful VS expand to keep registry in sync.
|
|
func (r *BlockVolumeRegistry) UpdateSize(name string, newSizeBytes uint64) error {
|
|
r.mu.Lock()
|
|
defer r.mu.Unlock()
|
|
entry, ok := r.volumes[name]
|
|
if !ok {
|
|
return fmt.Errorf("block volume %q not found in registry", name)
|
|
}
|
|
entry.SizeBytes = newSizeBytes
|
|
return nil
|
|
}
|
|
|
|
// Lookup returns the entry for the given name.
|
|
func (r *BlockVolumeRegistry) Lookup(name string) (*BlockVolumeEntry, bool) {
|
|
r.mu.RLock()
|
|
defer r.mu.RUnlock()
|
|
e, ok := r.volumes[name]
|
|
return e, ok
|
|
}
|
|
|
|
// ListByServer returns all entries hosted on the given server.
|
|
func (r *BlockVolumeRegistry) ListByServer(server string) []*BlockVolumeEntry {
|
|
r.mu.RLock()
|
|
defer r.mu.RUnlock()
|
|
names, ok := r.byServer[server]
|
|
if !ok {
|
|
return nil
|
|
}
|
|
entries := make([]*BlockVolumeEntry, 0, len(names))
|
|
for name := range names {
|
|
if e, ok := r.volumes[name]; ok {
|
|
entries = append(entries, e)
|
|
}
|
|
}
|
|
return entries
|
|
}
|
|
|
|
// UpdateFullHeartbeat reconciles the registry from a full heartbeat.
|
|
// Called on the first heartbeat from a volume server.
|
|
// Marks reported volumes as Active, removes entries for this server
|
|
// that are not reported (stale).
|
|
func (r *BlockVolumeRegistry) UpdateFullHeartbeat(server string, infos []*master_pb.BlockVolumeInfoMessage) {
|
|
r.mu.Lock()
|
|
defer r.mu.Unlock()
|
|
|
|
// Mark server as block-capable since it sent block volume info.
|
|
r.blockServers[server] = true
|
|
|
|
// Build set of reported paths.
|
|
reported := make(map[string]*master_pb.BlockVolumeInfoMessage, len(infos))
|
|
for _, info := range infos {
|
|
reported[info.Path] = info
|
|
}
|
|
|
|
// Find entries for this server that are NOT reported -> reconcile.
|
|
if names, ok := r.byServer[server]; ok {
|
|
for name := range names {
|
|
entry := r.volumes[name]
|
|
if entry == nil {
|
|
continue
|
|
}
|
|
if entry.VolumeServer == server {
|
|
// Server is the primary: check if primary path is reported.
|
|
if _, found := reported[entry.Path]; !found {
|
|
delete(r.volumes, name)
|
|
delete(names, name)
|
|
// Also clean up replica entries from byServer.
|
|
for _, ri := range entry.Replicas {
|
|
r.removeFromServer(ri.Server, name)
|
|
}
|
|
}
|
|
} else {
|
|
// Server is a replica: check if replica path is reported.
|
|
ri := entry.ReplicaByServer(server)
|
|
if ri == nil {
|
|
// No replica record — stale byServer index, just clean up.
|
|
delete(names, name)
|
|
continue
|
|
}
|
|
if _, found := reported[ri.Path]; !found {
|
|
// Replica path not reported — remove this replica, NOT the whole volume.
|
|
r.removeReplicaLocked(entry, server, name)
|
|
delete(names, name)
|
|
glog.V(0).Infof("block registry: removed stale replica %s for %q (path %s not in heartbeat)",
|
|
server, name, ri.Path)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Update or add entries for reported volumes.
|
|
for _, info := range infos {
|
|
// Find existing entry: search byServer index for matching path (primary or replica).
|
|
var existing *BlockVolumeEntry
|
|
var existingName string
|
|
if names, ok := r.byServer[server]; ok {
|
|
for vname := range names {
|
|
if e := r.volumes[vname]; e != nil {
|
|
if e.VolumeServer == server && e.Path == info.Path {
|
|
existing = e
|
|
existingName = vname
|
|
break
|
|
}
|
|
if ri := e.ReplicaByServer(server); ri != nil && ri.Path == info.Path {
|
|
existing = e
|
|
existingName = vname
|
|
break
|
|
}
|
|
}
|
|
}
|
|
}
|
|
// Also try lookup by name derived from path (handles post-restart).
|
|
if existing == nil {
|
|
name := nameFromPath(info.Path)
|
|
if name != "" {
|
|
if e, ok := r.volumes[name]; ok {
|
|
existing = e
|
|
existingName = name
|
|
}
|
|
}
|
|
}
|
|
|
|
if existing != nil {
|
|
isPrimary := existing.VolumeServer == server
|
|
isReplica := existing.ReplicaByServer(server) != nil
|
|
|
|
if isPrimary {
|
|
// Primary heartbeat: update primary fields.
|
|
existing.SizeBytes = info.VolumeSize
|
|
existing.Epoch = info.Epoch
|
|
existing.Role = info.Role
|
|
existing.Status = StatusActive
|
|
existing.LastLeaseGrant = time.Now()
|
|
existing.HealthScore = info.HealthScore
|
|
existing.ReplicaDegraded = info.ReplicaDegraded
|
|
existing.WALHeadLSN = info.WalHeadLsn
|
|
// F3: only update DurabilityMode when non-empty (prevents older VS from clearing strict mode).
|
|
if info.DurabilityMode != "" {
|
|
existing.DurabilityMode = info.DurabilityMode
|
|
}
|
|
// F5: update replica addresses from heartbeat info.
|
|
if info.ReplicaDataAddr != "" {
|
|
existing.ReplicaDataAddr = info.ReplicaDataAddr
|
|
}
|
|
if info.ReplicaCtrlAddr != "" {
|
|
existing.ReplicaCtrlAddr = info.ReplicaCtrlAddr
|
|
}
|
|
// Sync first replica's data addrs to Replicas[].
|
|
if info.ReplicaDataAddr != "" && len(existing.Replicas) > 0 {
|
|
existing.Replicas[0].DataAddr = info.ReplicaDataAddr
|
|
existing.Replicas[0].CtrlAddr = info.ReplicaCtrlAddr
|
|
}
|
|
} else if isReplica {
|
|
// Replica heartbeat: update ReplicaInfo fields.
|
|
for i := range existing.Replicas {
|
|
if existing.Replicas[i].Server == server {
|
|
existing.Replicas[i].Path = info.Path
|
|
existing.Replicas[i].WALHeadLSN = info.WalHeadLsn
|
|
existing.Replicas[i].HealthScore = info.HealthScore
|
|
existing.Replicas[i].LastHeartbeat = time.Now()
|
|
existing.Replicas[i].Role = info.Role
|
|
if existing.WALHeadLSN > info.WalHeadLsn {
|
|
existing.Replicas[i].WALLag = existing.WALHeadLSN - info.WalHeadLsn
|
|
} else {
|
|
existing.Replicas[i].WALLag = 0
|
|
}
|
|
break
|
|
}
|
|
}
|
|
} else {
|
|
// Fix #3: Server reports a volume that exists but has no record of this server.
|
|
// This happens after master restart: primary heartbeat re-created the entry,
|
|
// but replica heartbeat arrives and isn't linked. Add as replica.
|
|
ri := ReplicaInfo{
|
|
Server: server,
|
|
Path: info.Path,
|
|
HealthScore: info.HealthScore,
|
|
WALHeadLSN: info.WalHeadLsn,
|
|
LastHeartbeat: time.Now(),
|
|
Role: info.Role,
|
|
}
|
|
existing.Replicas = append(existing.Replicas, ri)
|
|
r.addToServer(server, existingName)
|
|
// Sync deprecated scalar fields.
|
|
if len(existing.Replicas) == 1 {
|
|
existing.ReplicaServer = ri.Server
|
|
existing.ReplicaPath = ri.Path
|
|
}
|
|
glog.V(0).Infof("block registry: attached replica %s for %q from heartbeat (path=%s)",
|
|
server, existingName, info.Path)
|
|
}
|
|
} else {
|
|
// Auto-register volumes reported by heartbeat but not in registry.
|
|
// This recovers state after master restart.
|
|
name := nameFromPath(info.Path)
|
|
if name == "" {
|
|
continue
|
|
}
|
|
if _, dup := r.volumes[name]; !dup {
|
|
entry := &BlockVolumeEntry{
|
|
Name: name,
|
|
VolumeServer: server,
|
|
Path: info.Path,
|
|
SizeBytes: info.VolumeSize,
|
|
Epoch: info.Epoch,
|
|
Role: info.Role,
|
|
Status: StatusActive,
|
|
LastLeaseGrant: time.Now(),
|
|
LeaseTTL: 30 * time.Second,
|
|
HealthScore: info.HealthScore,
|
|
WALHeadLSN: info.WalHeadLsn,
|
|
DurabilityMode: info.DurabilityMode,
|
|
}
|
|
if info.ReplicaDataAddr != "" {
|
|
entry.ReplicaDataAddr = info.ReplicaDataAddr
|
|
}
|
|
if info.ReplicaCtrlAddr != "" {
|
|
entry.ReplicaCtrlAddr = info.ReplicaCtrlAddr
|
|
}
|
|
r.volumes[name] = entry
|
|
r.addToServer(server, name)
|
|
glog.V(0).Infof("block registry: auto-registered %q from heartbeat (server=%s, path=%s, size=%d)",
|
|
name, server, info.Path, info.VolumeSize)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// UpdateDeltaHeartbeat processes incremental new/deleted block volumes.
|
|
// Called on subsequent heartbeats (not the first).
|
|
func (r *BlockVolumeRegistry) UpdateDeltaHeartbeat(server string, added []*master_pb.BlockVolumeShortInfoMessage, removed []*master_pb.BlockVolumeShortInfoMessage) {
|
|
r.mu.Lock()
|
|
defer r.mu.Unlock()
|
|
|
|
// Remove deleted volumes.
|
|
for _, rm := range removed {
|
|
if names, ok := r.byServer[server]; ok {
|
|
for name := range names {
|
|
if e := r.volumes[name]; e != nil && e.Path == rm.Path {
|
|
delete(r.volumes, name)
|
|
delete(names, name)
|
|
break
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Mark newly appeared volumes as active (if they exist in registry).
|
|
for _, add := range added {
|
|
if names, ok := r.byServer[server]; ok {
|
|
for name := range names {
|
|
if e := r.volumes[name]; e != nil && e.Path == add.Path {
|
|
e.Status = StatusActive
|
|
break
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// PickServer returns the server address with the fewest block volumes.
|
|
// servers is the list of online volume server addresses.
|
|
// Returns error if no servers available.
|
|
func (r *BlockVolumeRegistry) PickServer(servers []string) (string, error) {
|
|
if len(servers) == 0 {
|
|
return "", fmt.Errorf("no block volume servers available")
|
|
}
|
|
r.mu.RLock()
|
|
defer r.mu.RUnlock()
|
|
|
|
best := servers[0]
|
|
bestCount := r.countForServer(best)
|
|
for _, s := range servers[1:] {
|
|
c := r.countForServer(s)
|
|
if c < bestCount {
|
|
best = s
|
|
bestCount = c
|
|
}
|
|
}
|
|
return best, nil
|
|
}
|
|
|
|
// AcquireInflight tries to acquire a per-name create lock.
|
|
// Returns true if acquired (caller must call ReleaseInflight when done).
|
|
// Returns false if another create is already in progress for this name.
|
|
func (r *BlockVolumeRegistry) AcquireInflight(name string) bool {
|
|
_, loaded := r.inflight.LoadOrStore(name, &inflightEntry{})
|
|
return !loaded // true = we stored it (acquired), false = already existed
|
|
}
|
|
|
|
// ReleaseInflight releases the per-name create lock.
|
|
func (r *BlockVolumeRegistry) ReleaseInflight(name string) {
|
|
r.inflight.Delete(name)
|
|
}
|
|
|
|
// countForServer returns the number of volumes on the given server.
|
|
// Caller must hold at least RLock.
|
|
func (r *BlockVolumeRegistry) countForServer(server string) int {
|
|
if names, ok := r.byServer[server]; ok {
|
|
return len(names)
|
|
}
|
|
return 0
|
|
}
|
|
|
|
func (r *BlockVolumeRegistry) addToServer(server, name string) {
|
|
if r.byServer[server] == nil {
|
|
r.byServer[server] = make(map[string]bool)
|
|
}
|
|
r.byServer[server][name] = true
|
|
}
|
|
|
|
func (r *BlockVolumeRegistry) removeFromServer(server, name string) {
|
|
if names, ok := r.byServer[server]; ok {
|
|
delete(names, name)
|
|
if len(names) == 0 {
|
|
delete(r.byServer, server)
|
|
}
|
|
}
|
|
}
|
|
|
|
// removeReplicaLocked removes a replica from an entry by server address.
|
|
// Caller must hold r.mu. Also syncs deprecated scalar fields.
|
|
func (r *BlockVolumeRegistry) removeReplicaLocked(entry *BlockVolumeEntry, server, name string) {
|
|
newReplicas := make([]ReplicaInfo, 0, len(entry.Replicas))
|
|
for _, ri := range entry.Replicas {
|
|
if ri.Server == server {
|
|
continue
|
|
}
|
|
newReplicas = append(newReplicas, ri)
|
|
}
|
|
entry.Replicas = newReplicas
|
|
// Sync deprecated scalar fields.
|
|
if len(entry.Replicas) > 0 {
|
|
r0 := &entry.Replicas[0]
|
|
entry.ReplicaServer = r0.Server
|
|
entry.ReplicaPath = r0.Path
|
|
entry.ReplicaISCSIAddr = r0.ISCSIAddr
|
|
entry.ReplicaIQN = r0.IQN
|
|
entry.ReplicaDataAddr = r0.DataAddr
|
|
entry.ReplicaCtrlAddr = r0.CtrlAddr
|
|
} else {
|
|
entry.ReplicaServer = ""
|
|
entry.ReplicaPath = ""
|
|
entry.ReplicaISCSIAddr = ""
|
|
entry.ReplicaIQN = ""
|
|
entry.ReplicaDataAddr = ""
|
|
entry.ReplicaCtrlAddr = ""
|
|
}
|
|
}
|
|
|
|
// SetReplica sets replica info for a registered volume.
|
|
// Deprecated: use AddReplica for new code. This method syncs both scalar and Replicas[].
|
|
func (r *BlockVolumeRegistry) SetReplica(name, server, path, iscsiAddr, iqn string) error {
|
|
r.mu.Lock()
|
|
defer r.mu.Unlock()
|
|
entry, ok := r.volumes[name]
|
|
if !ok {
|
|
return fmt.Errorf("block volume %q not found", name)
|
|
}
|
|
// Remove old replica from byServer index before replacing.
|
|
if entry.ReplicaServer != "" && entry.ReplicaServer != server {
|
|
r.removeFromServer(entry.ReplicaServer, name)
|
|
}
|
|
entry.ReplicaServer = server
|
|
entry.ReplicaPath = path
|
|
entry.ReplicaISCSIAddr = iscsiAddr
|
|
entry.ReplicaIQN = iqn
|
|
r.addToServer(server, name)
|
|
|
|
// CP8-2: also sync to Replicas[].
|
|
info := ReplicaInfo{Server: server, Path: path, ISCSIAddr: iscsiAddr, IQN: iqn}
|
|
replaced := false
|
|
for i := range entry.Replicas {
|
|
if entry.Replicas[i].Server == server {
|
|
// Preserve existing health/LSN data.
|
|
info.HealthScore = entry.Replicas[i].HealthScore
|
|
info.WALHeadLSN = entry.Replicas[i].WALHeadLSN
|
|
info.DataAddr = entry.Replicas[i].DataAddr
|
|
info.CtrlAddr = entry.Replicas[i].CtrlAddr
|
|
entry.Replicas[i] = info
|
|
replaced = true
|
|
break
|
|
}
|
|
}
|
|
if !replaced {
|
|
entry.Replicas = append(entry.Replicas, info)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// ClearReplica removes all replica info for a registered volume.
|
|
// Deprecated: use RemoveReplica for new code. This method clears both scalar and Replicas[].
|
|
func (r *BlockVolumeRegistry) ClearReplica(name string) error {
|
|
r.mu.Lock()
|
|
defer r.mu.Unlock()
|
|
entry, ok := r.volumes[name]
|
|
if !ok {
|
|
return fmt.Errorf("block volume %q not found", name)
|
|
}
|
|
if entry.ReplicaServer != "" {
|
|
r.removeFromServer(entry.ReplicaServer, name)
|
|
}
|
|
// Remove all replicas from byServer index.
|
|
for _, ri := range entry.Replicas {
|
|
if ri.Server != entry.ReplicaServer {
|
|
r.removeFromServer(ri.Server, name)
|
|
}
|
|
}
|
|
entry.ReplicaServer = ""
|
|
entry.ReplicaPath = ""
|
|
entry.ReplicaISCSIAddr = ""
|
|
entry.ReplicaIQN = ""
|
|
entry.ReplicaDataAddr = ""
|
|
entry.ReplicaCtrlAddr = ""
|
|
entry.Replicas = nil
|
|
return nil
|
|
}
|
|
|
|
// SwapPrimaryReplica promotes the replica to primary and clears the old replica.
|
|
// The old primary becomes the new replica (if it reconnects, rebuild will handle it).
|
|
// Epoch is atomically computed as entry.Epoch+1 inside the lock (R2-F5).
|
|
// Returns the new epoch for use in assignment messages.
|
|
func (r *BlockVolumeRegistry) SwapPrimaryReplica(name string) (uint64, error) {
|
|
r.mu.Lock()
|
|
defer r.mu.Unlock()
|
|
entry, ok := r.volumes[name]
|
|
if !ok {
|
|
return 0, fmt.Errorf("block volume %q not found", name)
|
|
}
|
|
if entry.ReplicaServer == "" {
|
|
return 0, fmt.Errorf("block volume %q has no replica", name)
|
|
}
|
|
|
|
// Remove old primary from byServer index.
|
|
r.removeFromServer(entry.VolumeServer, name)
|
|
|
|
oldPrimaryServer := entry.VolumeServer
|
|
oldPrimaryPath := entry.Path
|
|
oldPrimaryIQN := entry.IQN
|
|
oldPrimaryISCSI := entry.ISCSIAddr
|
|
|
|
// Atomically bump epoch inside lock (R2-F5: prevents race with heartbeat updates).
|
|
newEpoch := entry.Epoch + 1
|
|
|
|
// Promote replica to primary.
|
|
entry.VolumeServer = entry.ReplicaServer
|
|
entry.Path = entry.ReplicaPath
|
|
entry.IQN = entry.ReplicaIQN
|
|
entry.ISCSIAddr = entry.ReplicaISCSIAddr
|
|
entry.Epoch = newEpoch
|
|
entry.Role = blockvol.RoleToWire(blockvol.RolePrimary) // R2-F3
|
|
entry.LastLeaseGrant = time.Now()
|
|
|
|
// Old primary becomes stale replica (will be rebuilt when it reconnects).
|
|
entry.ReplicaServer = oldPrimaryServer
|
|
entry.ReplicaPath = oldPrimaryPath
|
|
entry.ReplicaIQN = oldPrimaryIQN
|
|
entry.ReplicaISCSIAddr = oldPrimaryISCSI
|
|
entry.ReplicaDataAddr = ""
|
|
entry.ReplicaCtrlAddr = ""
|
|
|
|
// Update byServer index: new primary server now hosts this volume.
|
|
r.addToServer(entry.VolumeServer, name)
|
|
return newEpoch, nil
|
|
}
|
|
|
|
// AddReplica adds or replaces a replica in the Replicas slice (by server).
|
|
// Also updates the byServer index and deprecated scalar fields for backward compat.
|
|
func (r *BlockVolumeRegistry) AddReplica(name string, info ReplicaInfo) error {
|
|
r.mu.Lock()
|
|
defer r.mu.Unlock()
|
|
entry, ok := r.volumes[name]
|
|
if !ok {
|
|
return fmt.Errorf("block volume %q not found", name)
|
|
}
|
|
// Replace if same server already exists.
|
|
replaced := false
|
|
for i := range entry.Replicas {
|
|
if entry.Replicas[i].Server == info.Server {
|
|
entry.Replicas[i] = info
|
|
replaced = true
|
|
break
|
|
}
|
|
}
|
|
if !replaced {
|
|
entry.Replicas = append(entry.Replicas, info)
|
|
}
|
|
r.addToServer(info.Server, name)
|
|
|
|
// Sync deprecated scalar fields (first replica → scalar).
|
|
if len(entry.Replicas) > 0 {
|
|
r0 := &entry.Replicas[0]
|
|
entry.ReplicaServer = r0.Server
|
|
entry.ReplicaPath = r0.Path
|
|
entry.ReplicaISCSIAddr = r0.ISCSIAddr
|
|
entry.ReplicaIQN = r0.IQN
|
|
entry.ReplicaDataAddr = r0.DataAddr
|
|
entry.ReplicaCtrlAddr = r0.CtrlAddr
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// RemoveReplica removes a replica by server address.
|
|
func (r *BlockVolumeRegistry) RemoveReplica(name, server string) error {
|
|
r.mu.Lock()
|
|
defer r.mu.Unlock()
|
|
entry, ok := r.volumes[name]
|
|
if !ok {
|
|
return fmt.Errorf("block volume %q not found", name)
|
|
}
|
|
found := false
|
|
newReplicas := make([]ReplicaInfo, 0, len(entry.Replicas))
|
|
for _, ri := range entry.Replicas {
|
|
if ri.Server == server {
|
|
found = true
|
|
r.removeFromServer(server, name)
|
|
continue
|
|
}
|
|
newReplicas = append(newReplicas, ri)
|
|
}
|
|
if !found {
|
|
return fmt.Errorf("replica on %q not found for volume %q", server, name)
|
|
}
|
|
entry.Replicas = newReplicas
|
|
|
|
// Sync deprecated scalar fields.
|
|
if len(entry.Replicas) > 0 {
|
|
r0 := &entry.Replicas[0]
|
|
entry.ReplicaServer = r0.Server
|
|
entry.ReplicaPath = r0.Path
|
|
entry.ReplicaISCSIAddr = r0.ISCSIAddr
|
|
entry.ReplicaIQN = r0.IQN
|
|
entry.ReplicaDataAddr = r0.DataAddr
|
|
entry.ReplicaCtrlAddr = r0.CtrlAddr
|
|
} else {
|
|
entry.ReplicaServer = ""
|
|
entry.ReplicaPath = ""
|
|
entry.ReplicaISCSIAddr = ""
|
|
entry.ReplicaIQN = ""
|
|
entry.ReplicaDataAddr = ""
|
|
entry.ReplicaCtrlAddr = ""
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// SetPromotionLSNTolerance configures the max WAL LSN lag for promotion eligibility.
|
|
func (r *BlockVolumeRegistry) SetPromotionLSNTolerance(tolerance uint64) {
|
|
r.mu.Lock()
|
|
defer r.mu.Unlock()
|
|
r.promotionLSNTolerance = tolerance
|
|
}
|
|
|
|
// PromotionLSNTolerance returns the current promotion LSN tolerance.
|
|
func (r *BlockVolumeRegistry) PromotionLSNTolerance() uint64 {
|
|
r.mu.RLock()
|
|
defer r.mu.RUnlock()
|
|
return r.promotionLSNTolerance
|
|
}
|
|
|
|
// PromoteBestReplica promotes the best eligible replica to primary.
|
|
// Eligibility: heartbeat fresh (within 2×LeaseTTL), WALHeadLSN within tolerance of primary,
|
|
// and role must be RoleReplica (not RoleRebuilding).
|
|
// The promoted replica is removed from Replicas[]. Other replicas stay.
|
|
// Old primary is NOT added to Replicas (needs rebuild).
|
|
// Returns the new epoch.
|
|
func (r *BlockVolumeRegistry) PromoteBestReplica(name string) (uint64, error) {
|
|
r.mu.Lock()
|
|
defer r.mu.Unlock()
|
|
entry, ok := r.volumes[name]
|
|
if !ok {
|
|
return 0, fmt.Errorf("block volume %q not found", name)
|
|
}
|
|
if len(entry.Replicas) == 0 {
|
|
return 0, fmt.Errorf("block volume %q has no replicas", name)
|
|
}
|
|
|
|
// Filter eligible replicas.
|
|
now := time.Now()
|
|
freshnessCutoff := 2 * entry.LeaseTTL
|
|
if freshnessCutoff == 0 {
|
|
freshnessCutoff = 60 * time.Second // default if LeaseTTL not set
|
|
}
|
|
primaryLSN := entry.WALHeadLSN
|
|
|
|
bestIdx := -1
|
|
for i := range entry.Replicas {
|
|
ri := &entry.Replicas[i]
|
|
// Gate 1: heartbeat freshness.
|
|
if !ri.LastHeartbeat.IsZero() && now.Sub(ri.LastHeartbeat) > freshnessCutoff {
|
|
continue
|
|
}
|
|
// Gate 2: WAL LSN recency (skip if primary LSN is 0 — no data yet, all eligible).
|
|
if primaryLSN > 0 && ri.WALHeadLSN+r.promotionLSNTolerance < primaryLSN {
|
|
continue
|
|
}
|
|
// Gate 3: role must be RoleReplica (not rebuilding/stale).
|
|
if ri.Role != 0 && blockvol.RoleFromWire(ri.Role) != blockvol.RoleReplica {
|
|
continue
|
|
}
|
|
// Eligible — pick best by health score, tie-break by WALHeadLSN.
|
|
if bestIdx == -1 {
|
|
bestIdx = i
|
|
} else if ri.HealthScore > entry.Replicas[bestIdx].HealthScore {
|
|
bestIdx = i
|
|
} else if ri.HealthScore == entry.Replicas[bestIdx].HealthScore &&
|
|
ri.WALHeadLSN > entry.Replicas[bestIdx].WALHeadLSN {
|
|
bestIdx = i
|
|
}
|
|
}
|
|
|
|
if bestIdx == -1 {
|
|
return 0, fmt.Errorf("block volume %q: no eligible replicas for promotion", name)
|
|
}
|
|
|
|
promoted := entry.Replicas[bestIdx]
|
|
|
|
// Remove old primary from byServer index.
|
|
r.removeFromServer(entry.VolumeServer, name)
|
|
|
|
// Bump epoch atomically.
|
|
newEpoch := entry.Epoch + 1
|
|
|
|
// Promote replica to primary.
|
|
entry.VolumeServer = promoted.Server
|
|
entry.Path = promoted.Path
|
|
entry.IQN = promoted.IQN
|
|
entry.ISCSIAddr = promoted.ISCSIAddr
|
|
entry.Epoch = newEpoch
|
|
entry.Role = blockvol.RoleToWire(blockvol.RolePrimary)
|
|
entry.LastLeaseGrant = time.Now()
|
|
|
|
// Remove promoted from Replicas. Others stay.
|
|
entry.Replicas = append(entry.Replicas[:bestIdx], entry.Replicas[bestIdx+1:]...)
|
|
|
|
// Sync deprecated scalar fields.
|
|
if len(entry.Replicas) > 0 {
|
|
r0 := &entry.Replicas[0]
|
|
entry.ReplicaServer = r0.Server
|
|
entry.ReplicaPath = r0.Path
|
|
entry.ReplicaISCSIAddr = r0.ISCSIAddr
|
|
entry.ReplicaIQN = r0.IQN
|
|
entry.ReplicaDataAddr = r0.DataAddr
|
|
entry.ReplicaCtrlAddr = r0.CtrlAddr
|
|
} else {
|
|
entry.ReplicaServer = ""
|
|
entry.ReplicaPath = ""
|
|
entry.ReplicaISCSIAddr = ""
|
|
entry.ReplicaIQN = ""
|
|
entry.ReplicaDataAddr = ""
|
|
entry.ReplicaCtrlAddr = ""
|
|
}
|
|
|
|
// Update byServer index: new primary server now hosts this volume.
|
|
r.addToServer(entry.VolumeServer, name)
|
|
|
|
return newEpoch, nil
|
|
}
|
|
|
|
// MarkBlockCapable records that the given server supports block volumes.
|
|
func (r *BlockVolumeRegistry) MarkBlockCapable(server string) {
|
|
r.mu.Lock()
|
|
r.blockServers[server] = true
|
|
r.mu.Unlock()
|
|
}
|
|
|
|
// UnmarkBlockCapable removes a server from the block-capable set.
|
|
func (r *BlockVolumeRegistry) UnmarkBlockCapable(server string) {
|
|
r.mu.Lock()
|
|
delete(r.blockServers, server)
|
|
r.mu.Unlock()
|
|
}
|
|
|
|
// LeaseGrant holds the minimal fields for a lease renewal.
|
|
type LeaseGrant struct {
|
|
Path string
|
|
Epoch uint64
|
|
Role uint32
|
|
LeaseTtlMs uint32
|
|
}
|
|
|
|
// LeaseGrants generates lightweight lease renewals for all active primary
|
|
// volumes on a server. Only primaries need lease renewal — replicas are passive
|
|
// WAL receivers without a write lease. Grants carry path + epoch + role + TTL
|
|
// and are processed by HandleAssignment's same-role refresh path, which
|
|
// validates the epoch and calls lease.Grant().
|
|
// Volumes with a pending assignment are excluded (the full assignment handles lease).
|
|
func (r *BlockVolumeRegistry) LeaseGrants(server string, pendingPaths map[string]bool) []LeaseGrant {
|
|
r.mu.RLock()
|
|
defer r.mu.RUnlock()
|
|
names, ok := r.byServer[server]
|
|
if !ok {
|
|
return nil
|
|
}
|
|
var grants []LeaseGrant
|
|
for name := range names {
|
|
e := r.volumes[name]
|
|
if e == nil || e.Status != StatusActive {
|
|
continue
|
|
}
|
|
// Only primaries need lease renewal. Replicas are passive WAL receivers
|
|
// and don't hold a write lease.
|
|
if blockvol.RoleFromWire(e.Role) != blockvol.RolePrimary {
|
|
continue
|
|
}
|
|
// Primary must be on this server.
|
|
if e.VolumeServer != server {
|
|
continue
|
|
}
|
|
if pendingPaths[e.Path] {
|
|
continue
|
|
}
|
|
grants = append(grants, LeaseGrant{
|
|
Path: e.Path,
|
|
Epoch: e.Epoch,
|
|
Role: e.Role,
|
|
LeaseTtlMs: blockvol.LeaseTTLToWire(e.LeaseTTL),
|
|
})
|
|
}
|
|
return grants
|
|
}
|
|
|
|
// ListAll returns all registered block volume entries, sorted by name.
|
|
func (r *BlockVolumeRegistry) ListAll() []*BlockVolumeEntry {
|
|
r.mu.RLock()
|
|
defer r.mu.RUnlock()
|
|
entries := make([]*BlockVolumeEntry, 0, len(r.volumes))
|
|
for _, e := range r.volumes {
|
|
entries = append(entries, e)
|
|
}
|
|
sort.Slice(entries, func(i, j int) bool { return entries[i].Name < entries[j].Name })
|
|
return entries
|
|
}
|
|
|
|
// BlockServerSummary summarizes a block-capable volume server.
|
|
type BlockServerSummary struct {
|
|
Address string
|
|
VolumeCount int
|
|
BlockCapable bool
|
|
}
|
|
|
|
// ServerSummaries returns a summary for each block-capable server.
|
|
func (r *BlockVolumeRegistry) ServerSummaries() []BlockServerSummary {
|
|
r.mu.RLock()
|
|
defer r.mu.RUnlock()
|
|
summaries := make([]BlockServerSummary, 0, len(r.blockServers))
|
|
for addr := range r.blockServers {
|
|
count := 0
|
|
if names, ok := r.byServer[addr]; ok {
|
|
count = len(names)
|
|
}
|
|
summaries = append(summaries, BlockServerSummary{
|
|
Address: addr,
|
|
VolumeCount: count,
|
|
BlockCapable: true,
|
|
})
|
|
}
|
|
sort.Slice(summaries, func(i, j int) bool { return summaries[i].Address < summaries[j].Address })
|
|
return summaries
|
|
}
|
|
|
|
// BlockCapableServers returns the list of servers known to support block volumes.
|
|
func (r *BlockVolumeRegistry) BlockCapableServers() []string {
|
|
r.mu.RLock()
|
|
defer r.mu.RUnlock()
|
|
servers := make([]string, 0, len(r.blockServers))
|
|
for s := range r.blockServers {
|
|
servers = append(servers, s)
|
|
}
|
|
return servers
|
|
}
|
|
|
|
// MaxBarrierLagLSN returns the maximum WAL lag across all volumes and replicas.
|
|
// This is the primary durability-risk metric: primary WALHeadLSN minus
|
|
// replica's WALHeadLSN. SLO threshold: < 100 under normal load.
|
|
func (r *BlockVolumeRegistry) MaxBarrierLagLSN() uint64 {
|
|
r.mu.RLock()
|
|
defer r.mu.RUnlock()
|
|
var maxLag uint64
|
|
for _, entry := range r.volumes {
|
|
for _, ri := range entry.Replicas {
|
|
if ri.WALLag > maxLag {
|
|
maxLag = ri.WALLag
|
|
}
|
|
}
|
|
}
|
|
return maxLag
|
|
}
|
|
|
|
// AssignmentQueueDepth returns the total number of pending assignments across all servers.
|
|
func (r *BlockVolumeRegistry) AssignmentQueueDepth() int {
|
|
// Delegated to the assignment queue, not the registry.
|
|
// Placeholder: the queue tracks its own depth.
|
|
return 0
|
|
}
|
|
|
|
// nameFromPath extracts the volume name from a .blk file path.
|
|
// e.g. "/opt/data/block/my-volume.blk" -> "my-volume"
|
|
func nameFromPath(path string) string {
|
|
base := filepath.Base(path)
|
|
if strings.HasSuffix(base, ".blk") {
|
|
return strings.TrimSuffix(base, ".blk")
|
|
}
|
|
return base
|
|
}
|
|
|