You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

178 lines
5.7 KiB

11 months ago
11 months ago
11 months ago
11 months ago
10 months ago
11 months ago
7 months ago
7 months ago
11 months ago
10 months ago
11 months ago
11 months ago
11 months ago
10 months ago
11 months ago
11 months ago
10 months ago
11 months ago
  1. package broker
  2. import (
  3. "fmt"
  4. "github.com/seaweedfs/seaweedfs/weed/filer"
  5. "github.com/seaweedfs/seaweedfs/weed/glog"
  6. "github.com/seaweedfs/seaweedfs/weed/mq/topic"
  7. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  8. "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
  9. "github.com/seaweedfs/seaweedfs/weed/util"
  10. "github.com/seaweedfs/seaweedfs/weed/util/log_buffer"
  11. "google.golang.org/protobuf/proto"
  12. "math"
  13. "sync/atomic"
  14. "time"
  15. util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
  16. )
  17. func (b *MessageQueueBroker) genLogFlushFunc(t topic.Topic, partition *mq_pb.Partition) log_buffer.LogFlushFuncType {
  18. topicDir := fmt.Sprintf("%s/%s/%s", filer.TopicsDir, t.Namespace, t.Name)
  19. partitionGeneration := time.Unix(0, partition.UnixTimeNs).UTC().Format(topic.TIME_FORMAT)
  20. partitionDir := fmt.Sprintf("%s/%s/%04d-%04d", topicDir, partitionGeneration, partition.RangeStart, partition.RangeStop)
  21. return func(logBuffer *log_buffer.LogBuffer, startTime, stopTime time.Time, buf []byte) {
  22. if len(buf) == 0 {
  23. return
  24. }
  25. startTime, stopTime = startTime.UTC(), stopTime.UTC()
  26. targetFile := fmt.Sprintf("%s/%s", partitionDir, startTime.Format(topic.TIME_FORMAT))
  27. // TODO append block with more metadata
  28. for {
  29. if err := b.appendToFile(targetFile, buf); err != nil {
  30. glog.V(0).Infof("metadata log write failed %s: %v", targetFile, err)
  31. time.Sleep(737 * time.Millisecond)
  32. } else {
  33. break
  34. }
  35. }
  36. atomic.StoreInt64(&logBuffer.LastFlushTsNs, stopTime.UnixNano())
  37. b.accessLock.Lock()
  38. defer b.accessLock.Unlock()
  39. p := topic.FromPbPartition(partition)
  40. if localPartition := b.localTopicManager.GetLocalPartition(t, p); localPartition != nil {
  41. localPartition.NotifyLogFlushed(logBuffer.LastFlushTsNs)
  42. }
  43. glog.V(0).Infof("flushing at %d to %s size %d", logBuffer.LastFlushTsNs, targetFile, len(buf))
  44. }
  45. }
  46. func (b *MessageQueueBroker) genLogOnDiskReadFunc(t topic.Topic, partition *mq_pb.Partition) log_buffer.LogReadFromDiskFuncType {
  47. topicDir := fmt.Sprintf("%s/%s/%s", filer.TopicsDir, t.Namespace, t.Name)
  48. partitionGeneration := time.Unix(0, partition.UnixTimeNs).UTC().Format(topic.TIME_FORMAT)
  49. partitionDir := fmt.Sprintf("%s/%s/%04d-%04d", topicDir, partitionGeneration, partition.RangeStart, partition.RangeStop)
  50. lookupFileIdFn := func(fileId string) (targetUrls []string, err error) {
  51. return b.MasterClient.LookupFileId(fileId)
  52. }
  53. eachChunkFn := func(buf []byte, eachLogEntryFn log_buffer.EachLogEntryFuncType, starTsNs, stopTsNs int64) (processedTsNs int64, err error) {
  54. for pos := 0; pos+4 < len(buf); {
  55. size := util.BytesToUint32(buf[pos : pos+4])
  56. if pos+4+int(size) > len(buf) {
  57. err = fmt.Errorf("LogOnDiskReadFunc: read [%d,%d) from [0,%d)", pos, pos+int(size)+4, len(buf))
  58. return
  59. }
  60. entryData := buf[pos+4 : pos+4+int(size)]
  61. logEntry := &filer_pb.LogEntry{}
  62. if err = proto.Unmarshal(entryData, logEntry); err != nil {
  63. pos += 4 + int(size)
  64. err = fmt.Errorf("unexpected unmarshal mq_pb.Message: %v", err)
  65. return
  66. }
  67. if logEntry.TsNs < starTsNs {
  68. pos += 4 + int(size)
  69. continue
  70. }
  71. if stopTsNs != 0 && logEntry.TsNs > stopTsNs {
  72. println("stopTsNs", stopTsNs, "logEntry.TsNs", logEntry.TsNs)
  73. return
  74. }
  75. if _, err = eachLogEntryFn(logEntry); err != nil {
  76. err = fmt.Errorf("process log entry %v: %v", logEntry, err)
  77. return
  78. }
  79. processedTsNs = logEntry.TsNs
  80. pos += 4 + int(size)
  81. }
  82. return
  83. }
  84. eachFileFn := func(entry *filer_pb.Entry, eachLogEntryFn log_buffer.EachLogEntryFuncType, starTsNs, stopTsNs int64) (processedTsNs int64, err error) {
  85. if len(entry.Content) > 0 {
  86. // skip .offset files
  87. return
  88. }
  89. var urlStrings []string
  90. for _, chunk := range entry.Chunks {
  91. if chunk.Size == 0 {
  92. continue
  93. }
  94. if chunk.IsChunkManifest {
  95. glog.Warningf("this should not happen. unexpected chunk manifest in %s/%s", partitionDir, entry.Name)
  96. return
  97. }
  98. urlStrings, err = lookupFileIdFn(chunk.FileId)
  99. if err != nil {
  100. err = fmt.Errorf("lookup %s: %v", chunk.FileId, err)
  101. return
  102. }
  103. if len(urlStrings) == 0 {
  104. err = fmt.Errorf("no url found for %s", chunk.FileId)
  105. return
  106. }
  107. // try one of the urlString until util.Get(urlString) succeeds
  108. var processed bool
  109. for _, urlString := range urlStrings {
  110. // TODO optimization opportunity: reuse the buffer
  111. var data []byte
  112. if data, _, err = util_http.Get(urlString); err == nil {
  113. processed = true
  114. if processedTsNs, err = eachChunkFn(data, eachLogEntryFn, starTsNs, stopTsNs); err != nil {
  115. return
  116. }
  117. break
  118. }
  119. }
  120. if !processed {
  121. err = fmt.Errorf("no data processed for %s %s", entry.Name, chunk.FileId)
  122. return
  123. }
  124. }
  125. return
  126. }
  127. return func(startPosition log_buffer.MessagePosition, stopTsNs int64, eachLogEntryFn log_buffer.EachLogEntryFuncType) (lastReadPosition log_buffer.MessagePosition, isDone bool, err error) {
  128. startFileName := startPosition.UTC().Format(topic.TIME_FORMAT)
  129. startTsNs := startPosition.Time.UnixNano()
  130. stopTime := time.Unix(0, stopTsNs)
  131. var processedTsNs int64
  132. err = b.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
  133. return filer_pb.SeaweedList(client, partitionDir, "", func(entry *filer_pb.Entry, isLast bool) error {
  134. if entry.IsDirectory {
  135. return nil
  136. }
  137. if stopTsNs != 0 && entry.Name > stopTime.UTC().Format(topic.TIME_FORMAT) {
  138. isDone = true
  139. return nil
  140. }
  141. if entry.Name < startPosition.UTC().Format(topic.TIME_FORMAT) {
  142. return nil
  143. }
  144. if processedTsNs, err = eachFileFn(entry, eachLogEntryFn, startTsNs, stopTsNs); err != nil {
  145. return err
  146. }
  147. return nil
  148. }, startFileName, true, math.MaxInt32)
  149. })
  150. lastReadPosition = log_buffer.NewMessagePosition(processedTsNs, -2)
  151. return
  152. }
  153. }