You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

157 lines
5.2 KiB

5 years ago
4 years ago
  1. package filer
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/chrislusf/seaweedfs/weed/glog"
  6. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  7. "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
  8. "github.com/chrislusf/seaweedfs/weed/util"
  9. )
  10. type HardLinkId []byte
  11. func (f *Filer) DeleteEntryMetaAndData(ctx context.Context, p util.FullPath, isRecursive, ignoreRecursiveError, shouldDeleteChunks, isFromOtherCluster bool, signatures []int32) (err error) {
  12. if p == "/" {
  13. return nil
  14. }
  15. entry, findErr := f.FindEntry(ctx, p)
  16. if findErr != nil {
  17. return findErr
  18. }
  19. isDeleteCollection := f.isBucket(entry)
  20. var chunks []*filer_pb.FileChunk
  21. var hardLinkIds []HardLinkId
  22. chunks = append(chunks, entry.Chunks...)
  23. if entry.IsDirectory() {
  24. // delete the folder children, not including the folder itself
  25. var dirChunks []*filer_pb.FileChunk
  26. var dirHardLinkIds []HardLinkId
  27. dirChunks, dirHardLinkIds, err = f.doBatchDeleteFolderMetaAndData(ctx, entry, isRecursive, ignoreRecursiveError, shouldDeleteChunks && !isDeleteCollection, isDeleteCollection, isFromOtherCluster, signatures)
  28. if err != nil {
  29. glog.V(0).Infof("delete directory %s: %v", p, err)
  30. return fmt.Errorf("delete directory %s: %v", p, err)
  31. }
  32. chunks = append(chunks, dirChunks...)
  33. hardLinkIds = append(hardLinkIds, dirHardLinkIds...)
  34. }
  35. // delete the file or folder
  36. err = f.doDeleteEntryMetaAndData(ctx, entry, shouldDeleteChunks, isFromOtherCluster, signatures)
  37. if err != nil {
  38. return fmt.Errorf("delete file %s: %v", p, err)
  39. }
  40. if shouldDeleteChunks && !isDeleteCollection {
  41. f.DirectDeleteChunks(chunks)
  42. }
  43. // A case not handled:
  44. // what if the chunk is in a different collection?
  45. if shouldDeleteChunks {
  46. f.maybeDeleteHardLinks(hardLinkIds)
  47. }
  48. if isDeleteCollection {
  49. collectionName := entry.Name()
  50. f.doDeleteCollection(collectionName)
  51. f.deleteBucket(collectionName)
  52. }
  53. return nil
  54. }
  55. func (f *Filer) doBatchDeleteFolderMetaAndData(ctx context.Context, entry *Entry, isRecursive, ignoreRecursiveError, shouldDeleteChunks, isDeletingBucket, isFromOtherCluster bool, signatures []int32) (chunks []*filer_pb.FileChunk, hardlinkIds []HardLinkId, err error) {
  56. lastFileName := ""
  57. includeLastFile := false
  58. if !isDeletingBucket {
  59. for {
  60. entries, _, err := f.ListDirectoryEntries(ctx, entry.FullPath, lastFileName, includeLastFile, PaginationSize, "", "")
  61. if err != nil {
  62. glog.Errorf("list folder %s: %v", entry.FullPath, err)
  63. return nil, nil, fmt.Errorf("list folder %s: %v", entry.FullPath, err)
  64. }
  65. if lastFileName == "" && !isRecursive && len(entries) > 0 {
  66. // only for first iteration in the loop
  67. glog.Errorf("deleting a folder %s has children: %+v ...", entry.FullPath, entries[0].Name())
  68. return nil, nil, fmt.Errorf("fail to delete non-empty folder: %s", entry.FullPath)
  69. }
  70. for _, sub := range entries {
  71. lastFileName = sub.Name()
  72. var dirChunks []*filer_pb.FileChunk
  73. var dirHardLinkIds []HardLinkId
  74. if sub.IsDirectory() {
  75. subIsDeletingBucket := f.isBucket(sub)
  76. dirChunks, dirHardLinkIds, err = f.doBatchDeleteFolderMetaAndData(ctx, sub, isRecursive, ignoreRecursiveError, shouldDeleteChunks, subIsDeletingBucket, false, nil)
  77. chunks = append(chunks, dirChunks...)
  78. hardlinkIds = append(hardlinkIds, dirHardLinkIds...)
  79. } else {
  80. f.NotifyUpdateEvent(ctx, sub, nil, shouldDeleteChunks, isFromOtherCluster, nil)
  81. if len(sub.HardLinkId) != 0 {
  82. // hard link chunk data are deleted separately
  83. hardlinkIds = append(hardlinkIds, sub.HardLinkId)
  84. } else {
  85. chunks = append(chunks, sub.Chunks...)
  86. }
  87. }
  88. if err != nil && !ignoreRecursiveError {
  89. return nil, nil, err
  90. }
  91. }
  92. if len(entries) < PaginationSize {
  93. break
  94. }
  95. }
  96. }
  97. glog.V(3).Infof("deleting directory %v delete %d chunks: %v", entry.FullPath, len(chunks), shouldDeleteChunks)
  98. if storeDeletionErr := f.Store.DeleteFolderChildren(ctx, entry.FullPath); storeDeletionErr != nil {
  99. return nil, nil, fmt.Errorf("filer store delete: %v", storeDeletionErr)
  100. }
  101. f.NotifyUpdateEvent(ctx, entry, nil, shouldDeleteChunks, isFromOtherCluster, signatures)
  102. return chunks, hardlinkIds, nil
  103. }
  104. func (f *Filer) doDeleteEntryMetaAndData(ctx context.Context, entry *Entry, shouldDeleteChunks bool, isFromOtherCluster bool, signatures []int32) (err error) {
  105. glog.V(3).Infof("deleting entry %v, delete chunks: %v", entry.FullPath, shouldDeleteChunks)
  106. if storeDeletionErr := f.Store.DeleteOneEntry(ctx, entry); storeDeletionErr != nil {
  107. return fmt.Errorf("filer store delete: %v", storeDeletionErr)
  108. }
  109. if !entry.IsDirectory() {
  110. f.NotifyUpdateEvent(ctx, entry, nil, shouldDeleteChunks, isFromOtherCluster, signatures)
  111. }
  112. return nil
  113. }
  114. func (f *Filer) doDeleteCollection(collectionName string) (err error) {
  115. return f.MasterClient.WithClient(func(client master_pb.SeaweedClient) error {
  116. _, err := client.CollectionDelete(context.Background(), &master_pb.CollectionDeleteRequest{
  117. Name: collectionName,
  118. })
  119. if err != nil {
  120. glog.Infof("delete collection %s: %v", collectionName, err)
  121. }
  122. return err
  123. })
  124. }
  125. func (f *Filer) maybeDeleteHardLinks(hardLinkIds []HardLinkId) {
  126. for _, hardLinkId := range hardLinkIds {
  127. if err := f.Store.DeleteHardLink(context.Background(), hardLinkId); err != nil {
  128. glog.Errorf("delete hard link id %d : %v", hardLinkId, err)
  129. }
  130. }
  131. }