You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

178 lines
5.7 KiB

11 months ago
11 months ago
11 months ago
10 months ago
11 months ago
7 months ago
11 months ago
10 months ago
11 months ago
11 months ago
10 months ago
11 months ago
10 months ago
11 months ago
  1. package broker
  2. import (
  3. "fmt"
  4. "github.com/seaweedfs/seaweedfs/weed/filer"
  5. "github.com/seaweedfs/seaweedfs/weed/glog"
  6. "github.com/seaweedfs/seaweedfs/weed/mq/topic"
  7. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  8. "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
  9. "github.com/seaweedfs/seaweedfs/weed/util"
  10. "github.com/seaweedfs/seaweedfs/weed/util/log_buffer"
  11. "google.golang.org/protobuf/proto"
  12. "math"
  13. "sync/atomic"
  14. "time"
  15. )
  16. func (b *MessageQueueBroker) genLogFlushFunc(t topic.Topic, partition *mq_pb.Partition) log_buffer.LogFlushFuncType {
  17. topicDir := fmt.Sprintf("%s/%s/%s", filer.TopicsDir, t.Namespace, t.Name)
  18. partitionGeneration := time.Unix(0, partition.UnixTimeNs).UTC().Format(topic.TIME_FORMAT)
  19. partitionDir := fmt.Sprintf("%s/%s/%04d-%04d", topicDir, partitionGeneration, partition.RangeStart, partition.RangeStop)
  20. return func(logBuffer *log_buffer.LogBuffer, startTime, stopTime time.Time, buf []byte) {
  21. if len(buf) == 0 {
  22. return
  23. }
  24. startTime, stopTime = startTime.UTC(), stopTime.UTC()
  25. targetFile := fmt.Sprintf("%s/%s", partitionDir, startTime.Format(topic.TIME_FORMAT))
  26. // TODO append block with more metadata
  27. for {
  28. if err := b.appendToFile(targetFile, buf); err != nil {
  29. glog.V(0).Infof("metadata log write failed %s: %v", targetFile, err)
  30. time.Sleep(737 * time.Millisecond)
  31. } else {
  32. break
  33. }
  34. }
  35. atomic.StoreInt64(&logBuffer.LastFlushTsNs, stopTime.UnixNano())
  36. b.accessLock.Lock()
  37. defer b.accessLock.Unlock()
  38. p := topic.FromPbPartition(partition)
  39. if localPartition := b.localTopicManager.GetLocalPartition(t, p); localPartition != nil {
  40. localPartition.NotifyLogFlushed(logBuffer.LastFlushTsNs)
  41. }
  42. println("flushing at", logBuffer.LastFlushTsNs, "to", targetFile, "size", len(buf))
  43. }
  44. }
  45. func (b *MessageQueueBroker) genLogOnDiskReadFunc(t topic.Topic, partition *mq_pb.Partition) log_buffer.LogReadFromDiskFuncType {
  46. topicDir := fmt.Sprintf("%s/%s/%s", filer.TopicsDir, t.Namespace, t.Name)
  47. partitionGeneration := time.Unix(0, partition.UnixTimeNs).UTC().Format(topic.TIME_FORMAT)
  48. partitionDir := fmt.Sprintf("%s/%s/%04d-%04d", topicDir, partitionGeneration, partition.RangeStart, partition.RangeStop)
  49. lookupFileIdFn := func(fileId string) (targetUrls []string, err error) {
  50. return b.MasterClient.LookupFileId(fileId)
  51. }
  52. eachChunkFn := func(buf []byte, eachLogEntryFn log_buffer.EachLogEntryFuncType, starTsNs, stopTsNs int64) (processedTsNs int64, err error) {
  53. for pos := 0; pos+4 < len(buf); {
  54. size := util.BytesToUint32(buf[pos : pos+4])
  55. if pos+4+int(size) > len(buf) {
  56. err = fmt.Errorf("LogOnDiskReadFunc: read [%d,%d) from [0,%d)", pos, pos+int(size)+4, len(buf))
  57. return
  58. }
  59. entryData := buf[pos+4 : pos+4+int(size)]
  60. logEntry := &filer_pb.LogEntry{}
  61. if err = proto.Unmarshal(entryData, logEntry); err != nil {
  62. pos += 4 + int(size)
  63. err = fmt.Errorf("unexpected unmarshal mq_pb.Message: %v", err)
  64. return
  65. }
  66. if logEntry.TsNs < starTsNs {
  67. pos += 4 + int(size)
  68. continue
  69. }
  70. if stopTsNs != 0 && logEntry.TsNs > stopTsNs {
  71. println("stopTsNs", stopTsNs, "logEntry.TsNs", logEntry.TsNs)
  72. return
  73. }
  74. if _, err = eachLogEntryFn(logEntry); err != nil {
  75. err = fmt.Errorf("process log entry %v: %v", logEntry, err)
  76. return
  77. }
  78. processedTsNs = logEntry.TsNs
  79. pos += 4 + int(size)
  80. }
  81. return
  82. }
  83. eachFileFn := func(entry *filer_pb.Entry, eachLogEntryFn log_buffer.EachLogEntryFuncType, starTsNs, stopTsNs int64) (processedTsNs int64, err error) {
  84. if len(entry.Content) > 0 {
  85. glog.Warningf("this should not happen. unexpected content in %s/%s", partitionDir, entry.Name)
  86. return
  87. }
  88. var urlStrings []string
  89. for _, chunk := range entry.Chunks {
  90. if chunk.Size == 0 {
  91. continue
  92. }
  93. if chunk.IsChunkManifest {
  94. glog.Warningf("this should not happen. unexpected chunk manifest in %s/%s", partitionDir, entry.Name)
  95. return
  96. }
  97. urlStrings, err = lookupFileIdFn(chunk.FileId)
  98. if err != nil {
  99. err = fmt.Errorf("lookup %s: %v", chunk.FileId, err)
  100. return
  101. }
  102. if len(urlStrings) == 0 {
  103. err = fmt.Errorf("no url found for %s", chunk.FileId)
  104. return
  105. }
  106. // try one of the urlString until util.Get(urlString) succeeds
  107. var processed bool
  108. for _, urlString := range urlStrings {
  109. // TODO optimization opportunity: reuse the buffer
  110. var data []byte
  111. if data, _, err = util.Get(urlString); err == nil {
  112. processed = true
  113. if processedTsNs, err = eachChunkFn(data, eachLogEntryFn, starTsNs, stopTsNs); err != nil {
  114. return
  115. }
  116. break
  117. }
  118. }
  119. if !processed {
  120. err = fmt.Errorf("no data processed for %s %s", entry.Name, chunk.FileId)
  121. return
  122. }
  123. }
  124. return
  125. }
  126. return func(startPosition log_buffer.MessagePosition, stopTsNs int64, eachLogEntryFn log_buffer.EachLogEntryFuncType) (lastReadPosition log_buffer.MessagePosition, isDone bool, err error) {
  127. startFileName := startPosition.UTC().Format(topic.TIME_FORMAT)
  128. startTsNs := startPosition.Time.UnixNano()
  129. stopTime := time.Unix(0, stopTsNs)
  130. var processedTsNs int64
  131. err = b.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
  132. return filer_pb.SeaweedList(client, partitionDir, "", func(entry *filer_pb.Entry, isLast bool) error {
  133. if entry.IsDirectory {
  134. return nil
  135. }
  136. if stopTsNs != 0 && entry.Name > stopTime.UTC().Format(topic.TIME_FORMAT) {
  137. isDone = true
  138. return nil
  139. }
  140. if entry.Name < startPosition.UTC().Format(topic.TIME_FORMAT) {
  141. return nil
  142. }
  143. if processedTsNs, err = eachFileFn(entry, eachLogEntryFn, startTsNs, stopTsNs); err != nil {
  144. return err
  145. }
  146. return nil
  147. }, startFileName, true, math.MaxInt32)
  148. })
  149. lastReadPosition = log_buffer.NewMessagePosition(processedTsNs, -2)
  150. return
  151. }
  152. }