Browse Source

Generalize empty folder cleanup for Spark temp artifacts

pull/8292/head
Chris Lu 13 hours ago
parent
commit
bd77d9f9b3
  1. 68
      weed/filer/empty_folder_cleanup/empty_folder_cleaner.go
  2. 23
      weed/filer/empty_folder_cleanup/empty_folder_cleaner_test.go
  3. 27
      weed/s3api/s3api_object_handlers_delete.go
  4. 59
      weed/s3api/s3api_object_handlers_put.go

68
weed/filer/empty_folder_cleanup/empty_folder_cleaner.go

@ -9,8 +9,6 @@ import (
"github.com/seaweedfs/seaweedfs/weed/cluster/lock_manager"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
"github.com/seaweedfs/seaweedfs/weed/util"
)
@ -161,14 +159,9 @@ func (efc *EmptyFolderCleaner) OnDeleteEvent(directory string, entryName string,
return
}
// For Spark-style temporary folders, prioritize cleanup on the next processor tick.
// These paths are expected to be ephemeral and can otherwise accumulate quickly.
queueTime := eventTime
if containsTemporaryPathSegment(directory) {
queueTime = eventTime.Add(-efc.cleanupQueue.maxAge)
}
// Add to cleanup queue with event time (handles out-of-order events)
// Queue with an age that is immediately eligible at next processor tick.
// This keeps empty-folder cleanup responsive while preserving queue ordering/dedup.
queueTime := eventTime.Add(-efc.cleanupQueue.maxAge)
if efc.cleanupQueue.Add(directory, queueTime) {
glog.V(3).Infof("EmptyFolderCleaner: queued %s for cleanup", directory)
}
@ -276,48 +269,8 @@ func (efc *EmptyFolderCleaner) executeCleanup(folder string) {
return
}
// Check for explicit implicit_dir attribute
// First check cache
ctx := context.Background()
efc.mu.RLock()
var cachedImplicit *bool
if state, exists := efc.folderCounts[folder]; exists {
cachedImplicit = state.isImplicit
}
efc.mu.RUnlock()
var isImplicit bool
if cachedImplicit != nil {
isImplicit = *cachedImplicit
} else {
// Not cached, check filer
attrs, err := efc.filer.GetEntryAttributes(ctx, util.FullPath(folder))
if err != nil {
if err == filer_pb.ErrNotFound {
return
}
glog.V(2).Infof("EmptyFolderCleaner: error getting attributes for %s: %v", folder, err)
return
}
isImplicit = attrs != nil && string(attrs[s3_constants.ExtS3ImplicitDir]) == "true"
// Update cache
efc.mu.Lock()
if _, exists := efc.folderCounts[folder]; !exists {
efc.folderCounts[folder] = &folderState{}
}
efc.folderCounts[folder].isImplicit = &isImplicit
efc.mu.Unlock()
}
isTemporaryWorkPath := containsTemporaryPathSegment(folder)
if !isImplicit && !isTemporaryWorkPath {
glog.V(4).Infof("EmptyFolderCleaner: folder %s is not marked as implicit, skipping", folder)
return
}
// Check if folder is actually empty (count up to maxCountCheck)
ctx := context.Background()
count, err := efc.countItems(ctx, folder)
if err != nil {
glog.V(2).Infof("EmptyFolderCleaner: error counting items in %s: %v", folder, err)
@ -409,19 +362,6 @@ func isUnderBucketPath(directory, bucketPath string) bool {
return directoryDepth >= bucketPathDepth+2
}
func containsTemporaryPathSegment(path string) bool {
trimmed := strings.Trim(path, "/")
if trimmed == "" {
return false
}
for _, segment := range strings.Split(trimmed, "/") {
if segment == "_temporary" {
return true
}
}
return false
}
// cacheEvictionLoop periodically removes stale entries from folderCounts
func (efc *EmptyFolderCleaner) cacheEvictionLoop() {
ticker := time.NewTicker(efc.cacheExpiry)

23
weed/filer/empty_folder_cleanup/empty_folder_cleaner_test.go

@ -66,29 +66,6 @@ func Test_isUnderBucketPath(t *testing.T) {
}
}
func Test_containsTemporaryPathSegment(t *testing.T) {
tests := []struct {
name string
path string
expected bool
}{
{"spark temporary root", "/buckets/mybucket/output/_temporary", true},
{"spark temporary nested", "/buckets/mybucket/output/_temporary/0/task", true},
{"no temporary segment", "/buckets/mybucket/output/temp", false},
{"empty path", "", false},
{"root path", "/", false},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result := containsTemporaryPathSegment(tt.path)
if result != tt.expected {
t.Errorf("containsTemporaryPathSegment(%q) = %v, want %v", tt.path, result, tt.expected)
}
})
}
}
func TestEmptyFolderCleaner_ownsFolder(t *testing.T) {
// Create a LockRing with multiple servers
lockRing := lock_manager.NewLockRing(5 * time.Second)

27
weed/s3api/s3api_object_handlers_delete.go

@ -132,7 +132,7 @@ func (s3a *S3ApiServer) DeleteObjectHandler(w http.ResponseWriter, r *http.Reque
if deleteErr := doDeleteEntry(client, dir, name, true, false); deleteErr != nil {
return deleteErr
}
s3a.cleanupTemporaryParentDirectories(client, bucket, object)
s3a.cleanupEmptyParentDirectories(client, bucket, object)
return nil
})
@ -356,10 +356,10 @@ func (s3a *S3ApiServer) DeleteMultipleObjectsHandler(w http.ResponseWriter, r *h
err := doDeleteEntry(client, parentDirectoryPath, entryName, isDeleteData, isRecursive)
if err == nil {
deletedObjects = append(deletedObjects, object)
s3a.cleanupTemporaryParentDirectories(client, bucket, object.Key)
s3a.cleanupEmptyParentDirectories(client, bucket, object.Key)
} else if strings.Contains(err.Error(), filer.MsgFailDelNonEmptyFolder) {
deletedObjects = append(deletedObjects, object)
s3a.cleanupTemporaryParentDirectories(client, bucket, object.Key)
s3a.cleanupEmptyParentDirectories(client, bucket, object.Key)
if entryName != "" {
normalizedKey := strings.TrimSuffix(object.Key, "/")
if _, seen := pendingDirectoryDeleteSeen[normalizedKey]; !seen {
@ -421,9 +421,9 @@ func (s3a *S3ApiServer) DeleteMultipleObjectsHandler(w http.ResponseWriter, r *h
}
func (s3a *S3ApiServer) cleanupTemporaryParentDirectories(client filer_pb.SeaweedFilerClient, bucket, objectKey string) {
func (s3a *S3ApiServer) cleanupEmptyParentDirectories(client filer_pb.SeaweedFilerClient, bucket, objectKey string) {
normalizedKey := strings.Trim(strings.TrimSpace(objectKey), "/")
if normalizedKey == "" || !containsTemporaryPathSegment(normalizedKey) {
if normalizedKey == "" {
return
}
@ -432,12 +432,6 @@ func (s3a *S3ApiServer) cleanupTemporaryParentDirectories(client filer_pb.Seawee
bucketRoot := s3a.bucketDir(bucket)
for parentDirectoryPath != "" && parentDirectoryPath != "/" && parentDirectoryPath != bucketRoot {
relativeParent := strings.TrimPrefix(parentDirectoryPath, bucketRoot)
relativeParent = strings.TrimPrefix(relativeParent, "/")
if !containsTemporaryPathSegment(relativeParent) {
return
}
grandParent, directoryName := util.FullPath(parentDirectoryPath).DirAndName()
if directoryName == "" {
return
@ -455,16 +449,7 @@ func (s3a *S3ApiServer) cleanupTemporaryParentDirectories(client filer_pb.Seawee
parentDirectoryPath = grandParent
continue
}
glog.V(2).Infof("cleanupTemporaryParentDirectories: failed deleting %s/%s: %v", grandParent, directoryName, err)
glog.V(2).Infof("cleanupEmptyParentDirectories: failed deleting %s/%s: %v", grandParent, directoryName, err)
return
}
}
func containsTemporaryPathSegment(path string) bool {
for _, segment := range strings.Split(strings.Trim(path, "/"), "/") {
if segment == "_temporary" {
return true
}
}
return false
}

59
weed/s3api/s3api_object_handlers_put.go

@ -125,43 +125,36 @@ func (s3a *S3ApiServer) PutObjectHandler(w http.ResponseWriter, r *http.Request)
if strings.HasSuffix(object, "/") && r.ContentLength <= 1024 {
// Split the object into directory path and name
objectWithoutSlash := strings.TrimSuffix(object, "/")
if containsTemporaryPathSegment(objectWithoutSlash) {
// Spark and Hadoop committers may create explicit "_temporary" directory markers.
// Persisting these markers can accumulate stale empty directories.
// Skip materializing temporary markers and rely on implicit directories from actual object writes.
glog.V(3).Infof("PutObjectHandler: skipping temporary directory marker %s/%s", bucket, object)
} else {
dirName := path.Dir(objectWithoutSlash)
entryName := path.Base(objectWithoutSlash)
dirName := path.Dir(objectWithoutSlash)
entryName := path.Base(objectWithoutSlash)
if dirName == "." {
dirName = ""
}
dirName = strings.TrimPrefix(dirName, "/")
if dirName == "." {
dirName = ""
}
dirName = strings.TrimPrefix(dirName, "/")
// Construct full directory path
fullDirPath := s3a.bucketDir(bucket)
if dirName != "" {
fullDirPath = fullDirPath + "/" + dirName
}
// Construct full directory path
fullDirPath := s3a.bucketDir(bucket)
if dirName != "" {
fullDirPath = fullDirPath + "/" + dirName
}
if err := s3a.mkdir(
fullDirPath, entryName,
func(entry *filer_pb.Entry) {
if objectContentType == "" {
objectContentType = s3_constants.FolderMimeType
}
if r.ContentLength > 0 {
entry.Content, _ = io.ReadAll(r.Body)
}
entry.Attributes.Mime = objectContentType
if err := s3a.mkdir(
fullDirPath, entryName,
func(entry *filer_pb.Entry) {
if objectContentType == "" {
objectContentType = s3_constants.FolderMimeType
}
if r.ContentLength > 0 {
entry.Content, _ = io.ReadAll(r.Body)
}
entry.Attributes.Mime = objectContentType
// Set object owner for directory objects (same as regular objects)
s3a.setObjectOwnerFromRequest(r, bucket, entry)
}); err != nil {
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
return
}
// Set object owner for directory objects (same as regular objects)
s3a.setObjectOwnerFromRequest(r, bucket, entry)
}); err != nil {
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
return
}
} else {
// Get detailed versioning state for the bucket

Loading…
Cancel
Save