diff --git a/weed/shell/command_volume_balance.go b/weed/shell/command_volume_balance.go index 8351415e2..9805d7678 100644 --- a/weed/shell/command_volume_balance.go +++ b/weed/shell/command_volume_balance.go @@ -7,6 +7,7 @@ import ( "io" "os" "strings" + "sync" "time" "github.com/seaweedfs/seaweedfs/weed/pb" @@ -24,6 +25,16 @@ func init() { } type commandVolumeBalance struct { + commandEnv *CommandEnv + applyBalancing *bool + parallelBalancing *bool + volumeServers []*Node + volumeReplicas map[uint32][]*VolumeReplica + diskTypes []types.DiskType + lock sync.RWMutex + wg sync.WaitGroup + chErrc chan error + chDone chan struct{} } func (c *commandVolumeBalance) Name() string { @@ -77,11 +88,12 @@ func (c *commandVolumeBalance) Do(args []string, commandEnv *CommandEnv, writer racks := balanceCommand.String("racks", "", "only apply the balancing for this racks") nodes := balanceCommand.String("nodes", "", "only apply the balancing for this nodes") noLock := balanceCommand.Bool("noLock", false, "do not lock the admin shell at one's own risk") - applyBalancing := balanceCommand.Bool("force", false, "apply the balancing plan.") + c.applyBalancing = balanceCommand.Bool("force", false, "apply the balancing plan.") + c.parallelBalancing = balanceCommand.Bool("parallel", false, "do parallel balancing") if err = balanceCommand.Parse(args); err != nil { return nil } - infoAboutSimulationMode(writer, *applyBalancing, "-force") + infoAboutSimulationMode(writer, *c.parallelBalancing, "-force") if *noLock { commandEnv.noLock = true @@ -97,22 +109,23 @@ func (c *commandVolumeBalance) Do(args []string, commandEnv *CommandEnv, writer return err } - volumeServers := collectVolumeServersByDcRackNode(topologyInfo, *dc, *racks, *nodes) - volumeReplicas, _ := collectVolumeReplicaLocations(topologyInfo) - diskTypes := collectVolumeDiskTypes(topologyInfo) + c.commandEnv = commandEnv + c.volumeServers = collectVolumeServersByDcRackNode(topologyInfo, *dc, *racks, *nodes) + c.volumeReplicas, _ = collectVolumeReplicaLocations(topologyInfo) + c.diskTypes = collectVolumeDiskTypes(topologyInfo) if *collection == "EACH_COLLECTION" { collections, err := ListCollectionNames(commandEnv, true, false) if err != nil { return err } - for _, c := range collections { - if err = balanceVolumeServers(commandEnv, diskTypes, volumeReplicas, volumeServers, c, *applyBalancing); err != nil { + for _, eachCollection := range collections { + if err = c.balanceVolumeServers(eachCollection); err != nil { return err } } } else { - if err = balanceVolumeServers(commandEnv, diskTypes, volumeReplicas, volumeServers, *collection, *applyBalancing); err != nil { + if err = c.balanceVolumeServers(*collection); err != nil { return err } } @@ -120,10 +133,9 @@ func (c *commandVolumeBalance) Do(args []string, commandEnv *CommandEnv, writer return nil } -func balanceVolumeServers(commandEnv *CommandEnv, diskTypes []types.DiskType, volumeReplicas map[uint32][]*VolumeReplica, nodes []*Node, collection string, applyBalancing bool) error { - - for _, diskType := range diskTypes { - if err := balanceVolumeServersByDiskType(commandEnv, diskType, volumeReplicas, nodes, collection, applyBalancing); err != nil { +func (c *commandVolumeBalance) balanceVolumeServers(collection string) error { + for _, diskType := range c.diskTypes { + if err := c.balanceVolumeServersByDiskType(diskType, collection); err != nil { return err } } @@ -131,9 +143,8 @@ func balanceVolumeServers(commandEnv *CommandEnv, diskTypes []types.DiskType, vo } -func balanceVolumeServersByDiskType(commandEnv *CommandEnv, diskType types.DiskType, volumeReplicas map[uint32][]*VolumeReplica, nodes []*Node, collection string, applyBalancing bool) error { - - for _, n := range nodes { +func (c *commandVolumeBalance) balanceVolumeServersByDiskType(diskType types.DiskType, collection string) error { + for _, n := range c.volumeServers { n.selectVolumes(func(v *master_pb.VolumeInformationMessage) bool { if collection != "ALL_COLLECTIONS" { if v.Collection != collection { @@ -143,7 +154,7 @@ func balanceVolumeServersByDiskType(commandEnv *CommandEnv, diskType types.DiskT return v.DiskType == string(diskType) }) } - if err := balanceSelectedVolume(commandEnv, diskType, volumeReplicas, nodes, sortWritableVolumes, applyBalancing); err != nil { + if err := c.balanceSelectedVolume(diskType); err != nil { return err } @@ -263,11 +274,18 @@ func sortWritableVolumes(volumes []*master_pb.VolumeInformationMessage) { }) } -func balanceSelectedVolume(commandEnv *CommandEnv, diskType types.DiskType, volumeReplicas map[uint32][]*VolumeReplica, nodes []*Node, sortCandidatesFn func(volumes []*master_pb.VolumeInformationMessage), applyBalancing bool) (err error) { +func (c *commandVolumeBalance) balanceSelectedVolume(diskType types.DiskType) (err error) { selectedVolumeCount, volumeMaxCount := 0, float64(0) var nodesWithCapacity []*Node capacityFunc := capacityByMaxVolumeCount(diskType) - for _, dn := range nodes { + if *c.parallelBalancing { + c.chErrc = make(chan error, 1) + c.chDone = make(chan struct{}) + defer close(c.chErrc) + defer close(c.chDone) + } + + for _, dn := range c.volumeServers { selectedVolumeCount += len(dn.selectedVolumes) capacity := capacityFunc(dn.info) if capacity > 0 { @@ -281,7 +299,6 @@ func balanceSelectedVolume(commandEnv *CommandEnv, diskType types.DiskType, volu hasMoved := true // fmt.Fprintf(os.Stdout, " total %d volumes, max %d volumes, idealVolumeRatio %f\n", selectedVolumeCount, volumeMaxCount, idealVolumeRatio) - for hasMoved { hasMoved = false slices.SortFunc(nodesWithCapacity, func(a, b *Node) int { @@ -304,30 +321,43 @@ func balanceSelectedVolume(commandEnv *CommandEnv, diskType types.DiskType, volu for _, v := range fullNode.selectedVolumes { candidateVolumes = append(candidateVolumes, v) } - sortCandidatesFn(candidateVolumes) - for _, emptyNode := range nodesWithCapacity[:fullNodeIndex] { - if !(fullNode.localVolumeRatio(capacityFunc) > idealVolumeRatio && emptyNode.localVolumeNextRatio(capacityFunc) <= idealVolumeRatio) { - // no more volume servers with empty slots - break - } - fmt.Fprintf(os.Stdout, "%s %.2f %.2f:%.2f\t", diskType.ReadableString(), idealVolumeRatio, fullNode.localVolumeRatio(capacityFunc), emptyNode.localVolumeNextRatio(capacityFunc)) - hasMoved, err = attemptToMoveOneVolume(commandEnv, volumeReplicas, fullNode, candidateVolumes, emptyNode, applyBalancing) - if err != nil { - return - } - if hasMoved { - // moved one volume - break + sortWritableVolumes(candidateVolumes) + c.wg.Add(1) + go func() { + defer c.wg.Done() + for _, emptyNode := range nodesWithCapacity[:fullNodeIndex] { + if !(fullNode.localVolumeRatio(capacityFunc) > idealVolumeRatio && emptyNode.localVolumeNextRatio(capacityFunc) <= idealVolumeRatio) { + // no more volume servers with empty slots + break + } + fmt.Fprintf(os.Stdout, "%s %.2f %.2f:%.2f\t", diskType.ReadableString(), idealVolumeRatio, fullNode.localVolumeRatio(capacityFunc), emptyNode.localVolumeNextRatio(capacityFunc)) + hasMoved, err = c.attemptToMoveOneVolume(fullNode, candidateVolumes, emptyNode) + if err != nil { + return + } + if hasMoved { + // moved one volume + break + } } + }() + + if !*c.parallelBalancing { + c.wg.Wait() } } + + if *c.parallelBalancing { + c.wg.Wait() + } + return nil } -func attemptToMoveOneVolume(commandEnv *CommandEnv, volumeReplicas map[uint32][]*VolumeReplica, fullNode *Node, candidateVolumes []*master_pb.VolumeInformationMessage, emptyNode *Node, applyBalancing bool) (hasMoved bool, err error) { +func (c *commandVolumeBalance) attemptToMoveOneVolume(fullNode *Node, candidateVolumes []*master_pb.VolumeInformationMessage, emptyNode *Node) (hasMoved bool, err error) { for _, v := range candidateVolumes { - hasMoved, err = maybeMoveOneVolume(commandEnv, volumeReplicas, fullNode, v, emptyNode, applyBalancing) + hasMoved, err = maybeMoveOneVolume(c.commandEnv, c.volumeReplicas, fullNode, v, emptyNode, *c.applyBalancing) if err != nil { return } diff --git a/weed/shell/command_volume_balance_test.go b/weed/shell/command_volume_balance_test.go index 4e60f6ff8..77902ff6c 100644 --- a/weed/shell/command_volume_balance_test.go +++ b/weed/shell/command_volume_balance_test.go @@ -4,6 +4,7 @@ import ( "fmt" "github.com/seaweedfs/seaweedfs/weed/storage/types" "github.com/stretchr/testify/assert" + "sync" "testing" "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" @@ -254,8 +255,18 @@ func TestBalance(t *testing.T) { volumeServers := collectVolumeServersByDcRackNode(topologyInfo, "", "", "") volumeReplicas, _ := collectVolumeReplicaLocations(topologyInfo) diskTypes := collectVolumeDiskTypes(topologyInfo) - - if err := balanceVolumeServers(nil, diskTypes, volumeReplicas, volumeServers, "ALL_COLLECTIONS", false); err != nil { + applyBalancing := false + parallelBalancing := false + c := commandVolumeBalance{ + commandEnv: nil, + lock: sync.RWMutex{}, + parallelBalancing: ¶llelBalancing, + applyBalancing: &applyBalancing, + diskTypes: diskTypes, + volumeServers: volumeServers, + volumeReplicas: volumeReplicas, + } + if err := c.balanceVolumeServers("ALL_COLLECTIONS"); err != nil { t.Errorf("balance: %v", err) } diff --git a/weed/shell/command_volume_move.go b/weed/shell/command_volume_move.go index cf9991695..c4ce5dfaf 100644 --- a/weed/shell/command_volume_move.go +++ b/weed/shell/command_volume_move.go @@ -4,6 +4,7 @@ import ( "context" "flag" "fmt" + "github.com/seaweedfs/seaweedfs/weed/util" "io" "log" "time" @@ -169,7 +170,7 @@ func copyVolume(grpcDialOption grpc.DialOption, writer io.Writer, volumeId needl if resp.LastAppendAtNs != 0 { lastAppendAtNs = resp.LastAppendAtNs } else { - fmt.Fprintf(writer, "volume %d processed %d bytes\n", volumeId, resp.ProcessedBytes) + fmt.Fprintf(writer, "%s => %s volume %d processed %s\n", sourceVolumeServer, targetVolumeServer, volumeId, util.BytesToHumanReadable(uint64(resp.ProcessedBytes))) } }