From 01b3125815c1d6abdedf8a8e6cf41e45ba20413c Mon Sep 17 00:00:00 2001 From: Konstantin Lebedev <9497591+kmlebedev@users.noreply.github.com> Date: Fri, 20 Feb 2026 02:30:59 +0500 Subject: [PATCH] [shell]: volume balance capacity by min volume density (#8026) volume balance by min volume density and active volumes --- weed/server/volume_grpc_copy.go | 5 +- weed/shell/command_volume_balance.go | 153 ++++++++++++++++++++++----- weed/shell/commands.go | 1 + weed/storage/disk_location.go | 6 +- weed/util/bytes.go | 2 + weed/util/minfreespace.go | 14 +++ 6 files changed, 149 insertions(+), 32 deletions(-) diff --git a/weed/server/volume_grpc_copy.go b/weed/server/volume_grpc_copy.go index 5537cf720..a6c0e5a20 100644 --- a/weed/server/volume_grpc_copy.go +++ b/weed/server/volume_grpc_copy.go @@ -67,10 +67,11 @@ func (vs *VolumeServer) VolumeCopy(req *volume_server_pb.VolumeCopyRequest, stre diskType = req.DiskType } location := vs.store.FindFreeLocation(func(location *storage.DiskLocation) bool { - return location.DiskType == types.ToDiskType(diskType) + return location.DiskType == types.ToDiskType(diskType) && + location.AvailableSpace.Load() > volFileInfoResp.DatFileSize }) if location == nil { - return fmt.Errorf("no space left for disk type %s", types.ToDiskType(diskType).ReadableString()) + return fmt.Errorf("%s %s", util.ErrVolumeNoSpaceLeft, types.ToDiskType(diskType).ReadableString()) } dataBaseFileName = storage.VolumeFileName(location.Directory, volFileInfoResp.Collection, int(req.VolumeId)) diff --git a/weed/shell/command_volume_balance.go b/weed/shell/command_volume_balance.go index 1a2abf1d0..36cd38051 100644 --- a/weed/shell/command_volume_balance.go +++ b/weed/shell/command_volume_balance.go @@ -4,6 +4,7 @@ import ( "cmp" "flag" "fmt" + "github.com/seaweedfs/seaweedfs/weed/util" "io" "os" "regexp" @@ -21,6 +22,11 @@ import ( "github.com/seaweedfs/seaweedfs/weed/storage/needle" ) +const ( + thresholdVolumeSize = 1.01 + countZeroSelectedVolumes = 0.5 +) + func init() { Commands = append(Commands, &commandVolumeBalance{}) } @@ -28,7 +34,7 @@ func init() { type commandVolumeBalance struct { volumeSizeLimitMb uint64 commandEnv *CommandEnv - writable bool + volumeByActive *bool applyBalancing bool } @@ -84,22 +90,38 @@ func (c *commandVolumeBalance) HasTag(CommandTag) bool { } func (c *commandVolumeBalance) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { - + allowedVolumeBy := map[string]*bool{ + "ALL": nil, + "ACTIVE": new(bool), + "FULL": new(bool), + } + *allowedVolumeBy["ACTIVE"] = true balanceCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) + verbose := balanceCommand.Bool("v", false, "verbose mode") collection := balanceCommand.String("collection", "ALL_COLLECTIONS", "collection name, or use \"ALL_COLLECTIONS\" across collections, \"EACH_COLLECTION\" for each collection") dc := balanceCommand.String("dataCenter", "", "only apply the balancing for this dataCenter") racks := balanceCommand.String("racks", "", "only apply the balancing for this racks") nodes := balanceCommand.String("nodes", "", "only apply the balancing for this nodes") - writable := balanceCommand.Bool("writable", false, "only apply the balancing for writable volumes") noLock := balanceCommand.Bool("noLock", false, "do not lock the admin shell at one's own risk") applyBalancing := balanceCommand.Bool("apply", false, "apply the balancing plan.") // TODO: remove this alias applyBalancingAlias := balanceCommand.Bool("force", false, "apply the balancing plan (alias for -apply)") + balanceCommand.Func("volumeBy", "only apply the balancing for ALL volumes and ACTIVE or FULL", func(flagValue string) error { + if flagValue == "" { + return nil + } + for allowed, volumeBy := range allowedVolumeBy { + if flagValue == allowed { + c.volumeByActive = volumeBy + return nil + } + } + return fmt.Errorf("use \"ALL\", \"ACTIVE\" or \"FULL\"") + }) if err = balanceCommand.Parse(args); err != nil { return nil } handleDeprecatedForceFlag(writer, balanceCommand, applyBalancingAlias, applyBalancing) - c.writable = *writable c.applyBalancing = *applyBalancing infoAboutSimulationMode(writer, c.applyBalancing, "-apply") @@ -111,6 +133,7 @@ func (c *commandVolumeBalance) Do(args []string, commandEnv *CommandEnv, writer return } } + commandEnv.verbose = *verbose c.commandEnv = commandEnv // collect topology information @@ -182,13 +205,10 @@ func (c *commandVolumeBalance) balanceVolumeServersByDiskType(diskType types.Dis if v.DiskType != string(diskType) { return false } - if c.writable && v.Size > c.volumeSizeLimitMb { - return false - } - return true + return selectVolumesByActive(v.Size, c.volumeByActive, c.volumeSizeLimitMb) }) } - if err := balanceSelectedVolume(c.commandEnv, diskType, volumeReplicas, nodes, sortWritableVolumes, c.applyBalancing); err != nil { + if err := balanceSelectedVolume(c.commandEnv, diskType, volumeReplicas, nodes, sortWritableVolumes, c.volumeSizeLimitMb, c.applyBalancing); err != nil { return err } @@ -246,6 +266,25 @@ type Node struct { } type CapacityFunc func(*master_pb.DataNodeInfo) float64 +type DensityFunc func(*master_pb.DataNodeInfo) (float64, uint64) + +func capacityByMinVolumeDensity(diskType types.DiskType, volumeSizeLimitMb uint64) DensityFunc { + return func(info *master_pb.DataNodeInfo) (float64, uint64) { + diskInfo, found := info.DiskInfos[string(diskType)] + if !found { + return 0, 0 + } + var volumeSizes uint64 + for _, volumeInfo := range diskInfo.VolumeInfos { + volumeSizes += volumeInfo.Size + } + if volumeSizeLimitMb == 0 { + volumeSizeLimitMb = util.VolumeSizeLimitGB * util.KiByte + } + usedVolumeCount := volumeSizes / (volumeSizeLimitMb * util.MiByte) + return float64(diskInfo.MaxVolumeCount - int64(usedVolumeCount)), usedVolumeCount + } +} func capacityByMaxVolumeCount(diskType types.DiskType) CapacityFunc { return func(info *master_pb.DataNodeInfo) float64 { @@ -275,12 +314,27 @@ func capacityByFreeVolumeCount(diskType types.DiskType) CapacityFunc { } } -func (n *Node) localVolumeRatio(capacityFunc CapacityFunc) float64 { - return float64(len(n.selectedVolumes)) / capacityFunc(n.info) +func (n *Node) localVolumeDensityRatio(capacityFunc DensityFunc) float64 { + capacity, selectedVolumes := capacityFunc(n.info) + if capacity == 0 { + return 0 + } + if selectedVolumes == 0 { + return countZeroSelectedVolumes / capacity + } + return float64(selectedVolumes) / capacity } -func (n *Node) localVolumeNextRatio(capacityFunc CapacityFunc) float64 { - return float64(len(n.selectedVolumes)+1) / capacityFunc(n.info) +func (n *Node) localVolumeDensityNextRatio(capacityFunc DensityFunc) float64 { + capacity, selectedVolumes := capacityFunc(n.info) + if capacity == 0 { + return 0 + } + return float64(selectedVolumes+1) / capacity +} + +func (n *Node) localVolumeRatio(capacityFunc CapacityFunc) float64 { + return float64(len(n.selectedVolumes)) / capacityFunc(n.info) } func (n *Node) isOneVolumeOnly() bool { @@ -312,32 +366,51 @@ func sortWritableVolumes(volumes []*master_pb.VolumeInformationMessage) { }) } -func balanceSelectedVolume(commandEnv *CommandEnv, diskType types.DiskType, volumeReplicas map[uint32][]*VolumeReplica, nodes []*Node, sortCandidatesFn func(volumes []*master_pb.VolumeInformationMessage), applyBalancing bool) (err error) { - selectedVolumeCount, volumeMaxCount := 0, float64(0) +func selectVolumesByActive(volumeSize uint64, volumeByActive *bool, volumeSizeLimitMb uint64) bool { + if volumeByActive == nil { + return true + } + if uint64(float64(volumeSize)*thresholdVolumeSize) < volumeSizeLimitMb*util.MiByte { + return *volumeByActive + } else { + return !(*volumeByActive) + } +} + +func balanceSelectedVolume(commandEnv *CommandEnv, diskType types.DiskType, volumeReplicas map[uint32][]*VolumeReplica, nodes []*Node, sortCandidatesFn func(volumes []*master_pb.VolumeInformationMessage), volumeSizeLimitMb uint64, applyBalancing bool) (err error) { + selectedVolumeCount, volumeCapacities := uint64(0), float64(0) var nodesWithCapacity []*Node - capacityFunc := capacityByMaxVolumeCount(diskType) + if volumeSizeLimitMb == 0 { + volumeSizeLimitMb = util.VolumeSizeLimitGB * util.KiByte + } + capacityFunc := capacityByMinVolumeDensity(diskType, volumeSizeLimitMb) for _, dn := range nodes { - selectedVolumeCount += len(dn.selectedVolumes) - capacity := capacityFunc(dn.info) + capacity, volumeCount := capacityFunc(dn.info) if capacity > 0 { nodesWithCapacity = append(nodesWithCapacity, dn) } - volumeMaxCount += capacity + volumeCapacities += capacity + selectedVolumeCount += volumeCount } - - idealVolumeRatio := float64(selectedVolumeCount) / volumeMaxCount + if volumeCapacities == 0 { + return nil + } + idealVolumeRatio := float64(selectedVolumeCount) / volumeCapacities hasMoved := true - // fmt.Fprintf(os.Stdout, " total %d volumes, max %d volumes, idealVolumeRatio %f\n", selectedVolumeCount, volumeMaxCount, idealVolumeRatio) - + if commandEnv != nil && commandEnv.verbose { + fmt.Fprintf(os.Stdout, "selected nodes %d, volumes:%d, cap:%d, idealVolumeRatio %f\n", len(nodesWithCapacity), selectedVolumeCount, int64(volumeCapacities), idealVolumeRatio*100) + } for hasMoved { hasMoved = false slices.SortFunc(nodesWithCapacity, func(a, b *Node) int { - return cmp.Compare(a.localVolumeRatio(capacityFunc), b.localVolumeRatio(capacityFunc)) + return cmp.Compare(a.localVolumeDensityRatio(capacityFunc), b.localVolumeDensityRatio(capacityFunc)) }) if len(nodesWithCapacity) == 0 { - fmt.Printf("no volume server found with capacity for %s", diskType.ReadableString()) + if commandEnv != nil && commandEnv.verbose { + fmt.Fprintf(os.Stdout, "no volume server found with capacity for %s", diskType.ReadableString()) + } return nil } @@ -345,6 +418,9 @@ func balanceSelectedVolume(commandEnv *CommandEnv, diskType types.DiskType, volu var fullNodeIndex int for fullNodeIndex = len(nodesWithCapacity) - 1; fullNodeIndex >= 0; fullNodeIndex-- { fullNode = nodesWithCapacity[fullNodeIndex] + if len(fullNode.selectedVolumes) == 0 { + continue + } if !fullNode.isOneVolumeOnly() { break } @@ -353,15 +429,34 @@ func balanceSelectedVolume(commandEnv *CommandEnv, diskType types.DiskType, volu for _, v := range fullNode.selectedVolumes { candidateVolumes = append(candidateVolumes, v) } + if fullNodeIndex == -1 { + if commandEnv != nil && commandEnv.verbose { + fmt.Fprintf(os.Stdout, "no nodes with capacity found for %s, nodes %d", diskType.ReadableString(), len(nodesWithCapacity)) + } + return nil + } sortCandidatesFn(candidateVolumes) for _, emptyNode := range nodesWithCapacity[:fullNodeIndex] { - if !(fullNode.localVolumeRatio(capacityFunc) > idealVolumeRatio && emptyNode.localVolumeNextRatio(capacityFunc) <= idealVolumeRatio) { - // no more volume servers with empty slots + if !(fullNode.localVolumeDensityNextRatio(capacityFunc) > idealVolumeRatio && emptyNode.localVolumeDensityNextRatio(capacityFunc) <= idealVolumeRatio) { + if commandEnv != nil && commandEnv.verbose { + fmt.Printf("no more volume servers with empty slots %s, idealVolumeRatio %f\n", emptyNode.info.Id, idealVolumeRatio) + } break } - fmt.Fprintf(os.Stdout, "%s %.2f %.2f:%.2f\t", diskType.ReadableString(), idealVolumeRatio, fullNode.localVolumeRatio(capacityFunc), emptyNode.localVolumeNextRatio(capacityFunc)) + fmt.Fprintf(os.Stdout, "%s %.2f %.2f:%.2f\t", diskType.ReadableString(), idealVolumeRatio, + fullNode.localVolumeDensityRatio(capacityFunc), emptyNode.localVolumeDensityNextRatio(capacityFunc)) + if commandEnv != nil && commandEnv.verbose { + fmt.Fprintf(os.Stdout, "%s %.1f %.1f:%.1f\t", diskType.ReadableString(), idealVolumeRatio*100, + fullNode.localVolumeDensityRatio(capacityFunc)*100, emptyNode.localVolumeDensityNextRatio(capacityFunc)*100) + } hasMoved, err = attemptToMoveOneVolume(commandEnv, volumeReplicas, fullNode, candidateVolumes, emptyNode, applyBalancing) if err != nil { + if commandEnv != nil && commandEnv.verbose { + fmt.Fprintf(os.Stdout, "attempt to move one volume error %+v\n", err) + } + if strings.Contains(err.Error(), util.ErrVolumeNoSpaceLeft) { + continue + } return } if hasMoved { @@ -420,7 +515,7 @@ func moveVolume(commandEnv *CommandEnv, v *master_pb.VolumeInformationMessage, f } fmt.Fprintf(os.Stdout, " moving %s volume %s%d %s => %s\n", v.DiskType, collectionPrefix, v.Id, fullNode.info.Id, emptyNode.info.Id) if applyChange { - return LiveMoveVolume(commandEnv.option.GrpcDialOption, os.Stderr, needle.VolumeId(v.Id), pb.NewServerAddressFromDataNode(fullNode.info), pb.NewServerAddressFromDataNode(emptyNode.info), 5*time.Second, v.DiskType, 0, false) + return LiveMoveVolume(commandEnv.option.GrpcDialOption, os.Stderr, needle.VolumeId(v.Id), pb.NewServerAddressFromDataNode(fullNode.info), pb.NewServerAddressFromDataNode(emptyNode.info), 5*time.Second, v.DiskType, 0, v.ReadOnly) } return nil } diff --git a/weed/shell/commands.go b/weed/shell/commands.go index 55a09e392..d34204878 100644 --- a/weed/shell/commands.go +++ b/weed/shell/commands.go @@ -38,6 +38,7 @@ type CommandEnv struct { option *ShellOptions locker *exclusive_locks.ExclusiveLocker noLock bool + verbose bool } func NewCommandEnv(options *ShellOptions) *CommandEnv { diff --git a/weed/storage/disk_location.go b/weed/storage/disk_location.go index c7a9c82f2..3935861c8 100644 --- a/weed/storage/disk_location.go +++ b/weed/storage/disk_location.go @@ -9,6 +9,7 @@ import ( "strconv" "strings" "sync" + "sync/atomic" "time" "github.com/google/uuid" @@ -33,6 +34,7 @@ type DiskLocation struct { MaxVolumeCount int32 OriginalMaxVolumeCount int32 MinFreeSpace util.MinFreeSpace + AvailableSpace atomic.Uint64 volumes map[needle.VolumeId]*Volume volumesLock sync.RWMutex @@ -513,10 +515,12 @@ func (l *DiskLocation) UnUsedSpace(volumeSizeLimit uint64) (unUsedSpace uint64) func (l *DiskLocation) CheckDiskSpace() { if dir, e := filepath.Abs(l.Directory); e == nil { s := stats.NewDiskStatus(dir) + available := l.MinFreeSpace.AvailableSpace(s.Free, s.All) stats.VolumeServerResourceGauge.WithLabelValues(l.Directory, "all").Set(float64(s.All)) stats.VolumeServerResourceGauge.WithLabelValues(l.Directory, "used").Set(float64(s.Used)) stats.VolumeServerResourceGauge.WithLabelValues(l.Directory, "free").Set(float64(s.Free)) - + stats.VolumeServerResourceGauge.WithLabelValues(l.Directory, "avail").Set(float64(available)) + l.AvailableSpace.Store(available) isLow, desc := l.MinFreeSpace.IsLow(s.Free, s.PercentFree) if isLow != l.isDiskSpaceLow { l.isDiskSpaceLow = !l.isDiskSpaceLow diff --git a/weed/util/bytes.go b/weed/util/bytes.go index 482dc3a48..faf7df916 100644 --- a/weed/util/bytes.go +++ b/weed/util/bytes.go @@ -262,3 +262,5 @@ const ( PByte = TByte * 1000 EByte = PByte * 1000 ) + +const ErrVolumeNoSpaceLeft = "no space left for disk type" diff --git a/weed/util/minfreespace.go b/weed/util/minfreespace.go index 9fd041ddc..d06981338 100644 --- a/weed/util/minfreespace.go +++ b/weed/util/minfreespace.go @@ -44,6 +44,20 @@ func (s MinFreeSpace) IsLow(freeBytes uint64, freePercent float32) (yes bool, de return false, "" } +func (s MinFreeSpace) AvailableSpace(freeBytes uint64, totalBytes uint64) uint64 { + var minFreeSpaceBytes uint64 + switch s.Type { + case AsPercent: + minFreeSpaceBytes = uint64((float32(totalBytes) * s.Percent) / 100) + case AsBytes: + minFreeSpaceBytes = s.Bytes + } + if minFreeSpaceBytes > freeBytes { + return 0 + } + return freeBytes - minFreeSpaceBytes +} + // String returns a string representation of MinFreeSpace. func (s MinFreeSpace) String() string { switch s.Type {