You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

125 lines
3.5 KiB

5 years ago
5 years ago
6 months ago
8 months ago
8 months ago
8 months ago
8 months ago
8 months ago
8 months ago
8 months ago
  1. package filer
  2. import (
  3. "strings"
  4. "time"
  5. "github.com/seaweedfs/seaweedfs/weed/storage"
  6. "github.com/seaweedfs/seaweedfs/weed/util"
  7. "github.com/seaweedfs/seaweedfs/weed/glog"
  8. "github.com/seaweedfs/seaweedfs/weed/operation"
  9. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  10. "github.com/seaweedfs/seaweedfs/weed/wdclient"
  11. )
  12. func LookupByMasterClientFn(masterClient *wdclient.MasterClient) func(vids []string) (map[string]*operation.LookupResult, error) {
  13. return func(vids []string) (map[string]*operation.LookupResult, error) {
  14. m := make(map[string]*operation.LookupResult)
  15. for _, vid := range vids {
  16. locs, _ := masterClient.GetVidLocations(vid)
  17. var locations []operation.Location
  18. for _, loc := range locs {
  19. locations = append(locations, operation.Location{
  20. Url: loc.Url,
  21. PublicUrl: loc.PublicUrl,
  22. GrpcPort: loc.GrpcPort,
  23. })
  24. }
  25. m[vid] = &operation.LookupResult{
  26. VolumeOrFileId: vid,
  27. Locations: locations,
  28. }
  29. }
  30. return m, nil
  31. }
  32. }
  33. func (f *Filer) loopProcessingDeletion() {
  34. lookupFunc := LookupByMasterClientFn(f.MasterClient)
  35. DeletionBatchSize := 100000 // roughly 20 bytes cost per file id.
  36. var deletionCount int
  37. for {
  38. deletionCount = 0
  39. f.fileIdDeletionQueue.Consume(func(fileIds []string) {
  40. for len(fileIds) > 0 {
  41. var toDeleteFileIds []string
  42. if len(fileIds) > DeletionBatchSize {
  43. toDeleteFileIds = fileIds[:DeletionBatchSize]
  44. fileIds = fileIds[DeletionBatchSize:]
  45. } else {
  46. toDeleteFileIds = fileIds
  47. fileIds = fileIds[:0]
  48. }
  49. deletionCount = len(toDeleteFileIds)
  50. _, err := operation.DeleteFileIdsWithLookupVolumeId(f.GrpcDialOption, toDeleteFileIds, lookupFunc)
  51. if err != nil {
  52. if !strings.Contains(err.Error(), storage.ErrorDeleted.Error()) {
  53. glog.V(0).Infof("deleting fileIds len=%d error: %v", deletionCount, err)
  54. }
  55. } else {
  56. glog.V(2).Infof("deleting fileIds %+v", toDeleteFileIds)
  57. }
  58. }
  59. })
  60. if deletionCount == 0 {
  61. time.Sleep(1123 * time.Millisecond)
  62. }
  63. }
  64. }
  65. func (f *Filer) DeleteUncommittedChunks(chunks []*filer_pb.FileChunk) {
  66. f.doDeleteChunks(chunks)
  67. }
  68. func (f *Filer) DeleteChunks(fullpath util.FullPath, chunks []*filer_pb.FileChunk) {
  69. rule := f.FilerConf.MatchStorageRule(string(fullpath))
  70. if rule.DisableChunkDeletion {
  71. return
  72. }
  73. f.doDeleteChunks(chunks)
  74. }
  75. func (f *Filer) doDeleteChunks(chunks []*filer_pb.FileChunk) {
  76. for _, chunk := range chunks {
  77. if !chunk.IsChunkManifest {
  78. f.fileIdDeletionQueue.EnQueue(chunk.GetFileIdString())
  79. continue
  80. }
  81. dataChunks, manifestResolveErr := ResolveOneChunkManifest(f.MasterClient.LookupFileId, chunk)
  82. if manifestResolveErr != nil {
  83. glog.V(0).Infof("failed to resolve manifest %s: %v", chunk.FileId, manifestResolveErr)
  84. }
  85. for _, dChunk := range dataChunks {
  86. f.fileIdDeletionQueue.EnQueue(dChunk.GetFileIdString())
  87. }
  88. f.fileIdDeletionQueue.EnQueue(chunk.GetFileIdString())
  89. }
  90. }
  91. func (f *Filer) DeleteChunksNotRecursive(chunks []*filer_pb.FileChunk) {
  92. for _, chunk := range chunks {
  93. f.fileIdDeletionQueue.EnQueue(chunk.GetFileIdString())
  94. }
  95. }
  96. func (f *Filer) deleteChunksIfNotNew(oldEntry, newEntry *Entry) {
  97. var oldChunks, newChunks []*filer_pb.FileChunk
  98. if oldEntry != nil {
  99. oldChunks = oldEntry.GetChunks()
  100. }
  101. if newEntry != nil {
  102. newChunks = newEntry.GetChunks()
  103. }
  104. toDelete, err := MinusChunks(f.MasterClient.GetLookupFileIdFunction(), oldChunks, newChunks)
  105. if err != nil {
  106. glog.Errorf("Failed to resolve old entry chunks when delete old entry chunks. new: %s, old: %s", newChunks, oldChunks)
  107. return
  108. }
  109. f.DeleteChunksNotRecursive(toDelete)
  110. }