@ -6,6 +6,7 @@ import (
"fmt"
"math/rand/v2"
"sort"
"sync"
"time"
"github.com/seaweedfs/seaweedfs/weed/glog"
@ -556,6 +557,48 @@ type ecBalancer struct {
ecNodes [ ] * EcNode
replicaPlacement * super_block . ReplicaPlacement
applyBalancing bool
parallelize bool
wg * sync . WaitGroup
// TODO: Maybe accumulate all errors instead of just the last one.
wgError error
}
type ecBalancerTask func ( ) error
func ( ecb * ecBalancer ) wgInit ( ) {
if ecb . wg == nil {
ecb . wg = & sync . WaitGroup { }
ecb . wgError = nil
}
}
func ( ecb * ecBalancer ) wgAdd ( f ecBalancerTask ) {
if ecb . wg == nil || ! ecb . parallelize {
if err := f ( ) ; err != nil {
ecb . wgError = err
}
return
}
ecb . wg . Add ( 1 )
go func ( ) {
if err := f ( ) ; err != nil {
ecb . wgError = err
}
ecb . wg . Done ( )
} ( )
}
func ( ecb * ecBalancer ) wgWait ( ) error {
if ecb . wg != nil {
ecb . wg . Wait ( )
}
err := ecb . wgError
ecb . wg = nil
ecb . wgError = nil
return err
}
func ( ecb * ecBalancer ) racks ( ) map [ RackId ] * EcRack {
@ -592,15 +635,15 @@ func (ecb *ecBalancer) balanceEcVolumes(collection string) error {
}
func ( ecb * ecBalancer ) deleteDuplicatedEcShards ( collection string ) error {
// vid => []ecNode
vidLocations := ecb . collectVolumeIdToEcNodes ( collection )
// deduplicate ec shards
ecb . wgInit ( )
for vid , locations := range vidLocations {
if err := ecb . doDeduplicateEcShards ( collection , vid , locations ) ; err != nil {
return err
}
ecb . wgAdd ( func ( ) error {
return ecb . doDeduplicateEcShards ( collection , vid , locations )
} )
}
return nil
return ecb . wgWait ( )
}
func ( ecb * ecBalancer ) doDeduplicateEcShards ( collection string , vid needle . VolumeId , locations [ ] * EcNode ) error {
@ -636,6 +679,7 @@ func (ecb *ecBalancer) doDeduplicateEcShards(collection string, vid needle.Volum
return nil
}
// TODO: enable parallelization
func ( ecb * ecBalancer ) balanceEcShardsAcrossRacks ( collection string ) error {
// collect vid => []ecNode, since previous steps can change the locations
vidLocations := ecb . collectVolumeIdToEcNodes ( collection )
@ -741,6 +785,7 @@ func (ecb *ecBalancer) pickRackToBalanceShardsInto(rackToEcNodes map[RackId]*EcR
return targets [ rand . IntN ( len ( targets ) ) ] , nil
}
// TODO: enable parallelization
func ( ecb * ecBalancer ) balanceEcShardsWithinRacks ( collection string ) error {
// collect vid => []ecNode, since previous steps can change the locations
vidLocations := ecb . collectVolumeIdToEcNodes ( collection )
@ -809,6 +854,7 @@ func (ecb *ecBalancer) balanceEcRacks() error {
return nil
}
// TODO: enable parallelization
func ( ecb * ecBalancer ) doBalanceEcRack ( ecRack * EcRack ) error {
if len ( ecRack . ecNodes ) <= 1 {
return nil
@ -1001,7 +1047,7 @@ func (ecb *ecBalancer) collectVolumeIdToEcNodes(collection string) map[needle.Vo
return vidLocations
}
func EcBalance ( commandEnv * CommandEnv , collections [ ] string , dc string , ecReplicaPlacement * super_block . ReplicaPlacement , applyBalancing bool ) ( err error ) {
func EcBalance ( commandEnv * CommandEnv , collections [ ] string , dc string , ecReplicaPlacement * super_block . ReplicaPlacement , parallelize bool , applyBalancing bool ) ( err error ) {
if len ( collections ) == 0 {
return fmt . Errorf ( "no collections to balance" )
}
@ -1020,6 +1066,7 @@ func EcBalance(commandEnv *CommandEnv, collections []string, dc string, ecReplic
ecNodes : allEcNodes ,
replicaPlacement : ecReplicaPlacement ,
applyBalancing : applyBalancing ,
parallelize : parallelize ,
}
for _ , c := range collections {