Browse Source

refactor add parallelBalancing

pull/6222/head
Konstantin Lebedev 1 month ago
parent
commit
46c9cada19
  1. 76
      weed/shell/command_volume_balance.go
  2. 15
      weed/shell/command_volume_balance_test.go
  3. 3
      weed/shell/command_volume_move.go

76
weed/shell/command_volume_balance.go

@ -7,6 +7,7 @@ import (
"io"
"os"
"strings"
"sync"
"time"
"github.com/seaweedfs/seaweedfs/weed/pb"
@ -24,6 +25,16 @@ func init() {
}
type commandVolumeBalance struct {
commandEnv *CommandEnv
applyBalancing *bool
parallelBalancing *bool
volumeServers []*Node
volumeReplicas map[uint32][]*VolumeReplica
diskTypes []types.DiskType
lock sync.RWMutex
wg sync.WaitGroup
chErrc chan error
chDone chan struct{}
}
func (c *commandVolumeBalance) Name() string {
@ -77,11 +88,12 @@ func (c *commandVolumeBalance) Do(args []string, commandEnv *CommandEnv, writer
racks := balanceCommand.String("racks", "", "only apply the balancing for this racks")
nodes := balanceCommand.String("nodes", "", "only apply the balancing for this nodes")
noLock := balanceCommand.Bool("noLock", false, "do not lock the admin shell at one's own risk")
applyBalancing := balanceCommand.Bool("force", false, "apply the balancing plan.")
c.applyBalancing = balanceCommand.Bool("force", false, "apply the balancing plan.")
c.parallelBalancing = balanceCommand.Bool("parallel", false, "do parallel balancing")
if err = balanceCommand.Parse(args); err != nil {
return nil
}
infoAboutSimulationMode(writer, *applyBalancing, "-force")
infoAboutSimulationMode(writer, *c.parallelBalancing, "-force")
if *noLock {
commandEnv.noLock = true
@ -97,22 +109,23 @@ func (c *commandVolumeBalance) Do(args []string, commandEnv *CommandEnv, writer
return err
}
volumeServers := collectVolumeServersByDcRackNode(topologyInfo, *dc, *racks, *nodes)
volumeReplicas, _ := collectVolumeReplicaLocations(topologyInfo)
diskTypes := collectVolumeDiskTypes(topologyInfo)
c.commandEnv = commandEnv
c.volumeServers = collectVolumeServersByDcRackNode(topologyInfo, *dc, *racks, *nodes)
c.volumeReplicas, _ = collectVolumeReplicaLocations(topologyInfo)
c.diskTypes = collectVolumeDiskTypes(topologyInfo)
if *collection == "EACH_COLLECTION" {
collections, err := ListCollectionNames(commandEnv, true, false)
if err != nil {
return err
}
for _, c := range collections {
if err = balanceVolumeServers(commandEnv, diskTypes, volumeReplicas, volumeServers, c, *applyBalancing); err != nil {
for _, eachCollection := range collections {
if err = c.balanceVolumeServers(eachCollection); err != nil {
return err
}
}
} else {
if err = balanceVolumeServers(commandEnv, diskTypes, volumeReplicas, volumeServers, *collection, *applyBalancing); err != nil {
if err = c.balanceVolumeServers(*collection); err != nil {
return err
}
}
@ -120,10 +133,9 @@ func (c *commandVolumeBalance) Do(args []string, commandEnv *CommandEnv, writer
return nil
}
func balanceVolumeServers(commandEnv *CommandEnv, diskTypes []types.DiskType, volumeReplicas map[uint32][]*VolumeReplica, nodes []*Node, collection string, applyBalancing bool) error {
for _, diskType := range diskTypes {
if err := balanceVolumeServersByDiskType(commandEnv, diskType, volumeReplicas, nodes, collection, applyBalancing); err != nil {
func (c *commandVolumeBalance) balanceVolumeServers(collection string) error {
for _, diskType := range c.diskTypes {
if err := c.balanceVolumeServersByDiskType(diskType, collection); err != nil {
return err
}
}
@ -131,9 +143,8 @@ func balanceVolumeServers(commandEnv *CommandEnv, diskTypes []types.DiskType, vo
}
func balanceVolumeServersByDiskType(commandEnv *CommandEnv, diskType types.DiskType, volumeReplicas map[uint32][]*VolumeReplica, nodes []*Node, collection string, applyBalancing bool) error {
for _, n := range nodes {
func (c *commandVolumeBalance) balanceVolumeServersByDiskType(diskType types.DiskType, collection string) error {
for _, n := range c.volumeServers {
n.selectVolumes(func(v *master_pb.VolumeInformationMessage) bool {
if collection != "ALL_COLLECTIONS" {
if v.Collection != collection {
@ -143,7 +154,7 @@ func balanceVolumeServersByDiskType(commandEnv *CommandEnv, diskType types.DiskT
return v.DiskType == string(diskType)
})
}
if err := balanceSelectedVolume(commandEnv, diskType, volumeReplicas, nodes, sortWritableVolumes, applyBalancing); err != nil {
if err := c.balanceSelectedVolume(diskType); err != nil {
return err
}
@ -263,11 +274,18 @@ func sortWritableVolumes(volumes []*master_pb.VolumeInformationMessage) {
})
}
func balanceSelectedVolume(commandEnv *CommandEnv, diskType types.DiskType, volumeReplicas map[uint32][]*VolumeReplica, nodes []*Node, sortCandidatesFn func(volumes []*master_pb.VolumeInformationMessage), applyBalancing bool) (err error) {
func (c *commandVolumeBalance) balanceSelectedVolume(diskType types.DiskType) (err error) {
selectedVolumeCount, volumeMaxCount := 0, float64(0)
var nodesWithCapacity []*Node
capacityFunc := capacityByMaxVolumeCount(diskType)
for _, dn := range nodes {
if *c.parallelBalancing {
c.chErrc = make(chan error, 1)
c.chDone = make(chan struct{})
defer close(c.chErrc)
defer close(c.chDone)
}
for _, dn := range c.volumeServers {
selectedVolumeCount += len(dn.selectedVolumes)
capacity := capacityFunc(dn.info)
if capacity > 0 {
@ -281,7 +299,6 @@ func balanceSelectedVolume(commandEnv *CommandEnv, diskType types.DiskType, volu
hasMoved := true
// fmt.Fprintf(os.Stdout, " total %d volumes, max %d volumes, idealVolumeRatio %f\n", selectedVolumeCount, volumeMaxCount, idealVolumeRatio)
for hasMoved {
hasMoved = false
slices.SortFunc(nodesWithCapacity, func(a, b *Node) int {
@ -304,14 +321,17 @@ func balanceSelectedVolume(commandEnv *CommandEnv, diskType types.DiskType, volu
for _, v := range fullNode.selectedVolumes {
candidateVolumes = append(candidateVolumes, v)
}
sortCandidatesFn(candidateVolumes)
sortWritableVolumes(candidateVolumes)
c.wg.Add(1)
go func() {
defer c.wg.Done()
for _, emptyNode := range nodesWithCapacity[:fullNodeIndex] {
if !(fullNode.localVolumeRatio(capacityFunc) > idealVolumeRatio && emptyNode.localVolumeNextRatio(capacityFunc) <= idealVolumeRatio) {
// no more volume servers with empty slots
break
}
fmt.Fprintf(os.Stdout, "%s %.2f %.2f:%.2f\t", diskType.ReadableString(), idealVolumeRatio, fullNode.localVolumeRatio(capacityFunc), emptyNode.localVolumeNextRatio(capacityFunc))
hasMoved, err = attemptToMoveOneVolume(commandEnv, volumeReplicas, fullNode, candidateVolumes, emptyNode, applyBalancing)
hasMoved, err = c.attemptToMoveOneVolume(fullNode, candidateVolumes, emptyNode)
if err != nil {
return
}
@ -320,14 +340,24 @@ func balanceSelectedVolume(commandEnv *CommandEnv, diskType types.DiskType, volu
break
}
}
}()
if !*c.parallelBalancing {
c.wg.Wait()
}
}
if *c.parallelBalancing {
c.wg.Wait()
}
return nil
}
func attemptToMoveOneVolume(commandEnv *CommandEnv, volumeReplicas map[uint32][]*VolumeReplica, fullNode *Node, candidateVolumes []*master_pb.VolumeInformationMessage, emptyNode *Node, applyBalancing bool) (hasMoved bool, err error) {
func (c *commandVolumeBalance) attemptToMoveOneVolume(fullNode *Node, candidateVolumes []*master_pb.VolumeInformationMessage, emptyNode *Node) (hasMoved bool, err error) {
for _, v := range candidateVolumes {
hasMoved, err = maybeMoveOneVolume(commandEnv, volumeReplicas, fullNode, v, emptyNode, applyBalancing)
hasMoved, err = maybeMoveOneVolume(c.commandEnv, c.volumeReplicas, fullNode, v, emptyNode, *c.applyBalancing)
if err != nil {
return
}

15
weed/shell/command_volume_balance_test.go

@ -4,6 +4,7 @@ import (
"fmt"
"github.com/seaweedfs/seaweedfs/weed/storage/types"
"github.com/stretchr/testify/assert"
"sync"
"testing"
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
@ -254,8 +255,18 @@ func TestBalance(t *testing.T) {
volumeServers := collectVolumeServersByDcRackNode(topologyInfo, "", "", "")
volumeReplicas, _ := collectVolumeReplicaLocations(topologyInfo)
diskTypes := collectVolumeDiskTypes(topologyInfo)
if err := balanceVolumeServers(nil, diskTypes, volumeReplicas, volumeServers, "ALL_COLLECTIONS", false); err != nil {
applyBalancing := false
parallelBalancing := false
c := commandVolumeBalance{
commandEnv: nil,
lock: sync.RWMutex{},
parallelBalancing: &parallelBalancing,
applyBalancing: &applyBalancing,
diskTypes: diskTypes,
volumeServers: volumeServers,
volumeReplicas: volumeReplicas,
}
if err := c.balanceVolumeServers("ALL_COLLECTIONS"); err != nil {
t.Errorf("balance: %v", err)
}

3
weed/shell/command_volume_move.go

@ -4,6 +4,7 @@ import (
"context"
"flag"
"fmt"
"github.com/seaweedfs/seaweedfs/weed/util"
"io"
"log"
"time"
@ -169,7 +170,7 @@ func copyVolume(grpcDialOption grpc.DialOption, writer io.Writer, volumeId needl
if resp.LastAppendAtNs != 0 {
lastAppendAtNs = resp.LastAppendAtNs
} else {
fmt.Fprintf(writer, "volume %d processed %d bytes\n", volumeId, resp.ProcessedBytes)
fmt.Fprintf(writer, "%s => %s volume %d processed %s\n", sourceVolumeServer, targetVolumeServer, volumeId, util.BytesToHumanReadable(uint64(resp.ProcessedBytes)))
}
}

Loading…
Cancel
Save