@ -4,6 +4,7 @@ import (
"cmp"
"flag"
"fmt"
"github.com/seaweedfs/seaweedfs/weed/util"
"io"
"os"
"regexp"
@ -21,6 +22,11 @@ import (
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
)
const (
thresholdVolumeSize = 1.01
countZeroSelectedVolumes = 0.5
)
func init ( ) {
Commands = append ( Commands , & commandVolumeBalance { } )
}
@ -28,7 +34,7 @@ func init() {
type commandVolumeBalance struct {
volumeSizeLimitMb uint64
commandEnv * CommandEnv
writable bool
volumeByActive * bool
applyBalancing bool
}
@ -84,22 +90,38 @@ func (c *commandVolumeBalance) HasTag(CommandTag) bool {
}
func ( c * commandVolumeBalance ) Do ( args [ ] string , commandEnv * CommandEnv , writer io . Writer ) ( err error ) {
allowedVolumeBy := map [ string ] * bool {
"ALL" : nil ,
"ACTIVE" : new ( bool ) ,
"FULL" : new ( bool ) ,
}
* allowedVolumeBy [ "ACTIVE" ] = true
balanceCommand := flag . NewFlagSet ( c . Name ( ) , flag . ContinueOnError )
verbose := balanceCommand . Bool ( "v" , false , "verbose mode" )
collection := balanceCommand . String ( "collection" , "ALL_COLLECTIONS" , "collection name, or use \"ALL_COLLECTIONS\" across collections, \"EACH_COLLECTION\" for each collection" )
dc := balanceCommand . String ( "dataCenter" , "" , "only apply the balancing for this dataCenter" )
racks := balanceCommand . String ( "racks" , "" , "only apply the balancing for this racks" )
nodes := balanceCommand . String ( "nodes" , "" , "only apply the balancing for this nodes" )
writable := balanceCommand . Bool ( "writable" , false , "only apply the balancing for writable volumes" )
noLock := balanceCommand . Bool ( "noLock" , false , "do not lock the admin shell at one's own risk" )
applyBalancing := balanceCommand . Bool ( "apply" , false , "apply the balancing plan." )
// TODO: remove this alias
applyBalancingAlias := balanceCommand . Bool ( "force" , false , "apply the balancing plan (alias for -apply)" )
balanceCommand . Func ( "volumeBy" , "only apply the balancing for ALL volumes and ACTIVE or FULL" , func ( flagValue string ) error {
if flagValue == "" {
return nil
}
for allowed , volumeBy := range allowedVolumeBy {
if flagValue == allowed {
c . volumeByActive = volumeBy
return nil
}
}
return fmt . Errorf ( "use \"ALL\", \"ACTIVE\" or \"FULL\"" )
} )
if err = balanceCommand . Parse ( args ) ; err != nil {
return nil
}
handleDeprecatedForceFlag ( writer , balanceCommand , applyBalancingAlias , applyBalancing )
c . writable = * writable
c . applyBalancing = * applyBalancing
infoAboutSimulationMode ( writer , c . applyBalancing , "-apply" )
@ -111,6 +133,7 @@ func (c *commandVolumeBalance) Do(args []string, commandEnv *CommandEnv, writer
return
}
}
commandEnv . verbose = * verbose
c . commandEnv = commandEnv
// collect topology information
@ -182,13 +205,10 @@ func (c *commandVolumeBalance) balanceVolumeServersByDiskType(diskType types.Dis
if v . DiskType != string ( diskType ) {
return false
}
if c . writable && v . Size > c . volumeSizeLimitMb {
return false
}
return true
return selectVolumesByActive ( v . Size , c . volumeByActive , c . volumeSizeLimitMb )
} )
}
if err := balanceSelectedVolume ( c . commandEnv , diskType , volumeReplicas , nodes , sortWritableVolumes , c . applyBalancing ) ; err != nil {
if err := balanceSelectedVolume ( c . commandEnv , diskType , volumeReplicas , nodes , sortWritableVolumes , c . volumeSizeLimitMb , c . applyBalancing ) ; err != nil {
return err
}
@ -246,6 +266,25 @@ type Node struct {
}
type CapacityFunc func ( * master_pb . DataNodeInfo ) float64
type DensityFunc func ( * master_pb . DataNodeInfo ) ( float64 , uint64 )
func capacityByMinVolumeDensity ( diskType types . DiskType , volumeSizeLimitMb uint64 ) DensityFunc {
return func ( info * master_pb . DataNodeInfo ) ( float64 , uint64 ) {
diskInfo , found := info . DiskInfos [ string ( diskType ) ]
if ! found {
return 0 , 0
}
var volumeSizes uint64
for _ , volumeInfo := range diskInfo . VolumeInfos {
volumeSizes += volumeInfo . Size
}
if volumeSizeLimitMb == 0 {
volumeSizeLimitMb = util . VolumeSizeLimitGB * util . KiByte
}
usedVolumeCount := volumeSizes / ( volumeSizeLimitMb * util . MiByte )
return float64 ( diskInfo . MaxVolumeCount - int64 ( usedVolumeCount ) ) , usedVolumeCount
}
}
func capacityByMaxVolumeCount ( diskType types . DiskType ) CapacityFunc {
return func ( info * master_pb . DataNodeInfo ) float64 {
@ -275,12 +314,27 @@ func capacityByFreeVolumeCount(diskType types.DiskType) CapacityFunc {
}
}
func ( n * Node ) localVolumeRatio ( capacityFunc CapacityFunc ) float64 {
return float64 ( len ( n . selectedVolumes ) ) / capacityFunc ( n . info )
func ( n * Node ) localVolumeDensityRatio ( capacityFunc DensityFunc ) float64 {
capacity , selectedVolumes := capacityFunc ( n . info )
if capacity == 0 {
return 0
}
if selectedVolumes == 0 {
return countZeroSelectedVolumes / capacity
}
return float64 ( selectedVolumes ) / capacity
}
func ( n * Node ) localVolumeNextRatio ( capacityFunc CapacityFunc ) float64 {
return float64 ( len ( n . selectedVolumes ) + 1 ) / capacityFunc ( n . info )
func ( n * Node ) localVolumeDensityNextRatio ( capacityFunc DensityFunc ) float64 {
capacity , selectedVolumes := capacityFunc ( n . info )
if capacity == 0 {
return 0
}
return float64 ( selectedVolumes + 1 ) / capacity
}
func ( n * Node ) localVolumeRatio ( capacityFunc CapacityFunc ) float64 {
return float64 ( len ( n . selectedVolumes ) ) / capacityFunc ( n . info )
}
func ( n * Node ) isOneVolumeOnly ( ) bool {
@ -312,32 +366,51 @@ func sortWritableVolumes(volumes []*master_pb.VolumeInformationMessage) {
} )
}
func balanceSelectedVolume ( commandEnv * CommandEnv , diskType types . DiskType , volumeReplicas map [ uint32 ] [ ] * VolumeReplica , nodes [ ] * Node , sortCandidatesFn func ( volumes [ ] * master_pb . VolumeInformationMessage ) , applyBalancing bool ) ( err error ) {
selectedVolumeCount , volumeMaxCount := 0 , float64 ( 0 )
func selectVolumesByActive ( volumeSize uint64 , volumeByActive * bool , volumeSizeLimitMb uint64 ) bool {
if volumeByActive == nil {
return true
}
if uint64 ( float64 ( volumeSize ) * thresholdVolumeSize ) < volumeSizeLimitMb * util . MiByte {
return * volumeByActive
} else {
return ! ( * volumeByActive )
}
}
func balanceSelectedVolume ( commandEnv * CommandEnv , diskType types . DiskType , volumeReplicas map [ uint32 ] [ ] * VolumeReplica , nodes [ ] * Node , sortCandidatesFn func ( volumes [ ] * master_pb . VolumeInformationMessage ) , volumeSizeLimitMb uint64 , applyBalancing bool ) ( err error ) {
selectedVolumeCount , volumeCapacities := uint64 ( 0 ) , float64 ( 0 )
var nodesWithCapacity [ ] * Node
capacityFunc := capacityByMaxVolumeCount ( diskType )
if volumeSizeLimitMb == 0 {
volumeSizeLimitMb = util . VolumeSizeLimitGB * util . KiByte
}
capacityFunc := capacityByMinVolumeDensity ( diskType , volumeSizeLimitMb )
for _ , dn := range nodes {
selectedVolumeCount += len ( dn . selectedVolumes )
capacity := capacityFunc ( dn . info )
capacity , volumeCount := capacityFunc ( dn . info )
if capacity > 0 {
nodesWithCapacity = append ( nodesWithCapacity , dn )
}
volumeMaxCount += capacity
volumeCapacities += capacity
selectedVolumeCount += volumeCount
}
idealVolumeRatio := float64 ( selectedVolumeCount ) / volumeMaxCount
if volumeCapacities == 0 {
return nil
}
idealVolumeRatio := float64 ( selectedVolumeCount ) / volumeCapacities
hasMoved := true
// fmt.Fprintf(os.Stdout, " total %d volumes, max %d volumes, idealVolumeRatio %f\n", selectedVolumeCount, volumeMaxCount, idealVolumeRatio)
if commandEnv != nil && commandEnv . verbose {
fmt . Fprintf ( os . Stdout , "selected nodes %d, volumes:%d, cap:%d, idealVolumeRatio %f\n" , len ( nodesWithCapacity ) , selectedVolumeCount , int64 ( volumeCapacities ) , idealVolumeRatio * 100 )
}
for hasMoved {
hasMoved = false
slices . SortFunc ( nodesWithCapacity , func ( a , b * Node ) int {
return cmp . Compare ( a . localVolumeRatio ( capacityFunc ) , b . localVolumeRatio ( capacityFunc ) )
return cmp . Compare ( a . localVolumeDensity Ratio ( capacityFunc ) , b . localVolumeDensity Ratio ( capacityFunc ) )
} )
if len ( nodesWithCapacity ) == 0 {
fmt . Printf ( "no volume server found with capacity for %s" , diskType . ReadableString ( ) )
if commandEnv != nil && commandEnv . verbose {
fmt . Fprintf ( os . Stdout , "no volume server found with capacity for %s" , diskType . ReadableString ( ) )
}
return nil
}
@ -345,6 +418,9 @@ func balanceSelectedVolume(commandEnv *CommandEnv, diskType types.DiskType, volu
var fullNodeIndex int
for fullNodeIndex = len ( nodesWithCapacity ) - 1 ; fullNodeIndex >= 0 ; fullNodeIndex -- {
fullNode = nodesWithCapacity [ fullNodeIndex ]
if len ( fullNode . selectedVolumes ) == 0 {
continue
}
if ! fullNode . isOneVolumeOnly ( ) {
break
}
@ -353,15 +429,34 @@ func balanceSelectedVolume(commandEnv *CommandEnv, diskType types.DiskType, volu
for _ , v := range fullNode . selectedVolumes {
candidateVolumes = append ( candidateVolumes , v )
}
if fullNodeIndex == - 1 {
if commandEnv != nil && commandEnv . verbose {
fmt . Fprintf ( os . Stdout , "no nodes with capacity found for %s, nodes %d" , diskType . ReadableString ( ) , len ( nodesWithCapacity ) )
}
return nil
}
sortCandidatesFn ( candidateVolumes )
for _ , emptyNode := range nodesWithCapacity [ : fullNodeIndex ] {
if ! ( fullNode . localVolumeRatio ( capacityFunc ) > idealVolumeRatio && emptyNode . localVolumeNextRatio ( capacityFunc ) <= idealVolumeRatio ) {
// no more volume servers with empty slots
if ! ( fullNode . localVolumeDensityNextRatio ( capacityFunc ) > idealVolumeRatio && emptyNode . localVolumeDensityNextRatio ( capacityFunc ) <= idealVolumeRatio ) {
if commandEnv != nil && commandEnv . verbose {
fmt . Printf ( "no more volume servers with empty slots %s, idealVolumeRatio %f\n" , emptyNode . info . Id , idealVolumeRatio )
}
break
}
fmt . Fprintf ( os . Stdout , "%s %.2f %.2f:%.2f\t" , diskType . ReadableString ( ) , idealVolumeRatio , fullNode . localVolumeRatio ( capacityFunc ) , emptyNode . localVolumeNextRatio ( capacityFunc ) )
fmt . Fprintf ( os . Stdout , "%s %.2f %.2f:%.2f\t" , diskType . ReadableString ( ) , idealVolumeRatio ,
fullNode . localVolumeDensityRatio ( capacityFunc ) , emptyNode . localVolumeDensityNextRatio ( capacityFunc ) )
if commandEnv != nil && commandEnv . verbose {
fmt . Fprintf ( os . Stdout , "%s %.1f %.1f:%.1f\t" , diskType . ReadableString ( ) , idealVolumeRatio * 100 ,
fullNode . localVolumeDensityRatio ( capacityFunc ) * 100 , emptyNode . localVolumeDensityNextRatio ( capacityFunc ) * 100 )
}
hasMoved , err = attemptToMoveOneVolume ( commandEnv , volumeReplicas , fullNode , candidateVolumes , emptyNode , applyBalancing )
if err != nil {
if commandEnv != nil && commandEnv . verbose {
fmt . Fprintf ( os . Stdout , "attempt to move one volume error %+v\n" , err )
}
if strings . Contains ( err . Error ( ) , util . ErrVolumeNoSpaceLeft ) {
continue
}
return
}
if hasMoved {
@ -420,7 +515,7 @@ func moveVolume(commandEnv *CommandEnv, v *master_pb.VolumeInformationMessage, f
}
fmt . Fprintf ( os . Stdout , " moving %s volume %s%d %s => %s\n" , v . DiskType , collectionPrefix , v . Id , fullNode . info . Id , emptyNode . info . Id )
if applyChange {
return LiveMoveVolume ( commandEnv . option . GrpcDialOption , os . Stderr , needle . VolumeId ( v . Id ) , pb . NewServerAddressFromDataNode ( fullNode . info ) , pb . NewServerAddressFromDataNode ( emptyNode . info ) , 5 * time . Second , v . DiskType , 0 , false )
return LiveMoveVolume ( commandEnv . option . GrpcDialOption , os . Stderr , needle . VolumeId ( v . Id ) , pb . NewServerAddressFromDataNode ( fullNode . info ) , pb . NewServerAddressFromDataNode ( emptyNode . info ) , 5 * time . Second , v . DiskType , 0 , v . ReadOnly )
}
return nil
}