diff --git a/weed/shell/command_volume_balance.go b/weed/shell/command_volume_balance.go index 8a7b53f82..1a0db8f88 100644 --- a/weed/shell/command_volume_balance.go +++ b/weed/shell/command_volume_balance.go @@ -26,17 +26,14 @@ func init() { Commands = append(Commands, &commandVolumeBalance{}) } -var ( - capacityByFunc = capacityByMaxVolumeCount -) - -const funcNameCapByMinVolumeDensity = "capacityByMinVolumeDensity.func1" +const thresholdVolumeSize = 1.01 type commandVolumeBalance struct { - volumeSizeLimitMb uint64 commandEnv *CommandEnv + capacityByFunc CapacityByFunc volumeByActive *bool applyBalancing bool + volumeSizeLimitMb uint64 } func (c *commandVolumeBalance) Name() string { @@ -126,11 +123,12 @@ func (c *commandVolumeBalance) Do(args []string, commandEnv *CommandEnv, writer }) balanceCommand.Func("capacityBy", "capacityBy function name use \"MAX_VOLUME_COUNT\", \"FREE_VOLUME_COUNT\" and \"MIN_VOLUME_DENSITY\"", func(flagValue string) error { if flagValue == "" { + c.capacityByFunc = capacityByMaxVolumeCount return nil } for allowed, allowedCapacityByFunc := range allowedCapacityBy { if flagValue == allowed { - capacityByFunc = allowedCapacityByFunc + c.capacityByFunc = allowedCapacityByFunc return nil } } @@ -226,7 +224,7 @@ func (c *commandVolumeBalance) balanceVolumeServersByDiskType(diskType types.Dis return selectVolumesByActive(v.Size, c.volumeByActive, c.volumeSizeLimitMb) }) } - if err := balanceSelectedVolume(c.commandEnv, diskType, volumeReplicas, nodes, sortWritableVolumes, c.applyBalancing, c.volumeSizeLimitMb, c.volumeByActive); err != nil { + if err := balanceSelectedVolume(c.commandEnv, diskType, volumeReplicas, nodes, sortWritableVolumes, c.applyBalancing, c.volumeSizeLimitMb, c.capacityByFunc); err != nil { return err } @@ -301,9 +299,9 @@ func capacityByMaxVolumeCount(diskType types.DiskType, volumeSizeLimitMb uint64) ecShardCount += erasure_coding.GetShardCount(ecShardInfo) } if volumeSizeLimitMb == 0 { - volumeSizeLimitMb = util.VolumeSizeLimitGB * 1024 + volumeSizeLimitMb = util.VolumeSizeLimitGB * util.KiByte } - usedVolumeCount := volumeSizes / (volumeSizeLimitMb * 1024 * 1024) + usedVolumeCount := volumeSizes / (volumeSizeLimitMb * util.MiByte) return float64(diskInfo.MaxVolumeCount) - float64(ecShardCount)/erasure_coding.DataShardsCount, usedVolumeCount, false } @@ -319,7 +317,10 @@ func capacityByMinVolumeDensity(diskType types.DiskType, volumeSizeLimitMb uint6 for _, volumeInfo := range diskInfo.VolumeInfos { volumeSizes += volumeInfo.Size } - usedVolumeCount := volumeSizes / (volumeSizeLimitMb * 1024 * 1024) + if volumeSizeLimitMb == 0 { + volumeSizeLimitMb = util.VolumeSizeLimitGB * util.KiByte + } + usedVolumeCount := volumeSizes / (volumeSizeLimitMb * util.MiByte) return float64(uint64(diskInfo.MaxVolumeCount) - usedVolumeCount), usedVolumeCount, true } } @@ -339,9 +340,9 @@ func capacityByFreeVolumeCount(diskType types.DiskType, volumeSizeLimitMb uint64 ecShardCount += erasure_coding.GetShardCount(ecShardInfo) } if volumeSizeLimitMb == 0 { - volumeSizeLimitMb = util.VolumeSizeLimitGB * 1024 + volumeSizeLimitMb = util.VolumeSizeLimitGB * util.KiByte } - usedVolumeCount := volumeSizes / (volumeSizeLimitMb * 1024 * 1024) + usedVolumeCount := volumeSizes / (volumeSizeLimitMb * util.MiByte) return float64(diskInfo.MaxVolumeCount-diskInfo.VolumeCount) - float64(ecShardCount)/erasure_coding.DataShardsCount, usedVolumeCount, false } } @@ -395,14 +396,14 @@ func selectVolumesByActive(volumeSize uint64, volumeByActive *bool, volumeSizeLi if volumeByActive == nil { return true } - if (volumeSize*101)/100 < volumeSizeLimitMb*1024*1024 { + if uint64(float64(volumeSize)*thresholdVolumeSize) < volumeSizeLimitMb*util.MiByte { return *volumeByActive } else { return !(*volumeByActive) } } -func balanceSelectedVolume(commandEnv *CommandEnv, diskType types.DiskType, volumeReplicas map[uint32][]*VolumeReplica, nodes []*Node, sortCandidatesFn func(volumes []*master_pb.VolumeInformationMessage), applyBalancing bool, volumeSizeLimitMb uint64, volumeByActive *bool) (err error) { +func balanceSelectedVolume(commandEnv *CommandEnv, diskType types.DiskType, volumeReplicas map[uint32][]*VolumeReplica, nodes []*Node, sortCandidatesFn func(volumes []*master_pb.VolumeInformationMessage), applyBalancing bool, volumeSizeLimitMb uint64, capacityByFunc CapacityByFunc) (err error) { ratioVolumeCount, volumeCapacities, idealVolumeRatio := uint64(0), float64(0), float64(0) var nodesWithCapacity []*Node capacityFunc := capacityByFunc(diskType, volumeSizeLimitMb) diff --git a/weed/shell/command_volume_balance_test.go b/weed/shell/command_volume_balance_test.go index 6c0a8d065..bdecf4be9 100644 --- a/weed/shell/command_volume_balance_test.go +++ b/weed/shell/command_volume_balance_test.go @@ -20,10 +20,6 @@ type testMoveCase struct { expected bool } -func stringPtr(s string) *string { - return &s -} - func TestIsGoodMove(t *testing.T) { var tests = []testMoveCase{ @@ -259,11 +255,14 @@ func TestBalance(t *testing.T) { volumeServers := collectVolumeServersByDcRackNode(topologyInfo, "", "", "") volumeReplicas, _ := collectVolumeReplicaLocations(topologyInfo) diskTypes := collectVolumeDiskTypes(topologyInfo) - c := &commandVolumeBalance{} + c := &commandVolumeBalance{capacityByFunc: capacityByMaxVolumeCount} + if err := c.balanceVolumeServers(diskTypes, volumeReplicas, volumeServers, nil, "ALL_COLLECTIONS"); err != nil { + t.Errorf("balance: %v", err) + } + c = &commandVolumeBalance{capacityByFunc: capacityByMinVolumeDensity, volumeByActive: new(bool)} if err := c.balanceVolumeServers(diskTypes, volumeReplicas, volumeServers, nil, "ALL_COLLECTIONS"); err != nil { t.Errorf("balance: %v", err) } - } func TestVolumeSelection(t *testing.T) {