From 9137fa648615c5f15c9d834ec88d2eab3ebec6f7 Mon Sep 17 00:00:00 2001 From: Ping Qiu Date: Tue, 24 Mar 2026 01:17:51 -0700 Subject: [PATCH] fix: epoch-based reconciliation on master restart reconstruction MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When a second server reports the same volume during master restart, UpdateFullHeartbeat now uses epoch-based tie-breaking instead of first-heartbeat-wins: 1. Higher epoch wins as primary — old entry demoted to replica 2. Same epoch — higher WALHeadLSN wins (heuristic, warning logged) 3. Lower epoch — added as replica Applied in both code paths: the auto-register branch (no entry exists yet for this name) and the unlinked-server branch (entry exists but this server is not in it). This is a deterministic reconstruction improvement, not ground truth. The long-term fix is persisting authoritative volume state. 5 new tests covering all reconciliation scenarios. Co-Authored-By: Claude Opus 4.6 (1M context) --- weed/server/master_block_registry.go | 144 ++++++++++++++++++---- weed/server/master_block_registry_test.go | 132 +++++++++++++++++++- 2 files changed, 251 insertions(+), 25 deletions(-) diff --git a/weed/server/master_block_registry.go b/weed/server/master_block_registry.go index 17c4634c7..28063eb57 100644 --- a/weed/server/master_block_registry.go +++ b/weed/server/master_block_registry.go @@ -499,28 +499,10 @@ func (r *BlockVolumeRegistry) UpdateFullHeartbeat(server string, infos []*master } } } else { - // Fix #3: Server reports a volume that exists but has no record of this server. - // This happens after master restart: primary heartbeat re-created the entry, - // but replica heartbeat arrives and isn't linked. Add as replica. - ri := ReplicaInfo{ - Server: server, - Path: info.Path, - HealthScore: info.HealthScore, - WALHeadLSN: info.WalHeadLsn, - LastHeartbeat: time.Now(), - Role: info.Role, - NvmeAddr: info.NvmeAddr, - NQN: info.Nqn, - } - existing.Replicas = append(existing.Replicas, ri) - r.addToServer(server, existingName) - // Sync deprecated scalar fields. - if len(existing.Replicas) == 1 { - existing.ReplicaServer = ri.Server - existing.ReplicaPath = ri.Path - } - glog.V(0).Infof("block registry: attached replica %s for %q from heartbeat (path=%s)", - server, existingName, info.Path) + // Server reports a volume that exists but has no record of this server. + // This happens after master restart. Use epoch-based reconciliation + // to determine if the new server should be primary or replica. + r.reconcileOnRestart(existingName, existing, server, info) } } else { // Auto-register volumes reported by heartbeat but not in registry. @@ -529,7 +511,8 @@ func (r *BlockVolumeRegistry) UpdateFullHeartbeat(server string, infos []*master if name == "" { continue } - if _, dup := r.volumes[name]; !dup { + existing, dup := r.volumes[name] + if !dup { entry := &BlockVolumeEntry{ Name: name, VolumeServer: server, @@ -550,18 +533,131 @@ func (r *BlockVolumeRegistry) UpdateFullHeartbeat(server string, infos []*master if info.ReplicaCtrlAddr != "" { entry.ReplicaCtrlAddr = info.ReplicaCtrlAddr } - // NVMe publication: propagate NVMe fields from heartbeat. entry.NvmeAddr = info.NvmeAddr entry.NQN = info.Nqn r.volumes[name] = entry r.addToServer(server, name) glog.V(0).Infof("block registry: auto-registered %q from heartbeat (server=%s, path=%s, size=%d)", name, server, info.Path, info.VolumeSize) + } else { + // Reconcile: a second server reports the same volume during restart reconstruction. + r.reconcileOnRestart(name, existing, server, info) } } } } +// reconcileOnRestart handles the case where a second server reports a volume +// name that already exists in the registry during master restart reconstruction. +// Uses epoch-based tie-breaking to determine who is the real primary. +// +// Rules: +// 1. Higher epoch wins as primary — the old entry becomes a replica. +// 2. Same epoch, both claim primary — higher WALHeadLSN wins (heuristic). +// A warning is logged because this is an ambiguous case. +// 3. Lower epoch — new server is added as replica. +// +// Caller must hold r.mu (write lock). +func (r *BlockVolumeRegistry) reconcileOnRestart(name string, existing *BlockVolumeEntry, newServer string, info *master_pb.BlockVolumeInfoMessage) { + newEpoch := info.Epoch + oldEpoch := existing.Epoch + + if newEpoch > oldEpoch { + // New server has a higher epoch — it is the authoritative primary. + // Demote the current entry to a replica. + glog.V(0).Infof("block registry: reconcile %q: new server %s epoch %d > existing %s epoch %d, promoting new", + name, newServer, newEpoch, existing.VolumeServer, oldEpoch) + r.demoteExistingToReplica(name, existing, newServer, info) + return + } + + if newEpoch < oldEpoch { + // New server has a lower epoch — add as replica. + glog.V(0).Infof("block registry: reconcile %q: new server %s epoch %d < existing %s epoch %d, adding as replica", + name, newServer, newEpoch, existing.VolumeServer, oldEpoch) + r.addServerAsReplica(name, existing, newServer, info) + return + } + + // Same epoch — ambiguous. Use WALHeadLSN as heuristic tiebreaker. + if info.WalHeadLsn > existing.WALHeadLSN { + glog.Warningf("block registry: reconcile %q: AMBIGUOUS same epoch %d — new server %s LSN %d > existing %s LSN %d, promoting new (heuristic)", + name, newEpoch, newServer, info.WalHeadLsn, existing.VolumeServer, existing.WALHeadLSN) + r.demoteExistingToReplica(name, existing, newServer, info) + } else { + glog.Warningf("block registry: reconcile %q: AMBIGUOUS same epoch %d — existing %s LSN %d >= new server %s LSN %d, keeping existing (heuristic)", + name, newEpoch, existing.VolumeServer, existing.WALHeadLSN, newServer, info.WalHeadLsn) + r.addServerAsReplica(name, existing, newServer, info) + } +} + +// demoteExistingToReplica swaps the primary: new server becomes primary, +// old primary becomes a replica. Called during restart reconciliation. +// Caller must hold r.mu. +func (r *BlockVolumeRegistry) demoteExistingToReplica(name string, existing *BlockVolumeEntry, newServer string, info *master_pb.BlockVolumeInfoMessage) { + oldServer := existing.VolumeServer + oldPath := existing.Path + + // Save old primary as replica. + oldReplica := ReplicaInfo{ + Server: oldServer, + Path: oldPath, + ISCSIAddr: existing.ISCSIAddr, + IQN: existing.IQN, + NvmeAddr: existing.NvmeAddr, + NQN: existing.NQN, + HealthScore: existing.HealthScore, + WALHeadLSN: existing.WALHeadLSN, + LastHeartbeat: existing.LastLeaseGrant, + Role: blockvol.RoleToWire(blockvol.RoleReplica), + } + + // Update entry to reflect new primary. + existing.VolumeServer = newServer + existing.Path = info.Path + existing.Epoch = info.Epoch + existing.Role = info.Role + existing.HealthScore = info.HealthScore + existing.WALHeadLSN = info.WalHeadLsn + existing.LastLeaseGrant = time.Now() + if info.DurabilityMode != "" { + existing.DurabilityMode = info.DurabilityMode + } + existing.NvmeAddr = info.NvmeAddr + existing.NQN = info.Nqn + + // Add old primary as replica. + existing.Replicas = append(existing.Replicas, oldReplica) + r.addToServer(newServer, name) + + // Sync deprecated scalar fields. + if len(existing.Replicas) == 1 { + existing.ReplicaServer = oldReplica.Server + existing.ReplicaPath = oldReplica.Path + } +} + +// addServerAsReplica adds the new server as a replica for the existing entry. +// Caller must hold r.mu. +func (r *BlockVolumeRegistry) addServerAsReplica(name string, existing *BlockVolumeEntry, newServer string, info *master_pb.BlockVolumeInfoMessage) { + ri := ReplicaInfo{ + Server: newServer, + Path: info.Path, + HealthScore: info.HealthScore, + WALHeadLSN: info.WalHeadLsn, + LastHeartbeat: time.Now(), + Role: info.Role, + NvmeAddr: info.NvmeAddr, + NQN: info.Nqn, + } + existing.Replicas = append(existing.Replicas, ri) + r.addToServer(newServer, name) + if len(existing.Replicas) == 1 { + existing.ReplicaServer = ri.Server + existing.ReplicaPath = ri.Path + } +} + // UpdateDeltaHeartbeat processes incremental new/deleted block volumes. // Called on subsequent heartbeats (not the first). func (r *BlockVolumeRegistry) UpdateDeltaHeartbeat(server string, added []*master_pb.BlockVolumeShortInfoMessage, removed []*master_pb.BlockVolumeShortInfoMessage) { diff --git a/weed/server/master_block_registry_test.go b/weed/server/master_block_registry_test.go index ce239fb90..2389bc787 100644 --- a/weed/server/master_block_registry_test.go +++ b/weed/server/master_block_registry_test.go @@ -802,15 +802,18 @@ func TestRegistry_ReplicaHeartbeat_StaleReplicaRemoved(t *testing.T) { // Fix #3: Replica heartbeat after master restart reconstructs ReplicaInfo. func TestRegistry_ReplicaHeartbeat_ReconstructsAfterRestart(t *testing.T) { r := NewBlockVolumeRegistry() - // Simulate master restart: primary heartbeat re-created entry without replicas. + // Simulate master restart: primary heartbeat re-created entry (epoch 1). r.Register(&BlockVolumeEntry{ Name: "vol1", VolumeServer: "primary", Path: "/data/vol1.blk", + Epoch: 1, + WALHeadLSN: 100, Status: StatusActive, }) // Replica heartbeat arrives — vol1 exists but has no record of this server. + // Same epoch, lower LSN, Role=2 (replica) → added as replica. r.UpdateFullHeartbeat("replica1", []*master_pb.BlockVolumeInfoMessage{ {Path: "/data/vol1.blk", Epoch: 1, Role: 2, HealthScore: 0.95, WalHeadLsn: 42}, }, "") @@ -1618,6 +1621,133 @@ func TestRegistry_ManualPromote_Force_StillRejectsDeadServer(t *testing.T) { } } +// --- Master restart reconciliation tests --- + +func TestMasterRestart_HigherEpochWins(t *testing.T) { + r := NewBlockVolumeRegistry() + + // First heartbeat from stale primary (epoch 5). + r.UpdateFullHeartbeat("vs1:9333", []*master_pb.BlockVolumeInfoMessage{ + {Path: "/data/vol1.blk", Epoch: 5, Role: 1, WalHeadLsn: 100, VolumeSize: 1 << 30}, + }, "") + + entry, ok := r.Lookup("vol1") + if !ok { + t.Fatal("vol1 not found after first heartbeat") + } + if entry.VolumeServer != "vs1:9333" { + t.Fatalf("expected vs1 as initial primary, got %q", entry.VolumeServer) + } + + // Second heartbeat from real primary (epoch 6 — post-failover). + r.UpdateFullHeartbeat("vs2:9333", []*master_pb.BlockVolumeInfoMessage{ + {Path: "/data/vol1.blk", Epoch: 6, Role: 1, WalHeadLsn: 150, VolumeSize: 1 << 30}, + }, "") + + entry, _ = r.Lookup("vol1") + if entry.VolumeServer != "vs2:9333" { + t.Fatalf("expected vs2 (higher epoch) as primary, got %q", entry.VolumeServer) + } + if entry.Epoch != 6 { + t.Fatalf("expected epoch 6, got %d", entry.Epoch) + } + // Old primary should be a replica. + if len(entry.Replicas) != 1 || entry.Replicas[0].Server != "vs1:9333" { + t.Fatalf("expected vs1 demoted to replica, got replicas=%+v", entry.Replicas) + } +} + +func TestMasterRestart_LowerEpochBecomesReplica(t *testing.T) { + r := NewBlockVolumeRegistry() + + // First heartbeat from real primary (epoch 6). + r.UpdateFullHeartbeat("vs2:9333", []*master_pb.BlockVolumeInfoMessage{ + {Path: "/data/vol1.blk", Epoch: 6, Role: 1, WalHeadLsn: 150, VolumeSize: 1 << 30}, + }, "") + + // Second heartbeat from stale server (epoch 5). + r.UpdateFullHeartbeat("vs1:9333", []*master_pb.BlockVolumeInfoMessage{ + {Path: "/data/vol1.blk", Epoch: 5, Role: 1, WalHeadLsn: 100, VolumeSize: 1 << 30}, + }, "") + + entry, _ := r.Lookup("vol1") + if entry.VolumeServer != "vs2:9333" { + t.Fatalf("expected vs2 (higher epoch) to stay primary, got %q", entry.VolumeServer) + } + if entry.Epoch != 6 { + t.Fatalf("expected epoch 6, got %d", entry.Epoch) + } + if len(entry.Replicas) != 1 || entry.Replicas[0].Server != "vs1:9333" { + t.Fatalf("expected vs1 added as replica, got replicas=%+v", entry.Replicas) + } +} + +func TestMasterRestart_SameEpoch_HigherLSNWins(t *testing.T) { + r := NewBlockVolumeRegistry() + + // First heartbeat: epoch 5, LSN 100. + r.UpdateFullHeartbeat("vs1:9333", []*master_pb.BlockVolumeInfoMessage{ + {Path: "/data/vol1.blk", Epoch: 5, Role: 1, WalHeadLsn: 100, VolumeSize: 1 << 30}, + }, "") + + // Second heartbeat: same epoch 5, higher LSN 200 — heuristic: this server is more recent. + r.UpdateFullHeartbeat("vs2:9333", []*master_pb.BlockVolumeInfoMessage{ + {Path: "/data/vol1.blk", Epoch: 5, Role: 1, WalHeadLsn: 200, VolumeSize: 1 << 30}, + }, "") + + entry, _ := r.Lookup("vol1") + if entry.VolumeServer != "vs2:9333" { + t.Fatalf("expected vs2 (higher LSN) as primary, got %q", entry.VolumeServer) + } + if len(entry.Replicas) != 1 || entry.Replicas[0].Server != "vs1:9333" { + t.Fatalf("expected vs1 demoted to replica, got replicas=%+v", entry.Replicas) + } +} + +func TestMasterRestart_SameEpoch_SameLSN_ExistingWins(t *testing.T) { + r := NewBlockVolumeRegistry() + + // First heartbeat: epoch 5, LSN 100. + r.UpdateFullHeartbeat("vs1:9333", []*master_pb.BlockVolumeInfoMessage{ + {Path: "/data/vol1.blk", Epoch: 5, Role: 1, WalHeadLsn: 100, VolumeSize: 1 << 30}, + }, "") + + // Second heartbeat: same epoch 5, same LSN 100 — existing wins (deterministic). + r.UpdateFullHeartbeat("vs2:9333", []*master_pb.BlockVolumeInfoMessage{ + {Path: "/data/vol1.blk", Epoch: 5, Role: 1, WalHeadLsn: 100, VolumeSize: 1 << 30}, + }, "") + + entry, _ := r.Lookup("vol1") + if entry.VolumeServer != "vs1:9333" { + t.Fatalf("expected vs1 (existing, same LSN) to stay primary, got %q", entry.VolumeServer) + } + if len(entry.Replicas) != 1 || entry.Replicas[0].Server != "vs2:9333" { + t.Fatalf("expected vs2 added as replica, got replicas=%+v", entry.Replicas) + } +} + +func TestMasterRestart_ReplicaHeartbeat_AddedCorrectly(t *testing.T) { + r := NewBlockVolumeRegistry() + + // Primary heartbeat first. + r.UpdateFullHeartbeat("vs1:9333", []*master_pb.BlockVolumeInfoMessage{ + {Path: "/data/vol1.blk", Epoch: 5, Role: 1, WalHeadLsn: 100, VolumeSize: 1 << 30}, + }, "") + + // Replica heartbeat: same volume, lower epoch (stale replica never got promoted). + r.UpdateFullHeartbeat("vs2:9333", []*master_pb.BlockVolumeInfoMessage{ + {Path: "/data/vol1.blk", Epoch: 4, Role: 2, WalHeadLsn: 90, VolumeSize: 1 << 30}, + }, "") + + entry, _ := r.Lookup("vol1") + if entry.VolumeServer != "vs1:9333" { + t.Fatalf("primary should be vs1, got %q", entry.VolumeServer) + } + if len(entry.Replicas) != 1 || entry.Replicas[0].Server != "vs2:9333" { + t.Fatalf("expected vs2 as replica, got %+v", entry.Replicas) + } +} + // --- Copy semantics tests (pointer escape fix) --- func TestLookup_ReturnsCopy(t *testing.T) {