@ -6,7 +6,9 @@ import (
"fmt"
"io"
"os"
"reflect"
"regexp"
"runtime"
"strings"
"time"
@ -25,10 +27,16 @@ func init() {
Commands = append ( Commands , & commandVolumeBalance { } )
}
var (
capacityByFunc = capacityByMaxVolumeCount
)
const funcNameCapByMinVolumeDensity = "capacityByMinVolumeDensity.func1"
type commandVolumeBalance struct {
volumeSizeLimitMb uint64
commandEnv * CommandEnv
writable bool
volumeByActive * bool
applyBalancing bool
}
@ -84,22 +92,55 @@ func (c *commandVolumeBalance) HasTag(CommandTag) bool {
}
func ( c * commandVolumeBalance ) Do ( args [ ] string , commandEnv * CommandEnv , writer io . Writer ) ( err error ) {
allowedCapacityBy := map [ string ] CapacityByFunc {
"MAX_VOLUME_COUNT" : capacityByMaxVolumeCount ,
"FREE_VOLUME_COUNT" : capacityByFreeVolumeCount ,
"MIN_VOLUME_DENSITY" : capacityByMinVolumeDensity ,
}
allowedVolumeBy := map [ string ] * bool {
"ALL" : nil ,
"ACTIVE" : new ( bool ) ,
"FULL" : new ( bool ) ,
}
* allowedVolumeBy [ "ACTIVE" ] = true
balanceCommand := flag . NewFlagSet ( c . Name ( ) , flag . ContinueOnError )
verbose := balanceCommand . Bool ( "v" , false , "verbose mode" )
collection := balanceCommand . String ( "collection" , "ALL_COLLECTIONS" , "collection name, or use \"ALL_COLLECTIONS\" across collections, \"EACH_COLLECTION\" for each collection" )
dc := balanceCommand . String ( "dataCenter" , "" , "only apply the balancing for this dataCenter" )
racks := balanceCommand . String ( "racks" , "" , "only apply the balancing for this racks" )
nodes := balanceCommand . String ( "nodes" , "" , "only apply the balancing for this nodes" )
writable := balanceCommand . Bool ( "writable" , false , "only apply the balancing for writable volumes" )
noLock := balanceCommand . Bool ( "noLock" , false , "do not lock the admin shell at one's own risk" )
applyBalancing := balanceCommand . Bool ( "apply" , false , "apply the balancing plan." )
// TODO: remove this alias
applyBalancingAlias := balanceCommand . Bool ( "force" , false , "apply the balancing plan (alias for -apply)" )
balanceCommand . Func ( "volumeBy" , "only apply the balancing for ALL volumes and ACTIVE or FULL" , func ( flagValue string ) error {
if flagValue == "" {
return nil
}
for allowed , volumeBy := range allowedVolumeBy {
if flagValue == allowed {
c . volumeByActive = volumeBy
return nil
}
}
return fmt . Errorf ( "use \"ALL\", \"ACTIVE\" or \"FULL\"" )
} )
balanceCommand . Func ( "capacityBy" , "capacityBy function name use \"MAX_VOLUME_COUNT\", \"FREE_VOLUME_COUNT\" and \"MIN_VOLUME_DENSITY\"" , func ( flagValue string ) error {
if flagValue == "" {
return nil
}
for allowed , allowedCapacityByFunc := range allowedCapacityBy {
if flagValue == allowed {
capacityByFunc = allowedCapacityByFunc
return nil
}
}
return fmt . Errorf ( "use \"MAX_VOLUME_COUNT\", \"FREE_VOLUME_COUNT\" or \"MIN_VOLUME_DENSITY\"" )
} )
if err = balanceCommand . Parse ( args ) ; err != nil {
return nil
}
handleDeprecatedForceFlag ( writer , balanceCommand , applyBalancingAlias , applyBalancing )
c . writable = * writable
c . applyBalancing = * applyBalancing
infoAboutSimulationMode ( writer , c . applyBalancing , "-apply" )
@ -111,6 +152,7 @@ func (c *commandVolumeBalance) Do(args []string, commandEnv *CommandEnv, writer
return
}
}
commandEnv . verbose = * verbose
c . commandEnv = commandEnv
// collect topology information
@ -182,13 +224,10 @@ func (c *commandVolumeBalance) balanceVolumeServersByDiskType(diskType types.Dis
if v . DiskType != string ( diskType ) {
return false
}
if c . writable && v . Size > c . volumeSizeLimitMb {
return false
}
return true
return selectVolumesByActive ( v . Size , c . volumeByActive , c . volumeSizeLimitMb )
} )
}
if err := balanceSelectedVolume ( c . commandEnv , diskType , volumeReplicas , nodes , sortWritableVolumes , c . applyBalancing ) ; err != nil {
if err := balanceSelectedVolume ( c . commandEnv , diskType , volumeReplicas , nodes , sortWritableVolumes , c . applyBalancing , c . volumeSizeLimitMb , c . volumeByActive ) ; err != nil {
return err
}
@ -245,42 +284,76 @@ type Node struct {
rack string
}
type CapacityFunc func ( * master_pb . DataNodeInfo ) float64
type CapacityFunc func ( * master_pb . DataNodeInfo ) ( float64 , uint64 )
type CapacityByFunc func ( diskType types . DiskType , volumeSizeLimitMb uint64 ) CapacityFunc
func capacityByMaxVolumeCount ( diskType types . DiskType ) CapacityFunc {
return func ( info * master_pb . DataNodeInfo ) float64 {
func capacityByMaxVolumeCount ( diskType types . DiskType , volumeSizeLimitMb uint64 ) CapacityFunc {
return func ( info * master_pb . DataNodeInfo ) ( float64 , uint64 ) {
diskInfo , found := info . DiskInfos [ string ( diskType ) ]
if ! found {
return 0
return 0 , 0
}
var volumeSizes uint64
for _ , volumeInfo := range diskInfo . VolumeInfos {
volumeSizes += volumeInfo . Size
}
var ecShardCount int
for _ , ecShardInfo := range diskInfo . EcShardInfos {
ecShardCount += erasure_coding . GetShardCount ( ecShardInfo )
}
return float64 ( diskInfo . MaxVolumeCount ) - float64 ( ecShardCount ) / erasure_coding . DataShardsCount
usedVolumeCount := volumeSizes / ( volumeSizeLimitMb * 1024 * 1024 )
return float64 ( diskInfo . MaxVolumeCount ) - float64 ( ecShardCount ) / erasure_coding . DataShardsCount , usedVolumeCount
}
}
func capacityByMinVolumeDensity ( diskType types . DiskType , volumeSizeLimitMb uint64 ) CapacityFunc {
return func ( info * master_pb . DataNodeInfo ) ( float64 , uint64 ) {
diskInfo , found := info . DiskInfos [ string ( diskType ) ]
if ! found {
return 0 , 0
}
var volumeSizes uint64
for _ , volumeInfo := range diskInfo . VolumeInfos {
volumeSizes += volumeInfo . Size
}
usedVolumeCount := volumeSizes / ( volumeSizeLimitMb * 1024 * 1024 )
return float64 ( uint64 ( diskInfo . MaxVolumeCount ) - usedVolumeCount ) , usedVolumeCount
}
}
func capacityByFreeVolumeCount ( diskType types . DiskType ) CapacityFunc {
return func ( info * master_pb . DataNodeInfo ) float64 {
func capacityByFreeVolumeCount ( diskType types . DiskType , volumeSizeLimitMb uint64 ) CapacityFunc {
return func ( info * master_pb . DataNodeInfo ) ( float64 , uint64 ) {
diskInfo , found := info . DiskInfos [ string ( diskType ) ]
if ! found {
return 0
return 0 , 0
}
var volumeSizes uint64
for _ , volumeInfo := range diskInfo . VolumeInfos {
volumeSizes += volumeInfo . Size
}
var ecShardCount int
for _ , ecShardInfo := range diskInfo . EcShardInfos {
ecShardCount += erasure_coding . GetShardCount ( ecShardInfo )
}
return float64 ( diskInfo . MaxVolumeCount - diskInfo . VolumeCount ) - float64 ( ecShardCount ) / erasure_coding . DataShardsCount
usedVolumeCount := volumeSizes / ( volumeSizeLimitMb * 1024 * 1024 )
return float64 ( diskInfo . MaxVolumeCount - diskInfo . VolumeCount ) - float64 ( ecShardCount ) / erasure_coding . DataShardsCount , usedVolumeCount
}
}
func ( n * Node ) localVolumeRatio ( capacityFunc CapacityFunc ) float64 {
return float64 ( len ( n . selectedVolumes ) ) / capacityFunc ( n . info )
capacity , used := capacityFunc ( n . info )
if strings . HasSuffix ( getFuncName ( capacityFunc ) , funcNameCapByMinVolumeDensity ) {
return float64 ( used ) / capacity
}
return float64 ( len ( n . selectedVolumes ) ) / capacity
}
func ( n * Node ) localVolumeNextRatio ( capacityFunc CapacityFunc ) float64 {
return float64 ( len ( n . selectedVolumes ) + 1 ) / capacityFunc ( n . info )
capacity , used := capacityFunc ( n . info )
if strings . HasSuffix ( getFuncName ( capacityFunc ) , funcNameCapByMinVolumeDensity ) {
return float64 ( used + 1 ) / capacity
}
return float64 ( len ( n . selectedVolumes ) + 1 ) / capacity
}
func ( n * Node ) isOneVolumeOnly ( ) bool {
@ -312,24 +385,47 @@ func sortWritableVolumes(volumes []*master_pb.VolumeInformationMessage) {
} )
}
func balanceSelectedVolume ( commandEnv * CommandEnv , diskType types . DiskType , volumeReplicas map [ uint32 ] [ ] * VolumeReplica , nodes [ ] * Node , sortCandidatesFn func ( volumes [ ] * master_pb . VolumeInformationMessage ) , applyBalancing bool ) ( err error ) {
selectedVolumeCount , volumeMaxCount := 0 , float64 ( 0 )
func selectVolumesByActive ( volumeSize uint64 , volumeByActive * bool , volumeSizeLimitMb uint64 ) bool {
if volumeByActive == nil {
return true
}
if ( volumeSize * 101 ) / 100 < volumeSizeLimitMb * 1024 * 1024 {
return * volumeByActive
} else {
return ! ( * volumeByActive )
}
}
func getFuncName ( f interface { } ) string {
return runtime . FuncForPC ( reflect . ValueOf ( f ) . Pointer ( ) ) . Name ( )
}
func balanceSelectedVolume ( commandEnv * CommandEnv , diskType types . DiskType , volumeReplicas map [ uint32 ] [ ] * VolumeReplica , nodes [ ] * Node , sortCandidatesFn func ( volumes [ ] * master_pb . VolumeInformationMessage ) , applyBalancing bool , volumeSizeLimitMb uint64 , volumeByActive * bool ) ( err error ) {
selectedVolumeCount , volumeCapacities := 0 , float64 ( 0 )
var volumeUsageOnLimit uint64
var nodesWithCapacity [ ] * Node
capacityFunc := capacityByMaxVolumeCount ( diskType )
capacityFunc := capacityByFunc ( diskType , volumeSizeLimitMb )
for _ , dn := range nodes {
selectedVolumeCount += len ( dn . selectedVolumes )
capacity := capacityFunc ( dn . info )
capacity , used := capacityFunc ( dn . info )
if capacity > 0 {
nodesWithCapacity = append ( nodesWithCapacity , dn )
}
volumeMaxCount += capacity
volumeCapacities += capacity
volumeUsageOnLimit += used
}
var idealVolumeRatio float64
if strings . HasSuffix ( getFuncName ( capacityFunc ) , funcNameCapByMinVolumeDensity ) {
idealVolumeRatio = float64 ( volumeUsageOnLimit ) / volumeCapacities
} else {
idealVolumeRatio = float64 ( selectedVolumeCount ) / volumeCapacities
}
idealVolumeRatio := float64 ( selectedVolumeCount ) / volumeMaxCount
hasMoved := true
// fmt.Fprintf(os.Stdout, " total %d volumes, max %d volumes, idealVolumeRatio %f\n", selectedVolumeCount, volumeMaxCount, idealVolumeRatio)
if commandEnv . verbose {
fmt . Fprintf ( os . Stdout , "selected nodes %d, volumes:%d, max:%d, usage on limit: %d, idealVolumeRatio %f\n" , len ( nodesWithCapacity ) , selectedVolumeCount , int64 ( volumeCapacities ) , volumeUsageOnLimit , idealVolumeRatio )
}
for hasMoved {
hasMoved = false
@ -337,7 +433,7 @@ func balanceSelectedVolume(commandEnv *CommandEnv, diskType types.DiskType, volu
return cmp . Compare ( a . localVolumeRatio ( capacityFunc ) , b . localVolumeRatio ( capacityFunc ) )
} )
if len ( nodesWithCapacity ) == 0 {
fmt . Printf ( "no volume server found with capacity for %s" , diskType . ReadableString ( ) )
fmt . Fprintf ( os . Stdout , "no volume server found with capacity for %s" , diskType . ReadableString ( ) )
return nil
}
@ -345,6 +441,9 @@ func balanceSelectedVolume(commandEnv *CommandEnv, diskType types.DiskType, volu
var fullNodeIndex int
for fullNodeIndex = len ( nodesWithCapacity ) - 1 ; fullNodeIndex >= 0 ; fullNodeIndex -- {
fullNode = nodesWithCapacity [ fullNodeIndex ]
if len ( fullNode . selectedVolumes ) == 0 {
continue
}
if ! fullNode . isOneVolumeOnly ( ) {
break
}
@ -356,12 +455,19 @@ func balanceSelectedVolume(commandEnv *CommandEnv, diskType types.DiskType, volu
sortCandidatesFn ( candidateVolumes )
for _ , emptyNode := range nodesWithCapacity [ : fullNodeIndex ] {
if ! ( fullNode . localVolumeRatio ( capacityFunc ) > idealVolumeRatio && emptyNode . localVolumeNextRatio ( capacityFunc ) <= idealVolumeRatio ) {
// no more volume servers with empty slots
if commandEnv . verbose {
fmt . Printf ( "no more volume servers with empty slots %s, idealVolumeRatio %f\n" , emptyNode . info . Id , idealVolumeRatio )
}
break
}
fmt . Fprintf ( os . Stdout , "%s %.2f %.2f:%.2f\t" , diskType . ReadableString ( ) , idealVolumeRatio , fullNode . localVolumeRatio ( capacityFunc ) , emptyNode . localVolumeNextRatio ( capacityFunc ) )
if commandEnv . verbose {
fmt . Fprintf ( os . Stdout , "%s %.2f %.2f:%.2f\t" , diskType . ReadableString ( ) , idealVolumeRatio , fullNode . localVolumeRatio ( capacityFunc ) , emptyNode . localVolumeNextRatio ( capacityFunc ) )
}
hasMoved , err = attemptToMoveOneVolume ( commandEnv , volumeReplicas , fullNode , candidateVolumes , emptyNode , applyBalancing )
if err != nil {
if commandEnv . verbose {
fmt . Fprintf ( os . Stdout , "attempt to move one volume error %+v\n" , err )
}
return
}
if hasMoved {