@ -32,12 +32,17 @@ type FilerOperations interface {
// folderState tracks the state of a folder for empty folder cleanup
type folderState struct {
roughCount int // Cached rough count (up to maxCountCheck)
isImplicit * bool // Tri-state boolean: nil (unknown), true (implicit), false (explicit)
lastAddTime time . Time // Last time an item was added
lastDelTime time . Time // Last time an item was deleted
lastCheck time . Time // Last time we checked the actual count
}
type bucketCleanupPolicyState struct {
autoRemove bool
attrValue string
lastCheck time . Time
}
// EmptyFolderCleaner handles asynchronous cleanup of empty folders
// Each filer owns specific folders via consistent hashing based on the peer filer list
type EmptyFolderCleaner struct {
@ -46,8 +51,9 @@ type EmptyFolderCleaner struct {
host pb . ServerAddress
// Folder state tracking
mu sync . RWMutex
folderCounts map [ string ] * folderState // Rough count cache
mu sync . RWMutex
folderCounts map [ string ] * folderState // Rough count cache
bucketCleanupPolicies map [ string ] * bucketCleanupPolicyState // bucket path -> cleanup policy cache
// Cleanup queue (thread-safe, has its own lock)
cleanupQueue * CleanupQueue
@ -66,17 +72,18 @@ type EmptyFolderCleaner struct {
// NewEmptyFolderCleaner creates a new EmptyFolderCleaner
func NewEmptyFolderCleaner ( filer FilerOperations , lockRing * lock_manager . LockRing , host pb . ServerAddress , bucketPath string ) * EmptyFolderCleaner {
efc := & EmptyFolderCleaner {
filer : filer ,
lockRing : lockRing ,
host : host ,
folderCounts : make ( map [ string ] * folderState ) ,
cleanupQueue : NewCleanupQueue ( DefaultQueueMaxSize , DefaultQueueMaxAge ) ,
maxCountCheck : DefaultMaxCountCheck ,
cacheExpiry : DefaultCacheExpiry ,
processorSleep : DefaultProcessorSleep ,
bucketPath : bucketPath ,
enabled : true ,
stopCh : make ( chan struct { } ) ,
filer : filer ,
lockRing : lockRing ,
host : host ,
folderCounts : make ( map [ string ] * folderState ) ,
bucketCleanupPolicies : make ( map [ string ] * bucketCleanupPolicyState ) ,
cleanupQueue : NewCleanupQueue ( DefaultQueueMaxSize , DefaultQueueMaxAge ) ,
maxCountCheck : DefaultMaxCountCheck ,
cacheExpiry : DefaultCacheExpiry ,
processorSleep : DefaultProcessorSleep ,
bucketPath : bucketPath ,
enabled : true ,
stopCh : make ( chan struct { } ) ,
}
go efc . cacheEvictionLoop ( )
go efc . cleanupProcessor ( )
@ -268,58 +275,19 @@ func (efc *EmptyFolderCleaner) executeCleanup(folder string, triggeredBy string)
return
}
// Check for explicit implicit_dir attribute
// First check cache
ctx := context . Background ( )
efc . mu . RLock ( )
var cachedImplicit * bool
if state , exists := efc . folderCounts [ folder ] ; exists {
cachedImplicit = state . isImplicit
}
efc . mu . RUnlock ( )
var isImplicit bool
implicitSource := "cache"
implicitAttr := "<cached>"
if cachedImplicit != nil {
isImplicit = * cachedImplicit
if isImplicit {
implicitAttr = "true"
} else {
implicitAttr = "false"
}
} else {
implicitSource = "filer"
// Not cached, check filer
attrs , err := efc . filer . GetEntryAttributes ( ctx , util . FullPath ( folder ) )
if err != nil {
if err == filer_pb . ErrNotFound {
return
}
glog . V ( 2 ) . Infof ( "EmptyFolderCleaner: error getting attributes for %s: %v" , folder , err )
bucketPath , autoRemove , source , attrValue , err := efc . getBucketCleanupPolicy ( ctx , folder )
if err != nil {
if err == filer_pb . ErrNotFound {
return
}
isImplicit = attrs != nil && string ( attrs [ s3_constants . ExtS3ImplicitDir ] ) == "true"
if attrs == nil {
implicitAttr = "<no_attrs>"
} else if value , found := attrs [ s3_constants . ExtS3ImplicitDir ] ; found {
implicitAttr = string ( value )
} else {
implicitAttr = "<missing>"
}
// Update cache
efc . mu . Lock ( )
if _ , exists := efc . folderCounts [ folder ] ; ! exists {
efc . folderCounts [ folder ] = & folderState { }
}
efc . folderCounts [ folder ] . isImplicit = & isImplicit
efc . mu . Unlock ( )
glog . V ( 2 ) . Infof ( "EmptyFolderCleaner: failed to load bucket cleanup policy for folder %s (triggered by %s): %v" , folder , triggeredBy , err )
return
}
if ! isImplicit {
glog . Infof ( "EmptyFolderCleaner: folder %s (triggered by %s) is not marked as implicit (source=%s attr=%s), skipping" , folder , triggeredBy , implicitSource , implicitAttr )
if ! autoRemove {
glog . Infof ( "EmptyFolderCleaner: skipping folder %s (triggered by %s), bucket %s auto-remove-empty-folders disabled (source=%s attr=%s)" ,
folder , triggeredBy , bucketPath , source , attrValue )
return
}
@ -371,6 +339,80 @@ func (efc *EmptyFolderCleaner) deleteFolder(ctx context.Context, folder string)
return efc . filer . DeleteEntryMetaAndData ( ctx , util . FullPath ( folder ) , false , false , false , false , nil , 0 )
}
func ( efc * EmptyFolderCleaner ) getBucketCleanupPolicy ( ctx context . Context , folder string ) ( bucketPath string , autoRemove bool , source string , attrValue string , err error ) {
bucketPath , ok := extractBucketPath ( folder , efc . bucketPath )
if ! ok {
return "" , true , "default" , "<not_bucket_path>" , nil
}
now := time . Now ( )
efc . mu . RLock ( )
if state , found := efc . bucketCleanupPolicies [ bucketPath ] ; found && now . Sub ( state . lastCheck ) <= efc . cacheExpiry {
efc . mu . RUnlock ( )
return bucketPath , state . autoRemove , "cache" , state . attrValue , nil
}
efc . mu . RUnlock ( )
attrs , err := efc . filer . GetEntryAttributes ( ctx , util . FullPath ( bucketPath ) )
if err != nil {
return "" , true , "" , "" , err
}
autoRemove , attrValue = autoRemoveEmptyFoldersEnabled ( attrs )
efc . mu . Lock ( )
if efc . bucketCleanupPolicies == nil {
efc . bucketCleanupPolicies = make ( map [ string ] * bucketCleanupPolicyState )
}
efc . bucketCleanupPolicies [ bucketPath ] = & bucketCleanupPolicyState {
autoRemove : autoRemove ,
attrValue : attrValue ,
lastCheck : now ,
}
efc . mu . Unlock ( )
return bucketPath , autoRemove , "filer" , attrValue , nil
}
func extractBucketPath ( folder string , bucketPath string ) ( string , bool ) {
if bucketPath == "" {
return "" , false
}
cleanBucketPath := strings . TrimSuffix ( bucketPath , "/" )
prefix := cleanBucketPath + "/"
if ! strings . HasPrefix ( folder , prefix ) {
return "" , false
}
rest := strings . TrimPrefix ( folder , prefix )
bucketName , _ , found := strings . Cut ( rest , "/" )
if ! found || bucketName == "" {
return "" , false
}
return prefix + bucketName , true
}
func autoRemoveEmptyFoldersEnabled ( attrs map [ string ] [ ] byte ) ( bool , string ) {
if attrs == nil {
return true , "<no_attrs>"
}
value , found := attrs [ s3_constants . ExtAutoRemoveEmptyFolders ]
if ! found {
return true , "<missing>"
}
text := strings . TrimSpace ( string ( value ) )
if text == "" {
return true , "<empty>"
}
return ! strings . EqualFold ( text , "false" ) , text
}
// isUnderPath checks if child is under parent path
func isUnderPath ( child , parent string ) bool {
if parent == "" || parent == "/" {
@ -459,6 +501,12 @@ func (efc *EmptyFolderCleaner) evictStaleCacheEntries() {
}
}
for bucketPath , state := range efc . bucketCleanupPolicies {
if now . Sub ( state . lastCheck ) > efc . cacheExpiry {
delete ( efc . bucketCleanupPolicies , bucketPath )
}
}
if expiredCount > 0 {
glog . V ( 3 ) . Infof ( "EmptyFolderCleaner: evicted %d stale cache entries" , expiredCount )
}
@ -474,6 +522,7 @@ func (efc *EmptyFolderCleaner) Stop() {
efc . enabled = false
efc . cleanupQueue . Clear ( )
efc . folderCounts = make ( map [ string ] * folderState ) // Clear cache on stop
efc . bucketCleanupPolicies = make ( map [ string ] * bucketCleanupPolicyState )
}
// GetPendingCleanupCount returns the number of pending cleanup tasks (for testing)