Chris Lu 2 days ago
committed by GitHub
parent
commit
9a2d640fb4
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
  1. 45
      weed/filer/filer.go
  2. 2
      weed/mq/offset/consumer_group_storage.go
  3. 2
      weed/pb/filer.proto
  4. 503
      weed/pb/filer_pb/filer.pb.go
  5. 22
      weed/pb/filer_pb/filer_client.go
  6. 4
      weed/pb/filer_pb/filer_grpc.pb.go
  7. 38
      weed/s3api/s3api_object_handlers_delete.go
  8. 24
      weed/server/filer_grpc_server.go
  9. 2
      weed/shell/command_remote_unmount.go

45
weed/filer/filer.go

@ -351,11 +351,17 @@ func (f *Filer) FindEntry(ctx context.Context, p util.FullPath) (entry *Entry, e
if entry.GetS3ExpireTime().Before(time.Now()) && !entry.IsS3Versioning() {
if delErr := f.doDeleteEntryMetaAndData(ctx, entry, true, false, nil); delErr != nil {
glog.ErrorfCtx(ctx, "FindEntry doDeleteEntryMetaAndData %s failed: %v", entry.FullPath, delErr)
// Return error to prevent serving expired content (safer than returning the entry)
return nil, fmt.Errorf("failed to delete expired entry %s: %w", entry.FullPath, delErr)
}
return nil, filer_pb.ErrNotFound
}
} else if entry.Crtime.Add(time.Duration(entry.TtlSec) * time.Second).Before(time.Now()) {
f.Store.DeleteOneEntry(ctx, entry)
if delErr := f.Store.DeleteOneEntry(ctx, entry); delErr != nil {
glog.ErrorfCtx(ctx, "FindEntry DeleteOneEntry %s failed: %v", entry.FullPath, delErr)
// Return error to prevent serving expired content (safer than returning the entry)
return nil, fmt.Errorf("failed to delete expired entry %s: %w", entry.FullPath, delErr)
}
return nil, filer_pb.ErrNotFound
}
}
@ -400,23 +406,46 @@ func (f *Filer) doListDirectoryEntries(ctx context.Context, p util.FullPath, sta
}
// Delete expired entries after iteration completes to avoid DB connection deadlock
// Use context.WithoutCancel to ensure cleanup completes even if request is cancelled
if len(s3ExpiredEntries) > 0 || len(expiredEntries) > 0 {
opCtx := context.WithoutCancel(ctx)
// Delete all expired entries first
deletedCount := 0
for _, entry := range s3ExpiredEntries {
if delErr := f.doDeleteEntryMetaAndData(ctx, entry, true, false, nil); delErr != nil {
if delErr := f.doDeleteEntryMetaAndData(opCtx, entry, true, false, nil); delErr != nil {
glog.ErrorfCtx(ctx, "doListDirectoryEntries doDeleteEntryMetaAndData %s failed: %v", entry.FullPath, delErr)
} else {
deletedCount++
}
}
for _, entry := range expiredEntries {
if delErr := f.Store.DeleteOneEntry(ctx, entry); delErr != nil {
if delErr := f.Store.DeleteOneEntry(opCtx, entry); delErr != nil {
glog.ErrorfCtx(ctx, "doListDirectoryEntries DeleteOneEntry %s failed: %v", entry.FullPath, delErr)
} else {
deletedCount++
}
}
// After expiring entries, the directory might be empty.
// Attempt to clean it up and any empty parent directories.
if !hasValidEntries && p != "/" && startFileName == "" {
stopAtPath := util.FullPath(f.DirBucketsPath)
f.DeleteEmptyParentDirectories(ctx, p, stopAtPath)
// After successfully expiring entries, check if directory is now empty and cleanup
// Only do this on first page (startFileName == "") to avoid partial directory states
// DeleteEmptyParentDirectories has built-in protection against deleting bucket directories
if deletedCount > 0 && !hasValidEntries && p != "/" && startFileName == "" {
glog.V(2).InfofCtx(ctx, "doListDirectoryEntries: deleted %d expired entries from %s, checking for empty directory cleanup", deletedCount, p)
// Determine appropriate stop path based on whether this is an S3 path
var stopAtPath util.FullPath
if strings.HasPrefix(string(p), f.DirBucketsPath+"/") {
// S3 path: stop at the bucket root (e.g., /buckets/mybucket)
pathAfterBuckets := strings.TrimPrefix(string(p), f.DirBucketsPath+"/")
bucketName, _, _ := strings.Cut(pathAfterBuckets, "/")
stopAtPath = util.NewFullPath(f.DirBucketsPath, bucketName)
} else {
// Non-S3 path: allow cleanup up to root
stopAtPath = "/"
}
f.DeleteEmptyParentDirectories(opCtx, p, stopAtPath)
}
}

2
weed/mq/offset/consumer_group_storage.go

@ -176,6 +176,6 @@ func (f *FilerConsumerGroupOffsetStorage) DeleteConsumerGroupOffset(t topic.Topi
offsetFileName := fmt.Sprintf("%s.offset", consumerGroup)
return f.filerClientAccessor.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
return filer_pb.DoRemove(context.Background(), client, consumersDir, offsetFileName, false, false, false, false, nil)
return filer_pb.DoRemove(context.Background(), client, consumersDir, offsetFileName, false, false, false, false, nil, false, "")
})
}

2
weed/pb/filer.proto

@ -232,6 +232,8 @@ message DeleteEntryRequest {
bool is_from_other_cluster = 7;
repeated int32 signatures = 8;
int64 if_not_modified_after = 9;
bool delete_empty_parent_directories = 10; // If true, recursively delete empty parent directories
string delete_empty_parent_directories_stop_path = 11; // Stop empty directory cleanup at this path (e.g., "/buckets/mybucket")
}
message DeleteEntryResponse {

503
weed/pb/filer_pb/filer.pb.go
File diff suppressed because it is too large
View File

22
weed/pb/filer_pb/filer_client.go

@ -278,19 +278,21 @@ func MkFile(ctx context.Context, filerClient FilerClient, parentDirectoryPath st
func Remove(ctx context.Context, filerClient FilerClient, parentDirectoryPath, name string, isDeleteData, isRecursive, ignoreRecursiveErr, isFromOtherCluster bool, signatures []int32) error {
return filerClient.WithFilerClient(false, func(client SeaweedFilerClient) error {
return DoRemove(ctx, client, parentDirectoryPath, name, isDeleteData, isRecursive, ignoreRecursiveErr, isFromOtherCluster, signatures)
return DoRemove(ctx, client, parentDirectoryPath, name, isDeleteData, isRecursive, ignoreRecursiveErr, isFromOtherCluster, signatures, false, "")
})
}
func DoRemove(ctx context.Context, client SeaweedFilerClient, parentDirectoryPath string, name string, isDeleteData bool, isRecursive bool, ignoreRecursiveErr bool, isFromOtherCluster bool, signatures []int32) error {
func DoRemove(ctx context.Context, client SeaweedFilerClient, parentDirectoryPath string, name string, isDeleteData bool, isRecursive bool, ignoreRecursiveErr bool, isFromOtherCluster bool, signatures []int32, deleteEmptyParentDirectories bool, stopPath string) error {
deleteEntryRequest := &DeleteEntryRequest{
Directory: parentDirectoryPath,
Name: name,
IsDeleteData: isDeleteData,
IsRecursive: isRecursive,
IgnoreRecursiveError: ignoreRecursiveErr,
IsFromOtherCluster: isFromOtherCluster,
Signatures: signatures,
Directory: parentDirectoryPath,
Name: name,
IsDeleteData: isDeleteData,
IsRecursive: isRecursive,
IgnoreRecursiveError: ignoreRecursiveErr,
IsFromOtherCluster: isFromOtherCluster,
Signatures: signatures,
DeleteEmptyParentDirectories: deleteEmptyParentDirectories,
DeleteEmptyParentDirectoriesStopPath: stopPath,
}
if resp, err := client.DeleteEntry(ctx, deleteEntryRequest); err != nil {
if strings.Contains(err.Error(), ErrNotFound.Error()) {
@ -356,7 +358,7 @@ func DoDeleteEmptyParentDirectories(ctx context.Context, client SeaweedFilerClie
glog.V(2).InfofCtx(ctx, "DoDeleteEmptyParentDirectories: deleting empty directory %s", dirPath)
parentDir, dirName := dirPath.DirAndName()
if err := DoRemove(ctx, client, parentDir, dirName, false, false, false, false, nil); err == nil {
if err := DoRemove(ctx, client, parentDir, dirName, false, false, false, false, nil, false, ""); err == nil {
// Successfully deleted, continue checking upwards
DoDeleteEmptyParentDirectories(ctx, client, util.FullPath(parentDir), stopAtPath, checked)
} else {

4
weed/pb/filer_pb/filer_grpc.pb.go

@ -2,7 +2,7 @@
// versions:
// - protoc-gen-go-grpc v1.5.1
// - protoc v5.29.3
// source: filer.proto
// source: weed/pb/filer.proto
package filer_pb
@ -1047,5 +1047,5 @@ var SeaweedFiler_ServiceDesc = grpc.ServiceDesc{
ServerStreams: true,
},
},
Metadata: "filer.proto",
Metadata: "weed/pb/filer.proto",
}

38
weed/s3api/s3api_object_handlers_delete.go

@ -131,18 +131,11 @@ func (s3a *S3ApiServer) DeleteObjectHandler(w http.ResponseWriter, r *http.Reque
// This ensures deletion completes atomically to avoid inconsistent state
opCtx := context.WithoutCancel(r.Context())
if err := doDeleteEntry(client, dir, name, true, false); err != nil {
return err
}
// Cleanup empty directories
if !s3a.option.AllowEmptyFolder && strings.LastIndex(object, "/") > 0 {
bucketPath := fmt.Sprintf("%s/%s", s3a.option.BucketsPath, bucket)
// Recursively delete empty parent directories, stop at bucket path
filer_pb.DoDeleteEmptyParentDirectories(opCtx, client, util.FullPath(dir), util.FullPath(bucketPath), nil)
}
// Delete entry with optional empty parent directory cleanup
bucketPath := fmt.Sprintf("%s/%s", s3a.option.BucketsPath, bucket)
deleteEmptyDirs := !s3a.option.AllowEmptyFolder && strings.LastIndex(object, "/") > 0
return nil
return filer_pb.DoRemove(opCtx, client, dir, name, true, false, true, false, nil, deleteEmptyDirs, bucketPath)
})
if err != nil {
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
@ -222,6 +215,7 @@ func (s3a *S3ApiServer) DeleteMultipleObjectsHandler(w http.ResponseWriter, r *h
var deleteErrors []DeleteError
var auditLog *s3err.AccessLog
// Track directories with deletions for batch cleanup optimization
directoriesWithDeletion := make(map[string]bool)
if s3err.Logger != nil {
@ -346,7 +340,7 @@ func (s3a *S3ApiServer) DeleteMultipleObjectsHandler(w http.ResponseWriter, r *h
continue
}
} else {
// Handle non-versioned delete (original logic)
// Handle non-versioned delete (defer cleanup for batch optimization)
lastSeparator := strings.LastIndex(object.Key, "/")
parentDirectoryPath, entryName, isDeleteData, isRecursive := "", object.Key, true, false
if lastSeparator > 0 && lastSeparator+1 < len(object.Key) {
@ -355,10 +349,11 @@ func (s3a *S3ApiServer) DeleteMultipleObjectsHandler(w http.ResponseWriter, r *h
}
parentDirectoryPath = fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, bucket, parentDirectoryPath)
err := doDeleteEntry(client, parentDirectoryPath, entryName, isDeleteData, isRecursive)
// Delete file without cleanup (batch cleanup at the end for efficiency)
err := filer_pb.DoRemove(opCtx, client, parentDirectoryPath, entryName, isDeleteData, isRecursive, true, false, nil, false, "")
if err == nil {
// Track directory for empty directory cleanup
if !s3a.option.AllowEmptyFolder {
// Track directory for batch cleanup
if !s3a.option.AllowEmptyFolder && lastSeparator > 0 {
directoriesWithDeletion[parentDirectoryPath] = true
}
deletedObjects = append(deletedObjects, object)
@ -380,26 +375,29 @@ func (s3a *S3ApiServer) DeleteMultipleObjectsHandler(w http.ResponseWriter, r *h
}
}
// Cleanup empty directories - optimize by processing deepest first
// Batch cleanup: Process empty directories after all deletions
// This is much more efficient than checking after each deletion
if !s3a.option.AllowEmptyFolder && len(directoriesWithDeletion) > 0 {
bucketPath := fmt.Sprintf("%s/%s", s3a.option.BucketsPath, bucket)
// Collect and sort directories by depth (deepest first) to avoid redundant checks
// Sort directories by depth (deepest first) to avoid redundant checks
// Deeper directories are more likely to be empty and cleaning them first
// may make their parents empty, reducing total checks needed
var allDirs []string
for dirPath := range directoriesWithDeletion {
allDirs = append(allDirs, dirPath)
}
// Sort by depth (deeper directories first)
slices.SortFunc(allDirs, func(a, b string) int {
return strings.Count(b, "/") - strings.Count(a, "/")
})
// Track already-checked directories to avoid redundant work
// When we check a directory and recursively clean parents,
// mark them all as checked so we skip them in subsequent iterations
checked := make(map[string]bool)
for _, dirPath := range allDirs {
if !checked[dirPath] {
// Recursively delete empty parent directories, stop at bucket path
// Mark this directory and all its parents as checked during recursion
// Use server-side cleanup for consistency
filer_pb.DoDeleteEmptyParentDirectories(opCtx, client, util.FullPath(dirPath), util.FullPath(bucketPath), checked)
}
}

24
weed/server/filer_grpc_server.go

@ -293,9 +293,29 @@ func (fs *FilerServer) DeleteEntry(ctx context.Context, req *filer_pb.DeleteEntr
err = fs.filer.DeleteEntryMetaAndData(ctx, util.JoinPath(req.Directory, req.Name), req.IsRecursive, req.IgnoreRecursiveError, req.IsDeleteData, req.IsFromOtherCluster, req.Signatures, req.IfNotModifiedAfter)
resp = &filer_pb.DeleteEntryResponse{}
if err != nil && err != filer_pb.ErrNotFound {
resp.Error = err.Error()
if err != nil {
if err != filer_pb.ErrNotFound {
resp.Error = err.Error()
}
// Return early: either a real error or entry not found (nothing deleted, so no cleanup needed)
return resp, nil
}
// Optional cleanup of empty parent directories (only if deletion succeeded)
if req.DeleteEmptyParentDirectories {
stopAtPath := util.FullPath(req.DeleteEmptyParentDirectoriesStopPath)
if stopAtPath == "" {
// Default to root to allow cleanup for non-S3 paths
// S3 API clients provide a specific bucket stop path
stopAtPath = "/"
}
// Use non-cancellable context to ensure cleanup completes atomically
// even if the client cancels the request after deletion succeeds
opCtx := context.WithoutCancel(ctx)
fs.filer.DeleteEmptyParentDirectories(opCtx, util.FullPath(req.Directory), stopAtPath)
}
return resp, nil
}

2
weed/shell/command_remote_unmount.go

@ -100,7 +100,7 @@ func (c *commandRemoteUnmount) purgeMountedData(commandEnv *CommandEnv, dir stri
oldEntry := lookupResp.Entry
deleteError := filer_pb.DoRemove(ctx, client, parent, name, true, true, true, false, nil)
deleteError := filer_pb.DoRemove(ctx, client, parent, name, true, true, true, false, nil, false, "")
if deleteError != nil {
return fmt.Errorf("delete %s: %v", dir, deleteError)
}

Loading…
Cancel
Save