From a7b1b4cb22c73748cff02544830898b572e29671 Mon Sep 17 00:00:00 2001 From: Ping Qiu Date: Wed, 11 Mar 2026 15:33:45 -0700 Subject: [PATCH] fix: propagate NVMe fields through replica creation, heartbeat, and promotion ReplicaInfo now carries NvmeAddr/NQN. Fields are populated during replica allocation (tryCreateOneReplica), updated from replica heartbeats, and copied in PromoteBestReplica. This ensures master lookup returns correct NVMe endpoints immediately after failover, without waiting for the first post-promotion heartbeat. Co-Authored-By: Claude Opus 4.6 --- weed/server/master_block_registry.go | 15 ++++ weed/server/master_grpc_server_block.go | 2 + weed/server/master_grpc_server_block_test.go | 79 ++++++++++++++++++++ 3 files changed, 96 insertions(+) diff --git a/weed/server/master_block_registry.go b/weed/server/master_block_registry.go index 33ce17645..5af63ce91 100644 --- a/weed/server/master_block_registry.go +++ b/weed/server/master_block_registry.go @@ -28,6 +28,8 @@ type ReplicaInfo struct { Path string // file path on replica VS ISCSIAddr string // iSCSI target address IQN string // iSCSI qualified name + NvmeAddr string // NVMe/TCP target address (ip:port), empty if NVMe disabled + NQN string // NVMe subsystem NQN, empty if NVMe disabled DataAddr string // WAL receiver data listen addr CtrlAddr string // WAL receiver ctrl listen addr HealthScore float64 // from heartbeat (0.0-1.0) @@ -336,6 +338,10 @@ func (r *BlockVolumeRegistry) UpdateFullHeartbeat(server string, infos []*master if info.ReplicaCtrlAddr != "" { existing.ReplicaCtrlAddr = info.ReplicaCtrlAddr } + // NVMe publication: update NVMe fields from heartbeat. + // Required for master restart reconstruction and NVMe enable/disable. + existing.NvmeAddr = info.NvmeAddr + existing.NQN = info.Nqn // Sync first replica's data addrs to Replicas[]. if info.ReplicaDataAddr != "" && len(existing.Replicas) > 0 { existing.Replicas[0].DataAddr = info.ReplicaDataAddr @@ -350,6 +356,8 @@ func (r *BlockVolumeRegistry) UpdateFullHeartbeat(server string, infos []*master existing.Replicas[i].HealthScore = info.HealthScore existing.Replicas[i].LastHeartbeat = time.Now() existing.Replicas[i].Role = info.Role + existing.Replicas[i].NvmeAddr = info.NvmeAddr + existing.Replicas[i].NQN = info.Nqn if existing.WALHeadLSN > info.WalHeadLsn { existing.Replicas[i].WALLag = existing.WALHeadLSN - info.WalHeadLsn } else { @@ -369,6 +377,8 @@ func (r *BlockVolumeRegistry) UpdateFullHeartbeat(server string, infos []*master WALHeadLSN: info.WalHeadLsn, LastHeartbeat: time.Now(), Role: info.Role, + NvmeAddr: info.NvmeAddr, + NQN: info.Nqn, } existing.Replicas = append(existing.Replicas, ri) r.addToServer(server, existingName) @@ -408,6 +418,9 @@ func (r *BlockVolumeRegistry) UpdateFullHeartbeat(server string, infos []*master if info.ReplicaCtrlAddr != "" { entry.ReplicaCtrlAddr = info.ReplicaCtrlAddr } + // NVMe publication: propagate NVMe fields from heartbeat. + entry.NvmeAddr = info.NvmeAddr + entry.NQN = info.Nqn r.volumes[name] = entry r.addToServer(server, name) glog.V(0).Infof("block registry: auto-registered %q from heartbeat (server=%s, path=%s, size=%d)", @@ -816,6 +829,8 @@ func (r *BlockVolumeRegistry) PromoteBestReplica(name string) (uint64, error) { entry.Path = promoted.Path entry.IQN = promoted.IQN entry.ISCSIAddr = promoted.ISCSIAddr + entry.NvmeAddr = promoted.NvmeAddr + entry.NQN = promoted.NQN entry.Epoch = newEpoch entry.Role = blockvol.RoleToWire(blockvol.RolePrimary) entry.LastLeaseGrant = time.Now() diff --git a/weed/server/master_grpc_server_block.go b/weed/server/master_grpc_server_block.go index 4c4a3f5f2..e41e8af2f 100644 --- a/weed/server/master_grpc_server_block.go +++ b/weed/server/master_grpc_server_block.go @@ -287,6 +287,8 @@ func (ms *MasterServer) tryCreateOneReplica(ctx context.Context, req *master_pb. Path: replicaResult.Path, ISCSIAddr: replicaResult.ISCSIAddr, IQN: replicaResult.IQN, + NvmeAddr: replicaResult.NvmeAddr, + NQN: replicaResult.NQN, DataAddr: replicaResult.ReplicaDataAddr, CtrlAddr: replicaResult.ReplicaCtrlAddr, }) diff --git a/weed/server/master_grpc_server_block_test.go b/weed/server/master_grpc_server_block_test.go index 3d10607bb..ef4c9bc5f 100644 --- a/weed/server/master_grpc_server_block_test.go +++ b/weed/server/master_grpc_server_block_test.go @@ -6,6 +6,7 @@ import ( "sync" "sync/atomic" "testing" + "time" "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb" @@ -1099,3 +1100,81 @@ func TestMaster_NoNvmeFieldsWhenDisabled(t *testing.T) { t.Fatalf("Lookup NVMe fields should be empty, got addr=%q nqn=%q", lresp.NvmeAddr, lresp.Nqn) } } + +func TestMaster_PromotionCopiesNvmeFields(t *testing.T) { + ms := testMasterServer(t) + + // Directly register an entry with primary + replica, both having NVMe fields. + ms.blockRegistry.Register(&BlockVolumeEntry{ + Name: "ha-vol", + VolumeServer: "vs1:9333", + Path: "/data/ha-vol.blk", + IQN: "iqn.2024.test:ha-vol", + ISCSIAddr: "vs1:3260", + NvmeAddr: "vs1:4420", + NQN: "nqn.2024-01.com.seaweedfs:vol.ha-vol.vs1", + SizeBytes: 1 << 30, + Epoch: 5, + Role: 1, // RolePrimary + LeaseTTL: 30 * time.Second, + Replicas: []ReplicaInfo{ + { + Server: "vs2:9333", + Path: "/data/ha-vol.blk", + IQN: "iqn.2024.test:ha-vol-r", + ISCSIAddr: "vs2:3260", + NvmeAddr: "vs2:4420", + NQN: "nqn.2024-01.com.seaweedfs:vol.ha-vol.vs2", + DataAddr: "vs2:14260", + CtrlAddr: "vs2:14261", + HealthScore: 0.95, + WALHeadLSN: 100, + }, + }, + }) + // Wire byServer index for replica. + ms.blockRegistry.mu.Lock() + ms.blockRegistry.addToServer("vs2:9333", "ha-vol") + ms.blockRegistry.mu.Unlock() + + // Pre-promotion: verify primary NVMe fields. + entry, _ := ms.blockRegistry.Lookup("ha-vol") + if entry.NvmeAddr != "vs1:4420" { + t.Fatalf("pre-promotion NvmeAddr: got %q, want vs1:4420", entry.NvmeAddr) + } + + // Promote replica. + newEpoch, err := ms.blockRegistry.PromoteBestReplica("ha-vol") + if err != nil { + t.Fatalf("PromoteBestReplica: %v", err) + } + if newEpoch != 6 { + t.Fatalf("newEpoch: got %d, want 6", newEpoch) + } + + // After promotion: entry should have replica's NVMe fields. + entry, _ = ms.blockRegistry.Lookup("ha-vol") + if entry.VolumeServer != "vs2:9333" { + t.Fatalf("after promotion, VolumeServer: got %q, want vs2:9333", entry.VolumeServer) + } + if entry.NvmeAddr != "vs2:4420" { + t.Fatalf("after promotion, NvmeAddr: got %q, want vs2:4420", entry.NvmeAddr) + } + if entry.NQN != "nqn.2024-01.com.seaweedfs:vol.ha-vol.vs2" { + t.Fatalf("after promotion, NQN: got %q", entry.NQN) + } + + // Lookup should return the promoted replica's NVMe fields immediately. + lresp, err := ms.LookupBlockVolume(context.Background(), &master_pb.LookupBlockVolumeRequest{ + Name: "ha-vol", + }) + if err != nil { + t.Fatalf("LookupBlockVolume: %v", err) + } + if lresp.NvmeAddr != "vs2:4420" { + t.Fatalf("Lookup NvmeAddr after promotion: got %q, want vs2:4420", lresp.NvmeAddr) + } + if lresp.Nqn != "nqn.2024-01.com.seaweedfs:vol.ha-vol.vs2" { + t.Fatalf("Lookup Nqn after promotion: got %q", lresp.Nqn) + } +}