Browse Source

fix: epoch-based reconciliation on master restart reconstruction

When a second server reports the same volume during master restart,
UpdateFullHeartbeat now uses epoch-based tie-breaking instead of
first-heartbeat-wins:

1. Higher epoch wins as primary — old entry demoted to replica
2. Same epoch — higher WALHeadLSN wins (heuristic, warning logged)
3. Lower epoch — added as replica

Applied in both code paths: the auto-register branch (no entry
exists yet for this name) and the unlinked-server branch (entry
exists but this server is not in it).

This is a deterministic reconstruction improvement, not ground
truth. The long-term fix is persisting authoritative volume state.

5 new tests covering all reconciliation scenarios.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
feature/sw-block
Ping Qiu 1 week ago
parent
commit
9137fa6486
  1. 144
      weed/server/master_block_registry.go
  2. 132
      weed/server/master_block_registry_test.go

144
weed/server/master_block_registry.go

@ -499,28 +499,10 @@ func (r *BlockVolumeRegistry) UpdateFullHeartbeat(server string, infos []*master
} }
} }
} else { } else {
// Fix #3: Server reports a volume that exists but has no record of this server.
// This happens after master restart: primary heartbeat re-created the entry,
// but replica heartbeat arrives and isn't linked. Add as replica.
ri := ReplicaInfo{
Server: server,
Path: info.Path,
HealthScore: info.HealthScore,
WALHeadLSN: info.WalHeadLsn,
LastHeartbeat: time.Now(),
Role: info.Role,
NvmeAddr: info.NvmeAddr,
NQN: info.Nqn,
}
existing.Replicas = append(existing.Replicas, ri)
r.addToServer(server, existingName)
// Sync deprecated scalar fields.
if len(existing.Replicas) == 1 {
existing.ReplicaServer = ri.Server
existing.ReplicaPath = ri.Path
}
glog.V(0).Infof("block registry: attached replica %s for %q from heartbeat (path=%s)",
server, existingName, info.Path)
// Server reports a volume that exists but has no record of this server.
// This happens after master restart. Use epoch-based reconciliation
// to determine if the new server should be primary or replica.
r.reconcileOnRestart(existingName, existing, server, info)
} }
} else { } else {
// Auto-register volumes reported by heartbeat but not in registry. // Auto-register volumes reported by heartbeat but not in registry.
@ -529,7 +511,8 @@ func (r *BlockVolumeRegistry) UpdateFullHeartbeat(server string, infos []*master
if name == "" { if name == "" {
continue continue
} }
if _, dup := r.volumes[name]; !dup {
existing, dup := r.volumes[name]
if !dup {
entry := &BlockVolumeEntry{ entry := &BlockVolumeEntry{
Name: name, Name: name,
VolumeServer: server, VolumeServer: server,
@ -550,18 +533,131 @@ func (r *BlockVolumeRegistry) UpdateFullHeartbeat(server string, infos []*master
if info.ReplicaCtrlAddr != "" { if info.ReplicaCtrlAddr != "" {
entry.ReplicaCtrlAddr = info.ReplicaCtrlAddr entry.ReplicaCtrlAddr = info.ReplicaCtrlAddr
} }
// NVMe publication: propagate NVMe fields from heartbeat.
entry.NvmeAddr = info.NvmeAddr entry.NvmeAddr = info.NvmeAddr
entry.NQN = info.Nqn entry.NQN = info.Nqn
r.volumes[name] = entry r.volumes[name] = entry
r.addToServer(server, name) r.addToServer(server, name)
glog.V(0).Infof("block registry: auto-registered %q from heartbeat (server=%s, path=%s, size=%d)", glog.V(0).Infof("block registry: auto-registered %q from heartbeat (server=%s, path=%s, size=%d)",
name, server, info.Path, info.VolumeSize) name, server, info.Path, info.VolumeSize)
} else {
// Reconcile: a second server reports the same volume during restart reconstruction.
r.reconcileOnRestart(name, existing, server, info)
} }
} }
} }
} }
// reconcileOnRestart handles the case where a second server reports a volume
// name that already exists in the registry during master restart reconstruction.
// Uses epoch-based tie-breaking to determine who is the real primary.
//
// Rules:
// 1. Higher epoch wins as primary — the old entry becomes a replica.
// 2. Same epoch, both claim primary — higher WALHeadLSN wins (heuristic).
// A warning is logged because this is an ambiguous case.
// 3. Lower epoch — new server is added as replica.
//
// Caller must hold r.mu (write lock).
func (r *BlockVolumeRegistry) reconcileOnRestart(name string, existing *BlockVolumeEntry, newServer string, info *master_pb.BlockVolumeInfoMessage) {
newEpoch := info.Epoch
oldEpoch := existing.Epoch
if newEpoch > oldEpoch {
// New server has a higher epoch — it is the authoritative primary.
// Demote the current entry to a replica.
glog.V(0).Infof("block registry: reconcile %q: new server %s epoch %d > existing %s epoch %d, promoting new",
name, newServer, newEpoch, existing.VolumeServer, oldEpoch)
r.demoteExistingToReplica(name, existing, newServer, info)
return
}
if newEpoch < oldEpoch {
// New server has a lower epoch — add as replica.
glog.V(0).Infof("block registry: reconcile %q: new server %s epoch %d < existing %s epoch %d, adding as replica",
name, newServer, newEpoch, existing.VolumeServer, oldEpoch)
r.addServerAsReplica(name, existing, newServer, info)
return
}
// Same epoch — ambiguous. Use WALHeadLSN as heuristic tiebreaker.
if info.WalHeadLsn > existing.WALHeadLSN {
glog.Warningf("block registry: reconcile %q: AMBIGUOUS same epoch %d — new server %s LSN %d > existing %s LSN %d, promoting new (heuristic)",
name, newEpoch, newServer, info.WalHeadLsn, existing.VolumeServer, existing.WALHeadLSN)
r.demoteExistingToReplica(name, existing, newServer, info)
} else {
glog.Warningf("block registry: reconcile %q: AMBIGUOUS same epoch %d — existing %s LSN %d >= new server %s LSN %d, keeping existing (heuristic)",
name, newEpoch, existing.VolumeServer, existing.WALHeadLSN, newServer, info.WalHeadLsn)
r.addServerAsReplica(name, existing, newServer, info)
}
}
// demoteExistingToReplica swaps the primary: new server becomes primary,
// old primary becomes a replica. Called during restart reconciliation.
// Caller must hold r.mu.
func (r *BlockVolumeRegistry) demoteExistingToReplica(name string, existing *BlockVolumeEntry, newServer string, info *master_pb.BlockVolumeInfoMessage) {
oldServer := existing.VolumeServer
oldPath := existing.Path
// Save old primary as replica.
oldReplica := ReplicaInfo{
Server: oldServer,
Path: oldPath,
ISCSIAddr: existing.ISCSIAddr,
IQN: existing.IQN,
NvmeAddr: existing.NvmeAddr,
NQN: existing.NQN,
HealthScore: existing.HealthScore,
WALHeadLSN: existing.WALHeadLSN,
LastHeartbeat: existing.LastLeaseGrant,
Role: blockvol.RoleToWire(blockvol.RoleReplica),
}
// Update entry to reflect new primary.
existing.VolumeServer = newServer
existing.Path = info.Path
existing.Epoch = info.Epoch
existing.Role = info.Role
existing.HealthScore = info.HealthScore
existing.WALHeadLSN = info.WalHeadLsn
existing.LastLeaseGrant = time.Now()
if info.DurabilityMode != "" {
existing.DurabilityMode = info.DurabilityMode
}
existing.NvmeAddr = info.NvmeAddr
existing.NQN = info.Nqn
// Add old primary as replica.
existing.Replicas = append(existing.Replicas, oldReplica)
r.addToServer(newServer, name)
// Sync deprecated scalar fields.
if len(existing.Replicas) == 1 {
existing.ReplicaServer = oldReplica.Server
existing.ReplicaPath = oldReplica.Path
}
}
// addServerAsReplica adds the new server as a replica for the existing entry.
// Caller must hold r.mu.
func (r *BlockVolumeRegistry) addServerAsReplica(name string, existing *BlockVolumeEntry, newServer string, info *master_pb.BlockVolumeInfoMessage) {
ri := ReplicaInfo{
Server: newServer,
Path: info.Path,
HealthScore: info.HealthScore,
WALHeadLSN: info.WalHeadLsn,
LastHeartbeat: time.Now(),
Role: info.Role,
NvmeAddr: info.NvmeAddr,
NQN: info.Nqn,
}
existing.Replicas = append(existing.Replicas, ri)
r.addToServer(newServer, name)
if len(existing.Replicas) == 1 {
existing.ReplicaServer = ri.Server
existing.ReplicaPath = ri.Path
}
}
// UpdateDeltaHeartbeat processes incremental new/deleted block volumes. // UpdateDeltaHeartbeat processes incremental new/deleted block volumes.
// Called on subsequent heartbeats (not the first). // Called on subsequent heartbeats (not the first).
func (r *BlockVolumeRegistry) UpdateDeltaHeartbeat(server string, added []*master_pb.BlockVolumeShortInfoMessage, removed []*master_pb.BlockVolumeShortInfoMessage) { func (r *BlockVolumeRegistry) UpdateDeltaHeartbeat(server string, added []*master_pb.BlockVolumeShortInfoMessage, removed []*master_pb.BlockVolumeShortInfoMessage) {

132
weed/server/master_block_registry_test.go

@ -802,15 +802,18 @@ func TestRegistry_ReplicaHeartbeat_StaleReplicaRemoved(t *testing.T) {
// Fix #3: Replica heartbeat after master restart reconstructs ReplicaInfo. // Fix #3: Replica heartbeat after master restart reconstructs ReplicaInfo.
func TestRegistry_ReplicaHeartbeat_ReconstructsAfterRestart(t *testing.T) { func TestRegistry_ReplicaHeartbeat_ReconstructsAfterRestart(t *testing.T) {
r := NewBlockVolumeRegistry() r := NewBlockVolumeRegistry()
// Simulate master restart: primary heartbeat re-created entry without replicas.
// Simulate master restart: primary heartbeat re-created entry (epoch 1).
r.Register(&BlockVolumeEntry{ r.Register(&BlockVolumeEntry{
Name: "vol1", Name: "vol1",
VolumeServer: "primary", VolumeServer: "primary",
Path: "/data/vol1.blk", Path: "/data/vol1.blk",
Epoch: 1,
WALHeadLSN: 100,
Status: StatusActive, Status: StatusActive,
}) })
// Replica heartbeat arrives — vol1 exists but has no record of this server. // Replica heartbeat arrives — vol1 exists but has no record of this server.
// Same epoch, lower LSN, Role=2 (replica) → added as replica.
r.UpdateFullHeartbeat("replica1", []*master_pb.BlockVolumeInfoMessage{ r.UpdateFullHeartbeat("replica1", []*master_pb.BlockVolumeInfoMessage{
{Path: "/data/vol1.blk", Epoch: 1, Role: 2, HealthScore: 0.95, WalHeadLsn: 42}, {Path: "/data/vol1.blk", Epoch: 1, Role: 2, HealthScore: 0.95, WalHeadLsn: 42},
}, "") }, "")
@ -1618,6 +1621,133 @@ func TestRegistry_ManualPromote_Force_StillRejectsDeadServer(t *testing.T) {
} }
} }
// --- Master restart reconciliation tests ---
func TestMasterRestart_HigherEpochWins(t *testing.T) {
r := NewBlockVolumeRegistry()
// First heartbeat from stale primary (epoch 5).
r.UpdateFullHeartbeat("vs1:9333", []*master_pb.BlockVolumeInfoMessage{
{Path: "/data/vol1.blk", Epoch: 5, Role: 1, WalHeadLsn: 100, VolumeSize: 1 << 30},
}, "")
entry, ok := r.Lookup("vol1")
if !ok {
t.Fatal("vol1 not found after first heartbeat")
}
if entry.VolumeServer != "vs1:9333" {
t.Fatalf("expected vs1 as initial primary, got %q", entry.VolumeServer)
}
// Second heartbeat from real primary (epoch 6 — post-failover).
r.UpdateFullHeartbeat("vs2:9333", []*master_pb.BlockVolumeInfoMessage{
{Path: "/data/vol1.blk", Epoch: 6, Role: 1, WalHeadLsn: 150, VolumeSize: 1 << 30},
}, "")
entry, _ = r.Lookup("vol1")
if entry.VolumeServer != "vs2:9333" {
t.Fatalf("expected vs2 (higher epoch) as primary, got %q", entry.VolumeServer)
}
if entry.Epoch != 6 {
t.Fatalf("expected epoch 6, got %d", entry.Epoch)
}
// Old primary should be a replica.
if len(entry.Replicas) != 1 || entry.Replicas[0].Server != "vs1:9333" {
t.Fatalf("expected vs1 demoted to replica, got replicas=%+v", entry.Replicas)
}
}
func TestMasterRestart_LowerEpochBecomesReplica(t *testing.T) {
r := NewBlockVolumeRegistry()
// First heartbeat from real primary (epoch 6).
r.UpdateFullHeartbeat("vs2:9333", []*master_pb.BlockVolumeInfoMessage{
{Path: "/data/vol1.blk", Epoch: 6, Role: 1, WalHeadLsn: 150, VolumeSize: 1 << 30},
}, "")
// Second heartbeat from stale server (epoch 5).
r.UpdateFullHeartbeat("vs1:9333", []*master_pb.BlockVolumeInfoMessage{
{Path: "/data/vol1.blk", Epoch: 5, Role: 1, WalHeadLsn: 100, VolumeSize: 1 << 30},
}, "")
entry, _ := r.Lookup("vol1")
if entry.VolumeServer != "vs2:9333" {
t.Fatalf("expected vs2 (higher epoch) to stay primary, got %q", entry.VolumeServer)
}
if entry.Epoch != 6 {
t.Fatalf("expected epoch 6, got %d", entry.Epoch)
}
if len(entry.Replicas) != 1 || entry.Replicas[0].Server != "vs1:9333" {
t.Fatalf("expected vs1 added as replica, got replicas=%+v", entry.Replicas)
}
}
func TestMasterRestart_SameEpoch_HigherLSNWins(t *testing.T) {
r := NewBlockVolumeRegistry()
// First heartbeat: epoch 5, LSN 100.
r.UpdateFullHeartbeat("vs1:9333", []*master_pb.BlockVolumeInfoMessage{
{Path: "/data/vol1.blk", Epoch: 5, Role: 1, WalHeadLsn: 100, VolumeSize: 1 << 30},
}, "")
// Second heartbeat: same epoch 5, higher LSN 200 — heuristic: this server is more recent.
r.UpdateFullHeartbeat("vs2:9333", []*master_pb.BlockVolumeInfoMessage{
{Path: "/data/vol1.blk", Epoch: 5, Role: 1, WalHeadLsn: 200, VolumeSize: 1 << 30},
}, "")
entry, _ := r.Lookup("vol1")
if entry.VolumeServer != "vs2:9333" {
t.Fatalf("expected vs2 (higher LSN) as primary, got %q", entry.VolumeServer)
}
if len(entry.Replicas) != 1 || entry.Replicas[0].Server != "vs1:9333" {
t.Fatalf("expected vs1 demoted to replica, got replicas=%+v", entry.Replicas)
}
}
func TestMasterRestart_SameEpoch_SameLSN_ExistingWins(t *testing.T) {
r := NewBlockVolumeRegistry()
// First heartbeat: epoch 5, LSN 100.
r.UpdateFullHeartbeat("vs1:9333", []*master_pb.BlockVolumeInfoMessage{
{Path: "/data/vol1.blk", Epoch: 5, Role: 1, WalHeadLsn: 100, VolumeSize: 1 << 30},
}, "")
// Second heartbeat: same epoch 5, same LSN 100 — existing wins (deterministic).
r.UpdateFullHeartbeat("vs2:9333", []*master_pb.BlockVolumeInfoMessage{
{Path: "/data/vol1.blk", Epoch: 5, Role: 1, WalHeadLsn: 100, VolumeSize: 1 << 30},
}, "")
entry, _ := r.Lookup("vol1")
if entry.VolumeServer != "vs1:9333" {
t.Fatalf("expected vs1 (existing, same LSN) to stay primary, got %q", entry.VolumeServer)
}
if len(entry.Replicas) != 1 || entry.Replicas[0].Server != "vs2:9333" {
t.Fatalf("expected vs2 added as replica, got replicas=%+v", entry.Replicas)
}
}
func TestMasterRestart_ReplicaHeartbeat_AddedCorrectly(t *testing.T) {
r := NewBlockVolumeRegistry()
// Primary heartbeat first.
r.UpdateFullHeartbeat("vs1:9333", []*master_pb.BlockVolumeInfoMessage{
{Path: "/data/vol1.blk", Epoch: 5, Role: 1, WalHeadLsn: 100, VolumeSize: 1 << 30},
}, "")
// Replica heartbeat: same volume, lower epoch (stale replica never got promoted).
r.UpdateFullHeartbeat("vs2:9333", []*master_pb.BlockVolumeInfoMessage{
{Path: "/data/vol1.blk", Epoch: 4, Role: 2, WalHeadLsn: 90, VolumeSize: 1 << 30},
}, "")
entry, _ := r.Lookup("vol1")
if entry.VolumeServer != "vs1:9333" {
t.Fatalf("primary should be vs1, got %q", entry.VolumeServer)
}
if len(entry.Replicas) != 1 || entry.Replicas[0].Server != "vs2:9333" {
t.Fatalf("expected vs2 as replica, got %+v", entry.Replicas)
}
}
// --- Copy semantics tests (pointer escape fix) --- // --- Copy semantics tests (pointer escape fix) ---
func TestLookup_ReturnsCopy(t *testing.T) { func TestLookup_ReturnsCopy(t *testing.T) {

Loading…
Cancel
Save