From 692b3a6e0776f5586492963371c0ff0ef1aca917 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Tue, 10 Feb 2026 12:55:04 -0800 Subject: [PATCH] Fix Spark _temporary cleanup and add issue #8285 regression test --- test/s3/spark/issue_8285_repro_test.go | 180 ++++++++++++++++++ .../empty_folder_cleaner.go | 25 ++- .../empty_folder_cleaner_test.go | 23 +++ weed/s3api/s3api_object_handlers_delete.go | 106 +++++++++-- weed/s3api/s3api_object_handlers_put.go | 59 +++--- 5 files changed, 353 insertions(+), 40 deletions(-) create mode 100644 test/s3/spark/issue_8285_repro_test.go diff --git a/test/s3/spark/issue_8285_repro_test.go b/test/s3/spark/issue_8285_repro_test.go new file mode 100644 index 000000000..a514356da --- /dev/null +++ b/test/s3/spark/issue_8285_repro_test.go @@ -0,0 +1,180 @@ +package spark + +import ( + "context" + "errors" + "fmt" + "strings" + "testing" + "time" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/credentials" + "github.com/aws/aws-sdk-go-v2/service/s3" + s3types "github.com/aws/aws-sdk-go-v2/service/s3/types" +) + +func TestSparkS3TemporaryDirectoryCleanupIssue8285Regression(t *testing.T) { + if testing.Short() { + t.Skip("Skipping Spark integration test in short mode") + } + + env := setupSparkIssue8234Env(t) + + script := ` +import pyspark.sql.functions as F + +target = "s3a://test/issue-8285/output" + +spark.conf.set("spark.hadoop.fs.s3a.committer.name", "directory") +spark.conf.set("spark.hadoop.fs.s3a.committer.magic.enabled", "false") +spark.conf.set("spark.hadoop.fs.s3a.committer.staging.abort.pending.uploads", "true") +spark.conf.set("spark.hadoop.fs.s3a.committer.staging.conflict-mode", "append") +spark.conf.set("spark.hadoop.fs.s3a.committer.staging.tmp.path", "/tmp") +spark.conf.set("spark.hadoop.fs.s3a.directory.marker.retention", "keep") + +df = spark.range(0, 200).repartition(12).withColumn("value", F.col("id") * 2) +df.write.format("parquet").mode("overwrite").save(target) +count = spark.read.parquet(target).count() +print("WRITE_COUNT=" + str(count)) +` + + code, output := runSparkPyScript(t, env.sparkContainer, script, env.s3Port) + if code != 0 { + t.Fatalf("Spark script exited with code %d; output:\n%s", code, output) + } + if !strings.Contains(output, "WRITE_COUNT=200") { + t.Fatalf("expected write/read success marker in output, got:\n%s", output) + } + + keys := listObjectKeysByPrefix(t, env, "test", "issue-8285/") + var temporaryKeys []string + for _, key := range keys { + if hasTemporaryPathSegment(key) { + temporaryKeys = append(temporaryKeys, key) + } + } + + if len(temporaryKeys) > 0 { + t.Fatalf("issue #8285 regression detected: found lingering _temporary artifacts: %v\nall keys: %v", temporaryKeys, keys) + } + + temporaryCandidates := []string{ + "issue-8285/output/_temporary/", + "issue-8285/output/_temporary/0/", + "issue-8285/output/_temporary/0/_temporary/", + } + lingering := waitForObjectsToDisappear(t, env, "test", temporaryCandidates, 35*time.Second) + if len(lingering) > 0 { + t.Fatalf("issue #8285 regression detected: lingering temporary directories: %v", lingering) + } +} + +func listObjectKeysByPrefix(t *testing.T, env *TestEnvironment, bucketName, prefix string) []string { + t.Helper() + client := newS3Client(env) + + pager := s3.NewListObjectsV2Paginator(client, &s3.ListObjectsV2Input{ + Bucket: aws.String(bucketName), + Prefix: aws.String(prefix), + }) + + var keys []string + for pager.HasMorePages() { + page, err := pager.NextPage(context.Background()) + if err != nil { + t.Fatalf("failed listing objects for prefix %q: %v", prefix, err) + } + for _, object := range page.Contents { + keys = append(keys, aws.ToString(object.Key)) + } + } + + return keys +} + +func headObjectInfo(t *testing.T, env *TestEnvironment, bucketName, key string) (bool, string, error) { + t.Helper() + + client := newS3Client(env) + output, err := client.HeadObject(context.Background(), &s3.HeadObjectInput{ + Bucket: aws.String(bucketName), + Key: aws.String(key), + }) + if err == nil { + return true, aws.ToString(output.ContentType), nil + } + + var notFound *s3types.NotFound + if strings.Contains(err.Error(), "NotFound") || strings.Contains(err.Error(), "NoSuchKey") || errors.As(err, ¬Found) { + return false, "", nil + } + return false, "", err +} + +func waitForObjectsToDisappear(t *testing.T, env *TestEnvironment, bucketName string, keys []string, timeout time.Duration) []string { + t.Helper() + + deadline := time.Now().Add(timeout) + pending := make(map[string]struct{}, len(keys)) + details := make(map[string]string, len(keys)) + for _, key := range keys { + pending[key] = struct{}{} + } + + for len(pending) > 0 && time.Now().Before(deadline) { + for key := range pending { + exists, contentType, err := headObjectInfo(t, env, bucketName, key) + if err != nil { + details[key] = fmt.Sprintf("%s (head_error=%v)", key, err) + continue + } + if !exists { + delete(pending, key) + delete(details, key) + continue + } + details[key] = fmt.Sprintf("%s (exists=true, contentType=%q)", key, contentType) + } + if len(pending) > 0 { + time.Sleep(2 * time.Second) + } + } + + if len(pending) == 0 { + return nil + } + + var lingering []string + for _, key := range keys { + if _, ok := pending[key]; !ok { + continue + } + if detail, hasDetail := details[key]; hasDetail { + lingering = append(lingering, detail) + } else { + lingering = append(lingering, key) + } + } + return lingering +} + +func newS3Client(env *TestEnvironment) *s3.Client { + cfg := aws.Config{ + Region: "us-east-1", + Credentials: aws.NewCredentialsCache(credentials.NewStaticCredentialsProvider(env.accessKey, env.secretKey, "")), + BaseEndpoint: aws.String(fmt.Sprintf("http://localhost:%d", env.s3Port)), + } + return s3.NewFromConfig(cfg, func(o *s3.Options) { + o.UsePathStyle = true + }) +} + +func hasTemporaryPathSegment(key string) bool { + for _, segment := range strings.Split(strings.TrimSuffix(key, "/"), "/") { + if segment == "_temporary" { + return true + } + } + return false +} diff --git a/weed/filer/empty_folder_cleanup/empty_folder_cleaner.go b/weed/filer/empty_folder_cleanup/empty_folder_cleaner.go index d98dd5ee6..f78b0cf73 100644 --- a/weed/filer/empty_folder_cleanup/empty_folder_cleaner.go +++ b/weed/filer/empty_folder_cleanup/empty_folder_cleaner.go @@ -161,8 +161,15 @@ func (efc *EmptyFolderCleaner) OnDeleteEvent(directory string, entryName string, return } + // For Spark-style temporary folders, prioritize cleanup on the next processor tick. + // These paths are expected to be ephemeral and can otherwise accumulate quickly. + queueTime := eventTime + if containsTemporaryPathSegment(directory) { + queueTime = eventTime.Add(-efc.cleanupQueue.maxAge) + } + // Add to cleanup queue with event time (handles out-of-order events) - if efc.cleanupQueue.Add(directory, eventTime) { + if efc.cleanupQueue.Add(directory, queueTime) { glog.V(3).Infof("EmptyFolderCleaner: queued %s for cleanup", directory) } } @@ -304,7 +311,8 @@ func (efc *EmptyFolderCleaner) executeCleanup(folder string) { efc.mu.Unlock() } - if !isImplicit { + isTemporaryWorkPath := containsTemporaryPathSegment(folder) + if !isImplicit && !isTemporaryWorkPath { glog.V(4).Infof("EmptyFolderCleaner: folder %s is not marked as implicit, skipping", folder) return } @@ -401,6 +409,19 @@ func isUnderBucketPath(directory, bucketPath string) bool { return directoryDepth >= bucketPathDepth+2 } +func containsTemporaryPathSegment(path string) bool { + trimmed := strings.Trim(path, "/") + if trimmed == "" { + return false + } + for _, segment := range strings.Split(trimmed, "/") { + if segment == "_temporary" { + return true + } + } + return false +} + // cacheEvictionLoop periodically removes stale entries from folderCounts func (efc *EmptyFolderCleaner) cacheEvictionLoop() { ticker := time.NewTicker(efc.cacheExpiry) diff --git a/weed/filer/empty_folder_cleanup/empty_folder_cleaner_test.go b/weed/filer/empty_folder_cleanup/empty_folder_cleaner_test.go index 1c62a5dd1..b23a74488 100644 --- a/weed/filer/empty_folder_cleanup/empty_folder_cleaner_test.go +++ b/weed/filer/empty_folder_cleanup/empty_folder_cleaner_test.go @@ -66,6 +66,29 @@ func Test_isUnderBucketPath(t *testing.T) { } } +func Test_containsTemporaryPathSegment(t *testing.T) { + tests := []struct { + name string + path string + expected bool + }{ + {"spark temporary root", "/buckets/mybucket/output/_temporary", true}, + {"spark temporary nested", "/buckets/mybucket/output/_temporary/0/task", true}, + {"no temporary segment", "/buckets/mybucket/output/temp", false}, + {"empty path", "", false}, + {"root path", "/", false}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := containsTemporaryPathSegment(tt.path) + if result != tt.expected { + t.Errorf("containsTemporaryPathSegment(%q) = %v, want %v", tt.path, result, tt.expected) + } + }) + } +} + func TestEmptyFolderCleaner_ownsFolder(t *testing.T) { // Create a LockRing with multiple servers lockRing := lock_manager.NewLockRing(5 * time.Second) diff --git a/weed/s3api/s3api_object_handlers_delete.go b/weed/s3api/s3api_object_handlers_delete.go index 8bf10646b..e91c909f2 100644 --- a/weed/s3api/s3api_object_handlers_delete.go +++ b/weed/s3api/s3api_object_handlers_delete.go @@ -2,9 +2,9 @@ package s3api import ( "encoding/xml" - "fmt" "io" "net/http" + "sort" "strings" "github.com/seaweedfs/seaweedfs/weed/filer" @@ -125,13 +125,15 @@ func (s3a *S3ApiServer) DeleteObjectHandler(w http.ResponseWriter, r *http.Reque return } - target := util.FullPath(fmt.Sprintf("%s/%s", s3a.bucketDir(bucket), object)) + target := util.NewFullPath(s3a.bucketDir(bucket), object) dir, name := target.DirAndName() err := s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { - return doDeleteEntry(client, dir, name, true, false) - // Note: Empty folder cleanup is now handled asynchronously by EmptyFolderCleaner - // which listens to metadata events and uses consistent hashing for coordination + if deleteErr := doDeleteEntry(client, dir, name, true, false); deleteErr != nil { + return deleteErr + } + s3a.cleanupTemporaryParentDirectories(client, bucket, object) + return nil }) if err != nil { @@ -211,6 +213,13 @@ func (s3a *S3ApiServer) DeleteMultipleObjectsHandler(w http.ResponseWriter, r *h var deletedObjects []ObjectIdentifier var deleteErrors []DeleteError var auditLog *s3err.AccessLog + type pendingDirectoryDelete struct { + key string + parent string + name string + } + var pendingDirectoryDeletes []pendingDirectoryDelete + pendingDirectoryDeleteSeen := make(map[string]struct{}) if s3err.Logger != nil { auditLog = s3err.GetAccessLog(r, http.StatusNoContent, s3err.ErrNone) @@ -340,19 +349,28 @@ func (s3a *S3ApiServer) DeleteMultipleObjectsHandler(w http.ResponseWriter, r *h } } else { // Handle non-versioned delete (original logic) - lastSeparator := strings.LastIndex(object.Key, "/") - parentDirectoryPath, entryName, isDeleteData, isRecursive := "", object.Key, true, false - if lastSeparator > 0 && lastSeparator+1 < len(object.Key) { - entryName = object.Key[lastSeparator+1:] - parentDirectoryPath = object.Key[:lastSeparator] - } - parentDirectoryPath = fmt.Sprintf("%s/%s", s3a.bucketDir(bucket), parentDirectoryPath) + target := util.NewFullPath(s3a.bucketDir(bucket), object.Key) + parentDirectoryPath, entryName := target.DirAndName() + isDeleteData, isRecursive := true, false err := doDeleteEntry(client, parentDirectoryPath, entryName, isDeleteData, isRecursive) if err == nil { deletedObjects = append(deletedObjects, object) + s3a.cleanupTemporaryParentDirectories(client, bucket, object.Key) } else if strings.Contains(err.Error(), filer.MsgFailDelNonEmptyFolder) { deletedObjects = append(deletedObjects, object) + s3a.cleanupTemporaryParentDirectories(client, bucket, object.Key) + if entryName != "" { + normalizedKey := strings.TrimSuffix(object.Key, "/") + if _, seen := pendingDirectoryDeleteSeen[normalizedKey]; !seen { + pendingDirectoryDeleteSeen[normalizedKey] = struct{}{} + pendingDirectoryDeletes = append(pendingDirectoryDeletes, pendingDirectoryDelete{ + key: normalizedKey, + parent: parentDirectoryPath, + name: entryName, + }) + } + } } else { deleteErrors = append(deleteErrors, DeleteError{ Code: "", @@ -369,6 +387,22 @@ func (s3a *S3ApiServer) DeleteMultipleObjectsHandler(w http.ResponseWriter, r *h } } + if len(pendingDirectoryDeletes) > 0 { + sort.Slice(pendingDirectoryDeletes, func(i, j int) bool { + return len(pendingDirectoryDeletes[i].key) > len(pendingDirectoryDeletes[j].key) + }) + for _, pending := range pendingDirectoryDeletes { + retryErr := doDeleteEntry(client, pending.parent, pending.name, true, false) + if retryErr == nil { + continue + } + if strings.Contains(retryErr.Error(), filer.MsgFailDelNonEmptyFolder) || strings.Contains(retryErr.Error(), filer_pb.ErrNotFound.Error()) { + continue + } + glog.V(2).Infof("DeleteMultipleObjectsHandler: retry delete failed for %s: %v", pending.key, retryErr) + } + } + // Note: Empty folder cleanup is now handled asynchronously by EmptyFolderCleaner // which listens to metadata events and uses consistent hashing for coordination @@ -386,3 +420,51 @@ func (s3a *S3ApiServer) DeleteMultipleObjectsHandler(w http.ResponseWriter, r *h writeSuccessResponseXML(w, r, deleteResp) } + +func (s3a *S3ApiServer) cleanupTemporaryParentDirectories(client filer_pb.SeaweedFilerClient, bucket, objectKey string) { + normalizedKey := strings.Trim(strings.TrimSpace(objectKey), "/") + if normalizedKey == "" || !containsTemporaryPathSegment(normalizedKey) { + return + } + + target := util.NewFullPath(s3a.bucketDir(bucket), normalizedKey) + parentDirectoryPath, _ := target.DirAndName() + bucketRoot := s3a.bucketDir(bucket) + + for parentDirectoryPath != "" && parentDirectoryPath != "/" && parentDirectoryPath != bucketRoot { + relativeParent := strings.TrimPrefix(parentDirectoryPath, bucketRoot) + relativeParent = strings.TrimPrefix(relativeParent, "/") + if !containsTemporaryPathSegment(relativeParent) { + return + } + + grandParent, directoryName := util.FullPath(parentDirectoryPath).DirAndName() + if directoryName == "" { + return + } + + err := doDeleteEntry(client, grandParent, directoryName, true, false) + if err == nil { + parentDirectoryPath = grandParent + continue + } + if strings.Contains(err.Error(), filer.MsgFailDelNonEmptyFolder) { + return + } + if strings.Contains(err.Error(), filer_pb.ErrNotFound.Error()) { + parentDirectoryPath = grandParent + continue + } + glog.V(2).Infof("cleanupTemporaryParentDirectories: failed deleting %s/%s: %v", grandParent, directoryName, err) + return + } +} + +func containsTemporaryPathSegment(path string) bool { + for _, segment := range strings.Split(strings.Trim(path, "/"), "/") { + if segment == "_temporary" { + return true + } + } + return false +} diff --git a/weed/s3api/s3api_object_handlers_put.go b/weed/s3api/s3api_object_handlers_put.go index 254885f22..83955047f 100644 --- a/weed/s3api/s3api_object_handlers_put.go +++ b/weed/s3api/s3api_object_handlers_put.go @@ -125,36 +125,43 @@ func (s3a *S3ApiServer) PutObjectHandler(w http.ResponseWriter, r *http.Request) if strings.HasSuffix(object, "/") && r.ContentLength <= 1024 { // Split the object into directory path and name objectWithoutSlash := strings.TrimSuffix(object, "/") - dirName := path.Dir(objectWithoutSlash) - entryName := path.Base(objectWithoutSlash) + if containsTemporaryPathSegment(objectWithoutSlash) { + // Spark and Hadoop committers may create explicit "_temporary" directory markers. + // Persisting these markers can accumulate stale empty directories. + // Skip materializing temporary markers and rely on implicit directories from actual object writes. + glog.V(3).Infof("PutObjectHandler: skipping temporary directory marker %s/%s", bucket, object) + } else { + dirName := path.Dir(objectWithoutSlash) + entryName := path.Base(objectWithoutSlash) - if dirName == "." { - dirName = "" - } - dirName = strings.TrimPrefix(dirName, "/") + if dirName == "." { + dirName = "" + } + dirName = strings.TrimPrefix(dirName, "/") - // Construct full directory path - fullDirPath := s3a.bucketDir(bucket) - if dirName != "" { - fullDirPath = fullDirPath + "/" + dirName - } + // Construct full directory path + fullDirPath := s3a.bucketDir(bucket) + if dirName != "" { + fullDirPath = fullDirPath + "/" + dirName + } - if err := s3a.mkdir( - fullDirPath, entryName, - func(entry *filer_pb.Entry) { - if objectContentType == "" { - objectContentType = s3_constants.FolderMimeType - } - if r.ContentLength > 0 { - entry.Content, _ = io.ReadAll(r.Body) - } - entry.Attributes.Mime = objectContentType + if err := s3a.mkdir( + fullDirPath, entryName, + func(entry *filer_pb.Entry) { + if objectContentType == "" { + objectContentType = s3_constants.FolderMimeType + } + if r.ContentLength > 0 { + entry.Content, _ = io.ReadAll(r.Body) + } + entry.Attributes.Mime = objectContentType - // Set object owner for directory objects (same as regular objects) - s3a.setObjectOwnerFromRequest(r, bucket, entry) - }); err != nil { - s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) - return + // Set object owner for directory objects (same as regular objects) + s3a.setObjectOwnerFromRequest(r, bucket, entry) + }); err != nil { + s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) + return + } } } else { // Get detailed versioning state for the bucket