diff --git a/weed-fs/src/pkg/topology/data_node.go b/weed-fs/src/pkg/topology/data_node.go index 1df109da6..01f8f768a 100644 --- a/weed-fs/src/pkg/topology/data_node.go +++ b/weed-fs/src/pkg/topology/data_node.go @@ -13,7 +13,7 @@ type DataNode struct { Port int PublicUrl string LastSeen int64 // unix time in seconds - Dead bool + Dead bool } func NewDataNode(id string) *DataNode { @@ -21,12 +21,13 @@ func NewDataNode(id string) *DataNode { s.id = NodeId(id) s.nodeType = "DataNode" s.volumes = make(map[storage.VolumeId]storage.VolumeInfo) - s.NodeImpl.value = s + s.NodeImpl.value = s return s } func (dn *DataNode) AddOrUpdateVolume(v storage.VolumeInfo) { if _, ok := dn.volumes[v.Id]; !ok { dn.volumes[v.Id] = v + dn.UpAdjustVolumeCountDelta(1) dn.UpAdjustActiveVolumeCountDelta(1) dn.UpAdjustMaxVolumeId(v.Id) } else { @@ -45,13 +46,13 @@ func (dn *DataNode) MatchLocation(ip string, port int) bool { return dn.Ip == ip && dn.Port == port } func (dn *DataNode) Url() string { - return dn.Ip + ":" + strconv.Itoa(dn.Port) + return dn.Ip + ":" + strconv.Itoa(dn.Port) } func (dn *DataNode) ToMap() interface{} { ret := make(map[string]interface{}) ret["Url"] = dn.Url() - ret["Volumes"] = dn.GetActiveVolumeCount() + ret["Volumes"] = dn.GetVolumeCount() ret["Max"] = dn.GetMaxVolumeCount() ret["Free"] = dn.FreeSpace() ret["PublicUrl"] = dn.PublicUrl diff --git a/weed-fs/src/pkg/topology/node.go b/weed-fs/src/pkg/topology/node.go index c65e53217..02b427a2b 100644 --- a/weed-fs/src/pkg/topology/node.go +++ b/weed-fs/src/pkg/topology/node.go @@ -12,9 +12,11 @@ type Node interface { FreeSpace() int ReserveOneVolume(r int, vid storage.VolumeId) (bool, *DataNode) UpAdjustMaxVolumeCountDelta(maxVolumeCountDelta int) + UpAdjustVolumeCountDelta(volumeCountDelta int) UpAdjustActiveVolumeCountDelta(activeVolumeCountDelta int) UpAdjustMaxVolumeId(vid storage.VolumeId) + GetVolumeCount() int GetActiveVolumeCount() int GetMaxVolumeCount() int GetMaxVolumeId() storage.VolumeId @@ -31,6 +33,7 @@ type Node interface { } type NodeImpl struct { id NodeId + volumeCount int activeVolumeCount int maxVolumeCount int parent Node @@ -61,7 +64,7 @@ func (n *NodeImpl) Id() NodeId { return n.id } func (n *NodeImpl) FreeSpace() int { - return n.maxVolumeCount - n.activeVolumeCount + return n.maxVolumeCount - n.volumeCount } func (n *NodeImpl) SetParent(node Node) { n.parent = node @@ -106,6 +109,12 @@ func (n *NodeImpl) UpAdjustMaxVolumeCountDelta(maxVolumeCountDelta int) { //can n.parent.UpAdjustMaxVolumeCountDelta(maxVolumeCountDelta) } } +func (n *NodeImpl) UpAdjustVolumeCountDelta(volumeCountDelta int) { //can be negative + n.volumeCount += volumeCountDelta + if n.parent != nil { + n.parent.UpAdjustVolumeCountDelta(volumeCountDelta) + } +} func (n *NodeImpl) UpAdjustActiveVolumeCountDelta(activeVolumeCountDelta int) { //can be negative n.activeVolumeCount += activeVolumeCountDelta if n.parent != nil { @@ -123,6 +132,9 @@ func (n *NodeImpl) UpAdjustMaxVolumeId(vid storage.VolumeId) { //can be negative func (n *NodeImpl) GetMaxVolumeId() storage.VolumeId { return n.maxVolumeId } +func (n *NodeImpl) GetVolumeCount() int { + return n.volumeCount +} func (n *NodeImpl) GetActiveVolumeCount() int { return n.activeVolumeCount } @@ -135,6 +147,7 @@ func (n *NodeImpl) LinkChildNode(node Node) { n.children[node.Id()] = node n.UpAdjustMaxVolumeCountDelta(node.GetMaxVolumeCount()) n.UpAdjustMaxVolumeId(node.GetMaxVolumeId()) + n.UpAdjustVolumeCountDelta(node.GetVolumeCount()) n.UpAdjustActiveVolumeCountDelta(node.GetActiveVolumeCount()) node.SetParent(n) fmt.Println(n, "adds child", node.Id()) @@ -146,6 +159,7 @@ func (n *NodeImpl) UnlinkChildNode(nodeId NodeId) { node.SetParent(nil) if node != nil { delete(n.children, node.Id()) + n.UpAdjustVolumeCountDelta(-node.GetVolumeCount()) n.UpAdjustActiveVolumeCountDelta(-node.GetActiveVolumeCount()) n.UpAdjustMaxVolumeCountDelta(-node.GetMaxVolumeCount()) fmt.Println(n, "removes", node, "volumeCount =", n.activeVolumeCount) @@ -164,7 +178,7 @@ func (n *NodeImpl) CollectDeadNodeAndFullVolumes(freshThreshHold int64, volumeSi } for _, v := range dn.volumes { if uint64(v.Size) >= volumeSizeLimit { - //fmt.Println("volume",v.Id,"size",v.Size,">",volumeSizeLimit) + //fmt.Println("volume",v.Id,"size",v.Size,">",volumeSizeLimit) n.GetTopology().chanFullVolumes <- v } } diff --git a/weed-fs/src/pkg/topology/topology_event_handling.go b/weed-fs/src/pkg/topology/topology_event_handling.go index 4ff1a0bcc..08c08fcde 100644 --- a/weed-fs/src/pkg/topology/topology_event_handling.go +++ b/weed-fs/src/pkg/topology/topology_event_handling.go @@ -52,6 +52,7 @@ func (t *Topology) UnRegisterDataNode(dn *DataNode) { vl := t.GetVolumeLayout(v.RepType) vl.SetVolumeUnavailable(dn, v.Id) } + dn.UpAdjustVolumeCountDelta(-dn.GetVolumeCount()) dn.UpAdjustActiveVolumeCountDelta(-dn.GetActiveVolumeCount()) dn.UpAdjustMaxVolumeCountDelta(-dn.GetMaxVolumeCount()) dn.Parent().UnlinkChildNode(dn.Id())