Browse Source

Revert synchronous folder pruning and add cleanup diagnostics

pull/8292/head
Chris Lu 16 hours ago
parent
commit
d40154f36e
  1. 66
      weed/filer/empty_folder_cleanup/empty_folder_cleaner.go
  2. 91
      weed/s3api/s3api_object_handlers_delete.go

66
weed/filer/empty_folder_cleanup/empty_folder_cleaner.go

@ -9,6 +9,8 @@ import (
"github.com/seaweedfs/seaweedfs/weed/cluster/lock_manager" "github.com/seaweedfs/seaweedfs/weed/cluster/lock_manager"
"github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
"github.com/seaweedfs/seaweedfs/weed/util" "github.com/seaweedfs/seaweedfs/weed/util"
) )
@ -159,10 +161,8 @@ func (efc *EmptyFolderCleaner) OnDeleteEvent(directory string, entryName string,
return return
} }
// Queue with an age that is immediately eligible at next processor tick.
// This keeps empty-folder cleanup responsive while preserving queue ordering/dedup.
queueTime := eventTime.Add(-efc.cleanupQueue.maxAge)
if efc.cleanupQueue.Add(directory, queueTime) {
// Add to cleanup queue with event time (handles out-of-order events)
if efc.cleanupQueue.Add(directory, eventTime) {
glog.V(3).Infof("EmptyFolderCleaner: queued %s for cleanup", directory) glog.V(3).Infof("EmptyFolderCleaner: queued %s for cleanup", directory)
} }
} }
@ -214,6 +214,8 @@ func (efc *EmptyFolderCleaner) cleanupProcessor() {
func (efc *EmptyFolderCleaner) processCleanupQueue() { func (efc *EmptyFolderCleaner) processCleanupQueue() {
// Check if we should process // Check if we should process
if !efc.cleanupQueue.ShouldProcess() { if !efc.cleanupQueue.ShouldProcess() {
glog.V(4).Infof("EmptyFolderCleaner: skipping queue processing (len=%d, oldest_age=%v, max_size=%d, max_age=%v)",
efc.cleanupQueue.Len(), efc.cleanupQueue.OldestAge(), efc.cleanupQueue.maxSize, efc.cleanupQueue.maxAge)
return return
} }
@ -269,8 +271,62 @@ func (efc *EmptyFolderCleaner) executeCleanup(folder string) {
return return
} }
// Check if folder is actually empty (count up to maxCountCheck)
// Check for explicit implicit_dir attribute
// First check cache
ctx := context.Background() ctx := context.Background()
efc.mu.RLock()
var cachedImplicit *bool
if state, exists := efc.folderCounts[folder]; exists {
cachedImplicit = state.isImplicit
}
efc.mu.RUnlock()
var isImplicit bool
implicitSource := "cache"
implicitAttr := "<cached>"
if cachedImplicit != nil {
isImplicit = *cachedImplicit
if isImplicit {
implicitAttr = "true"
} else {
implicitAttr = "false"
}
} else {
implicitSource = "filer"
// Not cached, check filer
attrs, err := efc.filer.GetEntryAttributes(ctx, util.FullPath(folder))
if err != nil {
if err == filer_pb.ErrNotFound {
return
}
glog.V(2).Infof("EmptyFolderCleaner: error getting attributes for %s: %v", folder, err)
return
}
isImplicit = attrs != nil && string(attrs[s3_constants.ExtS3ImplicitDir]) == "true"
if attrs == nil {
implicitAttr = "<no_attrs>"
} else if value, found := attrs[s3_constants.ExtS3ImplicitDir]; found {
implicitAttr = string(value)
} else {
implicitAttr = "<missing>"
}
// Update cache
efc.mu.Lock()
if _, exists := efc.folderCounts[folder]; !exists {
efc.folderCounts[folder] = &folderState{}
}
efc.folderCounts[folder].isImplicit = &isImplicit
efc.mu.Unlock()
}
if !isImplicit {
glog.V(2).Infof("EmptyFolderCleaner: folder %s is not marked as implicit (source=%s attr=%s), skipping", folder, implicitSource, implicitAttr)
return
}
// Check if folder is actually empty (count up to maxCountCheck)
count, err := efc.countItems(ctx, folder) count, err := efc.countItems(ctx, folder)
if err != nil { if err != nil {
glog.V(2).Infof("EmptyFolderCleaner: error counting items in %s: %v", folder, err) glog.V(2).Infof("EmptyFolderCleaner: error counting items in %s: %v", folder, err)

91
weed/s3api/s3api_object_handlers_delete.go

@ -2,9 +2,9 @@ package s3api
import ( import (
"encoding/xml" "encoding/xml"
"fmt"
"io" "io"
"net/http" "net/http"
"sort"
"strings" "strings"
"github.com/seaweedfs/seaweedfs/weed/filer" "github.com/seaweedfs/seaweedfs/weed/filer"
@ -125,15 +125,13 @@ func (s3a *S3ApiServer) DeleteObjectHandler(w http.ResponseWriter, r *http.Reque
return return
} }
target := util.NewFullPath(s3a.bucketDir(bucket), object)
target := util.FullPath(fmt.Sprintf("%s/%s", s3a.bucketDir(bucket), object))
dir, name := target.DirAndName() dir, name := target.DirAndName()
err := s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { err := s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
if deleteErr := doDeleteEntry(client, dir, name, true, false); deleteErr != nil {
return deleteErr
}
s3a.cleanupEmptyParentDirectories(client, bucket, object)
return nil
return doDeleteEntry(client, dir, name, true, false)
// Note: Empty folder cleanup is now handled asynchronously by EmptyFolderCleaner
// which listens to metadata events and uses consistent hashing for coordination
}) })
if err != nil { if err != nil {
@ -213,13 +211,6 @@ func (s3a *S3ApiServer) DeleteMultipleObjectsHandler(w http.ResponseWriter, r *h
var deletedObjects []ObjectIdentifier var deletedObjects []ObjectIdentifier
var deleteErrors []DeleteError var deleteErrors []DeleteError
var auditLog *s3err.AccessLog var auditLog *s3err.AccessLog
type pendingDirectoryDelete struct {
key string
parent string
name string
}
var pendingDirectoryDeletes []pendingDirectoryDelete
pendingDirectoryDeleteSeen := make(map[string]struct{})
if s3err.Logger != nil { if s3err.Logger != nil {
auditLog = s3err.GetAccessLog(r, http.StatusNoContent, s3err.ErrNone) auditLog = s3err.GetAccessLog(r, http.StatusNoContent, s3err.ErrNone)
@ -349,28 +340,19 @@ func (s3a *S3ApiServer) DeleteMultipleObjectsHandler(w http.ResponseWriter, r *h
} }
} else { } else {
// Handle non-versioned delete (original logic) // Handle non-versioned delete (original logic)
target := util.NewFullPath(s3a.bucketDir(bucket), object.Key)
parentDirectoryPath, entryName := target.DirAndName()
isDeleteData, isRecursive := true, false
lastSeparator := strings.LastIndex(object.Key, "/")
parentDirectoryPath, entryName, isDeleteData, isRecursive := "", object.Key, true, false
if lastSeparator > 0 && lastSeparator+1 < len(object.Key) {
entryName = object.Key[lastSeparator+1:]
parentDirectoryPath = object.Key[:lastSeparator]
}
parentDirectoryPath = fmt.Sprintf("%s/%s", s3a.bucketDir(bucket), parentDirectoryPath)
err := doDeleteEntry(client, parentDirectoryPath, entryName, isDeleteData, isRecursive) err := doDeleteEntry(client, parentDirectoryPath, entryName, isDeleteData, isRecursive)
if err == nil { if err == nil {
deletedObjects = append(deletedObjects, object) deletedObjects = append(deletedObjects, object)
s3a.cleanupEmptyParentDirectories(client, bucket, object.Key)
} else if strings.Contains(err.Error(), filer.MsgFailDelNonEmptyFolder) { } else if strings.Contains(err.Error(), filer.MsgFailDelNonEmptyFolder) {
deletedObjects = append(deletedObjects, object) deletedObjects = append(deletedObjects, object)
s3a.cleanupEmptyParentDirectories(client, bucket, object.Key)
if entryName != "" {
normalizedKey := strings.TrimSuffix(object.Key, "/")
if _, seen := pendingDirectoryDeleteSeen[normalizedKey]; !seen {
pendingDirectoryDeleteSeen[normalizedKey] = struct{}{}
pendingDirectoryDeletes = append(pendingDirectoryDeletes, pendingDirectoryDelete{
key: normalizedKey,
parent: parentDirectoryPath,
name: entryName,
})
}
}
} else { } else {
deleteErrors = append(deleteErrors, DeleteError{ deleteErrors = append(deleteErrors, DeleteError{
Code: "", Code: "",
@ -387,22 +369,6 @@ func (s3a *S3ApiServer) DeleteMultipleObjectsHandler(w http.ResponseWriter, r *h
} }
} }
if len(pendingDirectoryDeletes) > 0 {
sort.Slice(pendingDirectoryDeletes, func(i, j int) bool {
return len(pendingDirectoryDeletes[i].key) > len(pendingDirectoryDeletes[j].key)
})
for _, pending := range pendingDirectoryDeletes {
retryErr := doDeleteEntry(client, pending.parent, pending.name, true, false)
if retryErr == nil {
continue
}
if strings.Contains(retryErr.Error(), filer.MsgFailDelNonEmptyFolder) || strings.Contains(retryErr.Error(), filer_pb.ErrNotFound.Error()) {
continue
}
glog.V(2).Infof("DeleteMultipleObjectsHandler: retry delete failed for %s: %v", pending.key, retryErr)
}
}
// Note: Empty folder cleanup is now handled asynchronously by EmptyFolderCleaner // Note: Empty folder cleanup is now handled asynchronously by EmptyFolderCleaner
// which listens to metadata events and uses consistent hashing for coordination // which listens to metadata events and uses consistent hashing for coordination
@ -420,36 +386,3 @@ func (s3a *S3ApiServer) DeleteMultipleObjectsHandler(w http.ResponseWriter, r *h
writeSuccessResponseXML(w, r, deleteResp) writeSuccessResponseXML(w, r, deleteResp)
} }
func (s3a *S3ApiServer) cleanupEmptyParentDirectories(client filer_pb.SeaweedFilerClient, bucket, objectKey string) {
normalizedKey := strings.Trim(strings.TrimSpace(objectKey), "/")
if normalizedKey == "" {
return
}
target := util.NewFullPath(s3a.bucketDir(bucket), normalizedKey)
parentDirectoryPath, _ := target.DirAndName()
bucketRoot := s3a.bucketDir(bucket)
for parentDirectoryPath != "" && parentDirectoryPath != "/" && parentDirectoryPath != bucketRoot {
grandParent, directoryName := util.FullPath(parentDirectoryPath).DirAndName()
if directoryName == "" {
return
}
err := doDeleteEntry(client, grandParent, directoryName, true, false)
if err == nil {
parentDirectoryPath = grandParent
continue
}
if strings.Contains(err.Error(), filer.MsgFailDelNonEmptyFolder) {
return
}
if strings.Contains(err.Error(), filer_pb.ErrNotFound.Error()) {
parentDirectoryPath = grandParent
continue
}
glog.V(2).Infof("cleanupEmptyParentDirectories: failed deleting %s/%s: %v", grandParent, directoryName, err)
return
}
}
Loading…
Cancel
Save