Browse Source

topology: support id-based DataNode identification

Update GetOrCreateDataNode to accept an id parameter for stable node
identification. When id is provided, the DataNode can maintain its identity
even when its IP address changes (e.g., in Kubernetes pod reschedules).

For backward compatibility:
- If id is provided, use it as the node ID
- If id is empty, fall back to ip:port

Ref: https://github.com/seaweedfs/seaweedfs/issues/7487
pull/7609/head
chrislu 2 days ago
parent
commit
c5a4348c02
  1. 63
      weed/topology/rack.go
  2. 10
      weed/topology/topology_test.go

63
weed/topology/rack.go

@ -1,10 +1,12 @@
package topology
import (
"net"
"slices"
"strings"
"time"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
"github.com/seaweedfs/seaweedfs/weed/storage/types"
"github.com/seaweedfs/seaweedfs/weed/util"
@ -34,17 +36,66 @@ func (r *Rack) FindDataNode(ip string, port int) *DataNode {
}
return nil
}
func (r *Rack) GetOrCreateDataNode(ip string, port int, grpcPort int, publicUrl string, maxVolumeCounts map[string]uint32) *DataNode {
// FindDataNodeById finds a DataNode by its ID using O(1) map lookup
func (r *Rack) FindDataNodeById(id string) *DataNode {
r.RLock()
defer r.RUnlock()
if c, ok := r.children[NodeId(id)]; ok {
return c.(*DataNode)
}
return nil
}
func (r *Rack) GetOrCreateDataNode(ip string, port int, grpcPort int, publicUrl string, id string, maxVolumeCounts map[string]uint32) *DataNode {
r.Lock()
defer r.Unlock()
for _, c := range r.children {
// Determine the node ID: use provided id, or fall back to ip:port for backward compatibility
nodeId := id
if nodeId == "" {
nodeId = util.JoinHostPort(ip, port)
}
// First, try to find by node ID using O(1) map lookup (stable identity)
if c, ok := r.children[NodeId(nodeId)]; ok {
dn := c.(*DataNode)
if dn.MatchLocation(ip, port) {
dn.LastSeen = time.Now().Unix()
return dn
// Update the IP/Port in case they changed (e.g., pod rescheduled in K8s)
dn.Ip = ip
dn.Port = port
dn.GrpcPort = grpcPort
dn.PublicUrl = publicUrl
dn.LastSeen = time.Now().Unix()
return dn
}
// For backward compatibility: if id was provided, also check by ip:port
// to handle transition from old (ip:port) to new (explicit id) behavior
if id != "" {
for oldId, c := range r.children {
dn := c.(*DataNode)
if dn.MatchLocation(ip, port) {
// Only transition if the oldId is in ip:port format (legacy identification).
// If oldId is an explicit id (not ip:port format), this is a different node
// that happens to reuse the same ip:port - don't incorrectly merge them.
if _, _, err := net.SplitHostPort(string(oldId)); err != nil {
// oldId is not in ip:port format, so it's an explicit id from another node
glog.Warningf("Volume server with id %s has ip:port %s:%d which is used by node %s", id, ip, port, oldId)
continue
}
// Found a node by ip:port, transition it to use the new explicit id
glog.V(0).Infof("Volume server %s transitioning id from %s to %s", dn.Url(), oldId, id)
// Re-key the node in the children map with the new id
delete(r.children, oldId)
dn.id = NodeId(id)
r.children[NodeId(id)] = dn
dn.LastSeen = time.Now().Unix()
return dn
}
}
}
dn := NewDataNode(util.JoinHostPort(ip, port))
dn := NewDataNode(nodeId)
dn.Ip = ip
dn.Port = port
dn.GrpcPort = grpcPort

10
weed/topology/topology_test.go

@ -34,7 +34,7 @@ func TestHandlingVolumeServerHeartbeat(t *testing.T) {
maxVolumeCounts := make(map[string]uint32)
maxVolumeCounts[""] = 25
maxVolumeCounts["ssd"] = 12
dn := rack.GetOrCreateDataNode("127.0.0.1", 34534, 0, "127.0.0.1", maxVolumeCounts)
dn := rack.GetOrCreateDataNode("127.0.0.1", 34534, 0, "127.0.0.1", "", maxVolumeCounts)
{
volumeCount := 7
@ -180,7 +180,7 @@ func TestAddRemoveVolume(t *testing.T) {
maxVolumeCounts := make(map[string]uint32)
maxVolumeCounts[""] = 25
maxVolumeCounts["ssd"] = 12
dn := rack.GetOrCreateDataNode("127.0.0.1", 34534, 0, "127.0.0.1", maxVolumeCounts)
dn := rack.GetOrCreateDataNode("127.0.0.1", 34534, 0, "127.0.0.1", "", maxVolumeCounts)
v := storage.VolumeInfo{
Id: needle.VolumeId(1),
@ -218,7 +218,7 @@ func TestVolumeReadOnlyStatusChange(t *testing.T) {
rack := dc.GetOrCreateRack("rack1")
maxVolumeCounts := make(map[string]uint32)
maxVolumeCounts[""] = 25
dn := rack.GetOrCreateDataNode("127.0.0.1", 34534, 0, "127.0.0.1", maxVolumeCounts)
dn := rack.GetOrCreateDataNode("127.0.0.1", 34534, 0, "127.0.0.1", "", maxVolumeCounts)
// Create a writable volume
v := storage.VolumeInfo{
@ -267,7 +267,7 @@ func TestVolumeReadOnlyAndRemoteStatusChange(t *testing.T) {
rack := dc.GetOrCreateRack("rack1")
maxVolumeCounts := make(map[string]uint32)
maxVolumeCounts[""] = 25
dn := rack.GetOrCreateDataNode("127.0.0.1", 34534, 0, "127.0.0.1", maxVolumeCounts)
dn := rack.GetOrCreateDataNode("127.0.0.1", 34534, 0, "127.0.0.1", "", maxVolumeCounts)
// Create a writable, local volume
v := storage.VolumeInfo{
@ -331,7 +331,7 @@ func TestListCollections(t *testing.T) {
topo := NewTopology("weedfs", sequence.NewMemorySequencer(), 32*1024, 5, false)
dc := topo.GetOrCreateDataCenter("dc1")
rack := dc.GetOrCreateRack("rack1")
dn := rack.GetOrCreateDataNode("127.0.0.1", 34534, 0, "127.0.0.1", nil)
dn := rack.GetOrCreateDataNode("127.0.0.1", 34534, 0, "127.0.0.1", "", nil)
topo.RegisterVolumeLayout(storage.VolumeInfo{
Id: needle.VolumeId(1111),

Loading…
Cancel
Save