Browse Source

shell: volume.balance plan by ratio of fullness

pull/1475/head
Chris Lu 4 years ago
parent
commit
d15682b4a1
  1. 4
      weed/shell/command_ec_common.go
  2. 79
      weed/shell/command_volume_balance.go

4
weed/shell/command_ec_common.go

@ -253,6 +253,10 @@ func mountEcShards(grpcDialOption grpc.DialOption, collection string, volumeId n
})
}
func divide(total, n int) float64 {
return float64(total) / float64(n)
}
func ceilDivide(total, n int) int {
return int(math.Ceil(float64(total) / float64(n)))
}

79
weed/shell/command_volume_balance.go

@ -83,35 +83,29 @@ func (c *commandVolumeBalance) Do(args []string, commandEnv *CommandEnv, writer
return err
}
typeToNodes := collectVolumeServersByType(resp.TopologyInfo, *dc)
volumeServers := collectVolumeServersByDc(resp.TopologyInfo, *dc)
volumeReplicas, _ := collectVolumeReplicaLocations(resp)
for maxVolumeCount, volumeServers := range typeToNodes {
if len(volumeServers) < 2 {
fmt.Printf("only 1 node is configured max %d volumes, skipping balancing\n", maxVolumeCount)
continue
if *collection == "EACH_COLLECTION" {
collections, err := ListCollectionNames(commandEnv, true, false)
if err != nil {
return err
}
if *collection == "EACH_COLLECTION" {
collections, err := ListCollectionNames(commandEnv, true, false)
if err != nil {
return err
}
for _, c := range collections {
if err = balanceVolumeServers(commandEnv, volumeReplicas, volumeServers, resp.VolumeSizeLimitMb*1024*1024, c, *applyBalancing); err != nil {
return err
}
}
} else if *collection == "ALL_COLLECTIONS" {
if err = balanceVolumeServers(commandEnv, volumeReplicas, volumeServers, resp.VolumeSizeLimitMb*1024*1024, "ALL_COLLECTIONS", *applyBalancing); err != nil {
return err
}
} else {
if err = balanceVolumeServers(commandEnv, volumeReplicas, volumeServers, resp.VolumeSizeLimitMb*1024*1024, *collection, *applyBalancing); err != nil {
for _, c := range collections {
if err = balanceVolumeServers(commandEnv, volumeReplicas, volumeServers, resp.VolumeSizeLimitMb*1024*1024, c, *applyBalancing); err != nil {
return err
}
}
} else if *collection == "ALL_COLLECTIONS" {
if err = balanceVolumeServers(commandEnv, volumeReplicas, volumeServers, resp.VolumeSizeLimitMb*1024*1024, "ALL_COLLECTIONS", *applyBalancing); err != nil {
return err
}
} else {
if err = balanceVolumeServers(commandEnv, volumeReplicas, volumeServers, resp.VolumeSizeLimitMb*1024*1024, *collection, *applyBalancing); err != nil {
return err
}
}
return nil
}
@ -150,15 +144,14 @@ func balanceVolumeServers(commandEnv *CommandEnv, volumeReplicas map[uint32][]*V
return nil
}
func collectVolumeServersByType(t *master_pb.TopologyInfo, selectedDataCenter string) (typeToNodes map[uint64][]*Node) {
typeToNodes = make(map[uint64][]*Node)
func collectVolumeServersByDc(t *master_pb.TopologyInfo, selectedDataCenter string) (nodes []*Node) {
for _, dc := range t.DataCenterInfos {
if selectedDataCenter != "" && dc.Id != selectedDataCenter {
continue
}
for _, r := range dc.RackInfos {
for _, dn := range r.DataNodeInfos {
typeToNodes[dn.MaxVolumeCount] = append(typeToNodes[dn.MaxVolumeCount], &Node{
nodes = append(nodes, &Node{
info: dn,
dc: dc.Id,
rack: r.Id,
@ -176,6 +169,23 @@ type Node struct {
rack string
}
func (n *Node) localVolumeRatio() float64 {
return divide(len(n.selectedVolumes), int(n.info.MaxVolumeCount))
}
func (n *Node) localVolumeNextRatio() float64 {
return divide(len(n.selectedVolumes) + 1, int(n.info.MaxVolumeCount))
}
func (n *Node) selectVolumes(fn func(v *master_pb.VolumeInformationMessage) bool) {
n.selectedVolumes = make(map[uint32]*master_pb.VolumeInformationMessage)
for _, v := range n.info.VolumeInfos {
if fn(v) {
n.selectedVolumes[v.Id] = v
}
}
}
func sortWritableVolumes(volumes []*master_pb.VolumeInformationMessage) {
sort.Slice(volumes, func(i, j int) bool {
return volumes[i].Size < volumes[j].Size
@ -189,20 +199,20 @@ func sortReadOnlyVolumes(volumes []*master_pb.VolumeInformationMessage) {
}
func balanceSelectedVolume(commandEnv *CommandEnv, volumeReplicas map[uint32][]*VolumeReplica, nodes []*Node, sortCandidatesFn func(volumes []*master_pb.VolumeInformationMessage), applyBalancing bool) (err error) {
selectedVolumeCount := 0
selectedVolumeCount, volumeMaxCount := 0, 0
for _, dn := range nodes {
selectedVolumeCount += len(dn.selectedVolumes)
volumeMaxCount += int(dn.info.MaxVolumeCount)
}
idealSelectedVolumes := ceilDivide(selectedVolumeCount, len(nodes))
idealVolumeRatio := divide(selectedVolumeCount, volumeMaxCount)
hasMoved := true
for hasMoved {
hasMoved = false
sort.Slice(nodes, func(i, j int) bool {
// TODO sort by free volume slots???
return len(nodes[i].selectedVolumes) < len(nodes[j].selectedVolumes)
return nodes[i].localVolumeRatio() < nodes[j].localVolumeRatio()
})
fullNode := nodes[len(nodes)-1]
@ -214,7 +224,7 @@ func balanceSelectedVolume(commandEnv *CommandEnv, volumeReplicas map[uint32][]*
for i := 0; i < len(nodes)-1; i++ {
emptyNode := nodes[i]
if !(len(fullNode.selectedVolumes) > idealSelectedVolumes && len(emptyNode.selectedVolumes)+1 <= idealSelectedVolumes) {
if !(fullNode.localVolumeRatio() > idealVolumeRatio && emptyNode.localVolumeNextRatio() <= idealVolumeRatio) {
// no more volume servers with empty slots
break
}
@ -265,15 +275,6 @@ func moveVolume(commandEnv *CommandEnv, v *master_pb.VolumeInformationMessage, f
return nil
}
func (node *Node) selectVolumes(fn func(v *master_pb.VolumeInformationMessage) bool) {
node.selectedVolumes = make(map[uint32]*master_pb.VolumeInformationMessage)
for _, v := range node.info.VolumeInfos {
if fn(v) {
node.selectedVolumes[v.Id] = v
}
}
}
func isGoodMove(placement *super_block.ReplicaPlacement, existingReplicas []*VolumeReplica, sourceNode, targetNode *Node) bool {
for _, replica := range existingReplicas {
if replica.location.dataNode.Id == targetNode.info.Id &&

Loading…
Cancel
Save