Browse Source

filer: async empty folder cleanup via metadata events (#7614)

* filer: async empty folder cleanup via metadata events

Implements asynchronous empty folder cleanup when files are deleted in S3.

Key changes:

1. EmptyFolderCleaner - New component that handles folder cleanup:
   - Uses consistent hashing (LockRing) to determine folder ownership
   - Each filer owns specific folders, avoiding duplicate cleanup work
   - Debounces delete events (10s delay) to batch multiple deletes
   - Caches rough folder counts to skip unnecessary checks
   - Cancels pending cleanup when new files are created
   - Handles both file and subdirectory deletions

2. Integration with metadata events:
   - Listens to both local and remote filer metadata events
   - Processes create/delete/rename events to track folder state
   - Only processes folders under /buckets/<bucket>/...

3. Removed synchronous empty folder cleanup from S3 handlers:
   - DeleteObjectHandler no longer calls DoDeleteEmptyParentDirectories
   - DeleteMultipleObjectsHandler no longer tracks/cleans directories
   - Cleanup now happens asynchronously via metadata events

Benefits:
- Non-blocking: S3 delete requests return immediately
- Coordinated: Only one filer (the owner) cleans each folder
- Efficient: Batching and caching reduce unnecessary checks
- Event-driven: Folder deletion triggers parent folder check automatically

* filer: add CleanupQueue data structure for deduplicated folder cleanup

CleanupQueue uses a linked list for FIFO ordering and a hashmap for O(1)
deduplication. Processing is triggered when:
- Queue size reaches maxSize (default 1000), OR
- Oldest item exceeds maxAge (default 10 minutes)

Key features:
- O(1) Add, Remove, Pop, Contains operations
- Duplicate folders are ignored (keeps original position/time)
- Testable with injectable time function
- Thread-safe with mutex protection

* filer: use CleanupQueue for empty folder cleanup

Replace timer-per-folder approach with queue-based processing:
- Use CleanupQueue for deduplication and ordered processing
- Process queue when full (1000 items) or oldest item exceeds 10 minutes
- Background processor checks queue every 10 seconds
- Remove from queue on create events to cancel pending cleanup

Benefits:
- Bounded memory: queue has max size, not unlimited timers
- Efficient: O(1) add/remove/contains operations
- Batch processing: handle many folders efficiently
- Better for high-volume delete scenarios

* filer: CleanupQueue.Add moves duplicate to back with updated time

When adding a folder that already exists in the queue:
- Remove it from its current position
- Add it to the back of the queue
- Update the queue time to current time

This ensures that folders with recent delete activity are processed
later, giving more time for additional deletes to occur.

* filer: CleanupQueue uses event time and inserts in sorted order

Changes:
- Add() now takes eventTime parameter instead of using current time
- Insert items in time-sorted order (oldest at front) to handle out-of-order events
- When updating duplicate with newer time, reposition to maintain sort order
- Ignore updates with older time (keep existing later time)

This ensures proper ordering when processing events from distributed filers
where event arrival order may not match event occurrence order.

* filer: remove unused CleanupQueue functions (SetNowFunc, GetAll)

Removed test-only functions:
- SetNowFunc: tests now use real time with past event times
- GetAll: tests now use Pop() to verify order

Kept functions used in production:
- Peek: used in filer_notify_read.go
- OldestAge: used in empty_folder_cleaner.go logging

* filer: initialize cache entry on first delete/create event

Previously, roughCount was only updated if the cache entry already
existed, but entries were only created during executeCleanup. This
meant delete/create events before the first cleanup didn't track
the count.

Now create the cache entry on first event, so roughCount properly
tracks all changes from the start.

* filer: skip adding to cleanup queue if roughCount > 0

If the cached roughCount indicates there are still items in the
folder, don't bother adding it to the cleanup queue. This avoids
unnecessary queue entries and reduces wasted cleanup checks.

* filer: don't create cache entry on create event

Only update roughCount if the folder is already being tracked.
New folders don't need tracking until we see a delete event.

* filer: move empty folder cleanup to its own package

- Created weed/filer/empty_folder_cleanup package
- Defined FilerOperations interface to break circular dependency
- Added CountDirectoryEntries method to Filer
- Exported IsUnderPath and IsUnderBucketPath helper functions

* filer: make isUnderPath and isUnderBucketPath private

These helpers are only used within the empty_folder_cleanup package.
pull/7607/merge
Chris Lu 4 days ago
committed by GitHub
parent
commit
39ba19eea6
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
  1. 206
      weed/filer/empty_folder_cleanup/cleanup_queue.go
  2. 370
      weed/filer/empty_folder_cleanup/cleanup_queue_test.go
  3. 436
      weed/filer/empty_folder_cleanup/empty_folder_cleaner.go
  4. 569
      weed/filer/empty_folder_cleanup/empty_folder_cleaner_test.go
  5. 8
      weed/filer/filer.go
  6. 39
      weed/filer/filer_notify.go
  7. 39
      weed/filer/filer_on_meta_event.go
  8. 13
      weed/filer/filer_search.go
  9. 57
      weed/s3api/s3api_object_handlers_delete.go

206
weed/filer/empty_folder_cleanup/cleanup_queue.go

@ -0,0 +1,206 @@
package empty_folder_cleanup
import (
"container/list"
"sync"
"time"
)
// CleanupQueue manages a deduplicated queue of folders pending cleanup.
// It uses a doubly-linked list ordered by event time (oldest at front) and a map for O(1) deduplication.
// Processing is triggered when:
// - Queue size reaches maxSize, OR
// - Oldest item exceeds maxAge
type CleanupQueue struct {
mu sync.Mutex
items *list.List // Linked list of *queueItem ordered by time (front = oldest)
itemsMap map[string]*list.Element // folder -> list element for O(1) lookup
maxSize int // Max queue size before triggering cleanup
maxAge time.Duration // Max age before triggering cleanup
}
// queueItem represents an item in the cleanup queue
type queueItem struct {
folder string
queueTime time.Time
}
// NewCleanupQueue creates a new CleanupQueue with the specified limits
func NewCleanupQueue(maxSize int, maxAge time.Duration) *CleanupQueue {
return &CleanupQueue{
items: list.New(),
itemsMap: make(map[string]*list.Element),
maxSize: maxSize,
maxAge: maxAge,
}
}
// Add adds a folder to the queue with the specified event time.
// The item is inserted in time-sorted order (oldest at front) to handle out-of-order events.
// If folder already exists with an older time, the time is updated and position adjusted.
// Returns true if the folder was newly added, false if it was updated.
func (q *CleanupQueue) Add(folder string, eventTime time.Time) bool {
q.mu.Lock()
defer q.mu.Unlock()
// Check if folder already exists
if elem, exists := q.itemsMap[folder]; exists {
existingItem := elem.Value.(*queueItem)
// Only update if new event is later
if eventTime.After(existingItem.queueTime) {
// Remove from current position
q.items.Remove(elem)
// Re-insert with new time in sorted position
newElem := q.insertSorted(folder, eventTime)
q.itemsMap[folder] = newElem
}
return false
}
// Insert new folder in sorted position
elem := q.insertSorted(folder, eventTime)
q.itemsMap[folder] = elem
return true
}
// insertSorted inserts an item in the correct position to maintain time ordering (oldest at front)
func (q *CleanupQueue) insertSorted(folder string, eventTime time.Time) *list.Element {
item := &queueItem{
folder: folder,
queueTime: eventTime,
}
// Find the correct position (insert before the first item with a later time)
for elem := q.items.Back(); elem != nil; elem = elem.Prev() {
existingItem := elem.Value.(*queueItem)
if !eventTime.Before(existingItem.queueTime) {
// Insert after this element
return q.items.InsertAfter(item, elem)
}
}
// This item is the oldest, insert at front
return q.items.PushFront(item)
}
// Remove removes a specific folder from the queue (e.g., when a file is created).
// Returns true if the folder was found and removed.
func (q *CleanupQueue) Remove(folder string) bool {
q.mu.Lock()
defer q.mu.Unlock()
elem, exists := q.itemsMap[folder]
if !exists {
return false
}
q.items.Remove(elem)
delete(q.itemsMap, folder)
return true
}
// ShouldProcess returns true if the queue should be processed.
// This is true when:
// - Queue size >= maxSize, OR
// - Oldest item age > maxAge
func (q *CleanupQueue) ShouldProcess() bool {
q.mu.Lock()
defer q.mu.Unlock()
return q.shouldProcessLocked()
}
// shouldProcessLocked checks if processing is needed (caller must hold lock)
func (q *CleanupQueue) shouldProcessLocked() bool {
if q.items.Len() == 0 {
return false
}
// Check if queue is full
if q.items.Len() >= q.maxSize {
return true
}
// Check if oldest item exceeds max age
front := q.items.Front()
if front != nil {
item := front.Value.(*queueItem)
if time.Since(item.queueTime) > q.maxAge {
return true
}
}
return false
}
// Pop removes and returns the oldest folder from the queue.
// Returns the folder and true if an item was available, or empty string and false if queue is empty.
func (q *CleanupQueue) Pop() (string, bool) {
q.mu.Lock()
defer q.mu.Unlock()
front := q.items.Front()
if front == nil {
return "", false
}
item := front.Value.(*queueItem)
q.items.Remove(front)
delete(q.itemsMap, item.folder)
return item.folder, true
}
// Peek returns the oldest folder without removing it.
// Returns the folder and queue time if available, or empty values if queue is empty.
func (q *CleanupQueue) Peek() (folder string, queueTime time.Time, ok bool) {
q.mu.Lock()
defer q.mu.Unlock()
front := q.items.Front()
if front == nil {
return "", time.Time{}, false
}
item := front.Value.(*queueItem)
return item.folder, item.queueTime, true
}
// Len returns the current queue size.
func (q *CleanupQueue) Len() int {
q.mu.Lock()
defer q.mu.Unlock()
return q.items.Len()
}
// Contains checks if a folder is in the queue.
func (q *CleanupQueue) Contains(folder string) bool {
q.mu.Lock()
defer q.mu.Unlock()
_, exists := q.itemsMap[folder]
return exists
}
// Clear removes all items from the queue.
func (q *CleanupQueue) Clear() {
q.mu.Lock()
defer q.mu.Unlock()
q.items.Init()
q.itemsMap = make(map[string]*list.Element)
}
// OldestAge returns the age of the oldest item in the queue, or 0 if empty.
func (q *CleanupQueue) OldestAge() time.Duration {
q.mu.Lock()
defer q.mu.Unlock()
front := q.items.Front()
if front == nil {
return 0
}
item := front.Value.(*queueItem)
return time.Since(item.queueTime)
}

370
weed/filer/empty_folder_cleanup/cleanup_queue_test.go

@ -0,0 +1,370 @@
package empty_folder_cleanup
import (
"testing"
"time"
)
func TestCleanupQueue_Add(t *testing.T) {
q := NewCleanupQueue(100, 10*time.Minute)
now := time.Now()
// Add first item
if !q.Add("/buckets/b1/folder1", now) {
t.Error("expected Add to return true for new item")
}
if q.Len() != 1 {
t.Errorf("expected len 1, got %d", q.Len())
}
// Add second item with later time
if !q.Add("/buckets/b1/folder2", now.Add(1*time.Second)) {
t.Error("expected Add to return true for new item")
}
if q.Len() != 2 {
t.Errorf("expected len 2, got %d", q.Len())
}
// Add duplicate with newer time - should update and reposition
if q.Add("/buckets/b1/folder1", now.Add(2*time.Second)) {
t.Error("expected Add to return false for existing item")
}
if q.Len() != 2 {
t.Errorf("expected len 2 after duplicate, got %d", q.Len())
}
// folder1 should now be at the back (newer time) - verify by popping
folder1, _ := q.Pop()
folder2, _ := q.Pop()
if folder1 != "/buckets/b1/folder2" || folder2 != "/buckets/b1/folder1" {
t.Errorf("expected folder1 to be moved to back, got %s, %s", folder1, folder2)
}
}
func TestCleanupQueue_Add_OutOfOrder(t *testing.T) {
q := NewCleanupQueue(100, 10*time.Minute)
baseTime := time.Now()
// Add items out of order
q.Add("/buckets/b1/folder3", baseTime.Add(3*time.Second))
q.Add("/buckets/b1/folder1", baseTime.Add(1*time.Second))
q.Add("/buckets/b1/folder2", baseTime.Add(2*time.Second))
// Items should be in time order (oldest first) - verify by popping
expected := []string{"/buckets/b1/folder1", "/buckets/b1/folder2", "/buckets/b1/folder3"}
for i, exp := range expected {
folder, ok := q.Pop()
if !ok || folder != exp {
t.Errorf("at index %d: expected %s, got %s", i, exp, folder)
}
}
}
func TestCleanupQueue_Add_DuplicateWithOlderTime(t *testing.T) {
q := NewCleanupQueue(100, 10*time.Minute)
baseTime := time.Now()
// Add folder at t=5
q.Add("/buckets/b1/folder1", baseTime.Add(5*time.Second))
// Try to add same folder with older time - should NOT update
q.Add("/buckets/b1/folder1", baseTime.Add(2*time.Second))
// Time should remain at t=5
_, queueTime, _ := q.Peek()
if queueTime != baseTime.Add(5*time.Second) {
t.Errorf("expected time to remain unchanged, got %v", queueTime)
}
}
func TestCleanupQueue_Remove(t *testing.T) {
q := NewCleanupQueue(100, 10*time.Minute)
now := time.Now()
q.Add("/buckets/b1/folder1", now)
q.Add("/buckets/b1/folder2", now.Add(1*time.Second))
q.Add("/buckets/b1/folder3", now.Add(2*time.Second))
// Remove middle item
if !q.Remove("/buckets/b1/folder2") {
t.Error("expected Remove to return true for existing item")
}
if q.Len() != 2 {
t.Errorf("expected len 2, got %d", q.Len())
}
if q.Contains("/buckets/b1/folder2") {
t.Error("removed item should not be in queue")
}
// Remove non-existent item
if q.Remove("/buckets/b1/nonexistent") {
t.Error("expected Remove to return false for non-existent item")
}
// Verify order is preserved by popping
folder1, _ := q.Pop()
folder3, _ := q.Pop()
if folder1 != "/buckets/b1/folder1" || folder3 != "/buckets/b1/folder3" {
t.Errorf("unexpected order: %s, %s", folder1, folder3)
}
}
func TestCleanupQueue_Pop(t *testing.T) {
q := NewCleanupQueue(100, 10*time.Minute)
now := time.Now()
// Pop from empty queue
folder, ok := q.Pop()
if ok {
t.Error("expected Pop to return false for empty queue")
}
if folder != "" {
t.Errorf("expected empty folder, got %s", folder)
}
// Add items and pop in order
q.Add("/buckets/b1/folder1", now)
q.Add("/buckets/b1/folder2", now.Add(1*time.Second))
q.Add("/buckets/b1/folder3", now.Add(2*time.Second))
folder, ok = q.Pop()
if !ok || folder != "/buckets/b1/folder1" {
t.Errorf("expected folder1, got %s (ok=%v)", folder, ok)
}
folder, ok = q.Pop()
if !ok || folder != "/buckets/b1/folder2" {
t.Errorf("expected folder2, got %s (ok=%v)", folder, ok)
}
folder, ok = q.Pop()
if !ok || folder != "/buckets/b1/folder3" {
t.Errorf("expected folder3, got %s (ok=%v)", folder, ok)
}
// Queue should be empty now
if q.Len() != 0 {
t.Errorf("expected empty queue, got len %d", q.Len())
}
}
func TestCleanupQueue_Peek(t *testing.T) {
q := NewCleanupQueue(100, 10*time.Minute)
now := time.Now()
// Peek empty queue
folder, _, ok := q.Peek()
if ok {
t.Error("expected Peek to return false for empty queue")
}
// Add item and peek
q.Add("/buckets/b1/folder1", now)
folder, queueTime, ok := q.Peek()
if !ok || folder != "/buckets/b1/folder1" {
t.Errorf("expected folder1, got %s (ok=%v)", folder, ok)
}
if queueTime != now {
t.Errorf("expected queue time %v, got %v", now, queueTime)
}
// Peek should not remove item
if q.Len() != 1 {
t.Errorf("Peek should not remove item, len=%d", q.Len())
}
}
func TestCleanupQueue_Contains(t *testing.T) {
q := NewCleanupQueue(100, 10*time.Minute)
now := time.Now()
q.Add("/buckets/b1/folder1", now)
if !q.Contains("/buckets/b1/folder1") {
t.Error("expected Contains to return true")
}
if q.Contains("/buckets/b1/folder2") {
t.Error("expected Contains to return false for non-existent")
}
}
func TestCleanupQueue_ShouldProcess_MaxSize(t *testing.T) {
q := NewCleanupQueue(3, 10*time.Minute)
now := time.Now()
// Empty queue
if q.ShouldProcess() {
t.Error("empty queue should not need processing")
}
// Add items below max
q.Add("/buckets/b1/folder1", now)
q.Add("/buckets/b1/folder2", now.Add(1*time.Second))
if q.ShouldProcess() {
t.Error("queue below max should not need processing")
}
// Add item to reach max
q.Add("/buckets/b1/folder3", now.Add(2*time.Second))
if !q.ShouldProcess() {
t.Error("queue at max should need processing")
}
}
func TestCleanupQueue_ShouldProcess_MaxAge(t *testing.T) {
q := NewCleanupQueue(100, 100*time.Millisecond) // Short max age for testing
// Add item with old event time
oldTime := time.Now().Add(-1 * time.Second) // 1 second ago
q.Add("/buckets/b1/folder1", oldTime)
// Item is older than maxAge, should need processing
if !q.ShouldProcess() {
t.Error("old item should trigger processing")
}
// Clear and add fresh item
q.Clear()
q.Add("/buckets/b1/folder2", time.Now())
// Fresh item should not trigger processing
if q.ShouldProcess() {
t.Error("fresh item should not trigger processing")
}
}
func TestCleanupQueue_Clear(t *testing.T) {
q := NewCleanupQueue(100, 10*time.Minute)
now := time.Now()
q.Add("/buckets/b1/folder1", now)
q.Add("/buckets/b1/folder2", now.Add(1*time.Second))
q.Add("/buckets/b1/folder3", now.Add(2*time.Second))
q.Clear()
if q.Len() != 0 {
t.Errorf("expected empty queue after Clear, got len %d", q.Len())
}
if q.Contains("/buckets/b1/folder1") {
t.Error("queue should not contain items after Clear")
}
}
func TestCleanupQueue_OldestAge(t *testing.T) {
q := NewCleanupQueue(100, 10*time.Minute)
// Empty queue
if q.OldestAge() != 0 {
t.Error("empty queue should have zero oldest age")
}
// Add item with time in the past
oldTime := time.Now().Add(-5 * time.Minute)
q.Add("/buckets/b1/folder1", oldTime)
// Age should be approximately 5 minutes
age := q.OldestAge()
if age < 4*time.Minute || age > 6*time.Minute {
t.Errorf("expected ~5m age, got %v", age)
}
}
func TestCleanupQueue_TimeOrder(t *testing.T) {
q := NewCleanupQueue(100, 10*time.Minute)
baseTime := time.Now()
// Add items in order
items := []string{
"/buckets/b1/a",
"/buckets/b1/b",
"/buckets/b1/c",
"/buckets/b1/d",
"/buckets/b1/e",
}
for i, item := range items {
q.Add(item, baseTime.Add(time.Duration(i)*time.Second))
}
// Pop should return in time order
for i, expected := range items {
got, ok := q.Pop()
if !ok {
t.Errorf("Pop %d: expected item, got empty", i)
}
if got != expected {
t.Errorf("Pop %d: expected %s, got %s", i, expected, got)
}
}
}
func TestCleanupQueue_DuplicateWithNewerTime(t *testing.T) {
q := NewCleanupQueue(100, 10*time.Minute)
baseTime := time.Now()
// Add items
q.Add("/buckets/b1/folder1", baseTime)
q.Add("/buckets/b1/folder2", baseTime.Add(1*time.Second))
q.Add("/buckets/b1/folder3", baseTime.Add(2*time.Second))
// Add duplicate with newer time - should update and reposition
q.Add("/buckets/b1/folder1", baseTime.Add(3*time.Second))
// folder1 should now be at the back (newest time) - verify by popping
expected := []string{"/buckets/b1/folder2", "/buckets/b1/folder3", "/buckets/b1/folder1"}
for i, exp := range expected {
folder, ok := q.Pop()
if !ok || folder != exp {
t.Errorf("at index %d: expected %s, got %s", i, exp, folder)
}
}
}
func TestCleanupQueue_Concurrent(t *testing.T) {
q := NewCleanupQueue(1000, 10*time.Minute)
done := make(chan bool)
now := time.Now()
// Concurrent adds
go func() {
for i := 0; i < 100; i++ {
q.Add("/buckets/b1/folder"+string(rune('A'+i%26)), now.Add(time.Duration(i)*time.Millisecond))
}
done <- true
}()
// Concurrent removes
go func() {
for i := 0; i < 50; i++ {
q.Remove("/buckets/b1/folder" + string(rune('A'+i%26)))
}
done <- true
}()
// Concurrent pops
go func() {
for i := 0; i < 30; i++ {
q.Pop()
}
done <- true
}()
// Concurrent reads
go func() {
for i := 0; i < 100; i++ {
q.Len()
q.Contains("/buckets/b1/folderA")
q.ShouldProcess()
}
done <- true
}()
// Wait for all goroutines
for i := 0; i < 4; i++ {
<-done
}
// Just verify no panic occurred and queue is in consistent state
_ = q.Len()
}

436
weed/filer/empty_folder_cleanup/empty_folder_cleaner.go

@ -0,0 +1,436 @@
package empty_folder_cleanup
import (
"context"
"strings"
"sync"
"time"
"github.com/seaweedfs/seaweedfs/weed/cluster/lock_manager"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/util"
)
const (
DefaultMaxCountCheck = 1000
DefaultCacheExpiry = 5 * time.Minute
DefaultQueueMaxSize = 1000
DefaultQueueMaxAge = 10 * time.Minute
DefaultProcessorSleep = 10 * time.Second // How often to check queue
)
// FilerOperations defines the filer operations needed by EmptyFolderCleaner
type FilerOperations interface {
CountDirectoryEntries(ctx context.Context, dirPath util.FullPath, limit int) (count int, err error)
DeleteEntryMetaAndData(ctx context.Context, p util.FullPath, isRecursive, ignoreRecursiveError, shouldDeleteChunks, isFromOtherCluster bool, signatures []int32, ifNotModifiedAfter int64) error
}
// folderState tracks the state of a folder for empty folder cleanup
type folderState struct {
roughCount int // Cached rough count (up to maxCountCheck)
lastAddTime time.Time // Last time an item was added
lastDelTime time.Time // Last time an item was deleted
lastCheck time.Time // Last time we checked the actual count
}
// EmptyFolderCleaner handles asynchronous cleanup of empty folders
// Each filer owns specific folders via consistent hashing based on the peer filer list
type EmptyFolderCleaner struct {
filer FilerOperations
lockRing *lock_manager.LockRing
host pb.ServerAddress
// Folder state tracking
mu sync.RWMutex
folderCounts map[string]*folderState // Rough count cache
// Cleanup queue (thread-safe, has its own lock)
cleanupQueue *CleanupQueue
// Configuration
maxCountCheck int // Max items to count (1000)
cacheExpiry time.Duration // How long to keep cache entries
processorSleep time.Duration // How often processor checks queue
bucketPath string // e.g., "/buckets"
// Control
enabled bool
stopCh chan struct{}
}
// NewEmptyFolderCleaner creates a new EmptyFolderCleaner
func NewEmptyFolderCleaner(filer FilerOperations, lockRing *lock_manager.LockRing, host pb.ServerAddress, bucketPath string) *EmptyFolderCleaner {
efc := &EmptyFolderCleaner{
filer: filer,
lockRing: lockRing,
host: host,
folderCounts: make(map[string]*folderState),
cleanupQueue: NewCleanupQueue(DefaultQueueMaxSize, DefaultQueueMaxAge),
maxCountCheck: DefaultMaxCountCheck,
cacheExpiry: DefaultCacheExpiry,
processorSleep: DefaultProcessorSleep,
bucketPath: bucketPath,
enabled: true,
stopCh: make(chan struct{}),
}
go efc.cacheEvictionLoop()
go efc.cleanupProcessor()
return efc
}
// SetEnabled enables or disables the cleaner
func (efc *EmptyFolderCleaner) SetEnabled(enabled bool) {
efc.mu.Lock()
defer efc.mu.Unlock()
efc.enabled = enabled
}
// IsEnabled returns whether the cleaner is enabled
func (efc *EmptyFolderCleaner) IsEnabled() bool {
efc.mu.RLock()
defer efc.mu.RUnlock()
return efc.enabled
}
// ownsFolder checks if this filer owns the folder via consistent hashing
func (efc *EmptyFolderCleaner) ownsFolder(folder string) bool {
servers := efc.lockRing.GetSnapshot()
if len(servers) <= 1 {
return true // Single filer case
}
return efc.hashKeyToServer(folder, servers) == efc.host
}
// hashKeyToServer uses consistent hashing to map a folder to a server
func (efc *EmptyFolderCleaner) hashKeyToServer(key string, servers []pb.ServerAddress) pb.ServerAddress {
if len(servers) == 0 {
return ""
}
x := util.HashStringToLong(key)
if x < 0 {
x = -x
}
x = x % int64(len(servers))
return servers[x]
}
// OnDeleteEvent is called when a file or directory is deleted
// Both file and directory deletions count towards making the parent folder empty
// eventTime is the time when the delete event occurred (for proper ordering)
func (efc *EmptyFolderCleaner) OnDeleteEvent(directory string, entryName string, isDirectory bool, eventTime time.Time) {
// Skip if not under bucket path (must be at least /buckets/<bucket>/...)
if efc.bucketPath != "" && !isUnderBucketPath(directory, efc.bucketPath) {
return
}
// Check if we own this folder
if !efc.ownsFolder(directory) {
glog.V(4).Infof("EmptyFolderCleaner: not owner of %s, skipping", directory)
return
}
efc.mu.Lock()
defer efc.mu.Unlock()
// Check enabled inside lock to avoid race with Stop()
if !efc.enabled {
return
}
glog.V(3).Infof("EmptyFolderCleaner: delete event in %s/%s (isDir=%v)", directory, entryName, isDirectory)
// Update cached count (create entry if needed)
state, exists := efc.folderCounts[directory]
if !exists {
state = &folderState{}
efc.folderCounts[directory] = state
}
if state.roughCount > 0 {
state.roughCount--
}
state.lastDelTime = eventTime
// Only add to cleanup queue if roughCount suggests folder might be empty
if state.roughCount > 0 {
glog.V(3).Infof("EmptyFolderCleaner: skipping queue for %s, roughCount=%d", directory, state.roughCount)
return
}
// Add to cleanup queue with event time (handles out-of-order events)
if efc.cleanupQueue.Add(directory, eventTime) {
glog.V(3).Infof("EmptyFolderCleaner: queued %s for cleanup", directory)
}
}
// OnCreateEvent is called when a file or directory is created
// Both file and directory creations cancel pending cleanup for the parent folder
func (efc *EmptyFolderCleaner) OnCreateEvent(directory string, entryName string, isDirectory bool) {
// Skip if not under bucket path (must be at least /buckets/<bucket>/...)
if efc.bucketPath != "" && !isUnderBucketPath(directory, efc.bucketPath) {
return
}
efc.mu.Lock()
defer efc.mu.Unlock()
// Check enabled inside lock to avoid race with Stop()
if !efc.enabled {
return
}
// Update cached count only if already tracked (no need to track new folders)
if state, exists := efc.folderCounts[directory]; exists {
state.roughCount++
state.lastAddTime = time.Now()
}
// Remove from cleanup queue (cancel pending cleanup)
if efc.cleanupQueue.Remove(directory) {
glog.V(3).Infof("EmptyFolderCleaner: cancelled cleanup for %s due to new entry", directory)
}
}
// cleanupProcessor runs in background and processes the cleanup queue
func (efc *EmptyFolderCleaner) cleanupProcessor() {
ticker := time.NewTicker(efc.processorSleep)
defer ticker.Stop()
for {
select {
case <-efc.stopCh:
return
case <-ticker.C:
efc.processCleanupQueue()
}
}
}
// processCleanupQueue processes items from the cleanup queue
func (efc *EmptyFolderCleaner) processCleanupQueue() {
// Check if we should process
if !efc.cleanupQueue.ShouldProcess() {
return
}
glog.V(3).Infof("EmptyFolderCleaner: processing cleanup queue (len=%d, age=%v)",
efc.cleanupQueue.Len(), efc.cleanupQueue.OldestAge())
// Process all items that are ready
for efc.cleanupQueue.Len() > 0 {
// Check if still enabled
if !efc.IsEnabled() {
return
}
// Pop the oldest item
folder, ok := efc.cleanupQueue.Pop()
if !ok {
break
}
// Execute cleanup for this folder
efc.executeCleanup(folder)
// If queue is no longer full and oldest item is not old enough, stop processing
if !efc.cleanupQueue.ShouldProcess() {
break
}
}
}
// executeCleanup performs the actual cleanup of an empty folder
func (efc *EmptyFolderCleaner) executeCleanup(folder string) {
efc.mu.Lock()
// Quick check: if we have cached count and it's > 0, skip
if state, exists := efc.folderCounts[folder]; exists {
if state.roughCount > 0 {
glog.V(3).Infof("EmptyFolderCleaner: skipping %s, cached count=%d", folder, state.roughCount)
efc.mu.Unlock()
return
}
// If there was an add after our delete, skip
if !state.lastAddTime.IsZero() && state.lastAddTime.After(state.lastDelTime) {
glog.V(3).Infof("EmptyFolderCleaner: skipping %s, add happened after delete", folder)
efc.mu.Unlock()
return
}
}
efc.mu.Unlock()
// Re-check ownership (topology might have changed)
if !efc.ownsFolder(folder) {
glog.V(3).Infof("EmptyFolderCleaner: no longer owner of %s, skipping", folder)
return
}
// Check if folder is actually empty (count up to maxCountCheck)
ctx := context.Background()
count, err := efc.countItems(ctx, folder)
if err != nil {
glog.V(2).Infof("EmptyFolderCleaner: error counting items in %s: %v", folder, err)
return
}
efc.mu.Lock()
// Update cache
if _, exists := efc.folderCounts[folder]; !exists {
efc.folderCounts[folder] = &folderState{}
}
efc.folderCounts[folder].roughCount = count
efc.folderCounts[folder].lastCheck = time.Now()
efc.mu.Unlock()
if count > 0 {
glog.V(3).Infof("EmptyFolderCleaner: folder %s has %d items, not empty", folder, count)
return
}
// Delete the empty folder
glog.V(2).Infof("EmptyFolderCleaner: deleting empty folder %s", folder)
if err := efc.deleteFolder(ctx, folder); err != nil {
glog.V(2).Infof("EmptyFolderCleaner: failed to delete empty folder %s: %v", folder, err)
return
}
// Clean up cache entry
efc.mu.Lock()
delete(efc.folderCounts, folder)
efc.mu.Unlock()
// Note: No need to recursively check parent folder here.
// The deletion of this folder will generate a metadata event,
// which will trigger OnDeleteEvent for the parent folder.
}
// countItems counts items in a folder (up to maxCountCheck)
func (efc *EmptyFolderCleaner) countItems(ctx context.Context, folder string) (int, error) {
return efc.filer.CountDirectoryEntries(ctx, util.FullPath(folder), efc.maxCountCheck)
}
// deleteFolder deletes an empty folder
func (efc *EmptyFolderCleaner) deleteFolder(ctx context.Context, folder string) error {
return efc.filer.DeleteEntryMetaAndData(ctx, util.FullPath(folder), false, false, false, false, nil, 0)
}
// isUnderPath checks if child is under parent path
func isUnderPath(child, parent string) bool {
if parent == "" || parent == "/" {
return true
}
// Ensure parent ends without slash for proper prefix matching
if len(parent) > 0 && parent[len(parent)-1] == '/' {
parent = parent[:len(parent)-1]
}
// Child must start with parent and then have a / or be exactly parent
if len(child) < len(parent) {
return false
}
if child[:len(parent)] != parent {
return false
}
if len(child) == len(parent) {
return true
}
return child[len(parent)] == '/'
}
// isUnderBucketPath checks if directory is inside a bucket (under /buckets/<bucket>/...)
// This ensures we only clean up folders inside buckets, not the buckets themselves
func isUnderBucketPath(directory, bucketPath string) bool {
if bucketPath == "" {
return true
}
// Ensure bucketPath ends without slash
if len(bucketPath) > 0 && bucketPath[len(bucketPath)-1] == '/' {
bucketPath = bucketPath[:len(bucketPath)-1]
}
// Directory must be under bucketPath
if !isUnderPath(directory, bucketPath) {
return false
}
// Directory must be at least /buckets/<bucket>/<something>
// i.e., depth must be at least bucketPath depth + 2
// For /buckets (depth 1), we need at least /buckets/mybucket/folder (depth 3)
bucketPathDepth := strings.Count(bucketPath, "/")
directoryDepth := strings.Count(directory, "/")
return directoryDepth >= bucketPathDepth+2
}
// cacheEvictionLoop periodically removes stale entries from folderCounts
func (efc *EmptyFolderCleaner) cacheEvictionLoop() {
ticker := time.NewTicker(efc.cacheExpiry)
defer ticker.Stop()
for {
select {
case <-efc.stopCh:
return
case <-ticker.C:
efc.evictStaleCacheEntries()
}
}
}
// evictStaleCacheEntries removes cache entries that haven't been accessed recently
func (efc *EmptyFolderCleaner) evictStaleCacheEntries() {
efc.mu.Lock()
defer efc.mu.Unlock()
now := time.Now()
expiredCount := 0
for folder, state := range efc.folderCounts {
// Skip if folder is in cleanup queue
if efc.cleanupQueue.Contains(folder) {
continue
}
// Find the most recent activity time for this folder
lastActivity := state.lastCheck
if state.lastAddTime.After(lastActivity) {
lastActivity = state.lastAddTime
}
if state.lastDelTime.After(lastActivity) {
lastActivity = state.lastDelTime
}
// Evict if no activity within cache expiry period
if now.Sub(lastActivity) > efc.cacheExpiry {
delete(efc.folderCounts, folder)
expiredCount++
}
}
if expiredCount > 0 {
glog.V(3).Infof("EmptyFolderCleaner: evicted %d stale cache entries", expiredCount)
}
}
// Stop stops the cleaner and cancels all pending tasks
func (efc *EmptyFolderCleaner) Stop() {
close(efc.stopCh)
efc.mu.Lock()
defer efc.mu.Unlock()
efc.enabled = false
efc.cleanupQueue.Clear()
efc.folderCounts = make(map[string]*folderState) // Clear cache on stop
}
// GetPendingCleanupCount returns the number of pending cleanup tasks (for testing)
func (efc *EmptyFolderCleaner) GetPendingCleanupCount() int {
return efc.cleanupQueue.Len()
}
// GetCachedFolderCount returns the cached count for a folder (for testing)
func (efc *EmptyFolderCleaner) GetCachedFolderCount(folder string) (int, bool) {
efc.mu.RLock()
defer efc.mu.RUnlock()
if state, exists := efc.folderCounts[folder]; exists {
return state.roughCount, true
}
return 0, false
}

569
weed/filer/empty_folder_cleanup/empty_folder_cleaner_test.go

@ -0,0 +1,569 @@
package empty_folder_cleanup
import (
"testing"
"time"
"github.com/seaweedfs/seaweedfs/weed/cluster/lock_manager"
"github.com/seaweedfs/seaweedfs/weed/pb"
)
func Test_isUnderPath(t *testing.T) {
tests := []struct {
name string
child string
parent string
expected bool
}{
{"child under parent", "/buckets/mybucket/folder/file.txt", "/buckets", true},
{"child is parent", "/buckets", "/buckets", true},
{"child not under parent", "/other/path", "/buckets", false},
{"empty parent", "/any/path", "", true},
{"root parent", "/any/path", "/", true},
{"parent with trailing slash", "/buckets/mybucket", "/buckets/", true},
{"similar prefix but not under", "/buckets-other/file", "/buckets", false},
{"deeply nested", "/buckets/a/b/c/d/e/f", "/buckets/a/b", true},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result := isUnderPath(tt.child, tt.parent)
if result != tt.expected {
t.Errorf("isUnderPath(%q, %q) = %v, want %v", tt.child, tt.parent, result, tt.expected)
}
})
}
}
func Test_isUnderBucketPath(t *testing.T) {
tests := []struct {
name string
directory string
bucketPath string
expected bool
}{
// Should NOT process - bucket path itself
{"bucket path itself", "/buckets", "/buckets", false},
// Should NOT process - bucket directory (immediate child)
{"bucket directory", "/buckets/mybucket", "/buckets", false},
// Should process - folder inside bucket
{"folder in bucket", "/buckets/mybucket/folder", "/buckets", true},
// Should process - nested folder
{"nested folder", "/buckets/mybucket/a/b/c", "/buckets", true},
// Should NOT process - outside buckets
{"outside buckets", "/other/path", "/buckets", false},
// Empty bucket path allows all
{"empty bucket path", "/any/path", "", true},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result := isUnderBucketPath(tt.directory, tt.bucketPath)
if result != tt.expected {
t.Errorf("isUnderBucketPath(%q, %q) = %v, want %v", tt.directory, tt.bucketPath, result, tt.expected)
}
})
}
}
func TestEmptyFolderCleaner_ownsFolder(t *testing.T) {
// Create a LockRing with multiple servers
lockRing := lock_manager.NewLockRing(5 * time.Second)
servers := []pb.ServerAddress{
"filer1:8888",
"filer2:8888",
"filer3:8888",
}
lockRing.SetSnapshot(servers)
// Create cleaner for filer1
cleaner1 := &EmptyFolderCleaner{
lockRing: lockRing,
host: "filer1:8888",
}
// Create cleaner for filer2
cleaner2 := &EmptyFolderCleaner{
lockRing: lockRing,
host: "filer2:8888",
}
// Create cleaner for filer3
cleaner3 := &EmptyFolderCleaner{
lockRing: lockRing,
host: "filer3:8888",
}
// Test that exactly one filer owns each folder
testFolders := []string{
"/buckets/mybucket/folder1",
"/buckets/mybucket/folder2",
"/buckets/mybucket/folder3",
"/buckets/mybucket/a/b/c",
"/buckets/otherbucket/x",
}
for _, folder := range testFolders {
ownCount := 0
if cleaner1.ownsFolder(folder) {
ownCount++
}
if cleaner2.ownsFolder(folder) {
ownCount++
}
if cleaner3.ownsFolder(folder) {
ownCount++
}
if ownCount != 1 {
t.Errorf("folder %q owned by %d filers, expected exactly 1", folder, ownCount)
}
}
}
func TestEmptyFolderCleaner_ownsFolder_singleServer(t *testing.T) {
// Create a LockRing with a single server
lockRing := lock_manager.NewLockRing(5 * time.Second)
lockRing.SetSnapshot([]pb.ServerAddress{"filer1:8888"})
cleaner := &EmptyFolderCleaner{
lockRing: lockRing,
host: "filer1:8888",
}
// Single filer should own all folders
testFolders := []string{
"/buckets/mybucket/folder1",
"/buckets/mybucket/folder2",
"/buckets/otherbucket/x",
}
for _, folder := range testFolders {
if !cleaner.ownsFolder(folder) {
t.Errorf("single filer should own folder %q", folder)
}
}
}
func TestEmptyFolderCleaner_ownsFolder_emptyRing(t *testing.T) {
// Create an empty LockRing
lockRing := lock_manager.NewLockRing(5 * time.Second)
cleaner := &EmptyFolderCleaner{
lockRing: lockRing,
host: "filer1:8888",
}
// With empty ring, should own all folders
if !cleaner.ownsFolder("/buckets/mybucket/folder") {
t.Error("should own folder with empty ring")
}
}
func TestEmptyFolderCleaner_OnCreateEvent_cancelsCleanup(t *testing.T) {
lockRing := lock_manager.NewLockRing(5 * time.Second)
lockRing.SetSnapshot([]pb.ServerAddress{"filer1:8888"})
cleaner := &EmptyFolderCleaner{
lockRing: lockRing,
host: "filer1:8888",
bucketPath: "/buckets",
enabled: true,
folderCounts: make(map[string]*folderState),
cleanupQueue: NewCleanupQueue(1000, 10*time.Minute),
stopCh: make(chan struct{}),
}
folder := "/buckets/mybucket/testfolder"
now := time.Now()
// Simulate delete event
cleaner.OnDeleteEvent(folder, "file.txt", false, now)
// Check that cleanup is queued
if cleaner.GetPendingCleanupCount() != 1 {
t.Errorf("expected 1 pending cleanup, got %d", cleaner.GetPendingCleanupCount())
}
// Simulate create event
cleaner.OnCreateEvent(folder, "newfile.txt", false)
// Check that cleanup is cancelled
if cleaner.GetPendingCleanupCount() != 0 {
t.Errorf("expected 0 pending cleanups after create, got %d", cleaner.GetPendingCleanupCount())
}
cleaner.Stop()
}
func TestEmptyFolderCleaner_OnDeleteEvent_deduplication(t *testing.T) {
lockRing := lock_manager.NewLockRing(5 * time.Second)
lockRing.SetSnapshot([]pb.ServerAddress{"filer1:8888"})
cleaner := &EmptyFolderCleaner{
lockRing: lockRing,
host: "filer1:8888",
bucketPath: "/buckets",
enabled: true,
folderCounts: make(map[string]*folderState),
cleanupQueue: NewCleanupQueue(1000, 10*time.Minute),
stopCh: make(chan struct{}),
}
folder := "/buckets/mybucket/testfolder"
now := time.Now()
// Simulate multiple delete events for same folder
for i := 0; i < 5; i++ {
cleaner.OnDeleteEvent(folder, "file"+string(rune('0'+i))+".txt", false, now.Add(time.Duration(i)*time.Second))
}
// Check that only 1 cleanup is queued (deduplicated)
if cleaner.GetPendingCleanupCount() != 1 {
t.Errorf("expected 1 pending cleanup after deduplication, got %d", cleaner.GetPendingCleanupCount())
}
cleaner.Stop()
}
func TestEmptyFolderCleaner_OnDeleteEvent_multipleFolders(t *testing.T) {
lockRing := lock_manager.NewLockRing(5 * time.Second)
lockRing.SetSnapshot([]pb.ServerAddress{"filer1:8888"})
cleaner := &EmptyFolderCleaner{
lockRing: lockRing,
host: "filer1:8888",
bucketPath: "/buckets",
enabled: true,
folderCounts: make(map[string]*folderState),
cleanupQueue: NewCleanupQueue(1000, 10*time.Minute),
stopCh: make(chan struct{}),
}
now := time.Now()
// Delete files in different folders
cleaner.OnDeleteEvent("/buckets/mybucket/folder1", "file.txt", false, now)
cleaner.OnDeleteEvent("/buckets/mybucket/folder2", "file.txt", false, now.Add(1*time.Second))
cleaner.OnDeleteEvent("/buckets/mybucket/folder3", "file.txt", false, now.Add(2*time.Second))
// Each folder should be queued
if cleaner.GetPendingCleanupCount() != 3 {
t.Errorf("expected 3 pending cleanups, got %d", cleaner.GetPendingCleanupCount())
}
cleaner.Stop()
}
func TestEmptyFolderCleaner_OnDeleteEvent_notOwner(t *testing.T) {
lockRing := lock_manager.NewLockRing(5 * time.Second)
lockRing.SetSnapshot([]pb.ServerAddress{"filer1:8888", "filer2:8888"})
// Create cleaner for filer that doesn't own the folder
cleaner := &EmptyFolderCleaner{
lockRing: lockRing,
host: "filer1:8888",
bucketPath: "/buckets",
enabled: true,
folderCounts: make(map[string]*folderState),
cleanupQueue: NewCleanupQueue(1000, 10*time.Minute),
stopCh: make(chan struct{}),
}
now := time.Now()
// Try many folders, looking for one that filer1 doesn't own
foundNonOwned := false
for i := 0; i < 100; i++ {
folder := "/buckets/mybucket/folder" + string(rune('0'+i%10)) + string(rune('0'+i/10))
if !cleaner.ownsFolder(folder) {
// This folder is not owned by filer1
cleaner.OnDeleteEvent(folder, "file.txt", false, now)
if cleaner.GetPendingCleanupCount() != 0 {
t.Errorf("non-owner should not queue cleanup for folder %s", folder)
}
foundNonOwned = true
break
}
}
if !foundNonOwned {
t.Skip("could not find a folder not owned by filer1")
}
cleaner.Stop()
}
func TestEmptyFolderCleaner_OnDeleteEvent_disabled(t *testing.T) {
lockRing := lock_manager.NewLockRing(5 * time.Second)
lockRing.SetSnapshot([]pb.ServerAddress{"filer1:8888"})
cleaner := &EmptyFolderCleaner{
lockRing: lockRing,
host: "filer1:8888",
bucketPath: "/buckets",
enabled: false, // Disabled
folderCounts: make(map[string]*folderState),
cleanupQueue: NewCleanupQueue(1000, 10*time.Minute),
stopCh: make(chan struct{}),
}
folder := "/buckets/mybucket/testfolder"
now := time.Now()
// Simulate delete event
cleaner.OnDeleteEvent(folder, "file.txt", false, now)
// Check that no cleanup is queued when disabled
if cleaner.GetPendingCleanupCount() != 0 {
t.Errorf("disabled cleaner should not queue cleanup, got %d", cleaner.GetPendingCleanupCount())
}
cleaner.Stop()
}
func TestEmptyFolderCleaner_OnDeleteEvent_directoryDeletion(t *testing.T) {
lockRing := lock_manager.NewLockRing(5 * time.Second)
lockRing.SetSnapshot([]pb.ServerAddress{"filer1:8888"})
cleaner := &EmptyFolderCleaner{
lockRing: lockRing,
host: "filer1:8888",
bucketPath: "/buckets",
enabled: true,
folderCounts: make(map[string]*folderState),
cleanupQueue: NewCleanupQueue(1000, 10*time.Minute),
stopCh: make(chan struct{}),
}
folder := "/buckets/mybucket/testfolder"
now := time.Now()
// Simulate directory delete event - should trigger cleanup
// because subdirectory deletion also makes parent potentially empty
cleaner.OnDeleteEvent(folder, "subdir", true, now)
// Check that cleanup IS queued for directory deletion
if cleaner.GetPendingCleanupCount() != 1 {
t.Errorf("directory deletion should trigger cleanup, got %d", cleaner.GetPendingCleanupCount())
}
cleaner.Stop()
}
func TestEmptyFolderCleaner_cachedCounts(t *testing.T) {
lockRing := lock_manager.NewLockRing(5 * time.Second)
lockRing.SetSnapshot([]pb.ServerAddress{"filer1:8888"})
cleaner := &EmptyFolderCleaner{
lockRing: lockRing,
host: "filer1:8888",
bucketPath: "/buckets",
enabled: true,
folderCounts: make(map[string]*folderState),
cleanupQueue: NewCleanupQueue(1000, 10*time.Minute),
stopCh: make(chan struct{}),
}
folder := "/buckets/mybucket/testfolder"
// Initialize cached count
cleaner.folderCounts[folder] = &folderState{roughCount: 5}
// Simulate create events
cleaner.OnCreateEvent(folder, "newfile1.txt", false)
cleaner.OnCreateEvent(folder, "newfile2.txt", false)
// Check cached count increased
count, exists := cleaner.GetCachedFolderCount(folder)
if !exists {
t.Error("cached folder count should exist")
}
if count != 7 {
t.Errorf("expected cached count 7, got %d", count)
}
// Simulate delete events
now := time.Now()
cleaner.OnDeleteEvent(folder, "file1.txt", false, now)
cleaner.OnDeleteEvent(folder, "file2.txt", false, now.Add(1*time.Second))
// Check cached count decreased
count, exists = cleaner.GetCachedFolderCount(folder)
if !exists {
t.Error("cached folder count should exist")
}
if count != 5 {
t.Errorf("expected cached count 5, got %d", count)
}
cleaner.Stop()
}
func TestEmptyFolderCleaner_Stop(t *testing.T) {
lockRing := lock_manager.NewLockRing(5 * time.Second)
lockRing.SetSnapshot([]pb.ServerAddress{"filer1:8888"})
cleaner := &EmptyFolderCleaner{
lockRing: lockRing,
host: "filer1:8888",
bucketPath: "/buckets",
enabled: true,
folderCounts: make(map[string]*folderState),
cleanupQueue: NewCleanupQueue(1000, 10*time.Minute),
stopCh: make(chan struct{}),
}
now := time.Now()
// Queue some cleanups
cleaner.OnDeleteEvent("/buckets/mybucket/folder1", "file1.txt", false, now)
cleaner.OnDeleteEvent("/buckets/mybucket/folder2", "file2.txt", false, now.Add(1*time.Second))
cleaner.OnDeleteEvent("/buckets/mybucket/folder3", "file3.txt", false, now.Add(2*time.Second))
// Verify cleanups are queued
if cleaner.GetPendingCleanupCount() < 1 {
t.Error("expected at least 1 pending cleanup before stop")
}
// Stop the cleaner
cleaner.Stop()
// Verify all cleanups are cancelled
if cleaner.GetPendingCleanupCount() != 0 {
t.Errorf("expected 0 pending cleanups after stop, got %d", cleaner.GetPendingCleanupCount())
}
}
func TestEmptyFolderCleaner_cacheEviction(t *testing.T) {
lockRing := lock_manager.NewLockRing(5 * time.Second)
lockRing.SetSnapshot([]pb.ServerAddress{"filer1:8888"})
cleaner := &EmptyFolderCleaner{
lockRing: lockRing,
host: "filer1:8888",
bucketPath: "/buckets",
enabled: true,
folderCounts: make(map[string]*folderState),
cleanupQueue: NewCleanupQueue(1000, 10*time.Minute),
cacheExpiry: 100 * time.Millisecond, // Short expiry for testing
stopCh: make(chan struct{}),
}
folder1 := "/buckets/mybucket/folder1"
folder2 := "/buckets/mybucket/folder2"
folder3 := "/buckets/mybucket/folder3"
// Add some cache entries with old timestamps
oldTime := time.Now().Add(-1 * time.Hour)
cleaner.folderCounts[folder1] = &folderState{roughCount: 5, lastCheck: oldTime}
cleaner.folderCounts[folder2] = &folderState{roughCount: 3, lastCheck: oldTime}
// folder3 has recent activity
cleaner.folderCounts[folder3] = &folderState{roughCount: 2, lastCheck: time.Now()}
// Verify all entries exist
if len(cleaner.folderCounts) != 3 {
t.Errorf("expected 3 cache entries, got %d", len(cleaner.folderCounts))
}
// Run eviction
cleaner.evictStaleCacheEntries()
// Verify stale entries are evicted
if len(cleaner.folderCounts) != 1 {
t.Errorf("expected 1 cache entry after eviction, got %d", len(cleaner.folderCounts))
}
// Verify the recent entry still exists
if _, exists := cleaner.folderCounts[folder3]; !exists {
t.Error("expected folder3 to still exist in cache")
}
// Verify stale entries are removed
if _, exists := cleaner.folderCounts[folder1]; exists {
t.Error("expected folder1 to be evicted")
}
if _, exists := cleaner.folderCounts[folder2]; exists {
t.Error("expected folder2 to be evicted")
}
cleaner.Stop()
}
func TestEmptyFolderCleaner_cacheEviction_skipsEntriesInQueue(t *testing.T) {
lockRing := lock_manager.NewLockRing(5 * time.Second)
lockRing.SetSnapshot([]pb.ServerAddress{"filer1:8888"})
cleaner := &EmptyFolderCleaner{
lockRing: lockRing,
host: "filer1:8888",
bucketPath: "/buckets",
enabled: true,
folderCounts: make(map[string]*folderState),
cleanupQueue: NewCleanupQueue(1000, 10*time.Minute),
cacheExpiry: 100 * time.Millisecond,
stopCh: make(chan struct{}),
}
folder := "/buckets/mybucket/folder"
oldTime := time.Now().Add(-1 * time.Hour)
// Add a stale cache entry
cleaner.folderCounts[folder] = &folderState{roughCount: 0, lastCheck: oldTime}
// Also add to cleanup queue
cleaner.cleanupQueue.Add(folder, time.Now())
// Run eviction
cleaner.evictStaleCacheEntries()
// Verify entry is NOT evicted because it's in cleanup queue
if _, exists := cleaner.folderCounts[folder]; !exists {
t.Error("expected folder to still exist in cache (is in cleanup queue)")
}
cleaner.Stop()
}
func TestEmptyFolderCleaner_queueFIFOOrder(t *testing.T) {
lockRing := lock_manager.NewLockRing(5 * time.Second)
lockRing.SetSnapshot([]pb.ServerAddress{"filer1:8888"})
cleaner := &EmptyFolderCleaner{
lockRing: lockRing,
host: "filer1:8888",
bucketPath: "/buckets",
enabled: true,
folderCounts: make(map[string]*folderState),
cleanupQueue: NewCleanupQueue(1000, 10*time.Minute),
stopCh: make(chan struct{}),
}
now := time.Now()
// Add folders in order
folders := []string{
"/buckets/mybucket/folder1",
"/buckets/mybucket/folder2",
"/buckets/mybucket/folder3",
}
for i, folder := range folders {
cleaner.OnDeleteEvent(folder, "file.txt", false, now.Add(time.Duration(i)*time.Second))
}
// Verify queue length
if cleaner.GetPendingCleanupCount() != 3 {
t.Errorf("expected 3 queued folders, got %d", cleaner.GetPendingCleanupCount())
}
// Verify time-sorted order by popping
for i, expected := range folders {
folder, ok := cleaner.cleanupQueue.Pop()
if !ok || folder != expected {
t.Errorf("expected folder %s at index %d, got %s", expected, i, folder)
}
}
cleaner.Stop()
}

8
weed/filer/filer.go

@ -11,6 +11,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/s3api/s3bucket"
"github.com/seaweedfs/seaweedfs/weed/cluster/lock_manager"
"github.com/seaweedfs/seaweedfs/weed/filer/empty_folder_cleanup"
"github.com/seaweedfs/seaweedfs/weed/cluster"
"github.com/seaweedfs/seaweedfs/weed/pb"
@ -56,6 +57,7 @@ type Filer struct {
MaxFilenameLength uint32
deletionQuit chan struct{}
DeletionRetryQueue *DeletionRetryQueue
EmptyFolderCleaner *empty_folder_cleanup.EmptyFolderCleaner
}
func NewFiler(masters pb.ServerDiscovery, grpcDialOption grpc.DialOption, filerHost pb.ServerAddress, filerGroup string, collection string, replication string, dataCenter string, maxFilenameLength uint32, notifyFn func()) *Filer {
@ -116,6 +118,9 @@ func (f *Filer) AggregateFromPeers(self pb.ServerAddress, existingNodes []*maste
f.Dlm.LockRing.SetSnapshot(snapshot)
glog.V(0).Infof("%s aggregate from peers %+v", self, snapshot)
// Initialize the empty folder cleaner using the same LockRing as Dlm for consistent hashing
f.EmptyFolderCleaner = empty_folder_cleanup.NewEmptyFolderCleaner(f, f.Dlm.LockRing, self, f.DirBucketsPath)
f.MetaAggregator = NewMetaAggregator(f, self, f.GrpcDialOption)
f.MasterClient.SetOnPeerUpdateFn(func(update *master_pb.ClusterNodeUpdate, startFrom time.Time) {
if update.NodeType != cluster.FilerType {
@ -506,6 +511,9 @@ func (f *Filer) IsDirectoryEmpty(ctx context.Context, dirPath util.FullPath) (bo
func (f *Filer) Shutdown() {
close(f.deletionQuit)
if f.EmptyFolderCleaner != nil {
f.EmptyFolderCleaner.Stop()
}
f.LocalMetaLogBuffer.ShutdownLogBuffer()
f.Store.Shutdown()
}

39
weed/filer/filer_notify.go

@ -66,6 +66,10 @@ func (f *Filer) NotifyUpdateEvent(ctx context.Context, oldEntry, newEntry *Entry
f.logMetaEvent(ctx, fullpath, eventNotification)
// Trigger empty folder cleanup for local events
// Remote events are handled via MetaAggregator.onMetadataChangeEvent
f.triggerLocalEmptyFolderCleanup(oldEntry, newEntry)
}
func (f *Filer) logMetaEvent(ctx context.Context, fullpath string, eventNotification *filer_pb.EventNotification) {
@ -89,6 +93,41 @@ func (f *Filer) logMetaEvent(ctx context.Context, fullpath string, eventNotifica
}
// triggerLocalEmptyFolderCleanup triggers empty folder cleanup for local events
// This is needed because onMetadataChangeEvent is only called for remote peer events
func (f *Filer) triggerLocalEmptyFolderCleanup(oldEntry, newEntry *Entry) {
if f.EmptyFolderCleaner == nil || !f.EmptyFolderCleaner.IsEnabled() {
return
}
eventTime := time.Now()
// Handle delete events (oldEntry exists, newEntry is nil)
if oldEntry != nil && newEntry == nil {
dir, name := oldEntry.FullPath.DirAndName()
f.EmptyFolderCleaner.OnDeleteEvent(dir, name, oldEntry.IsDirectory(), eventTime)
}
// Handle create events (oldEntry is nil, newEntry exists)
if oldEntry == nil && newEntry != nil {
dir, name := newEntry.FullPath.DirAndName()
f.EmptyFolderCleaner.OnCreateEvent(dir, name, newEntry.IsDirectory())
}
// Handle rename/move events (both exist but paths differ)
if oldEntry != nil && newEntry != nil {
oldDir, oldName := oldEntry.FullPath.DirAndName()
newDir, newName := newEntry.FullPath.DirAndName()
if oldDir != newDir || oldName != newName {
// Treat old location as delete
f.EmptyFolderCleaner.OnDeleteEvent(oldDir, oldName, oldEntry.IsDirectory(), eventTime)
// Treat new location as create
f.EmptyFolderCleaner.OnCreateEvent(newDir, newName, newEntry.IsDirectory())
}
}
}
func (f *Filer) logFlushFunc(logBuffer *log_buffer.LogBuffer, startTime, stopTime time.Time, buf []byte, minOffset, maxOffset int64) {
if len(buf) == 0 {

39
weed/filer/filer_on_meta_event.go

@ -2,6 +2,7 @@ package filer
import (
"bytes"
"time"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
@ -13,6 +14,7 @@ func (f *Filer) onMetadataChangeEvent(event *filer_pb.SubscribeMetadataResponse)
f.maybeReloadFilerConfiguration(event)
f.maybeReloadRemoteStorageConfigurationAndMapping(event)
f.onBucketEvents(event)
f.onEmptyFolderCleanupEvents(event)
}
func (f *Filer) onBucketEvents(event *filer_pb.SubscribeMetadataResponse) {
@ -32,6 +34,43 @@ func (f *Filer) onBucketEvents(event *filer_pb.SubscribeMetadataResponse) {
}
}
// onEmptyFolderCleanupEvents handles create/delete events for empty folder cleanup
func (f *Filer) onEmptyFolderCleanupEvents(event *filer_pb.SubscribeMetadataResponse) {
if f.EmptyFolderCleaner == nil || !f.EmptyFolderCleaner.IsEnabled() {
return
}
message := event.EventNotification
directory := event.Directory
eventTime := time.Unix(0, event.TsNs)
// Handle delete events - trigger folder cleanup check
if filer_pb.IsDelete(event) && message.OldEntry != nil {
f.EmptyFolderCleaner.OnDeleteEvent(directory, message.OldEntry.Name, message.OldEntry.IsDirectory, eventTime)
}
// Handle create events - cancel pending cleanup for the folder
if filer_pb.IsCreate(event) && message.NewEntry != nil {
f.EmptyFolderCleaner.OnCreateEvent(directory, message.NewEntry.Name, message.NewEntry.IsDirectory)
}
// Handle rename/move events
if filer_pb.IsRename(event) {
// Treat the old location as a delete
if message.OldEntry != nil {
f.EmptyFolderCleaner.OnDeleteEvent(directory, message.OldEntry.Name, message.OldEntry.IsDirectory, eventTime)
}
// Treat the new location as a create
if message.NewEntry != nil {
newDir := message.NewParentPath
if newDir == "" {
newDir = directory
}
f.EmptyFolderCleaner.OnCreateEvent(newDir, message.NewEntry.Name, message.NewEntry.IsDirectory)
}
}
}
func (f *Filer) maybeReloadFilerConfiguration(event *filer_pb.SubscribeMetadataResponse) {
if DirectoryEtcSeaweedFS != event.Directory {
if DirectoryEtcSeaweedFS != event.EventNotification.NewParentPath {

13
weed/filer/filer_search.go

@ -41,6 +41,19 @@ func (f *Filer) ListDirectoryEntries(ctx context.Context, p util.FullPath, start
return entries, hasMore, err
}
// CountDirectoryEntries counts entries in a directory up to limit
func (f *Filer) CountDirectoryEntries(ctx context.Context, p util.FullPath, limit int) (count int, err error) {
entries, hasMore, err := f.ListDirectoryEntries(ctx, p, "", false, int64(limit), "", "", "")
if err != nil {
return 0, err
}
count = len(entries)
if hasMore {
count = limit // At least this many
}
return count, nil
}
// For now, prefix and namePattern are mutually exclusive
func (f *Filer) StreamListDirectoryEntries(ctx context.Context, p util.FullPath, startFileName string, inclusive bool, limit int64, prefix string, namePattern string, namePatternExclude string, eachEntryFunc ListEachEntryFunc) (lastFileName string, err error) {
if strings.HasSuffix(string(p), "/") && len(p) > 1 {

57
weed/s3api/s3api_object_handlers_delete.go

@ -1,12 +1,10 @@
package s3api
import (
"context"
"encoding/xml"
"fmt"
"io"
"net/http"
"slices"
"strings"
"github.com/seaweedfs/seaweedfs/weed/filer"
@ -127,22 +125,9 @@ func (s3a *S3ApiServer) DeleteObjectHandler(w http.ResponseWriter, r *http.Reque
dir, name := target.DirAndName()
err := s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
// Use operation context that won't be cancelled if request terminates
// This ensures deletion completes atomically to avoid inconsistent state
opCtx := context.WithoutCancel(r.Context())
if err := doDeleteEntry(client, dir, name, true, false); err != nil {
return err
}
// Cleanup empty directories
if !s3a.option.AllowEmptyFolder && strings.LastIndex(object, "/") > 0 {
bucketPath := fmt.Sprintf("%s/%s", s3a.option.BucketsPath, bucket)
// Recursively delete empty parent directories, stop at bucket path
filer_pb.DoDeleteEmptyParentDirectories(opCtx, client, util.FullPath(dir), util.FullPath(bucketPath), nil)
}
return nil
return doDeleteEntry(client, dir, name, true, false)
// Note: Empty folder cleanup is now handled asynchronously by EmptyFolderCleaner
// which listens to metadata events and uses consistent hashing for coordination
})
if err != nil {
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
@ -222,8 +207,6 @@ func (s3a *S3ApiServer) DeleteMultipleObjectsHandler(w http.ResponseWriter, r *h
var deleteErrors []DeleteError
var auditLog *s3err.AccessLog
directoriesWithDeletion := make(map[string]bool)
if s3err.Logger != nil {
auditLog = s3err.GetAccessLog(r, http.StatusNoContent, s3err.ErrNone)
}
@ -245,10 +228,6 @@ func (s3a *S3ApiServer) DeleteMultipleObjectsHandler(w http.ResponseWriter, r *h
versioningConfigured := (versioningState != "")
s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
// Use operation context that won't be cancelled if request terminates
// This ensures batch deletion completes atomically to avoid inconsistent state
opCtx := context.WithoutCancel(r.Context())
// delete file entries
for _, object := range deleteObjects.Objects {
if object.Key == "" {
@ -357,10 +336,6 @@ func (s3a *S3ApiServer) DeleteMultipleObjectsHandler(w http.ResponseWriter, r *h
err := doDeleteEntry(client, parentDirectoryPath, entryName, isDeleteData, isRecursive)
if err == nil {
// Track directory for empty directory cleanup
if !s3a.option.AllowEmptyFolder {
directoriesWithDeletion[parentDirectoryPath] = true
}
deletedObjects = append(deletedObjects, object)
} else if strings.Contains(err.Error(), filer.MsgFailDelNonEmptyFolder) {
deletedObjects = append(deletedObjects, object)
@ -380,30 +355,8 @@ func (s3a *S3ApiServer) DeleteMultipleObjectsHandler(w http.ResponseWriter, r *h
}
}
// Cleanup empty directories - optimize by processing deepest first
if !s3a.option.AllowEmptyFolder && len(directoriesWithDeletion) > 0 {
bucketPath := fmt.Sprintf("%s/%s", s3a.option.BucketsPath, bucket)
// Collect and sort directories by depth (deepest first) to avoid redundant checks
var allDirs []string
for dirPath := range directoriesWithDeletion {
allDirs = append(allDirs, dirPath)
}
// Sort by depth (deeper directories first)
slices.SortFunc(allDirs, func(a, b string) int {
return strings.Count(b, "/") - strings.Count(a, "/")
})
// Track already-checked directories to avoid redundant work
checked := make(map[string]bool)
for _, dirPath := range allDirs {
if !checked[dirPath] {
// Recursively delete empty parent directories, stop at bucket path
// Mark this directory and all its parents as checked during recursion
filer_pb.DoDeleteEmptyParentDirectories(opCtx, client, util.FullPath(dirPath), util.FullPath(bucketPath), checked)
}
}
}
// Note: Empty folder cleanup is now handled asynchronously by EmptyFolderCleaner
// which listens to metadata events and uses consistent hashing for coordination
return nil
})

Loading…
Cancel
Save