Browse Source

optionally delete empty parent directories

also-delete-parent-directory-if-empty
chrislu 4 weeks ago
parent
commit
a8a4bc06e8
  1. 2
      weed/mq/offset/consumer_group_storage.go
  2. 2
      weed/pb/filer.proto
  3. 503
      weed/pb/filer_pb/filer.pb.go
  4. 22
      weed/pb/filer_pb/filer_client.go
  5. 4
      weed/pb/filer_pb/filer_grpc.pb.go
  6. 17
      weed/s3api/s3api_object_handlers_delete.go
  7. 13
      weed/server/filer_grpc_server.go
  8. 2
      weed/shell/command_remote_unmount.go

2
weed/mq/offset/consumer_group_storage.go

@ -176,6 +176,6 @@ func (f *FilerConsumerGroupOffsetStorage) DeleteConsumerGroupOffset(t topic.Topi
offsetFileName := fmt.Sprintf("%s.offset", consumerGroup)
return f.filerClientAccessor.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
return filer_pb.DoRemove(context.Background(), client, consumersDir, offsetFileName, false, false, false, false, nil)
return filer_pb.DoRemove(context.Background(), client, consumersDir, offsetFileName, false, false, false, false, nil, false, "")
})
}

2
weed/pb/filer.proto

@ -232,6 +232,8 @@ message DeleteEntryRequest {
bool is_from_other_cluster = 7;
repeated int32 signatures = 8;
int64 if_not_modified_after = 9;
bool delete_empty_parent_directories = 10; // If true, recursively delete empty parent directories
string delete_empty_parent_directories_stop_path = 11; // Stop empty directory cleanup at this path (e.g., "/buckets/mybucket")
}
message DeleteEntryResponse {

503
weed/pb/filer_pb/filer.pb.go
File diff suppressed because it is too large
View File

22
weed/pb/filer_pb/filer_client.go

@ -278,19 +278,21 @@ func MkFile(ctx context.Context, filerClient FilerClient, parentDirectoryPath st
func Remove(ctx context.Context, filerClient FilerClient, parentDirectoryPath, name string, isDeleteData, isRecursive, ignoreRecursiveErr, isFromOtherCluster bool, signatures []int32) error {
return filerClient.WithFilerClient(false, func(client SeaweedFilerClient) error {
return DoRemove(ctx, client, parentDirectoryPath, name, isDeleteData, isRecursive, ignoreRecursiveErr, isFromOtherCluster, signatures)
return DoRemove(ctx, client, parentDirectoryPath, name, isDeleteData, isRecursive, ignoreRecursiveErr, isFromOtherCluster, signatures, false, "")
})
}
func DoRemove(ctx context.Context, client SeaweedFilerClient, parentDirectoryPath string, name string, isDeleteData bool, isRecursive bool, ignoreRecursiveErr bool, isFromOtherCluster bool, signatures []int32) error {
func DoRemove(ctx context.Context, client SeaweedFilerClient, parentDirectoryPath string, name string, isDeleteData bool, isRecursive bool, ignoreRecursiveErr bool, isFromOtherCluster bool, signatures []int32, deleteEmptyParentDirectories bool, stopPath string) error {
deleteEntryRequest := &DeleteEntryRequest{
Directory: parentDirectoryPath,
Name: name,
IsDeleteData: isDeleteData,
IsRecursive: isRecursive,
IgnoreRecursiveError: ignoreRecursiveErr,
IsFromOtherCluster: isFromOtherCluster,
Signatures: signatures,
Directory: parentDirectoryPath,
Name: name,
IsDeleteData: isDeleteData,
IsRecursive: isRecursive,
IgnoreRecursiveError: ignoreRecursiveErr,
IsFromOtherCluster: isFromOtherCluster,
Signatures: signatures,
DeleteEmptyParentDirectories: deleteEmptyParentDirectories,
DeleteEmptyParentDirectoriesStopPath: stopPath,
}
if resp, err := client.DeleteEntry(ctx, deleteEntryRequest); err != nil {
if strings.Contains(err.Error(), ErrNotFound.Error()) {
@ -356,7 +358,7 @@ func DoDeleteEmptyParentDirectories(ctx context.Context, client SeaweedFilerClie
glog.V(2).InfofCtx(ctx, "DoDeleteEmptyParentDirectories: deleting empty directory %s", dirPath)
parentDir, dirName := dirPath.DirAndName()
if err := DoRemove(ctx, client, parentDir, dirName, false, false, false, false, nil); err == nil {
if err := DoRemove(ctx, client, parentDir, dirName, false, false, false, false, nil, false, ""); err == nil {
// Successfully deleted, continue checking upwards
DoDeleteEmptyParentDirectories(ctx, client, util.FullPath(parentDir), stopAtPath, checked)
} else {

4
weed/pb/filer_pb/filer_grpc.pb.go

@ -2,7 +2,7 @@
// versions:
// - protoc-gen-go-grpc v1.5.1
// - protoc v5.29.3
// source: filer.proto
// source: weed/pb/filer.proto
package filer_pb
@ -1047,5 +1047,5 @@ var SeaweedFiler_ServiceDesc = grpc.ServiceDesc{
ServerStreams: true,
},
},
Metadata: "filer.proto",
Metadata: "weed/pb/filer.proto",
}

17
weed/s3api/s3api_object_handlers_delete.go

@ -131,18 +131,11 @@ func (s3a *S3ApiServer) DeleteObjectHandler(w http.ResponseWriter, r *http.Reque
// This ensures deletion completes atomically to avoid inconsistent state
opCtx := context.WithoutCancel(r.Context())
if err := doDeleteEntry(client, dir, name, true, false); err != nil {
return err
}
// Cleanup empty directories
if !s3a.option.AllowEmptyFolder && strings.LastIndex(object, "/") > 0 {
bucketPath := fmt.Sprintf("%s/%s", s3a.option.BucketsPath, bucket)
// Recursively delete empty parent directories, stop at bucket path
filer_pb.DoDeleteEmptyParentDirectories(opCtx, client, util.FullPath(dir), util.FullPath(bucketPath), nil)
}
// Delete entry with optional empty parent directory cleanup
bucketPath := fmt.Sprintf("%s/%s", s3a.option.BucketsPath, bucket)
deleteEmptyDirs := !s3a.option.AllowEmptyFolder && strings.LastIndex(object, "/") > 0
return nil
return filer_pb.DoRemove(opCtx, client, dir, name, true, false, true, false, nil, deleteEmptyDirs, bucketPath)
})
if err != nil {
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
@ -355,7 +348,7 @@ func (s3a *S3ApiServer) DeleteMultipleObjectsHandler(w http.ResponseWriter, r *h
}
parentDirectoryPath = fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, bucket, parentDirectoryPath)
err := doDeleteEntry(client, parentDirectoryPath, entryName, isDeleteData, isRecursive)
err := filer_pb.DoRemove(opCtx, client, parentDirectoryPath, entryName, isDeleteData, isRecursive, true, false, nil, false, "")
if err == nil {
// Track directory for empty directory cleanup
if !s3a.option.AllowEmptyFolder {

13
weed/server/filer_grpc_server.go

@ -295,7 +295,20 @@ func (fs *FilerServer) DeleteEntry(ctx context.Context, req *filer_pb.DeleteEntr
resp = &filer_pb.DeleteEntryResponse{}
if err != nil && err != filer_pb.ErrNotFound {
resp.Error = err.Error()
return resp, nil
}
// Optional cleanup of empty parent directories
if req.DeleteEmptyParentDirectories {
stopAtPath := util.FullPath(req.DeleteEmptyParentDirectoriesStopPath)
if stopAtPath == "" {
stopAtPath = util.FullPath(fs.filer.DirBucketsPath)
}
// Clean up empty parent directories starting from req.Directory
fs.filer.DeleteEmptyParentDirectories(ctx, util.FullPath(req.Directory), stopAtPath)
}
return resp, nil
}

2
weed/shell/command_remote_unmount.go

@ -100,7 +100,7 @@ func (c *commandRemoteUnmount) purgeMountedData(commandEnv *CommandEnv, dir stri
oldEntry := lookupResp.Entry
deleteError := filer_pb.DoRemove(ctx, client, parent, name, true, true, true, false, nil)
deleteError := filer_pb.DoRemove(ctx, client, parent, name, true, true, true, false, nil, false, "")
if deleteError != nil {
return fmt.Errorf("delete %s: %v", dir, deleteError)
}

Loading…
Cancel
Save