Browse Source

correctly count volumes, to ensure proper capacity management

pull/2/head
Chris Lu 12 years ago
parent
commit
7e83a75fba
  1. 9
      weed-fs/src/pkg/topology/data_node.go
  2. 18
      weed-fs/src/pkg/topology/node.go
  3. 1
      weed-fs/src/pkg/topology/topology_event_handling.go

9
weed-fs/src/pkg/topology/data_node.go

@ -13,7 +13,7 @@ type DataNode struct {
Port int Port int
PublicUrl string PublicUrl string
LastSeen int64 // unix time in seconds LastSeen int64 // unix time in seconds
Dead bool
Dead bool
} }
func NewDataNode(id string) *DataNode { func NewDataNode(id string) *DataNode {
@ -21,12 +21,13 @@ func NewDataNode(id string) *DataNode {
s.id = NodeId(id) s.id = NodeId(id)
s.nodeType = "DataNode" s.nodeType = "DataNode"
s.volumes = make(map[storage.VolumeId]storage.VolumeInfo) s.volumes = make(map[storage.VolumeId]storage.VolumeInfo)
s.NodeImpl.value = s
s.NodeImpl.value = s
return s return s
} }
func (dn *DataNode) AddOrUpdateVolume(v storage.VolumeInfo) { func (dn *DataNode) AddOrUpdateVolume(v storage.VolumeInfo) {
if _, ok := dn.volumes[v.Id]; !ok { if _, ok := dn.volumes[v.Id]; !ok {
dn.volumes[v.Id] = v dn.volumes[v.Id] = v
dn.UpAdjustVolumeCountDelta(1)
dn.UpAdjustActiveVolumeCountDelta(1) dn.UpAdjustActiveVolumeCountDelta(1)
dn.UpAdjustMaxVolumeId(v.Id) dn.UpAdjustMaxVolumeId(v.Id)
} else { } else {
@ -45,13 +46,13 @@ func (dn *DataNode) MatchLocation(ip string, port int) bool {
return dn.Ip == ip && dn.Port == port return dn.Ip == ip && dn.Port == port
} }
func (dn *DataNode) Url() string { func (dn *DataNode) Url() string {
return dn.Ip + ":" + strconv.Itoa(dn.Port)
return dn.Ip + ":" + strconv.Itoa(dn.Port)
} }
func (dn *DataNode) ToMap() interface{} { func (dn *DataNode) ToMap() interface{} {
ret := make(map[string]interface{}) ret := make(map[string]interface{})
ret["Url"] = dn.Url() ret["Url"] = dn.Url()
ret["Volumes"] = dn.GetActiveVolumeCount()
ret["Volumes"] = dn.GetVolumeCount()
ret["Max"] = dn.GetMaxVolumeCount() ret["Max"] = dn.GetMaxVolumeCount()
ret["Free"] = dn.FreeSpace() ret["Free"] = dn.FreeSpace()
ret["PublicUrl"] = dn.PublicUrl ret["PublicUrl"] = dn.PublicUrl

18
weed-fs/src/pkg/topology/node.go

@ -12,9 +12,11 @@ type Node interface {
FreeSpace() int FreeSpace() int
ReserveOneVolume(r int, vid storage.VolumeId) (bool, *DataNode) ReserveOneVolume(r int, vid storage.VolumeId) (bool, *DataNode)
UpAdjustMaxVolumeCountDelta(maxVolumeCountDelta int) UpAdjustMaxVolumeCountDelta(maxVolumeCountDelta int)
UpAdjustVolumeCountDelta(volumeCountDelta int)
UpAdjustActiveVolumeCountDelta(activeVolumeCountDelta int) UpAdjustActiveVolumeCountDelta(activeVolumeCountDelta int)
UpAdjustMaxVolumeId(vid storage.VolumeId) UpAdjustMaxVolumeId(vid storage.VolumeId)
GetVolumeCount() int
GetActiveVolumeCount() int GetActiveVolumeCount() int
GetMaxVolumeCount() int GetMaxVolumeCount() int
GetMaxVolumeId() storage.VolumeId GetMaxVolumeId() storage.VolumeId
@ -31,6 +33,7 @@ type Node interface {
} }
type NodeImpl struct { type NodeImpl struct {
id NodeId id NodeId
volumeCount int
activeVolumeCount int activeVolumeCount int
maxVolumeCount int maxVolumeCount int
parent Node parent Node
@ -61,7 +64,7 @@ func (n *NodeImpl) Id() NodeId {
return n.id return n.id
} }
func (n *NodeImpl) FreeSpace() int { func (n *NodeImpl) FreeSpace() int {
return n.maxVolumeCount - n.activeVolumeCount
return n.maxVolumeCount - n.volumeCount
} }
func (n *NodeImpl) SetParent(node Node) { func (n *NodeImpl) SetParent(node Node) {
n.parent = node n.parent = node
@ -106,6 +109,12 @@ func (n *NodeImpl) UpAdjustMaxVolumeCountDelta(maxVolumeCountDelta int) { //can
n.parent.UpAdjustMaxVolumeCountDelta(maxVolumeCountDelta) n.parent.UpAdjustMaxVolumeCountDelta(maxVolumeCountDelta)
} }
} }
func (n *NodeImpl) UpAdjustVolumeCountDelta(volumeCountDelta int) { //can be negative
n.volumeCount += volumeCountDelta
if n.parent != nil {
n.parent.UpAdjustVolumeCountDelta(volumeCountDelta)
}
}
func (n *NodeImpl) UpAdjustActiveVolumeCountDelta(activeVolumeCountDelta int) { //can be negative func (n *NodeImpl) UpAdjustActiveVolumeCountDelta(activeVolumeCountDelta int) { //can be negative
n.activeVolumeCount += activeVolumeCountDelta n.activeVolumeCount += activeVolumeCountDelta
if n.parent != nil { if n.parent != nil {
@ -123,6 +132,9 @@ func (n *NodeImpl) UpAdjustMaxVolumeId(vid storage.VolumeId) { //can be negative
func (n *NodeImpl) GetMaxVolumeId() storage.VolumeId { func (n *NodeImpl) GetMaxVolumeId() storage.VolumeId {
return n.maxVolumeId return n.maxVolumeId
} }
func (n *NodeImpl) GetVolumeCount() int {
return n.volumeCount
}
func (n *NodeImpl) GetActiveVolumeCount() int { func (n *NodeImpl) GetActiveVolumeCount() int {
return n.activeVolumeCount return n.activeVolumeCount
} }
@ -135,6 +147,7 @@ func (n *NodeImpl) LinkChildNode(node Node) {
n.children[node.Id()] = node n.children[node.Id()] = node
n.UpAdjustMaxVolumeCountDelta(node.GetMaxVolumeCount()) n.UpAdjustMaxVolumeCountDelta(node.GetMaxVolumeCount())
n.UpAdjustMaxVolumeId(node.GetMaxVolumeId()) n.UpAdjustMaxVolumeId(node.GetMaxVolumeId())
n.UpAdjustVolumeCountDelta(node.GetVolumeCount())
n.UpAdjustActiveVolumeCountDelta(node.GetActiveVolumeCount()) n.UpAdjustActiveVolumeCountDelta(node.GetActiveVolumeCount())
node.SetParent(n) node.SetParent(n)
fmt.Println(n, "adds child", node.Id()) fmt.Println(n, "adds child", node.Id())
@ -146,6 +159,7 @@ func (n *NodeImpl) UnlinkChildNode(nodeId NodeId) {
node.SetParent(nil) node.SetParent(nil)
if node != nil { if node != nil {
delete(n.children, node.Id()) delete(n.children, node.Id())
n.UpAdjustVolumeCountDelta(-node.GetVolumeCount())
n.UpAdjustActiveVolumeCountDelta(-node.GetActiveVolumeCount()) n.UpAdjustActiveVolumeCountDelta(-node.GetActiveVolumeCount())
n.UpAdjustMaxVolumeCountDelta(-node.GetMaxVolumeCount()) n.UpAdjustMaxVolumeCountDelta(-node.GetMaxVolumeCount())
fmt.Println(n, "removes", node, "volumeCount =", n.activeVolumeCount) fmt.Println(n, "removes", node, "volumeCount =", n.activeVolumeCount)
@ -164,7 +178,7 @@ func (n *NodeImpl) CollectDeadNodeAndFullVolumes(freshThreshHold int64, volumeSi
} }
for _, v := range dn.volumes { for _, v := range dn.volumes {
if uint64(v.Size) >= volumeSizeLimit { if uint64(v.Size) >= volumeSizeLimit {
//fmt.Println("volume",v.Id,"size",v.Size,">",volumeSizeLimit)
//fmt.Println("volume",v.Id,"size",v.Size,">",volumeSizeLimit)
n.GetTopology().chanFullVolumes <- v n.GetTopology().chanFullVolumes <- v
} }
} }

1
weed-fs/src/pkg/topology/topology_event_handling.go

@ -52,6 +52,7 @@ func (t *Topology) UnRegisterDataNode(dn *DataNode) {
vl := t.GetVolumeLayout(v.RepType) vl := t.GetVolumeLayout(v.RepType)
vl.SetVolumeUnavailable(dn, v.Id) vl.SetVolumeUnavailable(dn, v.Id)
} }
dn.UpAdjustVolumeCountDelta(-dn.GetVolumeCount())
dn.UpAdjustActiveVolumeCountDelta(-dn.GetActiveVolumeCount()) dn.UpAdjustActiveVolumeCountDelta(-dn.GetActiveVolumeCount())
dn.UpAdjustMaxVolumeCountDelta(-dn.GetMaxVolumeCount()) dn.UpAdjustMaxVolumeCountDelta(-dn.GetMaxVolumeCount())
dn.Parent().UnlinkChildNode(dn.Id()) dn.Parent().UnlinkChildNode(dn.Id())

Loading…
Cancel
Save