|
|
@ -6,9 +6,7 @@ import ( |
|
|
"fmt" |
|
|
"fmt" |
|
|
"io" |
|
|
"io" |
|
|
"os" |
|
|
"os" |
|
|
"reflect" |
|
|
|
|
|
"regexp" |
|
|
"regexp" |
|
|
"runtime" |
|
|
|
|
|
"strings" |
|
|
"strings" |
|
|
"time" |
|
|
"time" |
|
|
|
|
|
|
|
|
@ -285,14 +283,14 @@ type Node struct { |
|
|
rack string |
|
|
rack string |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
type CapacityFunc func(*master_pb.DataNodeInfo) (float64, uint64) |
|
|
|
|
|
|
|
|
type CapacityFunc func(*master_pb.DataNodeInfo) (float64, uint64, bool) |
|
|
type CapacityByFunc func(diskType types.DiskType, volumeSizeLimitMb uint64) CapacityFunc |
|
|
type CapacityByFunc func(diskType types.DiskType, volumeSizeLimitMb uint64) CapacityFunc |
|
|
|
|
|
|
|
|
func capacityByMaxVolumeCount(diskType types.DiskType, volumeSizeLimitMb uint64) CapacityFunc { |
|
|
func capacityByMaxVolumeCount(diskType types.DiskType, volumeSizeLimitMb uint64) CapacityFunc { |
|
|
return func(info *master_pb.DataNodeInfo) (float64, uint64) { |
|
|
|
|
|
|
|
|
return func(info *master_pb.DataNodeInfo) (float64, uint64, bool) { |
|
|
diskInfo, found := info.DiskInfos[string(diskType)] |
|
|
diskInfo, found := info.DiskInfos[string(diskType)] |
|
|
if !found { |
|
|
if !found { |
|
|
return 0, 0 |
|
|
|
|
|
|
|
|
return 0, 0, false |
|
|
} |
|
|
} |
|
|
var volumeSizes uint64 |
|
|
var volumeSizes uint64 |
|
|
for _, volumeInfo := range diskInfo.VolumeInfos { |
|
|
for _, volumeInfo := range diskInfo.VolumeInfos { |
|
|
@ -307,30 +305,30 @@ func capacityByMaxVolumeCount(diskType types.DiskType, volumeSizeLimitMb uint64) |
|
|
} |
|
|
} |
|
|
usedVolumeCount := volumeSizes / (volumeSizeLimitMb * 1024 * 1024) |
|
|
usedVolumeCount := volumeSizes / (volumeSizeLimitMb * 1024 * 1024) |
|
|
|
|
|
|
|
|
return float64(diskInfo.MaxVolumeCount) - float64(ecShardCount)/erasure_coding.DataShardsCount, usedVolumeCount |
|
|
|
|
|
|
|
|
return float64(diskInfo.MaxVolumeCount) - float64(ecShardCount)/erasure_coding.DataShardsCount, usedVolumeCount, false |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
func capacityByMinVolumeDensity(diskType types.DiskType, volumeSizeLimitMb uint64) CapacityFunc { |
|
|
func capacityByMinVolumeDensity(diskType types.DiskType, volumeSizeLimitMb uint64) CapacityFunc { |
|
|
return func(info *master_pb.DataNodeInfo) (float64, uint64) { |
|
|
|
|
|
|
|
|
return func(info *master_pb.DataNodeInfo) (float64, uint64, bool) { |
|
|
diskInfo, found := info.DiskInfos[string(diskType)] |
|
|
diskInfo, found := info.DiskInfos[string(diskType)] |
|
|
if !found { |
|
|
if !found { |
|
|
return 0, 0 |
|
|
|
|
|
|
|
|
return 0, 0, true |
|
|
} |
|
|
} |
|
|
var volumeSizes uint64 |
|
|
var volumeSizes uint64 |
|
|
for _, volumeInfo := range diskInfo.VolumeInfos { |
|
|
for _, volumeInfo := range diskInfo.VolumeInfos { |
|
|
volumeSizes += volumeInfo.Size |
|
|
volumeSizes += volumeInfo.Size |
|
|
} |
|
|
} |
|
|
usedVolumeCount := volumeSizes / (volumeSizeLimitMb * 1024 * 1024) |
|
|
usedVolumeCount := volumeSizes / (volumeSizeLimitMb * 1024 * 1024) |
|
|
return float64(uint64(diskInfo.MaxVolumeCount) - usedVolumeCount), usedVolumeCount |
|
|
|
|
|
|
|
|
return float64(uint64(diskInfo.MaxVolumeCount) - usedVolumeCount), usedVolumeCount, true |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
func capacityByFreeVolumeCount(diskType types.DiskType, volumeSizeLimitMb uint64) CapacityFunc { |
|
|
func capacityByFreeVolumeCount(diskType types.DiskType, volumeSizeLimitMb uint64) CapacityFunc { |
|
|
return func(info *master_pb.DataNodeInfo) (float64, uint64) { |
|
|
|
|
|
|
|
|
return func(info *master_pb.DataNodeInfo) (float64, uint64, bool) { |
|
|
diskInfo, found := info.DiskInfos[string(diskType)] |
|
|
diskInfo, found := info.DiskInfos[string(diskType)] |
|
|
if !found { |
|
|
if !found { |
|
|
return 0, 0 |
|
|
|
|
|
|
|
|
return 0, 0, false |
|
|
} |
|
|
} |
|
|
var volumeSizes uint64 |
|
|
var volumeSizes uint64 |
|
|
for _, volumeInfo := range diskInfo.VolumeInfos { |
|
|
for _, volumeInfo := range diskInfo.VolumeInfos { |
|
|
@ -344,21 +342,21 @@ func capacityByFreeVolumeCount(diskType types.DiskType, volumeSizeLimitMb uint64 |
|
|
volumeSizeLimitMb = util.VolumeSizeLimitGB * 1024 |
|
|
volumeSizeLimitMb = util.VolumeSizeLimitGB * 1024 |
|
|
} |
|
|
} |
|
|
usedVolumeCount := volumeSizes / (volumeSizeLimitMb * 1024 * 1024) |
|
|
usedVolumeCount := volumeSizes / (volumeSizeLimitMb * 1024 * 1024) |
|
|
return float64(diskInfo.MaxVolumeCount-diskInfo.VolumeCount) - float64(ecShardCount)/erasure_coding.DataShardsCount, usedVolumeCount |
|
|
|
|
|
|
|
|
return float64(diskInfo.MaxVolumeCount-diskInfo.VolumeCount) - float64(ecShardCount)/erasure_coding.DataShardsCount, usedVolumeCount, false |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
func (n *Node) localVolumeRatio(capacityFunc CapacityFunc) float64 { |
|
|
func (n *Node) localVolumeRatio(capacityFunc CapacityFunc) float64 { |
|
|
capacity, used := capacityFunc(n.info) |
|
|
|
|
|
if strings.HasSuffix(getFuncName(capacityFunc), funcNameCapByMinVolumeDensity) { |
|
|
|
|
|
|
|
|
capacity, used, isDensityBased := capacityFunc(n.info) |
|
|
|
|
|
if isDensityBased { |
|
|
return float64(used) / capacity |
|
|
return float64(used) / capacity |
|
|
} |
|
|
} |
|
|
return float64(len(n.selectedVolumes)) / capacity |
|
|
return float64(len(n.selectedVolumes)) / capacity |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
func (n *Node) localVolumeNextRatio(capacityFunc CapacityFunc) float64 { |
|
|
func (n *Node) localVolumeNextRatio(capacityFunc CapacityFunc) float64 { |
|
|
capacity, used := capacityFunc(n.info) |
|
|
|
|
|
if strings.HasSuffix(getFuncName(capacityFunc), funcNameCapByMinVolumeDensity) { |
|
|
|
|
|
|
|
|
capacity, used, isDensityBased := capacityFunc(n.info) |
|
|
|
|
|
if isDensityBased { |
|
|
return float64(used+1) / capacity |
|
|
return float64(used+1) / capacity |
|
|
} |
|
|
} |
|
|
return float64(len(n.selectedVolumes)+1) / capacity |
|
|
return float64(len(n.selectedVolumes)+1) / capacity |
|
|
@ -404,35 +402,30 @@ func selectVolumesByActive(volumeSize uint64, volumeByActive *bool, volumeSizeLi |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
func getFuncName(f interface{}) string { |
|
|
|
|
|
return runtime.FuncForPC(reflect.ValueOf(f).Pointer()).Name() |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
func balanceSelectedVolume(commandEnv *CommandEnv, diskType types.DiskType, volumeReplicas map[uint32][]*VolumeReplica, nodes []*Node, sortCandidatesFn func(volumes []*master_pb.VolumeInformationMessage), applyBalancing bool, volumeSizeLimitMb uint64, volumeByActive *bool) (err error) { |
|
|
func balanceSelectedVolume(commandEnv *CommandEnv, diskType types.DiskType, volumeReplicas map[uint32][]*VolumeReplica, nodes []*Node, sortCandidatesFn func(volumes []*master_pb.VolumeInformationMessage), applyBalancing bool, volumeSizeLimitMb uint64, volumeByActive *bool) (err error) { |
|
|
selectedVolumeCount, volumeCapacities := 0, float64(0) |
|
|
|
|
|
var volumeUsageOnLimit uint64 |
|
|
|
|
|
|
|
|
ratioVolumeCount, volumeCapacities, idealVolumeRatio := uint64(0), float64(0), float64(0) |
|
|
var nodesWithCapacity []*Node |
|
|
var nodesWithCapacity []*Node |
|
|
capacityFunc := capacityByFunc(diskType, volumeSizeLimitMb) |
|
|
capacityFunc := capacityByFunc(diskType, volumeSizeLimitMb) |
|
|
for _, dn := range nodes { |
|
|
for _, dn := range nodes { |
|
|
selectedVolumeCount += len(dn.selectedVolumes) |
|
|
|
|
|
capacity, used := capacityFunc(dn.info) |
|
|
|
|
|
|
|
|
capacity, volumeUsed, isDensityBased := capacityFunc(dn.info) |
|
|
if capacity > 0 { |
|
|
if capacity > 0 { |
|
|
nodesWithCapacity = append(nodesWithCapacity, dn) |
|
|
nodesWithCapacity = append(nodesWithCapacity, dn) |
|
|
} |
|
|
} |
|
|
|
|
|
if isDensityBased { |
|
|
|
|
|
ratioVolumeCount += volumeUsed |
|
|
|
|
|
} else { |
|
|
|
|
|
ratioVolumeCount += uint64(len(dn.selectedVolumes)) |
|
|
|
|
|
} |
|
|
volumeCapacities += capacity |
|
|
volumeCapacities += capacity |
|
|
volumeUsageOnLimit += used |
|
|
|
|
|
} |
|
|
} |
|
|
var idealVolumeRatio float64 |
|
|
|
|
|
if strings.HasSuffix(getFuncName(capacityFunc), funcNameCapByMinVolumeDensity) { |
|
|
|
|
|
idealVolumeRatio = float64(volumeUsageOnLimit) / volumeCapacities |
|
|
|
|
|
} else { |
|
|
|
|
|
idealVolumeRatio = float64(selectedVolumeCount) / volumeCapacities |
|
|
|
|
|
|
|
|
if volumeCapacities > 0 { |
|
|
|
|
|
idealVolumeRatio = float64(ratioVolumeCount) / volumeCapacities |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
hasMoved := true |
|
|
hasMoved := true |
|
|
|
|
|
|
|
|
if commandEnv != nil && commandEnv.verbose { |
|
|
if commandEnv != nil && commandEnv.verbose { |
|
|
fmt.Fprintf(os.Stdout, "selected nodes %d, volumes:%d, max:%d, usage on limit: %d, idealVolumeRatio %f\n", len(nodesWithCapacity), selectedVolumeCount, int64(volumeCapacities), volumeUsageOnLimit, idealVolumeRatio) |
|
|
|
|
|
|
|
|
fmt.Fprintf(os.Stdout, "selected nodes %d, volumes:%d, max:%d, idealVolumeRatio %f\n", len(nodesWithCapacity), ratioVolumeCount, int64(volumeCapacities), idealVolumeRatio) |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
for hasMoved { |
|
|
for hasMoved { |
|
|
|