Browse Source

Fix Spark _temporary cleanup and add issue #8285 regression test

pull/8292/head
Chris Lu 1 day ago
parent
commit
692b3a6e07
  1. 180
      test/s3/spark/issue_8285_repro_test.go
  2. 25
      weed/filer/empty_folder_cleanup/empty_folder_cleaner.go
  3. 23
      weed/filer/empty_folder_cleanup/empty_folder_cleaner_test.go
  4. 106
      weed/s3api/s3api_object_handlers_delete.go
  5. 59
      weed/s3api/s3api_object_handlers_put.go

180
test/s3/spark/issue_8285_repro_test.go

@ -0,0 +1,180 @@
package spark
import (
"context"
"errors"
"fmt"
"strings"
"testing"
"time"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/credentials"
"github.com/aws/aws-sdk-go-v2/service/s3"
s3types "github.com/aws/aws-sdk-go-v2/service/s3/types"
)
func TestSparkS3TemporaryDirectoryCleanupIssue8285Regression(t *testing.T) {
if testing.Short() {
t.Skip("Skipping Spark integration test in short mode")
}
env := setupSparkIssue8234Env(t)
script := `
import pyspark.sql.functions as F
target = "s3a://test/issue-8285/output"
spark.conf.set("spark.hadoop.fs.s3a.committer.name", "directory")
spark.conf.set("spark.hadoop.fs.s3a.committer.magic.enabled", "false")
spark.conf.set("spark.hadoop.fs.s3a.committer.staging.abort.pending.uploads", "true")
spark.conf.set("spark.hadoop.fs.s3a.committer.staging.conflict-mode", "append")
spark.conf.set("spark.hadoop.fs.s3a.committer.staging.tmp.path", "/tmp")
spark.conf.set("spark.hadoop.fs.s3a.directory.marker.retention", "keep")
df = spark.range(0, 200).repartition(12).withColumn("value", F.col("id") * 2)
df.write.format("parquet").mode("overwrite").save(target)
count = spark.read.parquet(target).count()
print("WRITE_COUNT=" + str(count))
`
code, output := runSparkPyScript(t, env.sparkContainer, script, env.s3Port)
if code != 0 {
t.Fatalf("Spark script exited with code %d; output:\n%s", code, output)
}
if !strings.Contains(output, "WRITE_COUNT=200") {
t.Fatalf("expected write/read success marker in output, got:\n%s", output)
}
keys := listObjectKeysByPrefix(t, env, "test", "issue-8285/")
var temporaryKeys []string
for _, key := range keys {
if hasTemporaryPathSegment(key) {
temporaryKeys = append(temporaryKeys, key)
}
}
if len(temporaryKeys) > 0 {
t.Fatalf("issue #8285 regression detected: found lingering _temporary artifacts: %v\nall keys: %v", temporaryKeys, keys)
}
temporaryCandidates := []string{
"issue-8285/output/_temporary/",
"issue-8285/output/_temporary/0/",
"issue-8285/output/_temporary/0/_temporary/",
}
lingering := waitForObjectsToDisappear(t, env, "test", temporaryCandidates, 35*time.Second)
if len(lingering) > 0 {
t.Fatalf("issue #8285 regression detected: lingering temporary directories: %v", lingering)
}
}
func listObjectKeysByPrefix(t *testing.T, env *TestEnvironment, bucketName, prefix string) []string {
t.Helper()
client := newS3Client(env)
pager := s3.NewListObjectsV2Paginator(client, &s3.ListObjectsV2Input{
Bucket: aws.String(bucketName),
Prefix: aws.String(prefix),
})
var keys []string
for pager.HasMorePages() {
page, err := pager.NextPage(context.Background())
if err != nil {
t.Fatalf("failed listing objects for prefix %q: %v", prefix, err)
}
for _, object := range page.Contents {
keys = append(keys, aws.ToString(object.Key))
}
}
return keys
}
func headObjectInfo(t *testing.T, env *TestEnvironment, bucketName, key string) (bool, string, error) {
t.Helper()
client := newS3Client(env)
output, err := client.HeadObject(context.Background(), &s3.HeadObjectInput{
Bucket: aws.String(bucketName),
Key: aws.String(key),
})
if err == nil {
return true, aws.ToString(output.ContentType), nil
}
var notFound *s3types.NotFound
if strings.Contains(err.Error(), "NotFound") || strings.Contains(err.Error(), "NoSuchKey") || errors.As(err, &notFound) {
return false, "", nil
}
return false, "", err
}
func waitForObjectsToDisappear(t *testing.T, env *TestEnvironment, bucketName string, keys []string, timeout time.Duration) []string {
t.Helper()
deadline := time.Now().Add(timeout)
pending := make(map[string]struct{}, len(keys))
details := make(map[string]string, len(keys))
for _, key := range keys {
pending[key] = struct{}{}
}
for len(pending) > 0 && time.Now().Before(deadline) {
for key := range pending {
exists, contentType, err := headObjectInfo(t, env, bucketName, key)
if err != nil {
details[key] = fmt.Sprintf("%s (head_error=%v)", key, err)
continue
}
if !exists {
delete(pending, key)
delete(details, key)
continue
}
details[key] = fmt.Sprintf("%s (exists=true, contentType=%q)", key, contentType)
}
if len(pending) > 0 {
time.Sleep(2 * time.Second)
}
}
if len(pending) == 0 {
return nil
}
var lingering []string
for _, key := range keys {
if _, ok := pending[key]; !ok {
continue
}
if detail, hasDetail := details[key]; hasDetail {
lingering = append(lingering, detail)
} else {
lingering = append(lingering, key)
}
}
return lingering
}
func newS3Client(env *TestEnvironment) *s3.Client {
cfg := aws.Config{
Region: "us-east-1",
Credentials: aws.NewCredentialsCache(credentials.NewStaticCredentialsProvider(env.accessKey, env.secretKey, "")),
BaseEndpoint: aws.String(fmt.Sprintf("http://localhost:%d", env.s3Port)),
}
return s3.NewFromConfig(cfg, func(o *s3.Options) {
o.UsePathStyle = true
})
}
func hasTemporaryPathSegment(key string) bool {
for _, segment := range strings.Split(strings.TrimSuffix(key, "/"), "/") {
if segment == "_temporary" {
return true
}
}
return false
}

25
weed/filer/empty_folder_cleanup/empty_folder_cleaner.go

@ -161,8 +161,15 @@ func (efc *EmptyFolderCleaner) OnDeleteEvent(directory string, entryName string,
return
}
// For Spark-style temporary folders, prioritize cleanup on the next processor tick.
// These paths are expected to be ephemeral and can otherwise accumulate quickly.
queueTime := eventTime
if containsTemporaryPathSegment(directory) {
queueTime = eventTime.Add(-efc.cleanupQueue.maxAge)
}
// Add to cleanup queue with event time (handles out-of-order events)
if efc.cleanupQueue.Add(directory, eventTime) {
if efc.cleanupQueue.Add(directory, queueTime) {
glog.V(3).Infof("EmptyFolderCleaner: queued %s for cleanup", directory)
}
}
@ -304,7 +311,8 @@ func (efc *EmptyFolderCleaner) executeCleanup(folder string) {
efc.mu.Unlock()
}
if !isImplicit {
isTemporaryWorkPath := containsTemporaryPathSegment(folder)
if !isImplicit && !isTemporaryWorkPath {
glog.V(4).Infof("EmptyFolderCleaner: folder %s is not marked as implicit, skipping", folder)
return
}
@ -401,6 +409,19 @@ func isUnderBucketPath(directory, bucketPath string) bool {
return directoryDepth >= bucketPathDepth+2
}
func containsTemporaryPathSegment(path string) bool {
trimmed := strings.Trim(path, "/")
if trimmed == "" {
return false
}
for _, segment := range strings.Split(trimmed, "/") {
if segment == "_temporary" {
return true
}
}
return false
}
// cacheEvictionLoop periodically removes stale entries from folderCounts
func (efc *EmptyFolderCleaner) cacheEvictionLoop() {
ticker := time.NewTicker(efc.cacheExpiry)

23
weed/filer/empty_folder_cleanup/empty_folder_cleaner_test.go

@ -66,6 +66,29 @@ func Test_isUnderBucketPath(t *testing.T) {
}
}
func Test_containsTemporaryPathSegment(t *testing.T) {
tests := []struct {
name string
path string
expected bool
}{
{"spark temporary root", "/buckets/mybucket/output/_temporary", true},
{"spark temporary nested", "/buckets/mybucket/output/_temporary/0/task", true},
{"no temporary segment", "/buckets/mybucket/output/temp", false},
{"empty path", "", false},
{"root path", "/", false},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result := containsTemporaryPathSegment(tt.path)
if result != tt.expected {
t.Errorf("containsTemporaryPathSegment(%q) = %v, want %v", tt.path, result, tt.expected)
}
})
}
}
func TestEmptyFolderCleaner_ownsFolder(t *testing.T) {
// Create a LockRing with multiple servers
lockRing := lock_manager.NewLockRing(5 * time.Second)

106
weed/s3api/s3api_object_handlers_delete.go

@ -2,9 +2,9 @@ package s3api
import (
"encoding/xml"
"fmt"
"io"
"net/http"
"sort"
"strings"
"github.com/seaweedfs/seaweedfs/weed/filer"
@ -125,13 +125,15 @@ func (s3a *S3ApiServer) DeleteObjectHandler(w http.ResponseWriter, r *http.Reque
return
}
target := util.FullPath(fmt.Sprintf("%s/%s", s3a.bucketDir(bucket), object))
target := util.NewFullPath(s3a.bucketDir(bucket), object)
dir, name := target.DirAndName()
err := s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
return doDeleteEntry(client, dir, name, true, false)
// Note: Empty folder cleanup is now handled asynchronously by EmptyFolderCleaner
// which listens to metadata events and uses consistent hashing for coordination
if deleteErr := doDeleteEntry(client, dir, name, true, false); deleteErr != nil {
return deleteErr
}
s3a.cleanupTemporaryParentDirectories(client, bucket, object)
return nil
})
if err != nil {
@ -211,6 +213,13 @@ func (s3a *S3ApiServer) DeleteMultipleObjectsHandler(w http.ResponseWriter, r *h
var deletedObjects []ObjectIdentifier
var deleteErrors []DeleteError
var auditLog *s3err.AccessLog
type pendingDirectoryDelete struct {
key string
parent string
name string
}
var pendingDirectoryDeletes []pendingDirectoryDelete
pendingDirectoryDeleteSeen := make(map[string]struct{})
if s3err.Logger != nil {
auditLog = s3err.GetAccessLog(r, http.StatusNoContent, s3err.ErrNone)
@ -340,19 +349,28 @@ func (s3a *S3ApiServer) DeleteMultipleObjectsHandler(w http.ResponseWriter, r *h
}
} else {
// Handle non-versioned delete (original logic)
lastSeparator := strings.LastIndex(object.Key, "/")
parentDirectoryPath, entryName, isDeleteData, isRecursive := "", object.Key, true, false
if lastSeparator > 0 && lastSeparator+1 < len(object.Key) {
entryName = object.Key[lastSeparator+1:]
parentDirectoryPath = object.Key[:lastSeparator]
}
parentDirectoryPath = fmt.Sprintf("%s/%s", s3a.bucketDir(bucket), parentDirectoryPath)
target := util.NewFullPath(s3a.bucketDir(bucket), object.Key)
parentDirectoryPath, entryName := target.DirAndName()
isDeleteData, isRecursive := true, false
err := doDeleteEntry(client, parentDirectoryPath, entryName, isDeleteData, isRecursive)
if err == nil {
deletedObjects = append(deletedObjects, object)
s3a.cleanupTemporaryParentDirectories(client, bucket, object.Key)
} else if strings.Contains(err.Error(), filer.MsgFailDelNonEmptyFolder) {
deletedObjects = append(deletedObjects, object)
s3a.cleanupTemporaryParentDirectories(client, bucket, object.Key)
if entryName != "" {
normalizedKey := strings.TrimSuffix(object.Key, "/")
if _, seen := pendingDirectoryDeleteSeen[normalizedKey]; !seen {
pendingDirectoryDeleteSeen[normalizedKey] = struct{}{}
pendingDirectoryDeletes = append(pendingDirectoryDeletes, pendingDirectoryDelete{
key: normalizedKey,
parent: parentDirectoryPath,
name: entryName,
})
}
}
} else {
deleteErrors = append(deleteErrors, DeleteError{
Code: "",
@ -369,6 +387,22 @@ func (s3a *S3ApiServer) DeleteMultipleObjectsHandler(w http.ResponseWriter, r *h
}
}
if len(pendingDirectoryDeletes) > 0 {
sort.Slice(pendingDirectoryDeletes, func(i, j int) bool {
return len(pendingDirectoryDeletes[i].key) > len(pendingDirectoryDeletes[j].key)
})
for _, pending := range pendingDirectoryDeletes {
retryErr := doDeleteEntry(client, pending.parent, pending.name, true, false)
if retryErr == nil {
continue
}
if strings.Contains(retryErr.Error(), filer.MsgFailDelNonEmptyFolder) || strings.Contains(retryErr.Error(), filer_pb.ErrNotFound.Error()) {
continue
}
glog.V(2).Infof("DeleteMultipleObjectsHandler: retry delete failed for %s: %v", pending.key, retryErr)
}
}
// Note: Empty folder cleanup is now handled asynchronously by EmptyFolderCleaner
// which listens to metadata events and uses consistent hashing for coordination
@ -386,3 +420,51 @@ func (s3a *S3ApiServer) DeleteMultipleObjectsHandler(w http.ResponseWriter, r *h
writeSuccessResponseXML(w, r, deleteResp)
}
func (s3a *S3ApiServer) cleanupTemporaryParentDirectories(client filer_pb.SeaweedFilerClient, bucket, objectKey string) {
normalizedKey := strings.Trim(strings.TrimSpace(objectKey), "/")
if normalizedKey == "" || !containsTemporaryPathSegment(normalizedKey) {
return
}
target := util.NewFullPath(s3a.bucketDir(bucket), normalizedKey)
parentDirectoryPath, _ := target.DirAndName()
bucketRoot := s3a.bucketDir(bucket)
for parentDirectoryPath != "" && parentDirectoryPath != "/" && parentDirectoryPath != bucketRoot {
relativeParent := strings.TrimPrefix(parentDirectoryPath, bucketRoot)
relativeParent = strings.TrimPrefix(relativeParent, "/")
if !containsTemporaryPathSegment(relativeParent) {
return
}
grandParent, directoryName := util.FullPath(parentDirectoryPath).DirAndName()
if directoryName == "" {
return
}
err := doDeleteEntry(client, grandParent, directoryName, true, false)
if err == nil {
parentDirectoryPath = grandParent
continue
}
if strings.Contains(err.Error(), filer.MsgFailDelNonEmptyFolder) {
return
}
if strings.Contains(err.Error(), filer_pb.ErrNotFound.Error()) {
parentDirectoryPath = grandParent
continue
}
glog.V(2).Infof("cleanupTemporaryParentDirectories: failed deleting %s/%s: %v", grandParent, directoryName, err)
return
}
}
func containsTemporaryPathSegment(path string) bool {
for _, segment := range strings.Split(strings.Trim(path, "/"), "/") {
if segment == "_temporary" {
return true
}
}
return false
}

59
weed/s3api/s3api_object_handlers_put.go

@ -125,36 +125,43 @@ func (s3a *S3ApiServer) PutObjectHandler(w http.ResponseWriter, r *http.Request)
if strings.HasSuffix(object, "/") && r.ContentLength <= 1024 {
// Split the object into directory path and name
objectWithoutSlash := strings.TrimSuffix(object, "/")
dirName := path.Dir(objectWithoutSlash)
entryName := path.Base(objectWithoutSlash)
if containsTemporaryPathSegment(objectWithoutSlash) {
// Spark and Hadoop committers may create explicit "_temporary" directory markers.
// Persisting these markers can accumulate stale empty directories.
// Skip materializing temporary markers and rely on implicit directories from actual object writes.
glog.V(3).Infof("PutObjectHandler: skipping temporary directory marker %s/%s", bucket, object)
} else {
dirName := path.Dir(objectWithoutSlash)
entryName := path.Base(objectWithoutSlash)
if dirName == "." {
dirName = ""
}
dirName = strings.TrimPrefix(dirName, "/")
if dirName == "." {
dirName = ""
}
dirName = strings.TrimPrefix(dirName, "/")
// Construct full directory path
fullDirPath := s3a.bucketDir(bucket)
if dirName != "" {
fullDirPath = fullDirPath + "/" + dirName
}
// Construct full directory path
fullDirPath := s3a.bucketDir(bucket)
if dirName != "" {
fullDirPath = fullDirPath + "/" + dirName
}
if err := s3a.mkdir(
fullDirPath, entryName,
func(entry *filer_pb.Entry) {
if objectContentType == "" {
objectContentType = s3_constants.FolderMimeType
}
if r.ContentLength > 0 {
entry.Content, _ = io.ReadAll(r.Body)
}
entry.Attributes.Mime = objectContentType
if err := s3a.mkdir(
fullDirPath, entryName,
func(entry *filer_pb.Entry) {
if objectContentType == "" {
objectContentType = s3_constants.FolderMimeType
}
if r.ContentLength > 0 {
entry.Content, _ = io.ReadAll(r.Body)
}
entry.Attributes.Mime = objectContentType
// Set object owner for directory objects (same as regular objects)
s3a.setObjectOwnerFromRequest(r, bucket, entry)
}); err != nil {
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
return
// Set object owner for directory objects (same as regular objects)
s3a.setObjectOwnerFromRequest(r, bucket, entry)
}); err != nil {
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
return
}
}
} else {
// Get detailed versioning state for the bucket

Loading…
Cancel
Save