Browse Source

resolve conversation

pull/8026/head
Konstantin Lebedev 4 weeks ago
parent
commit
26ceded15a
  1. 31
      weed/shell/command_volume_balance.go
  2. 11
      weed/shell/command_volume_balance_test.go

31
weed/shell/command_volume_balance.go

@ -26,17 +26,14 @@ func init() {
Commands = append(Commands, &commandVolumeBalance{})
}
var (
capacityByFunc = capacityByMaxVolumeCount
)
const funcNameCapByMinVolumeDensity = "capacityByMinVolumeDensity.func1"
const thresholdVolumeSize = 1.01
type commandVolumeBalance struct {
volumeSizeLimitMb uint64
commandEnv *CommandEnv
capacityByFunc CapacityByFunc
volumeByActive *bool
applyBalancing bool
volumeSizeLimitMb uint64
}
func (c *commandVolumeBalance) Name() string {
@ -126,11 +123,12 @@ func (c *commandVolumeBalance) Do(args []string, commandEnv *CommandEnv, writer
})
balanceCommand.Func("capacityBy", "capacityBy function name use \"MAX_VOLUME_COUNT\", \"FREE_VOLUME_COUNT\" and \"MIN_VOLUME_DENSITY\"", func(flagValue string) error {
if flagValue == "" {
c.capacityByFunc = capacityByMaxVolumeCount
return nil
}
for allowed, allowedCapacityByFunc := range allowedCapacityBy {
if flagValue == allowed {
capacityByFunc = allowedCapacityByFunc
c.capacityByFunc = allowedCapacityByFunc
return nil
}
}
@ -226,7 +224,7 @@ func (c *commandVolumeBalance) balanceVolumeServersByDiskType(diskType types.Dis
return selectVolumesByActive(v.Size, c.volumeByActive, c.volumeSizeLimitMb)
})
}
if err := balanceSelectedVolume(c.commandEnv, diskType, volumeReplicas, nodes, sortWritableVolumes, c.applyBalancing, c.volumeSizeLimitMb, c.volumeByActive); err != nil {
if err := balanceSelectedVolume(c.commandEnv, diskType, volumeReplicas, nodes, sortWritableVolumes, c.applyBalancing, c.volumeSizeLimitMb, c.capacityByFunc); err != nil {
return err
}
@ -301,9 +299,9 @@ func capacityByMaxVolumeCount(diskType types.DiskType, volumeSizeLimitMb uint64)
ecShardCount += erasure_coding.GetShardCount(ecShardInfo)
}
if volumeSizeLimitMb == 0 {
volumeSizeLimitMb = util.VolumeSizeLimitGB * 1024
volumeSizeLimitMb = util.VolumeSizeLimitGB * util.KiByte
}
usedVolumeCount := volumeSizes / (volumeSizeLimitMb * 1024 * 1024)
usedVolumeCount := volumeSizes / (volumeSizeLimitMb * util.MiByte)
return float64(diskInfo.MaxVolumeCount) - float64(ecShardCount)/erasure_coding.DataShardsCount, usedVolumeCount, false
}
@ -319,7 +317,10 @@ func capacityByMinVolumeDensity(diskType types.DiskType, volumeSizeLimitMb uint6
for _, volumeInfo := range diskInfo.VolumeInfos {
volumeSizes += volumeInfo.Size
}
usedVolumeCount := volumeSizes / (volumeSizeLimitMb * 1024 * 1024)
if volumeSizeLimitMb == 0 {
volumeSizeLimitMb = util.VolumeSizeLimitGB * util.KiByte
}
usedVolumeCount := volumeSizes / (volumeSizeLimitMb * util.MiByte)
return float64(uint64(diskInfo.MaxVolumeCount) - usedVolumeCount), usedVolumeCount, true
}
}
@ -339,9 +340,9 @@ func capacityByFreeVolumeCount(diskType types.DiskType, volumeSizeLimitMb uint64
ecShardCount += erasure_coding.GetShardCount(ecShardInfo)
}
if volumeSizeLimitMb == 0 {
volumeSizeLimitMb = util.VolumeSizeLimitGB * 1024
volumeSizeLimitMb = util.VolumeSizeLimitGB * util.KiByte
}
usedVolumeCount := volumeSizes / (volumeSizeLimitMb * 1024 * 1024)
usedVolumeCount := volumeSizes / (volumeSizeLimitMb * util.MiByte)
return float64(diskInfo.MaxVolumeCount-diskInfo.VolumeCount) - float64(ecShardCount)/erasure_coding.DataShardsCount, usedVolumeCount, false
}
}
@ -395,14 +396,14 @@ func selectVolumesByActive(volumeSize uint64, volumeByActive *bool, volumeSizeLi
if volumeByActive == nil {
return true
}
if (volumeSize*101)/100 < volumeSizeLimitMb*1024*1024 {
if uint64(float64(volumeSize)*thresholdVolumeSize) < volumeSizeLimitMb*util.MiByte {
return *volumeByActive
} else {
return !(*volumeByActive)
}
}
func balanceSelectedVolume(commandEnv *CommandEnv, diskType types.DiskType, volumeReplicas map[uint32][]*VolumeReplica, nodes []*Node, sortCandidatesFn func(volumes []*master_pb.VolumeInformationMessage), applyBalancing bool, volumeSizeLimitMb uint64, volumeByActive *bool) (err error) {
func balanceSelectedVolume(commandEnv *CommandEnv, diskType types.DiskType, volumeReplicas map[uint32][]*VolumeReplica, nodes []*Node, sortCandidatesFn func(volumes []*master_pb.VolumeInformationMessage), applyBalancing bool, volumeSizeLimitMb uint64, capacityByFunc CapacityByFunc) (err error) {
ratioVolumeCount, volumeCapacities, idealVolumeRatio := uint64(0), float64(0), float64(0)
var nodesWithCapacity []*Node
capacityFunc := capacityByFunc(diskType, volumeSizeLimitMb)

11
weed/shell/command_volume_balance_test.go

@ -20,10 +20,6 @@ type testMoveCase struct {
expected bool
}
func stringPtr(s string) *string {
return &s
}
func TestIsGoodMove(t *testing.T) {
var tests = []testMoveCase{
@ -259,11 +255,14 @@ func TestBalance(t *testing.T) {
volumeServers := collectVolumeServersByDcRackNode(topologyInfo, "", "", "")
volumeReplicas, _ := collectVolumeReplicaLocations(topologyInfo)
diskTypes := collectVolumeDiskTypes(topologyInfo)
c := &commandVolumeBalance{}
c := &commandVolumeBalance{capacityByFunc: capacityByMaxVolumeCount}
if err := c.balanceVolumeServers(diskTypes, volumeReplicas, volumeServers, nil, "ALL_COLLECTIONS"); err != nil {
t.Errorf("balance: %v", err)
}
c = &commandVolumeBalance{capacityByFunc: capacityByMinVolumeDensity, volumeByActive: new(bool)}
if err := c.balanceVolumeServers(diskTypes, volumeReplicas, volumeServers, nil, "ALL_COLLECTIONS"); err != nil {
t.Errorf("balance: %v", err)
}
}
func TestVolumeSelection(t *testing.T) {

Loading…
Cancel
Save