diff --git a/weed/server/master_block_registry.go b/weed/server/master_block_registry.go index 28063eb57..6289dac53 100644 --- a/weed/server/master_block_registry.go +++ b/weed/server/master_block_registry.go @@ -575,19 +575,46 @@ func (r *BlockVolumeRegistry) reconcileOnRestart(name string, existing *BlockVol // New server has a lower epoch — add as replica. glog.V(0).Infof("block registry: reconcile %q: new server %s epoch %d < existing %s epoch %d, adding as replica", name, newServer, newEpoch, existing.VolumeServer, oldEpoch) - r.addServerAsReplica(name, existing, newServer, info) + r.upsertServerAsReplica(name, existing, newServer, info) return } - // Same epoch — ambiguous. Use WALHeadLSN as heuristic tiebreaker. - if info.WalHeadLsn > existing.WALHeadLSN { - glog.Warningf("block registry: reconcile %q: AMBIGUOUS same epoch %d — new server %s LSN %d > existing %s LSN %d, promoting new (heuristic)", - name, newEpoch, newServer, info.WalHeadLsn, existing.VolumeServer, existing.WALHeadLSN) + // Same epoch — trust reported roles first. + existingIsPrimary := existing.Role == blockvol.RoleToWire(blockvol.RolePrimary) + newIsPrimary := info.Role == blockvol.RoleToWire(blockvol.RolePrimary) + + if existingIsPrimary && !newIsPrimary { + // Existing claims primary, new claims replica — trust roles. + glog.V(0).Infof("block registry: reconcile %q: same epoch %d, existing %s is primary, new %s is replica — keeping existing", + name, newEpoch, existing.VolumeServer, newServer) + r.upsertServerAsReplica(name, existing, newServer, info) + return + } + if !existingIsPrimary && newIsPrimary { + // New claims primary, existing is not — trust roles. + glog.V(0).Infof("block registry: reconcile %q: same epoch %d, new %s claims primary, existing %s does not — promoting new", + name, newEpoch, newServer, existing.VolumeServer) r.demoteExistingToReplica(name, existing, newServer, info) + return + } + + // Both claim primary or both claim replica — ambiguous. Use WALHeadLSN as heuristic. + if newIsPrimary { + // Both claim primary. + if info.WalHeadLsn > existing.WALHeadLSN { + glog.Warningf("block registry: reconcile %q: AMBIGUOUS same epoch %d, both primary — new %s LSN %d > existing %s LSN %d, promoting new (heuristic)", + name, newEpoch, newServer, info.WalHeadLsn, existing.VolumeServer, existing.WALHeadLSN) + r.demoteExistingToReplica(name, existing, newServer, info) + } else { + glog.Warningf("block registry: reconcile %q: AMBIGUOUS same epoch %d, both primary — existing %s LSN %d >= new %s LSN %d, keeping existing (heuristic)", + name, newEpoch, existing.VolumeServer, existing.WALHeadLSN, newServer, info.WalHeadLsn) + r.upsertServerAsReplica(name, existing, newServer, info) + } } else { - glog.Warningf("block registry: reconcile %q: AMBIGUOUS same epoch %d — existing %s LSN %d >= new server %s LSN %d, keeping existing (heuristic)", - name, newEpoch, existing.VolumeServer, existing.WALHeadLSN, newServer, info.WalHeadLsn) - r.addServerAsReplica(name, existing, newServer, info) + // Both claim replica — no primary known. Keep existing, log ambiguity. + glog.Warningf("block registry: reconcile %q: AMBIGUOUS same epoch %d, neither claims primary — keeping existing %s, adding new %s as replica", + name, newEpoch, existing.VolumeServer, newServer) + r.upsertServerAsReplica(name, existing, newServer, info) } } @@ -637,9 +664,26 @@ func (r *BlockVolumeRegistry) demoteExistingToReplica(name string, existing *Blo } } -// addServerAsReplica adds the new server as a replica for the existing entry. +// upsertServerAsReplica adds or updates the server as a replica for the existing entry. +// If the server already exists in Replicas[], its fields are updated instead of appending +// a duplicate. This prevents duplicate replica entries during restart/replay windows. // Caller must hold r.mu. -func (r *BlockVolumeRegistry) addServerAsReplica(name string, existing *BlockVolumeEntry, newServer string, info *master_pb.BlockVolumeInfoMessage) { +func (r *BlockVolumeRegistry) upsertServerAsReplica(name string, existing *BlockVolumeEntry, newServer string, info *master_pb.BlockVolumeInfoMessage) { + // Check for existing replica entry for this server. + for i := range existing.Replicas { + if existing.Replicas[i].Server == newServer { + // Update in place. + existing.Replicas[i].Path = info.Path + existing.Replicas[i].HealthScore = info.HealthScore + existing.Replicas[i].WALHeadLSN = info.WalHeadLsn + existing.Replicas[i].LastHeartbeat = time.Now() + existing.Replicas[i].Role = info.Role + existing.Replicas[i].NvmeAddr = info.NvmeAddr + existing.Replicas[i].NQN = info.Nqn + return + } + } + // New replica — append. ri := ReplicaInfo{ Server: newServer, Path: info.Path, diff --git a/weed/server/master_block_registry_test.go b/weed/server/master_block_registry_test.go index 2389bc787..cd3ed34c4 100644 --- a/weed/server/master_block_registry_test.go +++ b/weed/server/master_block_registry_test.go @@ -1748,6 +1748,60 @@ func TestMasterRestart_ReplicaHeartbeat_AddedCorrectly(t *testing.T) { } } +func TestMasterRestart_SameEpoch_RoleTrusted(t *testing.T) { + r := NewBlockVolumeRegistry() + + // First heartbeat: vs1 claims primary, epoch 5, LSN 50. + r.UpdateFullHeartbeat("vs1:9333", []*master_pb.BlockVolumeInfoMessage{ + {Path: "/data/vol1.blk", Epoch: 5, Role: 1, WalHeadLsn: 50, VolumeSize: 1 << 30}, + }, "") + + // Second heartbeat: vs2 claims replica (Role=2), same epoch 5, HIGHER LSN. + // Even though LSN is higher, it reports Role=replica, so it should NOT become primary. + r.UpdateFullHeartbeat("vs2:9333", []*master_pb.BlockVolumeInfoMessage{ + {Path: "/data/vol1.blk", Epoch: 5, Role: 2, WalHeadLsn: 200, VolumeSize: 1 << 30}, + }, "") + + entry, _ := r.Lookup("vol1") + if entry.VolumeServer != "vs1:9333" { + t.Fatalf("expected vs1 (claims primary) to stay primary, got %q", entry.VolumeServer) + } + if len(entry.Replicas) != 1 || entry.Replicas[0].Server != "vs2:9333" { + t.Fatalf("expected vs2 as replica, got %+v", entry.Replicas) + } +} + +func TestMasterRestart_DuplicateReplicaHeartbeat_NoDuplicate(t *testing.T) { + r := NewBlockVolumeRegistry() + + // Primary heartbeat. + r.UpdateFullHeartbeat("vs1:9333", []*master_pb.BlockVolumeInfoMessage{ + {Path: "/data/vol1.blk", Epoch: 5, Role: 1, WalHeadLsn: 100, VolumeSize: 1 << 30}, + }, "") + + // Replica heartbeat — first time. + r.UpdateFullHeartbeat("vs2:9333", []*master_pb.BlockVolumeInfoMessage{ + {Path: "/data/vol1.blk", Epoch: 5, Role: 2, WalHeadLsn: 90, VolumeSize: 1 << 30, HealthScore: 0.8}, + }, "") + + // Same replica heartbeat again — should update, not duplicate. + r.UpdateFullHeartbeat("vs2:9333", []*master_pb.BlockVolumeInfoMessage{ + {Path: "/data/vol1.blk", Epoch: 5, Role: 2, WalHeadLsn: 95, VolumeSize: 1 << 30, HealthScore: 0.9}, + }, "") + + entry, _ := r.Lookup("vol1") + if len(entry.Replicas) != 1 { + t.Fatalf("expected 1 replica (no duplicates), got %d", len(entry.Replicas)) + } + // Should have the updated values from the second heartbeat. + if entry.Replicas[0].WALHeadLSN != 95 { + t.Fatalf("expected updated LSN 95, got %d", entry.Replicas[0].WALHeadLSN) + } + if entry.Replicas[0].HealthScore != 0.9 { + t.Fatalf("expected updated health 0.9, got %f", entry.Replicas[0].HealthScore) + } +} + // --- Copy semantics tests (pointer escape fix) --- func TestLookup_ReturnsCopy(t *testing.T) {