You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

168 lines
5.3 KiB

12 months ago
12 months ago
12 months ago
11 months ago
12 months ago
12 months ago
11 months ago
12 months ago
12 months ago
11 months ago
12 months ago
11 months ago
12 months ago
  1. package broker
  2. import (
  3. "fmt"
  4. "github.com/seaweedfs/seaweedfs/weed/filer"
  5. "github.com/seaweedfs/seaweedfs/weed/glog"
  6. "github.com/seaweedfs/seaweedfs/weed/mq/topic"
  7. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  8. "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
  9. "github.com/seaweedfs/seaweedfs/weed/util"
  10. "github.com/seaweedfs/seaweedfs/weed/util/log_buffer"
  11. "google.golang.org/protobuf/proto"
  12. "math"
  13. "sync/atomic"
  14. "time"
  15. )
  16. func (b *MessageQueueBroker) genLogFlushFunc(t topic.Topic, partition *mq_pb.Partition) log_buffer.LogFlushFuncType {
  17. topicDir := fmt.Sprintf("%s/%s/%s", filer.TopicsDir, t.Namespace, t.Name)
  18. partitionGeneration := time.Unix(0, partition.UnixTimeNs).UTC().Format(topic.TIME_FORMAT)
  19. partitionDir := fmt.Sprintf("%s/%s/%04d-%04d", topicDir, partitionGeneration, partition.RangeStart, partition.RangeStop)
  20. return func(logBuffer *log_buffer.LogBuffer, startTime, stopTime time.Time, buf []byte) {
  21. if len(buf) == 0 {
  22. return
  23. }
  24. startTime, stopTime = startTime.UTC(), stopTime.UTC()
  25. targetFile := fmt.Sprintf("%s/%s", partitionDir, startTime.Format(topic.TIME_FORMAT))
  26. // TODO append block with more metadata
  27. for {
  28. if err := b.appendToFile(targetFile, buf); err != nil {
  29. glog.V(0).Infof("metadata log write failed %s: %v", targetFile, err)
  30. time.Sleep(737 * time.Millisecond)
  31. } else {
  32. break
  33. }
  34. }
  35. atomic.StoreInt64(&logBuffer.LastFlushTsNs, stopTime.UnixNano())
  36. }
  37. }
  38. func (b *MessageQueueBroker) genLogOnDiskReadFunc(t topic.Topic, partition *mq_pb.Partition) log_buffer.LogReadFromDiskFuncType {
  39. topicDir := fmt.Sprintf("%s/%s/%s", filer.TopicsDir, t.Namespace, t.Name)
  40. partitionGeneration := time.Unix(0, partition.UnixTimeNs).UTC().Format(topic.TIME_FORMAT)
  41. partitionDir := fmt.Sprintf("%s/%s/%04d-%04d", topicDir, partitionGeneration, partition.RangeStart, partition.RangeStop)
  42. lookupFileIdFn := func(fileId string) (targetUrls []string, err error) {
  43. return b.MasterClient.LookupFileId(fileId)
  44. }
  45. eachChunkFn := func(buf []byte, eachLogEntryFn log_buffer.EachLogEntryFuncType, starTsNs, stopTsNs int64) (processedTsNs int64, err error) {
  46. for pos := 0; pos+4 < len(buf); {
  47. size := util.BytesToUint32(buf[pos : pos+4])
  48. if pos+4+int(size) > len(buf) {
  49. err = fmt.Errorf("LogOnDiskReadFunc: read [%d,%d) from [0,%d)", pos, pos+int(size)+4, len(buf))
  50. return
  51. }
  52. entryData := buf[pos+4 : pos+4+int(size)]
  53. logEntry := &filer_pb.LogEntry{}
  54. if err = proto.Unmarshal(entryData, logEntry); err != nil {
  55. pos += 4 + int(size)
  56. err = fmt.Errorf("unexpected unmarshal mq_pb.Message: %v", err)
  57. return
  58. }
  59. if logEntry.TsNs < starTsNs {
  60. pos += 4 + int(size)
  61. continue
  62. }
  63. if stopTsNs != 0 && logEntry.TsNs > stopTsNs {
  64. println("stopTsNs", stopTsNs, "logEntry.TsNs", logEntry.TsNs)
  65. return
  66. }
  67. if _, err = eachLogEntryFn(logEntry); err != nil {
  68. err = fmt.Errorf("process log entry %v: %v", logEntry, err)
  69. return
  70. }
  71. processedTsNs = logEntry.TsNs
  72. pos += 4 + int(size)
  73. }
  74. return
  75. }
  76. eachFileFn := func(entry *filer_pb.Entry, eachLogEntryFn log_buffer.EachLogEntryFuncType, starTsNs, stopTsNs int64) (processedTsNs int64, err error) {
  77. if len(entry.Content) > 0 {
  78. glog.Warningf("this should not happen. unexpected content in %s/%s", partitionDir, entry.Name)
  79. return
  80. }
  81. var urlStrings []string
  82. for _, chunk := range entry.Chunks {
  83. if chunk.Size == 0 {
  84. continue
  85. }
  86. if chunk.IsChunkManifest {
  87. glog.Warningf("this should not happen. unexpected chunk manifest in %s/%s", partitionDir, entry.Name)
  88. return
  89. }
  90. urlStrings, err = lookupFileIdFn(chunk.FileId)
  91. if err != nil {
  92. err = fmt.Errorf("lookup %s: %v", chunk.FileId, err)
  93. return
  94. }
  95. if len(urlStrings) == 0 {
  96. err = fmt.Errorf("no url found for %s", chunk.FileId)
  97. return
  98. }
  99. // try one of the urlString until util.Get(urlString) succeeds
  100. var processed bool
  101. for _, urlString := range urlStrings {
  102. // TODO optimization opportunity: reuse the buffer
  103. var data []byte
  104. if data, _, err = util.Get(urlString); err == nil {
  105. processed = true
  106. if processedTsNs, err = eachChunkFn(data, eachLogEntryFn, starTsNs, stopTsNs); err != nil {
  107. return
  108. }
  109. break
  110. }
  111. }
  112. if !processed {
  113. err = fmt.Errorf("no data processed for %s %s", entry.Name, chunk.FileId)
  114. return
  115. }
  116. }
  117. return
  118. }
  119. return func(startPosition log_buffer.MessagePosition, stopTsNs int64, eachLogEntryFn log_buffer.EachLogEntryFuncType) (lastReadPosition log_buffer.MessagePosition, isDone bool, err error) {
  120. startFileName := startPosition.UTC().Format(topic.TIME_FORMAT)
  121. startTsNs := startPosition.Time.UnixNano()
  122. stopTime := time.Unix(0, stopTsNs)
  123. var processedTsNs int64
  124. err = b.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
  125. return filer_pb.SeaweedList(client, partitionDir, "", func(entry *filer_pb.Entry, isLast bool) error {
  126. if entry.IsDirectory {
  127. return nil
  128. }
  129. if stopTsNs != 0 && entry.Name > stopTime.UTC().Format(topic.TIME_FORMAT) {
  130. isDone = true
  131. return nil
  132. }
  133. if entry.Name < startPosition.UTC().Format(topic.TIME_FORMAT) {
  134. return nil
  135. }
  136. if processedTsNs, err = eachFileFn(entry, eachLogEntryFn, startTsNs, stopTsNs); err != nil {
  137. return err
  138. }
  139. return nil
  140. }, startFileName, true, math.MaxInt32)
  141. })
  142. lastReadPosition = log_buffer.NewMessagePosition(processedTsNs, -2)
  143. return
  144. }
  145. }