@ -2,7 +2,9 @@ package topology
import (
"context"
"github.com/seaweedfs/seaweedfs/weed/util"
"io"
"sync"
"sync/atomic"
"time"
@ -213,7 +215,7 @@ func (t *Topology) batchVacuumVolumeCleanup(grpcDialOption grpc.DialOption, vl *
}
}
func ( t * Topology ) Vacuum ( grpcDialOption grpc . DialOption , garbageThreshold float64 , volumeId uint32 , collection string , preallocate int64 ) {
func ( t * Topology ) Vacuum ( grpcDialOption grpc . DialOption , garbageThreshold float64 , maxParallelVacuumPerServer int , volumeId uint32 , collection string , preallocate int64 ) {
// if there is vacuum going on, return immediately
swapped := atomic . CompareAndSwapInt64 ( & t . vacuumLockCounter , 0 , 1 )
@ -243,25 +245,83 @@ func (t *Topology) Vacuum(grpcDialOption grpc.DialOption, garbageThreshold float
t . vacuumOneVolumeId ( grpcDialOption , volumeLayout , c , garbageThreshold , locationList , vid , preallocate )
}
} else {
t . vacuumOneVolumeLayout ( grpcDialOption , volumeLayout , c , garbageThreshold , preallocate )
t . vacuumOneVolumeLayout ( grpcDialOption , volumeLayout , c , garbageThreshold , maxParallelVacuumPerServer , preallocate )
}
}
}
}
}
func ( t * Topology ) vacuumOneVolumeLayout ( grpcDialOption grpc . DialOption , volumeLayout * VolumeLayout , c * Collection , garbageThreshold float64 , preallocate int64 ) {
func ( t * Topology ) vacuumOneVolumeLayout ( grpcDialOption grpc . DialOption , volumeLayout * VolumeLayout , c * Collection , garbageThreshold float64 , maxParallelVacuumPerServer int , preallocate int64 ) {
volumeLayout . accessLock . RLock ( )
tmp Map := make ( map [ needle . VolumeId ] * VolumeLocationList )
todoVolume Map := make ( map [ needle . VolumeId ] * VolumeLocationList )
for vid , locationList := range volumeLayout . vid2location {
tmp Map [ vid ] = locationList . Copy ( )
todoVolume Map [ vid ] = locationList . Copy ( )
}
volumeLayout . accessLock . RUnlock ( )
for vid , locationList := range tmpMap {
t . vacuumOneVolumeId ( grpcDialOption , volumeLayout , c , garbageThreshold , locationList , vid , preallocate )
// limiter for each volume server
limiter := make ( map [ NodeId ] int )
var limiterLock sync . Mutex
for _ , locationList := range todoVolumeMap {
for _ , dn := range locationList . list {
if _ , ok := limiter [ dn . Id ( ) ] ; ! ok {
limiter [ dn . Id ( ) ] = maxParallelVacuumPerServer
}
}
}
executor := util . NewLimitedConcurrentExecutor ( 100 )
var wg sync . WaitGroup
for len ( todoVolumeMap ) > 0 {
pendingVolumeMap := make ( map [ needle . VolumeId ] * VolumeLocationList )
for vid , locationList := range todoVolumeMap {
hasEnoughQuota := true
for _ , dn := range locationList . list {
limiterLock . Lock ( )
quota := limiter [ dn . Id ( ) ]
limiterLock . Unlock ( )
if quota <= 0 {
hasEnoughQuota = false
break
}
}
if ! hasEnoughQuota {
pendingVolumeMap [ vid ] = locationList
continue
}
// debit the quota
for _ , dn := range locationList . list {
limiterLock . Lock ( )
limiter [ dn . Id ( ) ] --
limiterLock . Unlock ( )
}
wg . Add ( 1 )
executor . Execute ( func ( ) {
defer wg . Done ( )
t . vacuumOneVolumeId ( grpcDialOption , volumeLayout , c , garbageThreshold , locationList , vid , preallocate )
// credit the quota
for _ , dn := range locationList . list {
limiterLock . Lock ( )
limiter [ dn . Id ( ) ] ++
limiterLock . Unlock ( )
}
} )
}
if len ( todoVolumeMap ) == len ( pendingVolumeMap ) {
time . Sleep ( 10 * time . Second )
}
todoVolumeMap = pendingVolumeMap
}
wg . Wait ( )
}
func ( t * Topology ) vacuumOneVolumeId ( grpcDialOption grpc . DialOption , volumeLayout * VolumeLayout , c * Collection , garbageThreshold float64 , locationList * VolumeLocationList , vid needle . VolumeId , preallocate int64 ) {