|
|
@ -86,6 +86,7 @@ func (c *commandVolumeBalance) Do(args []string, commandEnv *CommandEnv, writer |
|
|
|
|
|
|
|
volumeServers := collectVolumeServersByDc(resp.TopologyInfo, *dc) |
|
|
|
volumeReplicas, _ := collectVolumeReplicaLocations(resp) |
|
|
|
diskTypes := collectVolumeDiskTypes(resp.TopologyInfo) |
|
|
|
|
|
|
|
if *collection == "EACH_COLLECTION" { |
|
|
|
collections, err := ListCollectionNames(commandEnv, true, false) |
|
|
@ -93,16 +94,16 @@ func (c *commandVolumeBalance) Do(args []string, commandEnv *CommandEnv, writer |
|
|
|
return err |
|
|
|
} |
|
|
|
for _, c := range collections { |
|
|
|
if err = balanceVolumeServers(commandEnv, volumeReplicas, volumeServers, resp.VolumeSizeLimitMb*1024*1024, c, *applyBalancing); err != nil { |
|
|
|
if err = balanceVolumeServers(commandEnv, diskTypes, volumeReplicas, volumeServers, resp.VolumeSizeLimitMb*1024*1024, c, *applyBalancing); err != nil { |
|
|
|
return err |
|
|
|
} |
|
|
|
} |
|
|
|
} else if *collection == "ALL_COLLECTIONS" { |
|
|
|
if err = balanceVolumeServers(commandEnv, volumeReplicas, volumeServers, resp.VolumeSizeLimitMb*1024*1024, "ALL_COLLECTIONS", *applyBalancing); err != nil { |
|
|
|
if err = balanceVolumeServers(commandEnv, diskTypes, volumeReplicas, volumeServers, resp.VolumeSizeLimitMb*1024*1024, "ALL_COLLECTIONS", *applyBalancing); err != nil { |
|
|
|
return err |
|
|
|
} |
|
|
|
} else { |
|
|
|
if err = balanceVolumeServers(commandEnv, volumeReplicas, volumeServers, resp.VolumeSizeLimitMb*1024*1024, *collection, *applyBalancing); err != nil { |
|
|
|
if err = balanceVolumeServers(commandEnv, diskTypes, volumeReplicas, volumeServers, resp.VolumeSizeLimitMb*1024*1024, *collection, *applyBalancing); err != nil { |
|
|
|
return err |
|
|
|
} |
|
|
|
} |
|
|
@ -110,9 +111,9 @@ func (c *commandVolumeBalance) Do(args []string, commandEnv *CommandEnv, writer |
|
|
|
return nil |
|
|
|
} |
|
|
|
|
|
|
|
func balanceVolumeServers(commandEnv *CommandEnv, volumeReplicas map[uint32][]*VolumeReplica, nodes []*Node, volumeSizeLimit uint64, collection string, applyBalancing bool) error { |
|
|
|
func balanceVolumeServers(commandEnv *CommandEnv, diskTypes []storage.DiskType, volumeReplicas map[uint32][]*VolumeReplica, nodes []*Node, volumeSizeLimit uint64, collection string, applyBalancing bool) error { |
|
|
|
|
|
|
|
for _, diskType := range []storage.DiskType{storage.HardDriveType, storage.SsdType} { |
|
|
|
for _, diskType := range diskTypes { |
|
|
|
if err := balanceVolumeServersByDiskType(commandEnv, diskType, volumeReplicas, nodes, volumeSizeLimit, collection, applyBalancing); err != nil { |
|
|
|
return err |
|
|
|
} |
|
|
@ -123,8 +124,7 @@ func balanceVolumeServers(commandEnv *CommandEnv, volumeReplicas map[uint32][]*V |
|
|
|
|
|
|
|
func balanceVolumeServersByDiskType(commandEnv *CommandEnv, diskType storage.DiskType, volumeReplicas map[uint32][]*VolumeReplica, nodes []*Node, volumeSizeLimit uint64, collection string, applyBalancing bool) error { |
|
|
|
|
|
|
|
// balance writable hdd volumes
|
|
|
|
// fmt.Fprintf(os.Stdout, "\nbalance collection %s writable hdd volumes\n", collection)
|
|
|
|
// balance writable volumes
|
|
|
|
for _, n := range nodes { |
|
|
|
n.selectVolumes(func(v *master_pb.VolumeInformationMessage) bool { |
|
|
|
if collection != "ALL_COLLECTIONS" { |
|
|
@ -139,8 +139,7 @@ func balanceVolumeServersByDiskType(commandEnv *CommandEnv, diskType storage.Dis |
|
|
|
return err |
|
|
|
} |
|
|
|
|
|
|
|
// balance readable hdd volumes
|
|
|
|
// fmt.Fprintf(os.Stdout, "\nbalance collection %s readable hdd volumes\n", collection)
|
|
|
|
// balance readable volumes
|
|
|
|
for _, n := range nodes { |
|
|
|
n.selectVolumes(func(v *master_pb.VolumeInformationMessage) bool { |
|
|
|
if collection != "ALL_COLLECTIONS" { |
|
|
@ -176,6 +175,25 @@ func collectVolumeServersByDc(t *master_pb.TopologyInfo, selectedDataCenter stri |
|
|
|
return |
|
|
|
} |
|
|
|
|
|
|
|
func collectVolumeDiskTypes(t *master_pb.TopologyInfo) (diskTypes []storage.DiskType) { |
|
|
|
knownTypes := make(map[string]bool) |
|
|
|
for _, dc := range t.DataCenterInfos { |
|
|
|
for _, r := range dc.RackInfos { |
|
|
|
for _, dn := range r.DataNodeInfos { |
|
|
|
for _, vi := range dn.VolumeInfos { |
|
|
|
if _, found := knownTypes[vi.DiskType]; !found { |
|
|
|
knownTypes[vi.DiskType] = true |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
for diskType, _ := range knownTypes { |
|
|
|
diskTypes = append(diskTypes, storage.ToDiskType(diskType)) |
|
|
|
} |
|
|
|
return |
|
|
|
} |
|
|
|
|
|
|
|
type Node struct { |
|
|
|
info *master_pb.DataNodeInfo |
|
|
|
selectedVolumes map[uint32]*master_pb.VolumeInformationMessage |
|
|
|