diff --git a/weed/topology/rack.go b/weed/topology/rack.go index f526cd84d..246e8f190 100644 --- a/weed/topology/rack.go +++ b/weed/topology/rack.go @@ -1,10 +1,12 @@ package topology import ( + "net" "slices" "strings" "time" + "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" "github.com/seaweedfs/seaweedfs/weed/storage/types" "github.com/seaweedfs/seaweedfs/weed/util" @@ -34,17 +36,66 @@ func (r *Rack) FindDataNode(ip string, port int) *DataNode { } return nil } -func (r *Rack) GetOrCreateDataNode(ip string, port int, grpcPort int, publicUrl string, maxVolumeCounts map[string]uint32) *DataNode { + +// FindDataNodeById finds a DataNode by its ID using O(1) map lookup +func (r *Rack) FindDataNodeById(id string) *DataNode { + r.RLock() + defer r.RUnlock() + if c, ok := r.children[NodeId(id)]; ok { + return c.(*DataNode) + } + return nil +} + +func (r *Rack) GetOrCreateDataNode(ip string, port int, grpcPort int, publicUrl string, id string, maxVolumeCounts map[string]uint32) *DataNode { r.Lock() defer r.Unlock() - for _, c := range r.children { + + // Determine the node ID: use provided id, or fall back to ip:port for backward compatibility + nodeId := id + if nodeId == "" { + nodeId = util.JoinHostPort(ip, port) + } + + // First, try to find by node ID using O(1) map lookup (stable identity) + if c, ok := r.children[NodeId(nodeId)]; ok { dn := c.(*DataNode) - if dn.MatchLocation(ip, port) { - dn.LastSeen = time.Now().Unix() - return dn + // Update the IP/Port in case they changed (e.g., pod rescheduled in K8s) + dn.Ip = ip + dn.Port = port + dn.GrpcPort = grpcPort + dn.PublicUrl = publicUrl + dn.LastSeen = time.Now().Unix() + return dn + } + + // For backward compatibility: if id was provided, also check by ip:port + // to handle transition from old (ip:port) to new (explicit id) behavior + if id != "" { + for oldId, c := range r.children { + dn := c.(*DataNode) + if dn.MatchLocation(ip, port) { + // Only transition if the oldId is in ip:port format (legacy identification). + // If oldId is an explicit id (not ip:port format), this is a different node + // that happens to reuse the same ip:port - don't incorrectly merge them. + if _, _, err := net.SplitHostPort(string(oldId)); err != nil { + // oldId is not in ip:port format, so it's an explicit id from another node + glog.Warningf("Volume server with id %s has ip:port %s:%d which is used by node %s", id, ip, port, oldId) + continue + } + // Found a node by ip:port, transition it to use the new explicit id + glog.V(0).Infof("Volume server %s transitioning id from %s to %s", dn.Url(), oldId, id) + // Re-key the node in the children map with the new id + delete(r.children, oldId) + dn.id = NodeId(id) + r.children[NodeId(id)] = dn + dn.LastSeen = time.Now().Unix() + return dn + } } } - dn := NewDataNode(util.JoinHostPort(ip, port)) + + dn := NewDataNode(nodeId) dn.Ip = ip dn.Port = port dn.GrpcPort = grpcPort diff --git a/weed/topology/topology_test.go b/weed/topology/topology_test.go index 8515d2f81..02bf12e07 100644 --- a/weed/topology/topology_test.go +++ b/weed/topology/topology_test.go @@ -34,7 +34,7 @@ func TestHandlingVolumeServerHeartbeat(t *testing.T) { maxVolumeCounts := make(map[string]uint32) maxVolumeCounts[""] = 25 maxVolumeCounts["ssd"] = 12 - dn := rack.GetOrCreateDataNode("127.0.0.1", 34534, 0, "127.0.0.1", maxVolumeCounts) + dn := rack.GetOrCreateDataNode("127.0.0.1", 34534, 0, "127.0.0.1", "", maxVolumeCounts) { volumeCount := 7 @@ -180,7 +180,7 @@ func TestAddRemoveVolume(t *testing.T) { maxVolumeCounts := make(map[string]uint32) maxVolumeCounts[""] = 25 maxVolumeCounts["ssd"] = 12 - dn := rack.GetOrCreateDataNode("127.0.0.1", 34534, 0, "127.0.0.1", maxVolumeCounts) + dn := rack.GetOrCreateDataNode("127.0.0.1", 34534, 0, "127.0.0.1", "", maxVolumeCounts) v := storage.VolumeInfo{ Id: needle.VolumeId(1), @@ -218,7 +218,7 @@ func TestVolumeReadOnlyStatusChange(t *testing.T) { rack := dc.GetOrCreateRack("rack1") maxVolumeCounts := make(map[string]uint32) maxVolumeCounts[""] = 25 - dn := rack.GetOrCreateDataNode("127.0.0.1", 34534, 0, "127.0.0.1", maxVolumeCounts) + dn := rack.GetOrCreateDataNode("127.0.0.1", 34534, 0, "127.0.0.1", "", maxVolumeCounts) // Create a writable volume v := storage.VolumeInfo{ @@ -267,7 +267,7 @@ func TestVolumeReadOnlyAndRemoteStatusChange(t *testing.T) { rack := dc.GetOrCreateRack("rack1") maxVolumeCounts := make(map[string]uint32) maxVolumeCounts[""] = 25 - dn := rack.GetOrCreateDataNode("127.0.0.1", 34534, 0, "127.0.0.1", maxVolumeCounts) + dn := rack.GetOrCreateDataNode("127.0.0.1", 34534, 0, "127.0.0.1", "", maxVolumeCounts) // Create a writable, local volume v := storage.VolumeInfo{ @@ -331,7 +331,7 @@ func TestListCollections(t *testing.T) { topo := NewTopology("weedfs", sequence.NewMemorySequencer(), 32*1024, 5, false) dc := topo.GetOrCreateDataCenter("dc1") rack := dc.GetOrCreateRack("rack1") - dn := rack.GetOrCreateDataNode("127.0.0.1", 34534, 0, "127.0.0.1", nil) + dn := rack.GetOrCreateDataNode("127.0.0.1", 34534, 0, "127.0.0.1", "", nil) topo.RegisterVolumeLayout(storage.VolumeInfo{ Id: needle.VolumeId(1111),