|
|
@ -50,13 +50,20 @@ func (r *Rack) GetOrCreateDataNode(ip string, port int, grpcPort int, publicUrl |
|
|
r.Lock() |
|
|
r.Lock() |
|
|
defer r.Unlock() |
|
|
defer r.Unlock() |
|
|
|
|
|
|
|
|
|
|
|
// Normalize the id parameter (trim whitespace)
|
|
|
|
|
|
id = strings.TrimSpace(id) |
|
|
|
|
|
|
|
|
// Determine the node ID: use provided id, or fall back to ip:port for backward compatibility
|
|
|
// Determine the node ID: use provided id, or fall back to ip:port for backward compatibility
|
|
|
nodeId := util.GetVolumeServerId(id, ip, port) |
|
|
nodeId := util.GetVolumeServerId(id, ip, port) |
|
|
|
|
|
|
|
|
// First, try to find by node ID using O(1) map lookup (stable identity)
|
|
|
// First, try to find by node ID using O(1) map lookup (stable identity)
|
|
|
if c, ok := r.children[NodeId(nodeId)]; ok { |
|
|
if c, ok := r.children[NodeId(nodeId)]; ok { |
|
|
dn := c.(*DataNode) |
|
|
dn := c.(*DataNode) |
|
|
// Update the IP/Port in case they changed (e.g., pod rescheduled in K8s)
|
|
|
|
|
|
|
|
|
// Log if IP or Port changed (e.g., pod rescheduled in K8s)
|
|
|
|
|
|
if dn.Ip != ip || dn.Port != port { |
|
|
|
|
|
glog.V(0).Infof("DataNode %s address changed from %s:%d to %s:%d", nodeId, dn.Ip, dn.Port, ip, port) |
|
|
|
|
|
} |
|
|
|
|
|
// Update the IP/Port in case they changed
|
|
|
dn.Ip = ip |
|
|
dn.Ip = ip |
|
|
dn.Port = port |
|
|
dn.Port = port |
|
|
dn.GrpcPort = grpcPort |
|
|
dn.GrpcPort = grpcPort |
|
|
|