From 5272ede271bbdd5d8740935da96d90e91e480efc Mon Sep 17 00:00:00 2001 From: Konstantin Lebedev <9497591+kmlebedev@users.noreply.github.com> Date: Wed, 14 Jan 2026 19:07:42 +0500 Subject: [PATCH 1/6] chore: balance capacity by free, max, disk usage --- weed/shell/command_volume_balance.go | 168 +++++++++++++++---- weed/shell/command_volume_fix_replication.go | 11 +- weed/shell/command_volume_server_evacuate.go | 7 +- weed/shell/command_volume_tier_move.go | 9 +- weed/shell/commands.go | 1 + 5 files changed, 154 insertions(+), 42 deletions(-) diff --git a/weed/shell/command_volume_balance.go b/weed/shell/command_volume_balance.go index 1a2abf1d0..9c21a0d18 100644 --- a/weed/shell/command_volume_balance.go +++ b/weed/shell/command_volume_balance.go @@ -6,7 +6,9 @@ import ( "fmt" "io" "os" + "reflect" "regexp" + "runtime" "strings" "time" @@ -25,10 +27,16 @@ func init() { Commands = append(Commands, &commandVolumeBalance{}) } +var ( + capacityByFunc = capacityByMaxVolumeCount +) + +const funcNameCapByMinVolumeDensity = "capacityByMinVolumeDensity.func1" + type commandVolumeBalance struct { volumeSizeLimitMb uint64 commandEnv *CommandEnv - writable bool + volumeByActive *bool applyBalancing bool } @@ -84,22 +92,55 @@ func (c *commandVolumeBalance) HasTag(CommandTag) bool { } func (c *commandVolumeBalance) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { - + allowedCapacityBy := map[string]CapacityByFunc{ + "MAX_VOLUME_COUNT": capacityByMaxVolumeCount, + "FREE_VOLUME_COUNT": capacityByFreeVolumeCount, + "MIN_VOLUME_DENSITY": capacityByMinVolumeDensity, + } + allowedVolumeBy := map[string]*bool{ + "ALL": nil, + "ACTIVE": new(bool), + "FULL": new(bool), + } + *allowedVolumeBy["ACTIVE"] = true balanceCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) + verbose := balanceCommand.Bool("v", false, "verbose mode") collection := balanceCommand.String("collection", "ALL_COLLECTIONS", "collection name, or use \"ALL_COLLECTIONS\" across collections, \"EACH_COLLECTION\" for each collection") dc := balanceCommand.String("dataCenter", "", "only apply the balancing for this dataCenter") racks := balanceCommand.String("racks", "", "only apply the balancing for this racks") nodes := balanceCommand.String("nodes", "", "only apply the balancing for this nodes") - writable := balanceCommand.Bool("writable", false, "only apply the balancing for writable volumes") noLock := balanceCommand.Bool("noLock", false, "do not lock the admin shell at one's own risk") applyBalancing := balanceCommand.Bool("apply", false, "apply the balancing plan.") // TODO: remove this alias applyBalancingAlias := balanceCommand.Bool("force", false, "apply the balancing plan (alias for -apply)") + balanceCommand.Func("volumeBy", "only apply the balancing for ALL volumes and ACTIVE or FULL", func(flagValue string) error { + if flagValue == "" { + return nil + } + for allowed, volumeBy := range allowedVolumeBy { + if flagValue == allowed { + c.volumeByActive = volumeBy + return nil + } + } + return fmt.Errorf("use \"ALL\", \"ACTIVE\" or \"FULL\"") + }) + balanceCommand.Func("capacityBy", "capacityBy function name use \"MAX_VOLUME_COUNT\", \"FREE_VOLUME_COUNT\" and \"MIN_VOLUME_DENSITY\"", func(flagValue string) error { + if flagValue == "" { + return nil + } + for allowed, allowedCapacityByFunc := range allowedCapacityBy { + if flagValue == allowed { + capacityByFunc = allowedCapacityByFunc + return nil + } + } + return fmt.Errorf("use \"MAX_VOLUME_COUNT\", \"FREE_VOLUME_COUNT\" or \"MIN_VOLUME_DENSITY\"") + }) if err = balanceCommand.Parse(args); err != nil { return nil } handleDeprecatedForceFlag(writer, balanceCommand, applyBalancingAlias, applyBalancing) - c.writable = *writable c.applyBalancing = *applyBalancing infoAboutSimulationMode(writer, c.applyBalancing, "-apply") @@ -111,6 +152,7 @@ func (c *commandVolumeBalance) Do(args []string, commandEnv *CommandEnv, writer return } } + commandEnv.verbose = *verbose c.commandEnv = commandEnv // collect topology information @@ -182,13 +224,10 @@ func (c *commandVolumeBalance) balanceVolumeServersByDiskType(diskType types.Dis if v.DiskType != string(diskType) { return false } - if c.writable && v.Size > c.volumeSizeLimitMb { - return false - } - return true + return selectVolumesByActive(v.Size, c.volumeByActive, c.volumeSizeLimitMb) }) } - if err := balanceSelectedVolume(c.commandEnv, diskType, volumeReplicas, nodes, sortWritableVolumes, c.applyBalancing); err != nil { + if err := balanceSelectedVolume(c.commandEnv, diskType, volumeReplicas, nodes, sortWritableVolumes, c.applyBalancing, c.volumeSizeLimitMb, c.volumeByActive); err != nil { return err } @@ -245,42 +284,76 @@ type Node struct { rack string } -type CapacityFunc func(*master_pb.DataNodeInfo) float64 +type CapacityFunc func(*master_pb.DataNodeInfo) (float64, uint64) +type CapacityByFunc func(diskType types.DiskType, volumeSizeLimitMb uint64) CapacityFunc -func capacityByMaxVolumeCount(diskType types.DiskType) CapacityFunc { - return func(info *master_pb.DataNodeInfo) float64 { +func capacityByMaxVolumeCount(diskType types.DiskType, volumeSizeLimitMb uint64) CapacityFunc { + return func(info *master_pb.DataNodeInfo) (float64, uint64) { diskInfo, found := info.DiskInfos[string(diskType)] if !found { - return 0 + return 0, 0 + } + var volumeSizes uint64 + for _, volumeInfo := range diskInfo.VolumeInfos { + volumeSizes += volumeInfo.Size } var ecShardCount int for _, ecShardInfo := range diskInfo.EcShardInfos { ecShardCount += erasure_coding.GetShardCount(ecShardInfo) } - return float64(diskInfo.MaxVolumeCount) - float64(ecShardCount)/erasure_coding.DataShardsCount + usedVolumeCount := volumeSizes / (volumeSizeLimitMb * 1024 * 1024) + return float64(diskInfo.MaxVolumeCount) - float64(ecShardCount)/erasure_coding.DataShardsCount, usedVolumeCount + } +} + +func capacityByMinVolumeDensity(diskType types.DiskType, volumeSizeLimitMb uint64) CapacityFunc { + return func(info *master_pb.DataNodeInfo) (float64, uint64) { + diskInfo, found := info.DiskInfos[string(diskType)] + if !found { + return 0, 0 + } + var volumeSizes uint64 + for _, volumeInfo := range diskInfo.VolumeInfos { + volumeSizes += volumeInfo.Size + } + usedVolumeCount := volumeSizes / (volumeSizeLimitMb * 1024 * 1024) + return float64(uint64(diskInfo.MaxVolumeCount) - usedVolumeCount), usedVolumeCount } } -func capacityByFreeVolumeCount(diskType types.DiskType) CapacityFunc { - return func(info *master_pb.DataNodeInfo) float64 { +func capacityByFreeVolumeCount(diskType types.DiskType, volumeSizeLimitMb uint64) CapacityFunc { + return func(info *master_pb.DataNodeInfo) (float64, uint64) { diskInfo, found := info.DiskInfos[string(diskType)] if !found { - return 0 + return 0, 0 + } + var volumeSizes uint64 + for _, volumeInfo := range diskInfo.VolumeInfos { + volumeSizes += volumeInfo.Size } var ecShardCount int for _, ecShardInfo := range diskInfo.EcShardInfos { ecShardCount += erasure_coding.GetShardCount(ecShardInfo) } - return float64(diskInfo.MaxVolumeCount-diskInfo.VolumeCount) - float64(ecShardCount)/erasure_coding.DataShardsCount + usedVolumeCount := volumeSizes / (volumeSizeLimitMb * 1024 * 1024) + return float64(diskInfo.MaxVolumeCount-diskInfo.VolumeCount) - float64(ecShardCount)/erasure_coding.DataShardsCount, usedVolumeCount } } func (n *Node) localVolumeRatio(capacityFunc CapacityFunc) float64 { - return float64(len(n.selectedVolumes)) / capacityFunc(n.info) + capacity, used := capacityFunc(n.info) + if strings.HasSuffix(getFuncName(capacityFunc), funcNameCapByMinVolumeDensity) { + return float64(used) / capacity + } + return float64(len(n.selectedVolumes)) / capacity } func (n *Node) localVolumeNextRatio(capacityFunc CapacityFunc) float64 { - return float64(len(n.selectedVolumes)+1) / capacityFunc(n.info) + capacity, used := capacityFunc(n.info) + if strings.HasSuffix(getFuncName(capacityFunc), funcNameCapByMinVolumeDensity) { + return float64(used+1) / capacity + } + return float64(len(n.selectedVolumes)+1) / capacity } func (n *Node) isOneVolumeOnly() bool { @@ -312,24 +385,47 @@ func sortWritableVolumes(volumes []*master_pb.VolumeInformationMessage) { }) } -func balanceSelectedVolume(commandEnv *CommandEnv, diskType types.DiskType, volumeReplicas map[uint32][]*VolumeReplica, nodes []*Node, sortCandidatesFn func(volumes []*master_pb.VolumeInformationMessage), applyBalancing bool) (err error) { - selectedVolumeCount, volumeMaxCount := 0, float64(0) +func selectVolumesByActive(volumeSize uint64, volumeByActive *bool, volumeSizeLimitMb uint64) bool { + if volumeByActive == nil { + return true + } + if (volumeSize*101)/100 < volumeSizeLimitMb*1024*1024 { + return *volumeByActive + } else { + return !(*volumeByActive) + } +} + +func getFuncName(f interface{}) string { + return runtime.FuncForPC(reflect.ValueOf(f).Pointer()).Name() +} + +func balanceSelectedVolume(commandEnv *CommandEnv, diskType types.DiskType, volumeReplicas map[uint32][]*VolumeReplica, nodes []*Node, sortCandidatesFn func(volumes []*master_pb.VolumeInformationMessage), applyBalancing bool, volumeSizeLimitMb uint64, volumeByActive *bool) (err error) { + selectedVolumeCount, volumeCapacities := 0, float64(0) + var volumeUsageOnLimit uint64 var nodesWithCapacity []*Node - capacityFunc := capacityByMaxVolumeCount(diskType) + capacityFunc := capacityByFunc(diskType, volumeSizeLimitMb) for _, dn := range nodes { selectedVolumeCount += len(dn.selectedVolumes) - capacity := capacityFunc(dn.info) + capacity, used := capacityFunc(dn.info) if capacity > 0 { nodesWithCapacity = append(nodesWithCapacity, dn) } - volumeMaxCount += capacity + volumeCapacities += capacity + volumeUsageOnLimit += used + } + var idealVolumeRatio float64 + if strings.HasSuffix(getFuncName(capacityFunc), funcNameCapByMinVolumeDensity) { + idealVolumeRatio = float64(volumeUsageOnLimit) / volumeCapacities + } else { + idealVolumeRatio = float64(selectedVolumeCount) / volumeCapacities } - - idealVolumeRatio := float64(selectedVolumeCount) / volumeMaxCount hasMoved := true - // fmt.Fprintf(os.Stdout, " total %d volumes, max %d volumes, idealVolumeRatio %f\n", selectedVolumeCount, volumeMaxCount, idealVolumeRatio) + if commandEnv.verbose { + fmt.Fprintf(os.Stdout, "selected nodes %d, volumes:%d, max:%d, usage on limit: %d, idealVolumeRatio %f\n", len(nodesWithCapacity), selectedVolumeCount, int64(volumeCapacities), volumeUsageOnLimit, idealVolumeRatio) + } for hasMoved { hasMoved = false @@ -337,7 +433,7 @@ func balanceSelectedVolume(commandEnv *CommandEnv, diskType types.DiskType, volu return cmp.Compare(a.localVolumeRatio(capacityFunc), b.localVolumeRatio(capacityFunc)) }) if len(nodesWithCapacity) == 0 { - fmt.Printf("no volume server found with capacity for %s", diskType.ReadableString()) + fmt.Fprintf(os.Stdout, "no volume server found with capacity for %s", diskType.ReadableString()) return nil } @@ -345,6 +441,9 @@ func balanceSelectedVolume(commandEnv *CommandEnv, diskType types.DiskType, volu var fullNodeIndex int for fullNodeIndex = len(nodesWithCapacity) - 1; fullNodeIndex >= 0; fullNodeIndex-- { fullNode = nodesWithCapacity[fullNodeIndex] + if len(fullNode.selectedVolumes) == 0 { + continue + } if !fullNode.isOneVolumeOnly() { break } @@ -356,12 +455,19 @@ func balanceSelectedVolume(commandEnv *CommandEnv, diskType types.DiskType, volu sortCandidatesFn(candidateVolumes) for _, emptyNode := range nodesWithCapacity[:fullNodeIndex] { if !(fullNode.localVolumeRatio(capacityFunc) > idealVolumeRatio && emptyNode.localVolumeNextRatio(capacityFunc) <= idealVolumeRatio) { - // no more volume servers with empty slots + if commandEnv.verbose { + fmt.Printf("no more volume servers with empty slots %s, idealVolumeRatio %f\n", emptyNode.info.Id, idealVolumeRatio) + } break } - fmt.Fprintf(os.Stdout, "%s %.2f %.2f:%.2f\t", diskType.ReadableString(), idealVolumeRatio, fullNode.localVolumeRatio(capacityFunc), emptyNode.localVolumeNextRatio(capacityFunc)) + if commandEnv.verbose { + fmt.Fprintf(os.Stdout, "%s %.2f %.2f:%.2f\t", diskType.ReadableString(), idealVolumeRatio, fullNode.localVolumeRatio(capacityFunc), emptyNode.localVolumeNextRatio(capacityFunc)) + } hasMoved, err = attemptToMoveOneVolume(commandEnv, volumeReplicas, fullNode, candidateVolumes, emptyNode, applyBalancing) if err != nil { + if commandEnv.verbose { + fmt.Fprintf(os.Stdout, "attempt to move one volume error %+v\n", err) + } return } if hasMoved { diff --git a/weed/shell/command_volume_fix_replication.go b/weed/shell/command_volume_fix_replication.go index 9c760ef9d..cbc87563b 100644 --- a/weed/shell/command_volume_fix_replication.go +++ b/weed/shell/command_volume_fix_replication.go @@ -351,10 +351,11 @@ func (c *commandVolumeFixReplication) fixOneUnderReplicatedVolume(commandEnv *Co foundNewLocation := false hasSkippedCollection := false keepDataNodesSorted(allLocations, types.ToDiskType(replica.info.DiskType)) - fn := capacityByFreeVolumeCount(types.ToDiskType(replica.info.DiskType)) + fn := capacityByFreeVolumeCount(types.ToDiskType(replica.info.DiskType), 0) for _, dst := range allLocations { + freeVolumeCount, _ := fn(dst.dataNode) // check whether data nodes satisfy the constraints - if fn(dst.dataNode) > 0 && satisfyReplicaPlacement(replicaPlacement, replicas, dst) { + if freeVolumeCount > 0 && satisfyReplicaPlacement(replicaPlacement, replicas, dst) { // check collection name pattern if *c.collectionPattern != "" { matched, err := filepath.Match(*c.collectionPattern, replica.info.Collection) @@ -427,9 +428,11 @@ func addVolumeCount(info *master_pb.DiskInfo, count int) { } func keepDataNodesSorted(dataNodes []location, diskType types.DiskType) { - fn := capacityByFreeVolumeCount(diskType) + fn := capacityByFreeVolumeCount(diskType, 0) slices.SortFunc(dataNodes, func(a, b location) int { - return int(fn(b.dataNode) - fn(a.dataNode)) + freeVolumeCountA, _ := fn(a.dataNode) + freeVolumeCountB, _ := fn(b.dataNode) + return int(freeVolumeCountB - freeVolumeCountA) }) } diff --git a/weed/shell/command_volume_server_evacuate.go b/weed/shell/command_volume_server_evacuate.go index 04e982f93..428534060 100644 --- a/weed/shell/command_volume_server_evacuate.go +++ b/weed/shell/command_volume_server_evacuate.go @@ -260,8 +260,8 @@ func (c *commandVolumeServerEvacuate) moveAwayOneEcVolume(commandEnv *CommandEnv } func moveAwayOneNormalVolume(commandEnv *CommandEnv, volumeReplicas map[uint32][]*VolumeReplica, vol *master_pb.VolumeInformationMessage, thisNode *Node, otherNodes []*Node, applyChange bool) (hasMoved bool, err error) { - freeVolumeCountfn := capacityByFreeVolumeCount(types.ToDiskType(vol.DiskType)) - maxVolumeCountFn := capacityByMaxVolumeCount(types.ToDiskType(vol.DiskType)) + freeVolumeCountfn := capacityByFreeVolumeCount(types.ToDiskType(vol.DiskType), 0) + maxVolumeCountFn := capacityByMaxVolumeCount(types.ToDiskType(vol.DiskType), 0) for _, n := range otherNodes { n.selectVolumes(func(v *master_pb.VolumeInformationMessage) bool { return v.DiskType == vol.DiskType @@ -273,7 +273,8 @@ func moveAwayOneNormalVolume(commandEnv *CommandEnv, volumeReplicas map[uint32][ }) for i := 0; i < len(otherNodes); i++ { emptyNode := otherNodes[i] - if freeVolumeCountfn(emptyNode.info) <= 0 { + freeVolumeCount, _ := freeVolumeCountfn(emptyNode.info) + if freeVolumeCount <= 0 { continue } hasMoved, err = maybeMoveOneVolume(commandEnv, volumeReplicas, thisNode, vol, emptyNode, applyChange) diff --git a/weed/shell/command_volume_tier_move.go b/weed/shell/command_volume_tier_move.go index cfa2eaef3..65e7ec432 100644 --- a/weed/shell/command_volume_tier_move.go +++ b/weed/shell/command_volume_tier_move.go @@ -147,7 +147,7 @@ func (c *commandVolumeTierMove) Do(args []string, commandEnv *CommandEnv, writer for _, vid := range volumeIds { collection := volumeIdToCollection[vid] - if err = c.doVolumeTierMove(commandEnv, writer, vid, collection, toDiskType, allLocations); err != nil { + if err = c.doVolumeTierMove(commandEnv, writer, vid, collection, toDiskType, allLocations, volumeSizeLimitMb); err != nil { fmt.Printf("tier move volume %d: %v\n", vid, err) } allLocations = rotateDataNodes(allLocations) @@ -196,7 +196,7 @@ func isOneOf(server string, locations []wdclient.Location) bool { return false } -func (c *commandVolumeTierMove) doVolumeTierMove(commandEnv *CommandEnv, writer io.Writer, vid needle.VolumeId, collection string, toDiskType types.DiskType, allLocations []location) (err error) { +func (c *commandVolumeTierMove) doVolumeTierMove(commandEnv *CommandEnv, writer io.Writer, vid needle.VolumeId, collection string, toDiskType types.DiskType, allLocations []location, volumeSizeLimitMb uint64) (err error) { // find volume location locations, found := commandEnv.MasterClient.GetLocationsClone(uint32(vid)) if !found { @@ -205,9 +205,10 @@ func (c *commandVolumeTierMove) doVolumeTierMove(commandEnv *CommandEnv, writer // find one server with the most empty volume slots with target disk type hasFoundTarget := false - fn := capacityByFreeVolumeCount(toDiskType) + fn := capacityByFreeVolumeCount(toDiskType, volumeSizeLimitMb) for _, dst := range allLocations { - if fn(dst.dataNode) > 0 && !hasFoundTarget { + freeVolumeCount, _ := fn(dst.dataNode) + if freeVolumeCount > 0 && !hasFoundTarget { // ask the volume server to replicate the volume if isOneOf(dst.dataNode.Id, locations) { continue diff --git a/weed/shell/commands.go b/weed/shell/commands.go index 55a09e392..d34204878 100644 --- a/weed/shell/commands.go +++ b/weed/shell/commands.go @@ -38,6 +38,7 @@ type CommandEnv struct { option *ShellOptions locker *exclusive_locks.ExclusiveLocker noLock bool + verbose bool } func NewCommandEnv(options *ShellOptions) *CommandEnv { From 59d12281ffb21808aa704febe44053313914d820 Mon Sep 17 00:00:00 2001 From: Konstantin Lebedev <9497591+kmlebedev@users.noreply.github.com> Date: Thu, 15 Jan 2026 16:06:06 +0500 Subject: [PATCH 2/6] fix bqlance test --- weed/shell/command_volume_balance.go | 20 +++++++++++++++----- weed/shell/command_volume_balance_test.go | 4 ++++ 2 files changed, 19 insertions(+), 5 deletions(-) diff --git a/weed/shell/command_volume_balance.go b/weed/shell/command_volume_balance.go index 9c21a0d18..879f58c05 100644 --- a/weed/shell/command_volume_balance.go +++ b/weed/shell/command_volume_balance.go @@ -18,6 +18,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding" "github.com/seaweedfs/seaweedfs/weed/storage/super_block" "github.com/seaweedfs/seaweedfs/weed/storage/types" + "github.com/seaweedfs/seaweedfs/weed/util" "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" "github.com/seaweedfs/seaweedfs/weed/storage/needle" @@ -301,7 +302,11 @@ func capacityByMaxVolumeCount(diskType types.DiskType, volumeSizeLimitMb uint64) for _, ecShardInfo := range diskInfo.EcShardInfos { ecShardCount += erasure_coding.GetShardCount(ecShardInfo) } + if volumeSizeLimitMb == 0 { + volumeSizeLimitMb = util.VolumeSizeLimitGB * 1024 + } usedVolumeCount := volumeSizes / (volumeSizeLimitMb * 1024 * 1024) + return float64(diskInfo.MaxVolumeCount) - float64(ecShardCount)/erasure_coding.DataShardsCount, usedVolumeCount } } @@ -335,6 +340,9 @@ func capacityByFreeVolumeCount(diskType types.DiskType, volumeSizeLimitMb uint64 for _, ecShardInfo := range diskInfo.EcShardInfos { ecShardCount += erasure_coding.GetShardCount(ecShardInfo) } + if volumeSizeLimitMb == 0 { + volumeSizeLimitMb = util.VolumeSizeLimitGB * 1024 + } usedVolumeCount := volumeSizes / (volumeSizeLimitMb * 1024 * 1024) return float64(diskInfo.MaxVolumeCount-diskInfo.VolumeCount) - float64(ecShardCount)/erasure_coding.DataShardsCount, usedVolumeCount } @@ -423,7 +431,7 @@ func balanceSelectedVolume(commandEnv *CommandEnv, diskType types.DiskType, volu hasMoved := true - if commandEnv.verbose { + if commandEnv != nil && commandEnv.verbose { fmt.Fprintf(os.Stdout, "selected nodes %d, volumes:%d, max:%d, usage on limit: %d, idealVolumeRatio %f\n", len(nodesWithCapacity), selectedVolumeCount, int64(volumeCapacities), volumeUsageOnLimit, idealVolumeRatio) } @@ -433,7 +441,9 @@ func balanceSelectedVolume(commandEnv *CommandEnv, diskType types.DiskType, volu return cmp.Compare(a.localVolumeRatio(capacityFunc), b.localVolumeRatio(capacityFunc)) }) if len(nodesWithCapacity) == 0 { - fmt.Fprintf(os.Stdout, "no volume server found with capacity for %s", diskType.ReadableString()) + if commandEnv != nil && commandEnv.verbose { + fmt.Fprintf(os.Stdout, "no volume server found with capacity for %s", diskType.ReadableString()) + } return nil } @@ -455,17 +465,17 @@ func balanceSelectedVolume(commandEnv *CommandEnv, diskType types.DiskType, volu sortCandidatesFn(candidateVolumes) for _, emptyNode := range nodesWithCapacity[:fullNodeIndex] { if !(fullNode.localVolumeRatio(capacityFunc) > idealVolumeRatio && emptyNode.localVolumeNextRatio(capacityFunc) <= idealVolumeRatio) { - if commandEnv.verbose { + if commandEnv != nil && commandEnv.verbose { fmt.Printf("no more volume servers with empty slots %s, idealVolumeRatio %f\n", emptyNode.info.Id, idealVolumeRatio) } break } - if commandEnv.verbose { + if commandEnv != nil && commandEnv.verbose { fmt.Fprintf(os.Stdout, "%s %.2f %.2f:%.2f\t", diskType.ReadableString(), idealVolumeRatio, fullNode.localVolumeRatio(capacityFunc), emptyNode.localVolumeNextRatio(capacityFunc)) } hasMoved, err = attemptToMoveOneVolume(commandEnv, volumeReplicas, fullNode, candidateVolumes, emptyNode, applyBalancing) if err != nil { - if commandEnv.verbose { + if commandEnv != nil && commandEnv.verbose { fmt.Fprintf(os.Stdout, "attempt to move one volume error %+v\n", err) } return diff --git a/weed/shell/command_volume_balance_test.go b/weed/shell/command_volume_balance_test.go index 99fdf5575..6c0a8d065 100644 --- a/weed/shell/command_volume_balance_test.go +++ b/weed/shell/command_volume_balance_test.go @@ -20,6 +20,10 @@ type testMoveCase struct { expected bool } +func stringPtr(s string) *string { + return &s +} + func TestIsGoodMove(t *testing.T) { var tests = []testMoveCase{ From d1db6123059f4284be6a83a19fbb00f2a74294b8 Mon Sep 17 00:00:00 2001 From: Konstantin Lebedev <9497591+kmlebedev@users.noreply.github.com> Date: Thu, 15 Jan 2026 16:54:35 +0500 Subject: [PATCH 3/6] avoid getFuncName --- weed/shell/command_volume_balance.go | 55 +++++++++----------- weed/shell/command_volume_fix_replication.go | 6 +-- weed/shell/command_volume_server_evacuate.go | 2 +- weed/shell/command_volume_tier_move.go | 2 +- 4 files changed, 29 insertions(+), 36 deletions(-) diff --git a/weed/shell/command_volume_balance.go b/weed/shell/command_volume_balance.go index 879f58c05..8a7b53f82 100644 --- a/weed/shell/command_volume_balance.go +++ b/weed/shell/command_volume_balance.go @@ -6,9 +6,7 @@ import ( "fmt" "io" "os" - "reflect" "regexp" - "runtime" "strings" "time" @@ -285,14 +283,14 @@ type Node struct { rack string } -type CapacityFunc func(*master_pb.DataNodeInfo) (float64, uint64) +type CapacityFunc func(*master_pb.DataNodeInfo) (float64, uint64, bool) type CapacityByFunc func(diskType types.DiskType, volumeSizeLimitMb uint64) CapacityFunc func capacityByMaxVolumeCount(diskType types.DiskType, volumeSizeLimitMb uint64) CapacityFunc { - return func(info *master_pb.DataNodeInfo) (float64, uint64) { + return func(info *master_pb.DataNodeInfo) (float64, uint64, bool) { diskInfo, found := info.DiskInfos[string(diskType)] if !found { - return 0, 0 + return 0, 0, false } var volumeSizes uint64 for _, volumeInfo := range diskInfo.VolumeInfos { @@ -307,30 +305,30 @@ func capacityByMaxVolumeCount(diskType types.DiskType, volumeSizeLimitMb uint64) } usedVolumeCount := volumeSizes / (volumeSizeLimitMb * 1024 * 1024) - return float64(diskInfo.MaxVolumeCount) - float64(ecShardCount)/erasure_coding.DataShardsCount, usedVolumeCount + return float64(diskInfo.MaxVolumeCount) - float64(ecShardCount)/erasure_coding.DataShardsCount, usedVolumeCount, false } } func capacityByMinVolumeDensity(diskType types.DiskType, volumeSizeLimitMb uint64) CapacityFunc { - return func(info *master_pb.DataNodeInfo) (float64, uint64) { + return func(info *master_pb.DataNodeInfo) (float64, uint64, bool) { diskInfo, found := info.DiskInfos[string(diskType)] if !found { - return 0, 0 + return 0, 0, true } var volumeSizes uint64 for _, volumeInfo := range diskInfo.VolumeInfos { volumeSizes += volumeInfo.Size } usedVolumeCount := volumeSizes / (volumeSizeLimitMb * 1024 * 1024) - return float64(uint64(diskInfo.MaxVolumeCount) - usedVolumeCount), usedVolumeCount + return float64(uint64(diskInfo.MaxVolumeCount) - usedVolumeCount), usedVolumeCount, true } } func capacityByFreeVolumeCount(diskType types.DiskType, volumeSizeLimitMb uint64) CapacityFunc { - return func(info *master_pb.DataNodeInfo) (float64, uint64) { + return func(info *master_pb.DataNodeInfo) (float64, uint64, bool) { diskInfo, found := info.DiskInfos[string(diskType)] if !found { - return 0, 0 + return 0, 0, false } var volumeSizes uint64 for _, volumeInfo := range diskInfo.VolumeInfos { @@ -344,21 +342,21 @@ func capacityByFreeVolumeCount(diskType types.DiskType, volumeSizeLimitMb uint64 volumeSizeLimitMb = util.VolumeSizeLimitGB * 1024 } usedVolumeCount := volumeSizes / (volumeSizeLimitMb * 1024 * 1024) - return float64(diskInfo.MaxVolumeCount-diskInfo.VolumeCount) - float64(ecShardCount)/erasure_coding.DataShardsCount, usedVolumeCount + return float64(diskInfo.MaxVolumeCount-diskInfo.VolumeCount) - float64(ecShardCount)/erasure_coding.DataShardsCount, usedVolumeCount, false } } func (n *Node) localVolumeRatio(capacityFunc CapacityFunc) float64 { - capacity, used := capacityFunc(n.info) - if strings.HasSuffix(getFuncName(capacityFunc), funcNameCapByMinVolumeDensity) { + capacity, used, isDensityBased := capacityFunc(n.info) + if isDensityBased { return float64(used) / capacity } return float64(len(n.selectedVolumes)) / capacity } func (n *Node) localVolumeNextRatio(capacityFunc CapacityFunc) float64 { - capacity, used := capacityFunc(n.info) - if strings.HasSuffix(getFuncName(capacityFunc), funcNameCapByMinVolumeDensity) { + capacity, used, isDensityBased := capacityFunc(n.info) + if isDensityBased { return float64(used+1) / capacity } return float64(len(n.selectedVolumes)+1) / capacity @@ -404,35 +402,30 @@ func selectVolumesByActive(volumeSize uint64, volumeByActive *bool, volumeSizeLi } } -func getFuncName(f interface{}) string { - return runtime.FuncForPC(reflect.ValueOf(f).Pointer()).Name() -} - func balanceSelectedVolume(commandEnv *CommandEnv, diskType types.DiskType, volumeReplicas map[uint32][]*VolumeReplica, nodes []*Node, sortCandidatesFn func(volumes []*master_pb.VolumeInformationMessage), applyBalancing bool, volumeSizeLimitMb uint64, volumeByActive *bool) (err error) { - selectedVolumeCount, volumeCapacities := 0, float64(0) - var volumeUsageOnLimit uint64 + ratioVolumeCount, volumeCapacities, idealVolumeRatio := uint64(0), float64(0), float64(0) var nodesWithCapacity []*Node capacityFunc := capacityByFunc(diskType, volumeSizeLimitMb) for _, dn := range nodes { - selectedVolumeCount += len(dn.selectedVolumes) - capacity, used := capacityFunc(dn.info) + capacity, volumeUsed, isDensityBased := capacityFunc(dn.info) if capacity > 0 { nodesWithCapacity = append(nodesWithCapacity, dn) } + if isDensityBased { + ratioVolumeCount += volumeUsed + } else { + ratioVolumeCount += uint64(len(dn.selectedVolumes)) + } volumeCapacities += capacity - volumeUsageOnLimit += used } - var idealVolumeRatio float64 - if strings.HasSuffix(getFuncName(capacityFunc), funcNameCapByMinVolumeDensity) { - idealVolumeRatio = float64(volumeUsageOnLimit) / volumeCapacities - } else { - idealVolumeRatio = float64(selectedVolumeCount) / volumeCapacities + if volumeCapacities > 0 { + idealVolumeRatio = float64(ratioVolumeCount) / volumeCapacities } hasMoved := true if commandEnv != nil && commandEnv.verbose { - fmt.Fprintf(os.Stdout, "selected nodes %d, volumes:%d, max:%d, usage on limit: %d, idealVolumeRatio %f\n", len(nodesWithCapacity), selectedVolumeCount, int64(volumeCapacities), volumeUsageOnLimit, idealVolumeRatio) + fmt.Fprintf(os.Stdout, "selected nodes %d, volumes:%d, max:%d, idealVolumeRatio %f\n", len(nodesWithCapacity), ratioVolumeCount, int64(volumeCapacities), idealVolumeRatio) } for hasMoved { diff --git a/weed/shell/command_volume_fix_replication.go b/weed/shell/command_volume_fix_replication.go index cbc87563b..be3136598 100644 --- a/weed/shell/command_volume_fix_replication.go +++ b/weed/shell/command_volume_fix_replication.go @@ -353,7 +353,7 @@ func (c *commandVolumeFixReplication) fixOneUnderReplicatedVolume(commandEnv *Co keepDataNodesSorted(allLocations, types.ToDiskType(replica.info.DiskType)) fn := capacityByFreeVolumeCount(types.ToDiskType(replica.info.DiskType), 0) for _, dst := range allLocations { - freeVolumeCount, _ := fn(dst.dataNode) + freeVolumeCount, _, _ := fn(dst.dataNode) // check whether data nodes satisfy the constraints if freeVolumeCount > 0 && satisfyReplicaPlacement(replicaPlacement, replicas, dst) { // check collection name pattern @@ -430,8 +430,8 @@ func addVolumeCount(info *master_pb.DiskInfo, count int) { func keepDataNodesSorted(dataNodes []location, diskType types.DiskType) { fn := capacityByFreeVolumeCount(diskType, 0) slices.SortFunc(dataNodes, func(a, b location) int { - freeVolumeCountA, _ := fn(a.dataNode) - freeVolumeCountB, _ := fn(b.dataNode) + freeVolumeCountA, _, _ := fn(a.dataNode) + freeVolumeCountB, _, _ := fn(b.dataNode) return int(freeVolumeCountB - freeVolumeCountA) }) } diff --git a/weed/shell/command_volume_server_evacuate.go b/weed/shell/command_volume_server_evacuate.go index 428534060..ba51e65f8 100644 --- a/weed/shell/command_volume_server_evacuate.go +++ b/weed/shell/command_volume_server_evacuate.go @@ -273,7 +273,7 @@ func moveAwayOneNormalVolume(commandEnv *CommandEnv, volumeReplicas map[uint32][ }) for i := 0; i < len(otherNodes); i++ { emptyNode := otherNodes[i] - freeVolumeCount, _ := freeVolumeCountfn(emptyNode.info) + freeVolumeCount, _, _ := freeVolumeCountfn(emptyNode.info) if freeVolumeCount <= 0 { continue } diff --git a/weed/shell/command_volume_tier_move.go b/weed/shell/command_volume_tier_move.go index 65e7ec432..8ad824ff6 100644 --- a/weed/shell/command_volume_tier_move.go +++ b/weed/shell/command_volume_tier_move.go @@ -207,7 +207,7 @@ func (c *commandVolumeTierMove) doVolumeTierMove(commandEnv *CommandEnv, writer hasFoundTarget := false fn := capacityByFreeVolumeCount(toDiskType, volumeSizeLimitMb) for _, dst := range allLocations { - freeVolumeCount, _ := fn(dst.dataNode) + freeVolumeCount, _, _ := fn(dst.dataNode) if freeVolumeCount > 0 && !hasFoundTarget { // ask the volume server to replicate the volume if isOneOf(dst.dataNode.Id, locations) { From 26ceded15ac51c34f73a4ad2daf17f176cd8db95 Mon Sep 17 00:00:00 2001 From: Konstantin Lebedev <9497591+kmlebedev@users.noreply.github.com> Date: Thu, 15 Jan 2026 17:29:50 +0500 Subject: [PATCH 4/6] resolve conversation --- weed/shell/command_volume_balance.go | 31 ++++++++++++----------- weed/shell/command_volume_balance_test.go | 11 ++++---- 2 files changed, 21 insertions(+), 21 deletions(-) diff --git a/weed/shell/command_volume_balance.go b/weed/shell/command_volume_balance.go index 8a7b53f82..1a0db8f88 100644 --- a/weed/shell/command_volume_balance.go +++ b/weed/shell/command_volume_balance.go @@ -26,17 +26,14 @@ func init() { Commands = append(Commands, &commandVolumeBalance{}) } -var ( - capacityByFunc = capacityByMaxVolumeCount -) - -const funcNameCapByMinVolumeDensity = "capacityByMinVolumeDensity.func1" +const thresholdVolumeSize = 1.01 type commandVolumeBalance struct { - volumeSizeLimitMb uint64 commandEnv *CommandEnv + capacityByFunc CapacityByFunc volumeByActive *bool applyBalancing bool + volumeSizeLimitMb uint64 } func (c *commandVolumeBalance) Name() string { @@ -126,11 +123,12 @@ func (c *commandVolumeBalance) Do(args []string, commandEnv *CommandEnv, writer }) balanceCommand.Func("capacityBy", "capacityBy function name use \"MAX_VOLUME_COUNT\", \"FREE_VOLUME_COUNT\" and \"MIN_VOLUME_DENSITY\"", func(flagValue string) error { if flagValue == "" { + c.capacityByFunc = capacityByMaxVolumeCount return nil } for allowed, allowedCapacityByFunc := range allowedCapacityBy { if flagValue == allowed { - capacityByFunc = allowedCapacityByFunc + c.capacityByFunc = allowedCapacityByFunc return nil } } @@ -226,7 +224,7 @@ func (c *commandVolumeBalance) balanceVolumeServersByDiskType(diskType types.Dis return selectVolumesByActive(v.Size, c.volumeByActive, c.volumeSizeLimitMb) }) } - if err := balanceSelectedVolume(c.commandEnv, diskType, volumeReplicas, nodes, sortWritableVolumes, c.applyBalancing, c.volumeSizeLimitMb, c.volumeByActive); err != nil { + if err := balanceSelectedVolume(c.commandEnv, diskType, volumeReplicas, nodes, sortWritableVolumes, c.applyBalancing, c.volumeSizeLimitMb, c.capacityByFunc); err != nil { return err } @@ -301,9 +299,9 @@ func capacityByMaxVolumeCount(diskType types.DiskType, volumeSizeLimitMb uint64) ecShardCount += erasure_coding.GetShardCount(ecShardInfo) } if volumeSizeLimitMb == 0 { - volumeSizeLimitMb = util.VolumeSizeLimitGB * 1024 + volumeSizeLimitMb = util.VolumeSizeLimitGB * util.KiByte } - usedVolumeCount := volumeSizes / (volumeSizeLimitMb * 1024 * 1024) + usedVolumeCount := volumeSizes / (volumeSizeLimitMb * util.MiByte) return float64(diskInfo.MaxVolumeCount) - float64(ecShardCount)/erasure_coding.DataShardsCount, usedVolumeCount, false } @@ -319,7 +317,10 @@ func capacityByMinVolumeDensity(diskType types.DiskType, volumeSizeLimitMb uint6 for _, volumeInfo := range diskInfo.VolumeInfos { volumeSizes += volumeInfo.Size } - usedVolumeCount := volumeSizes / (volumeSizeLimitMb * 1024 * 1024) + if volumeSizeLimitMb == 0 { + volumeSizeLimitMb = util.VolumeSizeLimitGB * util.KiByte + } + usedVolumeCount := volumeSizes / (volumeSizeLimitMb * util.MiByte) return float64(uint64(diskInfo.MaxVolumeCount) - usedVolumeCount), usedVolumeCount, true } } @@ -339,9 +340,9 @@ func capacityByFreeVolumeCount(diskType types.DiskType, volumeSizeLimitMb uint64 ecShardCount += erasure_coding.GetShardCount(ecShardInfo) } if volumeSizeLimitMb == 0 { - volumeSizeLimitMb = util.VolumeSizeLimitGB * 1024 + volumeSizeLimitMb = util.VolumeSizeLimitGB * util.KiByte } - usedVolumeCount := volumeSizes / (volumeSizeLimitMb * 1024 * 1024) + usedVolumeCount := volumeSizes / (volumeSizeLimitMb * util.MiByte) return float64(diskInfo.MaxVolumeCount-diskInfo.VolumeCount) - float64(ecShardCount)/erasure_coding.DataShardsCount, usedVolumeCount, false } } @@ -395,14 +396,14 @@ func selectVolumesByActive(volumeSize uint64, volumeByActive *bool, volumeSizeLi if volumeByActive == nil { return true } - if (volumeSize*101)/100 < volumeSizeLimitMb*1024*1024 { + if uint64(float64(volumeSize)*thresholdVolumeSize) < volumeSizeLimitMb*util.MiByte { return *volumeByActive } else { return !(*volumeByActive) } } -func balanceSelectedVolume(commandEnv *CommandEnv, diskType types.DiskType, volumeReplicas map[uint32][]*VolumeReplica, nodes []*Node, sortCandidatesFn func(volumes []*master_pb.VolumeInformationMessage), applyBalancing bool, volumeSizeLimitMb uint64, volumeByActive *bool) (err error) { +func balanceSelectedVolume(commandEnv *CommandEnv, diskType types.DiskType, volumeReplicas map[uint32][]*VolumeReplica, nodes []*Node, sortCandidatesFn func(volumes []*master_pb.VolumeInformationMessage), applyBalancing bool, volumeSizeLimitMb uint64, capacityByFunc CapacityByFunc) (err error) { ratioVolumeCount, volumeCapacities, idealVolumeRatio := uint64(0), float64(0), float64(0) var nodesWithCapacity []*Node capacityFunc := capacityByFunc(diskType, volumeSizeLimitMb) diff --git a/weed/shell/command_volume_balance_test.go b/weed/shell/command_volume_balance_test.go index 6c0a8d065..bdecf4be9 100644 --- a/weed/shell/command_volume_balance_test.go +++ b/weed/shell/command_volume_balance_test.go @@ -20,10 +20,6 @@ type testMoveCase struct { expected bool } -func stringPtr(s string) *string { - return &s -} - func TestIsGoodMove(t *testing.T) { var tests = []testMoveCase{ @@ -259,11 +255,14 @@ func TestBalance(t *testing.T) { volumeServers := collectVolumeServersByDcRackNode(topologyInfo, "", "", "") volumeReplicas, _ := collectVolumeReplicaLocations(topologyInfo) diskTypes := collectVolumeDiskTypes(topologyInfo) - c := &commandVolumeBalance{} + c := &commandVolumeBalance{capacityByFunc: capacityByMaxVolumeCount} + if err := c.balanceVolumeServers(diskTypes, volumeReplicas, volumeServers, nil, "ALL_COLLECTIONS"); err != nil { + t.Errorf("balance: %v", err) + } + c = &commandVolumeBalance{capacityByFunc: capacityByMinVolumeDensity, volumeByActive: new(bool)} if err := c.balanceVolumeServers(diskTypes, volumeReplicas, volumeServers, nil, "ALL_COLLECTIONS"); err != nil { t.Errorf("balance: %v", err) } - } func TestVolumeSelection(t *testing.T) { From 4e9b2adf86faad9cd2652c5930529be700d220be Mon Sep 17 00:00:00 2001 From: Konstantin Lebedev <9497591+kmlebedev@users.noreply.github.com> Date: Thu, 15 Jan 2026 20:25:33 +0500 Subject: [PATCH 5/6] Division by zero if capacity is 0 produces Inf. --- weed/shell/command_volume_balance.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/weed/shell/command_volume_balance.go b/weed/shell/command_volume_balance.go index 1a0db8f88..0643b9bde 100644 --- a/weed/shell/command_volume_balance.go +++ b/weed/shell/command_volume_balance.go @@ -349,6 +349,9 @@ func capacityByFreeVolumeCount(diskType types.DiskType, volumeSizeLimitMb uint64 func (n *Node) localVolumeRatio(capacityFunc CapacityFunc) float64 { capacity, used, isDensityBased := capacityFunc(n.info) + if capacity == 0 { + return 0 + } if isDensityBased { return float64(used) / capacity } @@ -357,6 +360,9 @@ func (n *Node) localVolumeRatio(capacityFunc CapacityFunc) float64 { func (n *Node) localVolumeNextRatio(capacityFunc CapacityFunc) float64 { capacity, used, isDensityBased := capacityFunc(n.info) + if capacity == 0 { + return 0 + } if isDensityBased { return float64(used+1) / capacity } From 20e5952c3506cda0517f3098f8501b6a4a9568b0 Mon Sep 17 00:00:00 2001 From: Konstantin Lebedev <9497591+kmlebedev@users.noreply.github.com> Date: Thu, 15 Jan 2026 23:07:42 +0500 Subject: [PATCH 6/6] fix c.capacityByFunc is nil --- weed/shell/command_volume_balance.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/weed/shell/command_volume_balance.go b/weed/shell/command_volume_balance.go index 0643b9bde..bdf91b32e 100644 --- a/weed/shell/command_volume_balance.go +++ b/weed/shell/command_volume_balance.go @@ -121,9 +121,9 @@ func (c *commandVolumeBalance) Do(args []string, commandEnv *CommandEnv, writer } return fmt.Errorf("use \"ALL\", \"ACTIVE\" or \"FULL\"") }) + c.capacityByFunc = capacityByMaxVolumeCount balanceCommand.Func("capacityBy", "capacityBy function name use \"MAX_VOLUME_COUNT\", \"FREE_VOLUME_COUNT\" and \"MIN_VOLUME_DENSITY\"", func(flagValue string) error { if flagValue == "" { - c.capacityByFunc = capacityByMaxVolumeCount return nil } for allowed, allowedCapacityByFunc := range allowedCapacityBy {