diff --git a/weed/filer/filer_delete_entry.go b/weed/filer/filer_delete_entry.go index 0848088ef..b4f4e46ff 100644 --- a/weed/filer/filer_delete_entry.go +++ b/weed/filer/filer_delete_entry.go @@ -3,6 +3,7 @@ package filer import ( "context" "fmt" + "strings" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" @@ -22,7 +23,7 @@ func (f *Filer) DeleteEntryMetaAndData(ctx context.Context, p util.FullPath, isR return findErr } - isCollection := f.isBucket(entry) + isDeleteCollection := f.isBucket(entry) var chunks []*filer_pb.FileChunk var hardLinkIds []HardLinkId @@ -31,7 +32,7 @@ func (f *Filer) DeleteEntryMetaAndData(ctx context.Context, p util.FullPath, isR // delete the folder children, not including the folder itself var dirChunks []*filer_pb.FileChunk var dirHardLinkIds []HardLinkId - dirChunks, dirHardLinkIds, err = f.doBatchDeleteFolderMetaAndData(ctx, entry, isRecursive, ignoreRecursiveError, shouldDeleteChunks && !isCollection, isFromOtherCluster, signatures) + dirChunks, dirHardLinkIds, err = f.doBatchDeleteFolderMetaAndData(ctx, entry, isRecursive, ignoreRecursiveError, shouldDeleteChunks && !isDeleteCollection, isFromOtherCluster, signatures) if err != nil { glog.V(0).Infof("delete directory %s: %v", p, err) return fmt.Errorf("delete directory %s: %v", p, err) @@ -46,7 +47,7 @@ func (f *Filer) DeleteEntryMetaAndData(ctx context.Context, p util.FullPath, isR return fmt.Errorf("delete file %s: %v", p, err) } - if shouldDeleteChunks && !isCollection { + if shouldDeleteChunks && !isDeleteCollection { f.DirectDeleteChunks(chunks) } // A case not handled: @@ -55,10 +56,15 @@ func (f *Filer) DeleteEntryMetaAndData(ctx context.Context, p util.FullPath, isR f.maybeDeleteHardLinks(hardLinkIds) } - if isCollection { + if isDeleteCollection { collectionName := entry.Name() f.doDeleteCollection(collectionName) f.deleteBucket(collectionName) + } else { + parent, _ := p.DirAndName() + if err := f.removeEmptyParentFolder(ctx, util.FullPath(parent)); err != nil { + glog.Errorf("clean up empty folders for %s : %v", p, err) + } } return nil @@ -153,3 +159,25 @@ func (f *Filer) maybeDeleteHardLinks(hardLinkIds []HardLinkId) { } } } + +func (f *Filer) removeEmptyParentFolder(ctx context.Context, dir util.FullPath) error { + if !strings.HasPrefix(string(dir), f.DirBucketsPath) { + return nil + } + parent, _ := dir.DirAndName() + if parent == f.DirBucketsPath { + // should not delete bucket itself + return nil + } + entries, err := f.ListDirectoryEntries(ctx, dir, "", false, 1, "") + if err != nil { + return err + } + if len(entries) > 0 { + return nil + } + if err := f.Store.DeleteEntry(ctx, dir); err != nil { + return err + } + return f.removeEmptyParentFolder(ctx, util.FullPath(parent)) +}