Browse Source

Fix Spark temp cleanup with implicit directory markers

pull/8292/head
Chris Lu 14 hours ago
parent
commit
2737e20916
  1. 2
      test/s3/spark/issue_8234_repro_test.go
  2. 32
      test/s3/spark/setup_test.go
  3. 5
      weed/filer/empty_folder_cleanup/empty_folder_cleaner.go
  4. 6
      weed/filer/empty_folder_cleanup/empty_folder_cleaner_test.go
  5. 6
      weed/s3api/s3api_object_handlers_put.go

2
test/s3/spark/issue_8234_repro_test.go

@ -14,7 +14,7 @@ func setupSparkIssue8234Env(t *testing.T) *TestEnvironment {
}
env.StartSeaweedFS(t)
t.Cleanup(func() { env.Cleanup() })
t.Cleanup(func() { env.Cleanup(t) })
createObjectBucket(t, env, "test")
env.startSparkContainer(t)

32
test/s3/spark/setup_test.go

@ -8,6 +8,7 @@ import (
"net"
"os"
"os/exec"
"path/filepath"
"sync"
"testing"
"time"
@ -53,6 +54,8 @@ type TestEnvironment struct {
dockerAvailable bool
weedBinary string
seaweedfsDataDir string
weedLogPath string
weedLogFile *os.File
masterPort int
filerPort int
s3Port int
@ -113,6 +116,15 @@ func (env *TestEnvironment) StartSeaweedFS(t *testing.T) {
"-s3.config", iamConfigPath,
"-dir", env.seaweedfsDataDir,
)
weedLogPath := filepath.Join(env.seaweedfsDataDir, "weed-mini.log")
weedLogFile, err := os.Create(weedLogPath)
if err != nil {
t.Fatalf("failed to create weed log file: %v", err)
}
env.weedLogPath = weedLogPath
env.weedLogFile = weedLogFile
env.masterProcess.Stdout = weedLogFile
env.masterProcess.Stderr = weedLogFile
env.masterProcess.Env = append(os.Environ(),
"AWS_ACCESS_KEY_ID="+env.accessKey,
"AWS_SECRET_ACCESS_KEY="+env.secretKey,
@ -160,12 +172,30 @@ func (env *TestEnvironment) startSparkContainer(t *testing.T) {
env.sparkContainer = container
}
func (env *TestEnvironment) Cleanup() {
func (env *TestEnvironment) Cleanup(t *testing.T) {
if env.masterProcess != nil && env.masterProcess.Process != nil {
_ = env.masterProcess.Process.Kill()
_ = env.masterProcess.Wait()
}
clearMiniProcess(env.masterProcess)
if env.weedLogFile != nil {
_ = env.weedLogFile.Close()
}
if t.Failed() && os.Getenv("CI") != "" && env.weedLogPath != "" {
logData, err := os.ReadFile(env.weedLogPath)
if err != nil {
t.Logf("failed to read weed mini log file %s: %v", env.weedLogPath, err)
} else {
// Print the tail to keep CI output manageable while preserving failure context.
const maxTailBytes = 64 * 1024
start := 0
if len(logData) > maxTailBytes {
start = len(logData) - maxTailBytes
}
t.Logf("weed mini logs (tail, %d bytes):\n%s", len(logData)-start, string(logData[start:]))
}
}
if env.sparkContainer != nil {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)

5
weed/filer/empty_folder_cleanup/empty_folder_cleaner.go

@ -319,9 +319,8 @@ func (efc *EmptyFolderCleaner) executeCleanup(folder string) {
}
if !isImplicit {
// Some S3 clients create directory markers without implicit-dir metadata.
// For delete-driven cleanup, still verify emptiness and remove if safe.
glog.Infof("EmptyFolderCleaner: folder %s is not marked as implicit (source=%s attr=%s), proceeding with delete-driven empty check", folder, implicitSource, implicitAttr)
glog.Infof("EmptyFolderCleaner: folder %s is not marked as implicit (source=%s attr=%s), skipping", folder, implicitSource, implicitAttr)
return
}
// Check if folder is actually empty (count up to maxCountCheck)

6
weed/filer/empty_folder_cleanup/empty_folder_cleaner_test.go

@ -644,7 +644,7 @@ func TestEmptyFolderCleaner_processCleanupQueue_drainsAllOnceTriggered(t *testin
}
}
func TestEmptyFolderCleaner_executeCleanup_missingImplicitAttributeStillDeletes(t *testing.T) {
func TestEmptyFolderCleaner_executeCleanup_missingImplicitAttributeSkips(t *testing.T) {
lockRing := lock_manager.NewLockRing(5 * time.Second)
lockRing.SetSnapshot([]pb.ServerAddress{"filer1:8888"})
@ -679,7 +679,7 @@ func TestEmptyFolderCleaner_executeCleanup_missingImplicitAttributeStillDeletes(
folder := "/buckets/test/folder"
cleaner.executeCleanup(folder)
if len(deleted) != 1 || deleted[0] != folder {
t.Fatalf("expected folder %s to be deleted, got %v", folder, deleted)
if len(deleted) != 0 {
t.Fatalf("expected folder %s to be skipped, got deletions %v", folder, deleted)
}
}

6
weed/s3api/s3api_object_handlers_put.go

@ -149,6 +149,12 @@ func (s3a *S3ApiServer) PutObjectHandler(w http.ResponseWriter, r *http.Request)
entry.Content, _ = io.ReadAll(r.Body)
}
entry.Attributes.Mime = objectContentType
if entry.Extended == nil {
entry.Extended = make(map[string][]byte, 1)
}
// Treat directory-marker PUT entries as implicit so async cleanup can prune
// them when all children are deleted.
entry.Extended[s3_constants.ExtS3ImplicitDir] = []byte("true")
// Set object owner for directory objects (same as regular objects)
s3a.setObjectOwnerFromRequest(r, bucket, entry)

Loading…
Cancel
Save