Browse Source

fix: propagate NVMe fields through replica creation, heartbeat, and promotion

ReplicaInfo now carries NvmeAddr/NQN. Fields are populated during
replica allocation (tryCreateOneReplica), updated from replica
heartbeats, and copied in PromoteBestReplica. This ensures master
lookup returns correct NVMe endpoints immediately after failover,
without waiting for the first post-promotion heartbeat.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
feature/sw-block
Ping Qiu 2 days ago
parent
commit
a7b1b4cb22
  1. 15
      weed/server/master_block_registry.go
  2. 2
      weed/server/master_grpc_server_block.go
  3. 79
      weed/server/master_grpc_server_block_test.go

15
weed/server/master_block_registry.go

@ -28,6 +28,8 @@ type ReplicaInfo struct {
Path string // file path on replica VS Path string // file path on replica VS
ISCSIAddr string // iSCSI target address ISCSIAddr string // iSCSI target address
IQN string // iSCSI qualified name IQN string // iSCSI qualified name
NvmeAddr string // NVMe/TCP target address (ip:port), empty if NVMe disabled
NQN string // NVMe subsystem NQN, empty if NVMe disabled
DataAddr string // WAL receiver data listen addr DataAddr string // WAL receiver data listen addr
CtrlAddr string // WAL receiver ctrl listen addr CtrlAddr string // WAL receiver ctrl listen addr
HealthScore float64 // from heartbeat (0.0-1.0) HealthScore float64 // from heartbeat (0.0-1.0)
@ -336,6 +338,10 @@ func (r *BlockVolumeRegistry) UpdateFullHeartbeat(server string, infos []*master
if info.ReplicaCtrlAddr != "" { if info.ReplicaCtrlAddr != "" {
existing.ReplicaCtrlAddr = info.ReplicaCtrlAddr existing.ReplicaCtrlAddr = info.ReplicaCtrlAddr
} }
// NVMe publication: update NVMe fields from heartbeat.
// Required for master restart reconstruction and NVMe enable/disable.
existing.NvmeAddr = info.NvmeAddr
existing.NQN = info.Nqn
// Sync first replica's data addrs to Replicas[]. // Sync first replica's data addrs to Replicas[].
if info.ReplicaDataAddr != "" && len(existing.Replicas) > 0 { if info.ReplicaDataAddr != "" && len(existing.Replicas) > 0 {
existing.Replicas[0].DataAddr = info.ReplicaDataAddr existing.Replicas[0].DataAddr = info.ReplicaDataAddr
@ -350,6 +356,8 @@ func (r *BlockVolumeRegistry) UpdateFullHeartbeat(server string, infos []*master
existing.Replicas[i].HealthScore = info.HealthScore existing.Replicas[i].HealthScore = info.HealthScore
existing.Replicas[i].LastHeartbeat = time.Now() existing.Replicas[i].LastHeartbeat = time.Now()
existing.Replicas[i].Role = info.Role existing.Replicas[i].Role = info.Role
existing.Replicas[i].NvmeAddr = info.NvmeAddr
existing.Replicas[i].NQN = info.Nqn
if existing.WALHeadLSN > info.WalHeadLsn { if existing.WALHeadLSN > info.WalHeadLsn {
existing.Replicas[i].WALLag = existing.WALHeadLSN - info.WalHeadLsn existing.Replicas[i].WALLag = existing.WALHeadLSN - info.WalHeadLsn
} else { } else {
@ -369,6 +377,8 @@ func (r *BlockVolumeRegistry) UpdateFullHeartbeat(server string, infos []*master
WALHeadLSN: info.WalHeadLsn, WALHeadLSN: info.WalHeadLsn,
LastHeartbeat: time.Now(), LastHeartbeat: time.Now(),
Role: info.Role, Role: info.Role,
NvmeAddr: info.NvmeAddr,
NQN: info.Nqn,
} }
existing.Replicas = append(existing.Replicas, ri) existing.Replicas = append(existing.Replicas, ri)
r.addToServer(server, existingName) r.addToServer(server, existingName)
@ -408,6 +418,9 @@ func (r *BlockVolumeRegistry) UpdateFullHeartbeat(server string, infos []*master
if info.ReplicaCtrlAddr != "" { if info.ReplicaCtrlAddr != "" {
entry.ReplicaCtrlAddr = info.ReplicaCtrlAddr entry.ReplicaCtrlAddr = info.ReplicaCtrlAddr
} }
// NVMe publication: propagate NVMe fields from heartbeat.
entry.NvmeAddr = info.NvmeAddr
entry.NQN = info.Nqn
r.volumes[name] = entry r.volumes[name] = entry
r.addToServer(server, name) r.addToServer(server, name)
glog.V(0).Infof("block registry: auto-registered %q from heartbeat (server=%s, path=%s, size=%d)", glog.V(0).Infof("block registry: auto-registered %q from heartbeat (server=%s, path=%s, size=%d)",
@ -816,6 +829,8 @@ func (r *BlockVolumeRegistry) PromoteBestReplica(name string) (uint64, error) {
entry.Path = promoted.Path entry.Path = promoted.Path
entry.IQN = promoted.IQN entry.IQN = promoted.IQN
entry.ISCSIAddr = promoted.ISCSIAddr entry.ISCSIAddr = promoted.ISCSIAddr
entry.NvmeAddr = promoted.NvmeAddr
entry.NQN = promoted.NQN
entry.Epoch = newEpoch entry.Epoch = newEpoch
entry.Role = blockvol.RoleToWire(blockvol.RolePrimary) entry.Role = blockvol.RoleToWire(blockvol.RolePrimary)
entry.LastLeaseGrant = time.Now() entry.LastLeaseGrant = time.Now()

2
weed/server/master_grpc_server_block.go

@ -287,6 +287,8 @@ func (ms *MasterServer) tryCreateOneReplica(ctx context.Context, req *master_pb.
Path: replicaResult.Path, Path: replicaResult.Path,
ISCSIAddr: replicaResult.ISCSIAddr, ISCSIAddr: replicaResult.ISCSIAddr,
IQN: replicaResult.IQN, IQN: replicaResult.IQN,
NvmeAddr: replicaResult.NvmeAddr,
NQN: replicaResult.NQN,
DataAddr: replicaResult.ReplicaDataAddr, DataAddr: replicaResult.ReplicaDataAddr,
CtrlAddr: replicaResult.ReplicaCtrlAddr, CtrlAddr: replicaResult.ReplicaCtrlAddr,
}) })

79
weed/server/master_grpc_server_block_test.go

@ -6,6 +6,7 @@ import (
"sync" "sync"
"sync/atomic" "sync/atomic"
"testing" "testing"
"time"
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb" "github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb" "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
@ -1099,3 +1100,81 @@ func TestMaster_NoNvmeFieldsWhenDisabled(t *testing.T) {
t.Fatalf("Lookup NVMe fields should be empty, got addr=%q nqn=%q", lresp.NvmeAddr, lresp.Nqn) t.Fatalf("Lookup NVMe fields should be empty, got addr=%q nqn=%q", lresp.NvmeAddr, lresp.Nqn)
} }
} }
func TestMaster_PromotionCopiesNvmeFields(t *testing.T) {
ms := testMasterServer(t)
// Directly register an entry with primary + replica, both having NVMe fields.
ms.blockRegistry.Register(&BlockVolumeEntry{
Name: "ha-vol",
VolumeServer: "vs1:9333",
Path: "/data/ha-vol.blk",
IQN: "iqn.2024.test:ha-vol",
ISCSIAddr: "vs1:3260",
NvmeAddr: "vs1:4420",
NQN: "nqn.2024-01.com.seaweedfs:vol.ha-vol.vs1",
SizeBytes: 1 << 30,
Epoch: 5,
Role: 1, // RolePrimary
LeaseTTL: 30 * time.Second,
Replicas: []ReplicaInfo{
{
Server: "vs2:9333",
Path: "/data/ha-vol.blk",
IQN: "iqn.2024.test:ha-vol-r",
ISCSIAddr: "vs2:3260",
NvmeAddr: "vs2:4420",
NQN: "nqn.2024-01.com.seaweedfs:vol.ha-vol.vs2",
DataAddr: "vs2:14260",
CtrlAddr: "vs2:14261",
HealthScore: 0.95,
WALHeadLSN: 100,
},
},
})
// Wire byServer index for replica.
ms.blockRegistry.mu.Lock()
ms.blockRegistry.addToServer("vs2:9333", "ha-vol")
ms.blockRegistry.mu.Unlock()
// Pre-promotion: verify primary NVMe fields.
entry, _ := ms.blockRegistry.Lookup("ha-vol")
if entry.NvmeAddr != "vs1:4420" {
t.Fatalf("pre-promotion NvmeAddr: got %q, want vs1:4420", entry.NvmeAddr)
}
// Promote replica.
newEpoch, err := ms.blockRegistry.PromoteBestReplica("ha-vol")
if err != nil {
t.Fatalf("PromoteBestReplica: %v", err)
}
if newEpoch != 6 {
t.Fatalf("newEpoch: got %d, want 6", newEpoch)
}
// After promotion: entry should have replica's NVMe fields.
entry, _ = ms.blockRegistry.Lookup("ha-vol")
if entry.VolumeServer != "vs2:9333" {
t.Fatalf("after promotion, VolumeServer: got %q, want vs2:9333", entry.VolumeServer)
}
if entry.NvmeAddr != "vs2:4420" {
t.Fatalf("after promotion, NvmeAddr: got %q, want vs2:4420", entry.NvmeAddr)
}
if entry.NQN != "nqn.2024-01.com.seaweedfs:vol.ha-vol.vs2" {
t.Fatalf("after promotion, NQN: got %q", entry.NQN)
}
// Lookup should return the promoted replica's NVMe fields immediately.
lresp, err := ms.LookupBlockVolume(context.Background(), &master_pb.LookupBlockVolumeRequest{
Name: "ha-vol",
})
if err != nil {
t.Fatalf("LookupBlockVolume: %v", err)
}
if lresp.NvmeAddr != "vs2:4420" {
t.Fatalf("Lookup NvmeAddr after promotion: got %q, want vs2:4420", lresp.NvmeAddr)
}
if lresp.Nqn != "nqn.2024-01.com.seaweedfs:vol.ha-vol.vs2" {
t.Fatalf("Lookup Nqn after promotion: got %q", lresp.Nqn)
}
}
Loading…
Cancel
Save