You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

148 lines
3.7 KiB

  1. package command
  2. import (
  3. "github.com/seaweedfs/seaweedfs/weed/glog"
  4. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  5. "github.com/seaweedfs/seaweedfs/weed/util"
  6. "sync"
  7. )
  8. type MetadataProcessFunc func(resp *filer_pb.SubscribeMetadataResponse) error
  9. type MetadataProcessor struct {
  10. activeJobs map[int64]*filer_pb.SubscribeMetadataResponse
  11. activeJobsLock sync.Mutex
  12. activeJobsCond *sync.Cond
  13. concurrencyLimit int
  14. fn MetadataProcessFunc
  15. processedTsWatermark int64
  16. }
  17. func NewMetadataProcessor(fn MetadataProcessFunc, concurrency int) *MetadataProcessor {
  18. t := &MetadataProcessor{
  19. fn: fn,
  20. activeJobs: make(map[int64]*filer_pb.SubscribeMetadataResponse),
  21. concurrencyLimit: concurrency,
  22. }
  23. t.activeJobsCond = sync.NewCond(&t.activeJobsLock)
  24. return t
  25. }
  26. func (t *MetadataProcessor) AddSyncJob(resp *filer_pb.SubscribeMetadataResponse) {
  27. if filer_pb.IsEmpty(resp) {
  28. return
  29. }
  30. t.activeJobsLock.Lock()
  31. defer t.activeJobsLock.Unlock()
  32. for len(t.activeJobs) >= t.concurrencyLimit || t.conflictsWith(resp) {
  33. t.activeJobsCond.Wait()
  34. }
  35. t.activeJobs[resp.TsNs] = resp
  36. go func() {
  37. util.RetryForever("metadata processor", func() error {
  38. return t.fn(resp)
  39. }, func(err error) bool {
  40. glog.Errorf("process %v: %v", resp, err)
  41. return true
  42. })
  43. t.activeJobsLock.Lock()
  44. defer t.activeJobsLock.Unlock()
  45. delete(t.activeJobs, resp.TsNs)
  46. // if is the oldest job, write down the watermark
  47. isOldest := true
  48. for t, _ := range t.activeJobs {
  49. if resp.TsNs > t {
  50. isOldest = false
  51. break
  52. }
  53. }
  54. if isOldest {
  55. t.processedTsWatermark = resp.TsNs
  56. }
  57. t.activeJobsCond.Signal()
  58. }()
  59. }
  60. func (t *MetadataProcessor) conflictsWith(resp *filer_pb.SubscribeMetadataResponse) bool {
  61. for _, r := range t.activeJobs {
  62. if shouldWaitFor(resp, r) {
  63. return true
  64. }
  65. }
  66. return false
  67. }
  68. // a is one possible job to schedule
  69. // b is one existing active job
  70. func shouldWaitFor(a *filer_pb.SubscribeMetadataResponse, b *filer_pb.SubscribeMetadataResponse) bool {
  71. aPath, aNewPath, aIsDirectory := extractPathsFromMetadata(a)
  72. bPath, bNewPath, bIsDirectory := extractPathsFromMetadata(b)
  73. if pairShouldWaitFor(aPath, bPath, aIsDirectory, bIsDirectory) {
  74. return true
  75. }
  76. if aNewPath != "" {
  77. if pairShouldWaitFor(aNewPath, bPath, aIsDirectory, bIsDirectory) {
  78. return true
  79. }
  80. }
  81. if bNewPath != "" {
  82. if pairShouldWaitFor(aPath, bNewPath, aIsDirectory, bIsDirectory) {
  83. return true
  84. }
  85. }
  86. if aNewPath != "" && bNewPath != "" {
  87. if pairShouldWaitFor(aNewPath, bNewPath, aIsDirectory, bIsDirectory) {
  88. return true
  89. }
  90. }
  91. return false
  92. }
  93. func pairShouldWaitFor(aPath, bPath util.FullPath, aIsDirectory, bIsDirectory bool) bool {
  94. if bIsDirectory {
  95. if aIsDirectory {
  96. return aPath.IsUnder(bPath) || bPath.IsUnder(aPath)
  97. } else {
  98. return aPath.IsUnder(bPath)
  99. }
  100. } else {
  101. if aIsDirectory {
  102. return bPath.IsUnder(aPath)
  103. } else {
  104. return aPath == bPath
  105. }
  106. }
  107. }
  108. func extractPathsFromMetadata(resp *filer_pb.SubscribeMetadataResponse) (path, newPath util.FullPath, isDirectory bool) {
  109. oldEntry := resp.EventNotification.OldEntry
  110. newEntry := resp.EventNotification.NewEntry
  111. // create
  112. if filer_pb.IsCreate(resp) {
  113. path = util.FullPath(resp.Directory).Child(newEntry.Name)
  114. isDirectory = newEntry.IsDirectory
  115. return
  116. }
  117. if filer_pb.IsDelete(resp) {
  118. path = util.FullPath(resp.Directory).Child(oldEntry.Name)
  119. isDirectory = oldEntry.IsDirectory
  120. return
  121. }
  122. if filer_pb.IsUpdate(resp) {
  123. path = util.FullPath(resp.Directory).Child(newEntry.Name)
  124. isDirectory = newEntry.IsDirectory
  125. return
  126. }
  127. // renaming
  128. path = util.FullPath(resp.Directory).Child(oldEntry.Name)
  129. isDirectory = oldEntry.IsDirectory
  130. newPath = util.FullPath(resp.EventNotification.NewParentPath).Child(newEntry.Name)
  131. return
  132. }