diff --git a/weed/filer/empty_folder_cleanup/empty_folder_cleaner.go b/weed/filer/empty_folder_cleanup/empty_folder_cleaner.go index f78b0cf73..11a5cea44 100644 --- a/weed/filer/empty_folder_cleanup/empty_folder_cleaner.go +++ b/weed/filer/empty_folder_cleanup/empty_folder_cleaner.go @@ -9,8 +9,6 @@ import ( "github.com/seaweedfs/seaweedfs/weed/cluster/lock_manager" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb" - "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" - "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants" "github.com/seaweedfs/seaweedfs/weed/util" ) @@ -161,14 +159,9 @@ func (efc *EmptyFolderCleaner) OnDeleteEvent(directory string, entryName string, return } - // For Spark-style temporary folders, prioritize cleanup on the next processor tick. - // These paths are expected to be ephemeral and can otherwise accumulate quickly. - queueTime := eventTime - if containsTemporaryPathSegment(directory) { - queueTime = eventTime.Add(-efc.cleanupQueue.maxAge) - } - - // Add to cleanup queue with event time (handles out-of-order events) + // Queue with an age that is immediately eligible at next processor tick. + // This keeps empty-folder cleanup responsive while preserving queue ordering/dedup. + queueTime := eventTime.Add(-efc.cleanupQueue.maxAge) if efc.cleanupQueue.Add(directory, queueTime) { glog.V(3).Infof("EmptyFolderCleaner: queued %s for cleanup", directory) } @@ -276,48 +269,8 @@ func (efc *EmptyFolderCleaner) executeCleanup(folder string) { return } - // Check for explicit implicit_dir attribute - // First check cache - ctx := context.Background() - efc.mu.RLock() - var cachedImplicit *bool - if state, exists := efc.folderCounts[folder]; exists { - cachedImplicit = state.isImplicit - } - efc.mu.RUnlock() - - var isImplicit bool - if cachedImplicit != nil { - isImplicit = *cachedImplicit - } else { - // Not cached, check filer - attrs, err := efc.filer.GetEntryAttributes(ctx, util.FullPath(folder)) - if err != nil { - if err == filer_pb.ErrNotFound { - return - } - glog.V(2).Infof("EmptyFolderCleaner: error getting attributes for %s: %v", folder, err) - return - } - - isImplicit = attrs != nil && string(attrs[s3_constants.ExtS3ImplicitDir]) == "true" - - // Update cache - efc.mu.Lock() - if _, exists := efc.folderCounts[folder]; !exists { - efc.folderCounts[folder] = &folderState{} - } - efc.folderCounts[folder].isImplicit = &isImplicit - efc.mu.Unlock() - } - - isTemporaryWorkPath := containsTemporaryPathSegment(folder) - if !isImplicit && !isTemporaryWorkPath { - glog.V(4).Infof("EmptyFolderCleaner: folder %s is not marked as implicit, skipping", folder) - return - } - // Check if folder is actually empty (count up to maxCountCheck) + ctx := context.Background() count, err := efc.countItems(ctx, folder) if err != nil { glog.V(2).Infof("EmptyFolderCleaner: error counting items in %s: %v", folder, err) @@ -409,19 +362,6 @@ func isUnderBucketPath(directory, bucketPath string) bool { return directoryDepth >= bucketPathDepth+2 } -func containsTemporaryPathSegment(path string) bool { - trimmed := strings.Trim(path, "/") - if trimmed == "" { - return false - } - for _, segment := range strings.Split(trimmed, "/") { - if segment == "_temporary" { - return true - } - } - return false -} - // cacheEvictionLoop periodically removes stale entries from folderCounts func (efc *EmptyFolderCleaner) cacheEvictionLoop() { ticker := time.NewTicker(efc.cacheExpiry) diff --git a/weed/filer/empty_folder_cleanup/empty_folder_cleaner_test.go b/weed/filer/empty_folder_cleanup/empty_folder_cleaner_test.go index b23a74488..1c62a5dd1 100644 --- a/weed/filer/empty_folder_cleanup/empty_folder_cleaner_test.go +++ b/weed/filer/empty_folder_cleanup/empty_folder_cleaner_test.go @@ -66,29 +66,6 @@ func Test_isUnderBucketPath(t *testing.T) { } } -func Test_containsTemporaryPathSegment(t *testing.T) { - tests := []struct { - name string - path string - expected bool - }{ - {"spark temporary root", "/buckets/mybucket/output/_temporary", true}, - {"spark temporary nested", "/buckets/mybucket/output/_temporary/0/task", true}, - {"no temporary segment", "/buckets/mybucket/output/temp", false}, - {"empty path", "", false}, - {"root path", "/", false}, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - result := containsTemporaryPathSegment(tt.path) - if result != tt.expected { - t.Errorf("containsTemporaryPathSegment(%q) = %v, want %v", tt.path, result, tt.expected) - } - }) - } -} - func TestEmptyFolderCleaner_ownsFolder(t *testing.T) { // Create a LockRing with multiple servers lockRing := lock_manager.NewLockRing(5 * time.Second) diff --git a/weed/s3api/s3api_object_handlers_delete.go b/weed/s3api/s3api_object_handlers_delete.go index e91c909f2..774452381 100644 --- a/weed/s3api/s3api_object_handlers_delete.go +++ b/weed/s3api/s3api_object_handlers_delete.go @@ -132,7 +132,7 @@ func (s3a *S3ApiServer) DeleteObjectHandler(w http.ResponseWriter, r *http.Reque if deleteErr := doDeleteEntry(client, dir, name, true, false); deleteErr != nil { return deleteErr } - s3a.cleanupTemporaryParentDirectories(client, bucket, object) + s3a.cleanupEmptyParentDirectories(client, bucket, object) return nil }) @@ -356,10 +356,10 @@ func (s3a *S3ApiServer) DeleteMultipleObjectsHandler(w http.ResponseWriter, r *h err := doDeleteEntry(client, parentDirectoryPath, entryName, isDeleteData, isRecursive) if err == nil { deletedObjects = append(deletedObjects, object) - s3a.cleanupTemporaryParentDirectories(client, bucket, object.Key) + s3a.cleanupEmptyParentDirectories(client, bucket, object.Key) } else if strings.Contains(err.Error(), filer.MsgFailDelNonEmptyFolder) { deletedObjects = append(deletedObjects, object) - s3a.cleanupTemporaryParentDirectories(client, bucket, object.Key) + s3a.cleanupEmptyParentDirectories(client, bucket, object.Key) if entryName != "" { normalizedKey := strings.TrimSuffix(object.Key, "/") if _, seen := pendingDirectoryDeleteSeen[normalizedKey]; !seen { @@ -421,9 +421,9 @@ func (s3a *S3ApiServer) DeleteMultipleObjectsHandler(w http.ResponseWriter, r *h } -func (s3a *S3ApiServer) cleanupTemporaryParentDirectories(client filer_pb.SeaweedFilerClient, bucket, objectKey string) { +func (s3a *S3ApiServer) cleanupEmptyParentDirectories(client filer_pb.SeaweedFilerClient, bucket, objectKey string) { normalizedKey := strings.Trim(strings.TrimSpace(objectKey), "/") - if normalizedKey == "" || !containsTemporaryPathSegment(normalizedKey) { + if normalizedKey == "" { return } @@ -432,12 +432,6 @@ func (s3a *S3ApiServer) cleanupTemporaryParentDirectories(client filer_pb.Seawee bucketRoot := s3a.bucketDir(bucket) for parentDirectoryPath != "" && parentDirectoryPath != "/" && parentDirectoryPath != bucketRoot { - relativeParent := strings.TrimPrefix(parentDirectoryPath, bucketRoot) - relativeParent = strings.TrimPrefix(relativeParent, "/") - if !containsTemporaryPathSegment(relativeParent) { - return - } - grandParent, directoryName := util.FullPath(parentDirectoryPath).DirAndName() if directoryName == "" { return @@ -455,16 +449,7 @@ func (s3a *S3ApiServer) cleanupTemporaryParentDirectories(client filer_pb.Seawee parentDirectoryPath = grandParent continue } - glog.V(2).Infof("cleanupTemporaryParentDirectories: failed deleting %s/%s: %v", grandParent, directoryName, err) + glog.V(2).Infof("cleanupEmptyParentDirectories: failed deleting %s/%s: %v", grandParent, directoryName, err) return } } - -func containsTemporaryPathSegment(path string) bool { - for _, segment := range strings.Split(strings.Trim(path, "/"), "/") { - if segment == "_temporary" { - return true - } - } - return false -} diff --git a/weed/s3api/s3api_object_handlers_put.go b/weed/s3api/s3api_object_handlers_put.go index 83955047f..254885f22 100644 --- a/weed/s3api/s3api_object_handlers_put.go +++ b/weed/s3api/s3api_object_handlers_put.go @@ -125,43 +125,36 @@ func (s3a *S3ApiServer) PutObjectHandler(w http.ResponseWriter, r *http.Request) if strings.HasSuffix(object, "/") && r.ContentLength <= 1024 { // Split the object into directory path and name objectWithoutSlash := strings.TrimSuffix(object, "/") - if containsTemporaryPathSegment(objectWithoutSlash) { - // Spark and Hadoop committers may create explicit "_temporary" directory markers. - // Persisting these markers can accumulate stale empty directories. - // Skip materializing temporary markers and rely on implicit directories from actual object writes. - glog.V(3).Infof("PutObjectHandler: skipping temporary directory marker %s/%s", bucket, object) - } else { - dirName := path.Dir(objectWithoutSlash) - entryName := path.Base(objectWithoutSlash) + dirName := path.Dir(objectWithoutSlash) + entryName := path.Base(objectWithoutSlash) - if dirName == "." { - dirName = "" - } - dirName = strings.TrimPrefix(dirName, "/") + if dirName == "." { + dirName = "" + } + dirName = strings.TrimPrefix(dirName, "/") - // Construct full directory path - fullDirPath := s3a.bucketDir(bucket) - if dirName != "" { - fullDirPath = fullDirPath + "/" + dirName - } + // Construct full directory path + fullDirPath := s3a.bucketDir(bucket) + if dirName != "" { + fullDirPath = fullDirPath + "/" + dirName + } - if err := s3a.mkdir( - fullDirPath, entryName, - func(entry *filer_pb.Entry) { - if objectContentType == "" { - objectContentType = s3_constants.FolderMimeType - } - if r.ContentLength > 0 { - entry.Content, _ = io.ReadAll(r.Body) - } - entry.Attributes.Mime = objectContentType + if err := s3a.mkdir( + fullDirPath, entryName, + func(entry *filer_pb.Entry) { + if objectContentType == "" { + objectContentType = s3_constants.FolderMimeType + } + if r.ContentLength > 0 { + entry.Content, _ = io.ReadAll(r.Body) + } + entry.Attributes.Mime = objectContentType - // Set object owner for directory objects (same as regular objects) - s3a.setObjectOwnerFromRequest(r, bucket, entry) - }); err != nil { - s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) - return - } + // Set object owner for directory objects (same as regular objects) + s3a.setObjectOwnerFromRequest(r, bucket, entry) + }); err != nil { + s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) + return } } else { // Get detailed versioning state for the bucket