Browse Source

free volume slots factor in ec shard counts

pull/991/head
Chris Lu 6 years ago
parent
commit
eaa76f11b7
  1. 2
      weed/topology/data_node.go
  2. 21
      weed/topology/data_node_ec.go
  3. 26
      weed/topology/node.go

2
weed/topology/data_node.go

@ -146,7 +146,7 @@ func (dn *DataNode) ToMap() interface{} {
ret := make(map[string]interface{}) ret := make(map[string]interface{})
ret["Url"] = dn.Url() ret["Url"] = dn.Url()
ret["Volumes"] = dn.GetVolumeCount() ret["Volumes"] = dn.GetVolumeCount()
ret["EcShards"] = dn.GetEcShardsCount()
ret["EcShards"] = dn.GetEcShardCount()
ret["Max"] = dn.GetMaxVolumeCount() ret["Max"] = dn.GetMaxVolumeCount()
ret["Free"] = dn.FreeSpace() ret["Free"] = dn.FreeSpace()
ret["PublicUrl"] = dn.PublicUrl ret["PublicUrl"] = dn.PublicUrl

21
weed/topology/data_node_ec.go

@ -14,17 +14,6 @@ func (dn *DataNode) GetEcShards() (ret []*erasure_coding.EcVolumeInfo) {
return ret return ret
} }
func (dn *DataNode) GetEcShardsCount() (count int) {
dn.RLock()
defer dn.RUnlock()
for _, ecVolumeInfo := range dn.ecShards {
count += ecVolumeInfo.ShardBits.ShardIdCount()
}
return count
}
func (dn *DataNode) UpdateEcShards(actualShards []*erasure_coding.EcVolumeInfo) (newShards, deletedShards []*erasure_coding.EcVolumeInfo) { func (dn *DataNode) UpdateEcShards(actualShards []*erasure_coding.EcVolumeInfo) (newShards, deletedShards []*erasure_coding.EcVolumeInfo) {
// prepare the new ec shard map // prepare the new ec shard map
actualEcShardMap := make(map[needle.VolumeId]*erasure_coding.EcVolumeInfo) actualEcShardMap := make(map[needle.VolumeId]*erasure_coding.EcVolumeInfo)
@ -61,6 +50,7 @@ func (dn *DataNode) UpdateEcShards(actualShards []*erasure_coding.EcVolumeInfo)
// if changed, set to the new ec shard map // if changed, set to the new ec shard map
dn.ecShardsLock.Lock() dn.ecShardsLock.Lock()
dn.ecShards = actualEcShardMap dn.ecShards = actualEcShardMap
dn.UpAdjustEcShardCountDelta(int64(len(newShards) - len(deletedShards)))
dn.ecShardsLock.Unlock() dn.ecShardsLock.Unlock()
} }
@ -83,12 +73,18 @@ func (dn *DataNode) AddOrUpdateEcShard(s *erasure_coding.EcVolumeInfo) {
dn.ecShardsLock.Lock() dn.ecShardsLock.Lock()
defer dn.ecShardsLock.Unlock() defer dn.ecShardsLock.Unlock()
delta := 0
if existing, ok := dn.ecShards[s.VolumeId]; !ok { if existing, ok := dn.ecShards[s.VolumeId]; !ok {
dn.ecShards[s.VolumeId] = s dn.ecShards[s.VolumeId] = s
delta = s.ShardBits.ShardIdCount()
} else { } else {
oldCount := existing.ShardBits.ShardIdCount()
existing.ShardBits = existing.ShardBits.Plus(s.ShardBits) existing.ShardBits = existing.ShardBits.Plus(s.ShardBits)
delta = existing.ShardBits.ShardIdCount() - oldCount
} }
dn.UpAdjustEcShardCountDelta(int64(delta))
} }
func (dn *DataNode) DeleteEcShard(s *erasure_coding.EcVolumeInfo) { func (dn *DataNode) DeleteEcShard(s *erasure_coding.EcVolumeInfo) {
@ -96,7 +92,10 @@ func (dn *DataNode) DeleteEcShard(s *erasure_coding.EcVolumeInfo) {
defer dn.ecShardsLock.Unlock() defer dn.ecShardsLock.Unlock()
if existing, ok := dn.ecShards[s.VolumeId]; ok { if existing, ok := dn.ecShards[s.VolumeId]; ok {
oldCount := existing.ShardBits.ShardIdCount()
existing.ShardBits = existing.ShardBits.Minus(s.ShardBits) existing.ShardBits = existing.ShardBits.Minus(s.ShardBits)
delta := existing.ShardBits.ShardIdCount() - oldCount
dn.UpAdjustEcShardCountDelta(int64(delta))
if existing.ShardBits.ShardIdCount() == 0 { if existing.ShardBits.ShardIdCount() == 0 {
delete(dn.ecShards, s.VolumeId) delete(dn.ecShards, s.VolumeId)
} }

26
weed/topology/node.go

@ -8,6 +8,7 @@ import (
"sync/atomic" "sync/atomic"
"github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/storage/erasure_coding"
"github.com/chrislusf/seaweedfs/weed/storage/needle" "github.com/chrislusf/seaweedfs/weed/storage/needle"
) )
@ -19,10 +20,12 @@ type Node interface {
ReserveOneVolume(r int64) (*DataNode, error) ReserveOneVolume(r int64) (*DataNode, error)
UpAdjustMaxVolumeCountDelta(maxVolumeCountDelta int64) UpAdjustMaxVolumeCountDelta(maxVolumeCountDelta int64)
UpAdjustVolumeCountDelta(volumeCountDelta int64) UpAdjustVolumeCountDelta(volumeCountDelta int64)
UpAdjustEcShardCountDelta(ecShardCountDelta int64)
UpAdjustActiveVolumeCountDelta(activeVolumeCountDelta int64) UpAdjustActiveVolumeCountDelta(activeVolumeCountDelta int64)
UpAdjustMaxVolumeId(vid needle.VolumeId) UpAdjustMaxVolumeId(vid needle.VolumeId)
GetVolumeCount() int64 GetVolumeCount() int64
GetEcShardCount() int64
GetActiveVolumeCount() int64 GetActiveVolumeCount() int64
GetMaxVolumeCount() int64 GetMaxVolumeCount() int64
GetMaxVolumeId() needle.VolumeId GetMaxVolumeId() needle.VolumeId
@ -40,14 +43,15 @@ type Node interface {
GetValue() interface{} //get reference to the topology,dc,rack,datanode GetValue() interface{} //get reference to the topology,dc,rack,datanode
} }
type NodeImpl struct { type NodeImpl struct {
id NodeId
volumeCount int64 volumeCount int64
activeVolumeCount int64 activeVolumeCount int64
ecShardCount int64
maxVolumeCount int64 maxVolumeCount int64
id NodeId
parent Node parent Node
sync.RWMutex // lock children
children map[NodeId]Node
maxVolumeId needle.VolumeId
sync.RWMutex // lock children
children map[NodeId]Node
maxVolumeId needle.VolumeId
//for rack, data center, topology //for rack, data center, topology
nodeType string nodeType string
@ -128,6 +132,9 @@ func (n *NodeImpl) Id() NodeId {
return n.id return n.id
} }
func (n *NodeImpl) FreeSpace() int64 { func (n *NodeImpl) FreeSpace() int64 {
if n.ecShardCount > 0 {
return n.maxVolumeCount - n.volumeCount - n.ecShardCount/erasure_coding.DataShardsCount - 1
}
return n.maxVolumeCount - n.volumeCount return n.maxVolumeCount - n.volumeCount
} }
func (n *NodeImpl) SetParent(node Node) { func (n *NodeImpl) SetParent(node Node) {
@ -184,6 +191,12 @@ func (n *NodeImpl) UpAdjustVolumeCountDelta(volumeCountDelta int64) { //can be n
n.parent.UpAdjustVolumeCountDelta(volumeCountDelta) n.parent.UpAdjustVolumeCountDelta(volumeCountDelta)
} }
} }
func (n *NodeImpl) UpAdjustEcShardCountDelta(ecShardCountDelta int64) { //can be negative
atomic.AddInt64(&n.ecShardCount, ecShardCountDelta)
if n.parent != nil {
n.parent.UpAdjustEcShardCountDelta(ecShardCountDelta)
}
}
func (n *NodeImpl) UpAdjustActiveVolumeCountDelta(activeVolumeCountDelta int64) { //can be negative func (n *NodeImpl) UpAdjustActiveVolumeCountDelta(activeVolumeCountDelta int64) { //can be negative
atomic.AddInt64(&n.activeVolumeCount, activeVolumeCountDelta) atomic.AddInt64(&n.activeVolumeCount, activeVolumeCountDelta)
if n.parent != nil { if n.parent != nil {
@ -204,6 +217,9 @@ func (n *NodeImpl) GetMaxVolumeId() needle.VolumeId {
func (n *NodeImpl) GetVolumeCount() int64 { func (n *NodeImpl) GetVolumeCount() int64 {
return n.volumeCount return n.volumeCount
} }
func (n *NodeImpl) GetEcShardCount() int64 {
return n.ecShardCount
}
func (n *NodeImpl) GetActiveVolumeCount() int64 { func (n *NodeImpl) GetActiveVolumeCount() int64 {
return n.activeVolumeCount return n.activeVolumeCount
} }
@ -219,6 +235,7 @@ func (n *NodeImpl) LinkChildNode(node Node) {
n.UpAdjustMaxVolumeCountDelta(node.GetMaxVolumeCount()) n.UpAdjustMaxVolumeCountDelta(node.GetMaxVolumeCount())
n.UpAdjustMaxVolumeId(node.GetMaxVolumeId()) n.UpAdjustMaxVolumeId(node.GetMaxVolumeId())
n.UpAdjustVolumeCountDelta(node.GetVolumeCount()) n.UpAdjustVolumeCountDelta(node.GetVolumeCount())
n.UpAdjustEcShardCountDelta(node.GetEcShardCount())
n.UpAdjustActiveVolumeCountDelta(node.GetActiveVolumeCount()) n.UpAdjustActiveVolumeCountDelta(node.GetActiveVolumeCount())
node.SetParent(n) node.SetParent(n)
glog.V(0).Infoln(n, "adds child", node.Id()) glog.V(0).Infoln(n, "adds child", node.Id())
@ -233,6 +250,7 @@ func (n *NodeImpl) UnlinkChildNode(nodeId NodeId) {
node.SetParent(nil) node.SetParent(nil)
delete(n.children, node.Id()) delete(n.children, node.Id())
n.UpAdjustVolumeCountDelta(-node.GetVolumeCount()) n.UpAdjustVolumeCountDelta(-node.GetVolumeCount())
n.UpAdjustEcShardCountDelta(-node.GetEcShardCount())
n.UpAdjustActiveVolumeCountDelta(-node.GetActiveVolumeCount()) n.UpAdjustActiveVolumeCountDelta(-node.GetActiveVolumeCount())
n.UpAdjustMaxVolumeCountDelta(-node.GetMaxVolumeCount()) n.UpAdjustMaxVolumeCountDelta(-node.GetMaxVolumeCount())
glog.V(0).Infoln(n, "removes", node.Id()) glog.V(0).Infoln(n, "removes", node.Id())

Loading…
Cancel
Save