|
|
@ -43,7 +43,7 @@ func (c *commandVolumeBalance) Help() string { |
|
|
|
} |
|
|
|
pick the volume server A with the lowest number of writable volumes x |
|
|
|
pick the volume server B with the highest number of writable volumes y |
|
|
|
if y > idealWritableVolumes and x +1 < idealWritableVolumes { |
|
|
|
if y > idealWritableVolumes and x +1 <= idealWritableVolumes { |
|
|
|
if B has a writable volume id v that A does not have { |
|
|
|
move writable volume v from A to B |
|
|
|
} |
|
|
@ -60,7 +60,7 @@ func (c *commandVolumeBalance) Help() string { |
|
|
|
func (c *commandVolumeBalance) Do(args []string, commandEnv *commandEnv, writer io.Writer) (err error) { |
|
|
|
|
|
|
|
balanceCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) |
|
|
|
collection := balanceCommand.String("c", "", "collection name. use \"ALL\" for all collections") |
|
|
|
collection := balanceCommand.String("c", "ALL", "collection name. use \"ALL\" for all collections") |
|
|
|
applyBalancing := balanceCommand.Bool("f", false, "apply the balancing plan.") |
|
|
|
if err = balanceCommand.Parse(args); err != nil { |
|
|
|
return nil |
|
|
@ -81,9 +81,25 @@ func (c *commandVolumeBalance) Do(args []string, commandEnv *commandEnv, writer |
|
|
|
if len(volumeServers) < 2 { |
|
|
|
continue |
|
|
|
} |
|
|
|
if err = balanceVolumeServers(commandEnv, volumeServers, resp.VolumeSizeLimitMb*1024*1024, *collection, *applyBalancing); err != nil { |
|
|
|
return err |
|
|
|
if *collection == "ALL" { |
|
|
|
collections, err := ListCollectionNames(commandEnv) |
|
|
|
if err != nil { |
|
|
|
return err |
|
|
|
} |
|
|
|
for _, c := range collections { |
|
|
|
if err = balanceVolumeServers(commandEnv, volumeServers, resp.VolumeSizeLimitMb*1024*1024, c, *applyBalancing); err != nil { |
|
|
|
return err |
|
|
|
} |
|
|
|
} |
|
|
|
if err = balanceVolumeServers(commandEnv, volumeServers, resp.VolumeSizeLimitMb*1024*1024, "ALL", *applyBalancing); err != nil { |
|
|
|
return err |
|
|
|
} |
|
|
|
} else { |
|
|
|
if err = balanceVolumeServers(commandEnv, volumeServers, resp.VolumeSizeLimitMb*1024*1024, *collection, *applyBalancing); err != nil { |
|
|
|
return err |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
return nil |
|
|
|
} |
|
|
@ -98,7 +114,7 @@ func balanceVolumeServers(commandEnv *commandEnv, dataNodeInfos []*master_pb.Dat |
|
|
|
|
|
|
|
// balance writable volumes
|
|
|
|
for _, n := range nodes { |
|
|
|
n.prepareVolumes(func(v *master_pb.VolumeInformationMessage) bool { |
|
|
|
n.selectVolumes(func(v *master_pb.VolumeInformationMessage) bool { |
|
|
|
if collection != "ALL" { |
|
|
|
if v.Collection != collection { |
|
|
|
return false |
|
|
@ -113,7 +129,7 @@ func balanceVolumeServers(commandEnv *commandEnv, dataNodeInfos []*master_pb.Dat |
|
|
|
|
|
|
|
// balance readable volumes
|
|
|
|
for _, n := range nodes { |
|
|
|
n.prepareVolumes(func(v *master_pb.VolumeInformationMessage) bool { |
|
|
|
n.selectVolumes(func(v *master_pb.VolumeInformationMessage) bool { |
|
|
|
if collection != "ALL" { |
|
|
|
if v.Collection != collection { |
|
|
|
return false |
|
|
@ -213,7 +229,7 @@ func moveVolume(commandEnv *commandEnv, v *master_pb.VolumeInformationMessage, f |
|
|
|
return nil |
|
|
|
} |
|
|
|
|
|
|
|
func (node *Node) prepareVolumes(fn func(v *master_pb.VolumeInformationMessage) bool) { |
|
|
|
func (node *Node) selectVolumes(fn func(v *master_pb.VolumeInformationMessage) bool) { |
|
|
|
node.selectedVolumes = make(map[uint32]*master_pb.VolumeInformationMessage) |
|
|
|
for _, v := range node.info.VolumeInfos { |
|
|
|
if fn(v) { |
|
|
|