@ -6,6 +6,7 @@ import ( 
			
		
	
		
			
				
						"fmt"  
			
		
	
		
			
				
						"math/rand/v2"  
			
		
	
		
			
				
						"sort"  
			
		
	
		
			
				
						"sync"  
			
		
	
		
			
				
						"time"  
			
		
	
		
			
				
					
 
			
		
	
		
			
				
						"github.com/seaweedfs/seaweedfs/weed/glog"  
			
		
	
	
		
			
				
					
						
							
								 
						
						
							
								 
						
						
					 
				
				@ -556,6 +557,48 @@ type ecBalancer struct { 
			
		
	
		
			
				
						ecNodes           [ ] * EcNode  
			
		
	
		
			
				
						replicaPlacement  * super_block . ReplicaPlacement  
			
		
	
		
			
				
						applyBalancing    bool  
			
		
	
		
			
				
						parallelize       bool  
			
		
	
		
			
				
					
 
			
		
	
		
			
				
						wg  * sync . WaitGroup  
			
		
	
		
			
				
						// TODO: Maybe accumulate all errors instead of just the last one.
  
			
		
	
		
			
				
						wgError  error  
			
		
	
		
			
				
					}  
			
		
	
		
			
				
					
 
			
		
	
		
			
				
					type  ecBalancerTask  func ( )  error  
			
		
	
		
			
				
					
 
			
		
	
		
			
				
					func  ( ecb  * ecBalancer )  wgInit ( )  {  
			
		
	
		
			
				
						if  ecb . wg  ==  nil  {  
			
		
	
		
			
				
							ecb . wg  =  & sync . WaitGroup { }  
			
		
	
		
			
				
							ecb . wgError  =  nil  
			
		
	
		
			
				
						}  
			
		
	
		
			
				
					}  
			
		
	
		
			
				
					
 
			
		
	
		
			
				
					func  ( ecb  * ecBalancer )  wgAdd ( f  ecBalancerTask )  {  
			
		
	
		
			
				
						if  ecb . wg  ==  nil  ||  ! ecb . parallelize  {  
			
		
	
		
			
				
							if  err  :=  f ( ) ;  err  !=  nil  {  
			
		
	
		
			
				
								ecb . wgError  =  err  
			
		
	
		
			
				
							}  
			
		
	
		
			
				
							return  
			
		
	
		
			
				
						}  
			
		
	
		
			
				
					
 
			
		
	
		
			
				
						ecb . wg . Add ( 1 )  
			
		
	
		
			
				
						go  func ( )  {  
			
		
	
		
			
				
							if  err  :=  f ( ) ;  err  !=  nil  {  
			
		
	
		
			
				
								ecb . wgError  =  err  
			
		
	
		
			
				
							}  
			
		
	
		
			
				
							ecb . wg . Done ( )  
			
		
	
		
			
				
						} ( )  
			
		
	
		
			
				
					}  
			
		
	
		
			
				
					
 
			
		
	
		
			
				
					func  ( ecb  * ecBalancer )  wgWait ( )  error  {  
			
		
	
		
			
				
						if  ecb . wg  !=  nil  {  
			
		
	
		
			
				
							ecb . wg . Wait ( )  
			
		
	
		
			
				
						}  
			
		
	
		
			
				
						err  :=  ecb . wgError  
			
		
	
		
			
				
						ecb . wg  =  nil  
			
		
	
		
			
				
						ecb . wgError  =  nil  
			
		
	
		
			
				
					
 
			
		
	
		
			
				
						return  err  
			
		
	
		
			
				
					}  
			
		
	
		
			
				
					
 
			
		
	
		
			
				
					func  ( ecb  * ecBalancer )  racks ( )  map [ RackId ] * EcRack  {  
			
		
	
	
		
			
				
					
						
							
								 
						
						
							
								 
						
						
					 
				
				@ -592,15 +635,15 @@ func (ecb *ecBalancer) balanceEcVolumes(collection string) error { 
			
		
	
		
			
				
					}  
			
		
	
		
			
				
					
 
			
		
	
		
			
				
					func  ( ecb  * ecBalancer )  deleteDuplicatedEcShards ( collection  string )  error  {  
			
		
	
		
			
				
						// vid => []ecNode
  
			
		
	
		
			
				
						vidLocations  :=  ecb . collectVolumeIdToEcNodes ( collection )  
			
		
	
		
			
				
						// deduplicate ec shards
  
			
		
	
		
			
				
					
 
			
		
	
		
			
				
						ecb . wgInit ( )  
			
		
	
		
			
				
						for  vid ,  locations  :=  range  vidLocations  {  
			
		
	
		
			
				
							if  err  :=  ecb . doDeduplicateEcShards ( collection ,  vid ,  locations ) ;  err  !=  nil {  
			
		
	
		
			
				
								return  err   
			
		
	
		
			
				
							}  
			
		
	
		
			
				
							ecb . wgAdd ( func ( )  error {  
			
		
	
		
			
				
								return  ecb . doDeduplicateEcShards ( collection ,  vid ,  locations )   
			
		
	
		
			
				
							} )  
			
		
	
		
			
				
						}  
			
		
	
		
			
				
						return  nil  
			
		
	
		
			
				
						return  ecb . wgWait ( )  
			
		
	
		
			
				
					}  
			
		
	
		
			
				
					
 
			
		
	
		
			
				
					func  ( ecb  * ecBalancer )  doDeduplicateEcShards ( collection  string ,  vid  needle . VolumeId ,  locations  [ ] * EcNode )  error  {  
			
		
	
	
		
			
				
					
						
							
								 
						
						
							
								 
						
						
					 
				
				@ -636,6 +679,7 @@ func (ecb *ecBalancer) doDeduplicateEcShards(collection string, vid needle.Volum 
			
		
	
		
			
				
						return  nil  
			
		
	
		
			
				
					}  
			
		
	
		
			
				
					
 
			
		
	
		
			
				
					// TODO: enable parallelization
  
			
		
	
		
			
				
					func  ( ecb  * ecBalancer )  balanceEcShardsAcrossRacks ( collection  string )  error  {  
			
		
	
		
			
				
						// collect vid => []ecNode, since previous steps can change the locations
  
			
		
	
		
			
				
						vidLocations  :=  ecb . collectVolumeIdToEcNodes ( collection )  
			
		
	
	
		
			
				
					
						
							
								 
						
						
							
								 
						
						
					 
				
				@ -741,6 +785,7 @@ func (ecb *ecBalancer) pickRackToBalanceShardsInto(rackToEcNodes map[RackId]*EcR 
			
		
	
		
			
				
						return  targets [ rand . IntN ( len ( targets ) ) ] ,  nil  
			
		
	
		
			
				
					}  
			
		
	
		
			
				
					
 
			
		
	
		
			
				
					// TODO: enable parallelization
  
			
		
	
		
			
				
					func  ( ecb  * ecBalancer )  balanceEcShardsWithinRacks ( collection  string )  error  {  
			
		
	
		
			
				
						// collect vid => []ecNode, since previous steps can change the locations
  
			
		
	
		
			
				
						vidLocations  :=  ecb . collectVolumeIdToEcNodes ( collection )  
			
		
	
	
		
			
				
					
						
							
								 
						
						
							
								 
						
						
					 
				
				@ -809,6 +854,7 @@ func (ecb *ecBalancer) balanceEcRacks() error { 
			
		
	
		
			
				
						return  nil  
			
		
	
		
			
				
					}  
			
		
	
		
			
				
					
 
			
		
	
		
			
				
					// TODO: enable parallelization
  
			
		
	
		
			
				
					func  ( ecb  * ecBalancer )  doBalanceEcRack ( ecRack  * EcRack )  error  {  
			
		
	
		
			
				
						if  len ( ecRack . ecNodes )  <=  1  {  
			
		
	
		
			
				
							return  nil  
			
		
	
	
		
			
				
					
						
							
								 
						
						
							
								 
						
						
					 
				
				@ -1001,7 +1047,7 @@ func (ecb *ecBalancer) collectVolumeIdToEcNodes(collection string) map[needle.Vo 
			
		
	
		
			
				
						return  vidLocations  
			
		
	
		
			
				
					}  
			
		
	
		
			
				
					
 
			
		
	
		
			
				
					func  EcBalance ( commandEnv  * CommandEnv ,  collections  [ ] string ,  dc  string ,  ecReplicaPlacement  * super_block . ReplicaPlacement ,  applyBalancing  bool )  ( err  error )  {  
			
		
	
		
			
				
					func  EcBalance ( commandEnv  * CommandEnv ,  collections  [ ] string ,  dc  string ,  ecReplicaPlacement  * super_block . ReplicaPlacement ,  parallelize  bool ,  applyBalancing  bool )  ( err  error )  {  
			
		
	
		
			
				
						if  len ( collections )  ==  0  {  
			
		
	
		
			
				
							return  fmt . Errorf ( "no collections to balance" )  
			
		
	
		
			
				
						}  
			
		
	
	
		
			
				
					
						
						
						
							
								 
						
					 
				
				@ -1020,6 +1066,7 @@ func EcBalance(commandEnv *CommandEnv, collections []string, dc string, ecReplic 
			
		
	
		
			
				
							ecNodes :           allEcNodes ,  
			
		
	
		
			
				
							replicaPlacement :  ecReplicaPlacement ,  
			
		
	
		
			
				
							applyBalancing :    applyBalancing ,  
			
		
	
		
			
				
							parallelize :       parallelize ,  
			
		
	
		
			
				
						}  
			
		
	
		
			
				
					
 
			
		
	
		
			
				
						for  _ ,  c  :=  range  collections  {