|
|
@ -81,12 +81,14 @@ func (c *commandVolumeBalance) Do(args []string, commandEnv *commandEnv, writer |
|
|
|
if len(volumeServers) < 2 { |
|
|
|
continue |
|
|
|
} |
|
|
|
balanceVolumeServers(commandEnv, volumeServers, resp.VolumeSizeLimitMb*1024*1024, *collection, *applyBalancing) |
|
|
|
if err = balanceVolumeServers(commandEnv, volumeServers, resp.VolumeSizeLimitMb*1024*1024, *collection, *applyBalancing); err != nil { |
|
|
|
return err |
|
|
|
} |
|
|
|
} |
|
|
|
return nil |
|
|
|
} |
|
|
|
|
|
|
|
func balanceVolumeServers(commandEnv *commandEnv, dataNodeInfos []*master_pb.DataNodeInfo, volumeSizeLimit uint64, collection string, applyBalancing bool) { |
|
|
|
func balanceVolumeServers(commandEnv *commandEnv, dataNodeInfos []*master_pb.DataNodeInfo, volumeSizeLimit uint64, collection string, applyBalancing bool) error { |
|
|
|
var nodes []*Node |
|
|
|
for _, dn := range dataNodeInfos { |
|
|
|
nodes = append(nodes, &Node{ |
|
|
@ -105,7 +107,9 @@ func balanceVolumeServers(commandEnv *commandEnv, dataNodeInfos []*master_pb.Dat |
|
|
|
return !v.ReadOnly && v.Size < volumeSizeLimit |
|
|
|
}) |
|
|
|
} |
|
|
|
balanceSelectedVolume(commandEnv, nodes, sortWritableVolumes, applyBalancing) |
|
|
|
if err := balanceSelectedVolume(commandEnv, nodes, sortWritableVolumes, applyBalancing); err != nil { |
|
|
|
return err |
|
|
|
} |
|
|
|
|
|
|
|
// balance readable volumes
|
|
|
|
for _, n := range nodes { |
|
|
@ -118,7 +122,11 @@ func balanceVolumeServers(commandEnv *commandEnv, dataNodeInfos []*master_pb.Dat |
|
|
|
return v.ReadOnly || v.Size >= volumeSizeLimit |
|
|
|
}) |
|
|
|
} |
|
|
|
balanceSelectedVolume(commandEnv, nodes, sortReadOnlyVolumes, applyBalancing) |
|
|
|
if err := balanceSelectedVolume(commandEnv, nodes, sortReadOnlyVolumes, applyBalancing); err != nil { |
|
|
|
return err |
|
|
|
} |
|
|
|
|
|
|
|
return nil |
|
|
|
} |
|
|
|
|
|
|
|
func collectVolumeServersByType(t *master_pb.TopologyInfo) (typeToNodes map[uint64][]*master_pb.DataNodeInfo) { |
|
|
@ -150,7 +158,7 @@ func sortReadOnlyVolumes(volumes []*master_pb.VolumeInformationMessage) { |
|
|
|
}) |
|
|
|
} |
|
|
|
|
|
|
|
func balanceSelectedVolume(commandEnv *commandEnv, nodes []*Node, sortCandidatesFn func(volumes []*master_pb.VolumeInformationMessage), applyBalancing bool) { |
|
|
|
func balanceSelectedVolume(commandEnv *commandEnv, nodes []*Node, sortCandidatesFn func(volumes []*master_pb.VolumeInformationMessage), applyBalancing bool) error { |
|
|
|
selectedVolumeCount := 0 |
|
|
|
for _, dn := range nodes { |
|
|
|
selectedVolumeCount += len(dn.selectedVolumes) |
|
|
@ -177,19 +185,22 @@ func balanceSelectedVolume(commandEnv *commandEnv, nodes []*Node, sortCandidates |
|
|
|
|
|
|
|
for _, v := range candidateVolumes { |
|
|
|
if _, found := emptyNode.selectedVolumes[v.Id]; !found { |
|
|
|
moveVolume(commandEnv, v, fullNode, emptyNode, applyBalancing) |
|
|
|
delete(fullNode.selectedVolumes, v.Id) |
|
|
|
emptyNode.selectedVolumes[v.Id] = v |
|
|
|
hasMove = true |
|
|
|
break |
|
|
|
if err := moveVolume(commandEnv, v, fullNode, emptyNode, applyBalancing); err == nil { |
|
|
|
delete(fullNode.selectedVolumes, v.Id) |
|
|
|
emptyNode.selectedVolumes[v.Id] = v |
|
|
|
hasMove = true |
|
|
|
break |
|
|
|
} else { |
|
|
|
return err |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
return nil |
|
|
|
} |
|
|
|
|
|
|
|
func moveVolume(commandEnv *commandEnv, v *master_pb.VolumeInformationMessage, fullNode *Node, emptyNode *Node, applyBalancing bool) { |
|
|
|
func moveVolume(commandEnv *commandEnv, v *master_pb.VolumeInformationMessage, fullNode *Node, emptyNode *Node, applyBalancing bool) error { |
|
|
|
collectionPrefix := v.Collection + "_" |
|
|
|
if v.Collection == "" { |
|
|
|
collectionPrefix = "" |
|
|
@ -197,8 +208,9 @@ func moveVolume(commandEnv *commandEnv, v *master_pb.VolumeInformationMessage, f |
|
|
|
fmt.Fprintf(os.Stdout, "moving volume %s%d %s => %s\n", collectionPrefix, v.Id, fullNode.info.Id, emptyNode.info.Id) |
|
|
|
if applyBalancing { |
|
|
|
ctx := context.Background() |
|
|
|
LiveMoveVolume(ctx, commandEnv.option.GrpcDialOption, needle.VolumeId(v.Id), fullNode.info.Id, emptyNode.info.Id, 5*time.Second) |
|
|
|
return LiveMoveVolume(ctx, commandEnv.option.GrpcDialOption, needle.VolumeId(v.Id), fullNode.info.Id, emptyNode.info.Id, 5*time.Second) |
|
|
|
} |
|
|
|
return nil |
|
|
|
} |
|
|
|
|
|
|
|
func (node *Node) prepareVolumes(fn func(v *master_pb.VolumeInformationMessage) bool) { |
|
|
|