You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

179 lines
5.0 KiB

5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
  1. package broker
  2. import (
  3. "fmt"
  4. "github.com/chrislusf/seaweedfs/weed/util"
  5. "github.com/chrislusf/seaweedfs/weed/util/log_buffer"
  6. "io"
  7. "strings"
  8. "time"
  9. "github.com/golang/protobuf/proto"
  10. "github.com/chrislusf/seaweedfs/weed/filer"
  11. "github.com/chrislusf/seaweedfs/weed/glog"
  12. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  13. "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb"
  14. )
  15. func (broker *MessageBroker) Subscribe(stream messaging_pb.SeaweedMessaging_SubscribeServer) error {
  16. // process initial request
  17. in, err := stream.Recv()
  18. if err == io.EOF {
  19. return nil
  20. }
  21. if err != nil {
  22. return err
  23. }
  24. var processedTsNs int64
  25. var messageCount int64
  26. subscriberId := in.Init.SubscriberId
  27. // TODO look it up
  28. topicConfig := &messaging_pb.TopicConfiguration{
  29. // IsTransient: true,
  30. }
  31. // get lock
  32. tp := TopicPartition{
  33. Namespace: in.Init.Namespace,
  34. Topic: in.Init.Topic,
  35. Partition: in.Init.Partition,
  36. }
  37. fmt.Printf("+ subscriber %s for %s\n", subscriberId, tp.String())
  38. defer func() {
  39. fmt.Printf("- subscriber %s for %s %d messages last %v\n", subscriberId, tp.String(), messageCount, time.Unix(0, processedTsNs))
  40. }()
  41. lock := broker.topicManager.RequestLock(tp, topicConfig, false)
  42. defer broker.topicManager.ReleaseLock(tp, false)
  43. isConnected := true
  44. go func() {
  45. for isConnected {
  46. if _, err := stream.Recv(); err != nil {
  47. // println("disconnecting connection to", subscriberId, tp.String())
  48. isConnected = false
  49. lock.cond.Signal()
  50. }
  51. }
  52. }()
  53. lastReadTime := time.Now()
  54. switch in.Init.StartPosition {
  55. case messaging_pb.SubscriberMessage_InitMessage_TIMESTAMP:
  56. lastReadTime = time.Unix(0, in.Init.TimestampNs)
  57. case messaging_pb.SubscriberMessage_InitMessage_LATEST:
  58. case messaging_pb.SubscriberMessage_InitMessage_EARLIEST:
  59. lastReadTime = time.Unix(0, 0)
  60. }
  61. // how to process each message
  62. // an error returned will end the subscription
  63. eachMessageFn := func(m *messaging_pb.Message) error {
  64. err := stream.Send(&messaging_pb.BrokerMessage{
  65. Data: m,
  66. })
  67. if err != nil {
  68. glog.V(0).Infof("=> subscriber %v: %+v", subscriberId, err)
  69. }
  70. return err
  71. }
  72. eachLogEntryFn := func(logEntry *filer_pb.LogEntry) error {
  73. m := &messaging_pb.Message{}
  74. if err = proto.Unmarshal(logEntry.Data, m); err != nil {
  75. glog.Errorf("unexpected unmarshal messaging_pb.Message: %v", err)
  76. return err
  77. }
  78. // fmt.Printf("sending : %d bytes ts %d\n", len(m.Value), logEntry.TsNs)
  79. if err = eachMessageFn(m); err != nil {
  80. glog.Errorf("sending %d bytes to %s: %s", len(m.Value), subscriberId, err)
  81. return err
  82. }
  83. if m.IsClose {
  84. // println("processed EOF")
  85. return io.EOF
  86. }
  87. processedTsNs = logEntry.TsNs
  88. messageCount++
  89. return nil
  90. }
  91. // fmt.Printf("subscriber %s read %d on disk log %v\n", subscriberId, messageCount, lastReadTime)
  92. for {
  93. if err = broker.readPersistedLogBuffer(&tp, lastReadTime, eachLogEntryFn); err != nil {
  94. if err != io.EOF {
  95. // println("stopping from persisted logs", err.Error())
  96. return err
  97. }
  98. }
  99. if processedTsNs != 0 {
  100. lastReadTime = time.Unix(0, processedTsNs)
  101. }
  102. lastReadTime, err = lock.logBuffer.LoopProcessLogData("broker", lastReadTime, func() bool {
  103. lock.Mutex.Lock()
  104. lock.cond.Wait()
  105. lock.Mutex.Unlock()
  106. return isConnected
  107. }, eachLogEntryFn)
  108. if err != nil {
  109. if err == log_buffer.ResumeFromDiskError {
  110. continue
  111. }
  112. glog.Errorf("processed to %v: %v", lastReadTime, err)
  113. time.Sleep(3127 * time.Millisecond)
  114. if err != log_buffer.ResumeError {
  115. break
  116. }
  117. }
  118. }
  119. return err
  120. }
  121. func (broker *MessageBroker) readPersistedLogBuffer(tp *TopicPartition, startTime time.Time, eachLogEntryFn func(logEntry *filer_pb.LogEntry) error) (err error) {
  122. startTime = startTime.UTC()
  123. startDate := fmt.Sprintf("%04d-%02d-%02d", startTime.Year(), startTime.Month(), startTime.Day())
  124. startHourMinute := fmt.Sprintf("%02d-%02d", startTime.Hour(), startTime.Minute())
  125. sizeBuf := make([]byte, 4)
  126. startTsNs := startTime.UnixNano()
  127. topicDir := genTopicDir(tp.Namespace, tp.Topic)
  128. partitionSuffix := fmt.Sprintf(".part%02d", tp.Partition)
  129. return filer_pb.List(broker, topicDir, "", func(dayEntry *filer_pb.Entry, isLast bool) error {
  130. dayDir := fmt.Sprintf("%s/%s", topicDir, dayEntry.Name)
  131. return filer_pb.List(broker, dayDir, "", func(hourMinuteEntry *filer_pb.Entry, isLast bool) error {
  132. if dayEntry.Name == startDate {
  133. hourMinute := util.FileNameBase(hourMinuteEntry.Name)
  134. if strings.Compare(hourMinute, startHourMinute) < 0 {
  135. return nil
  136. }
  137. }
  138. if !strings.HasSuffix(hourMinuteEntry.Name, partitionSuffix) {
  139. return nil
  140. }
  141. // println("partition", tp.Partition, "processing", dayDir, "/", hourMinuteEntry.Name)
  142. chunkedFileReader := filer.NewChunkStreamReader(broker, hourMinuteEntry.Chunks)
  143. defer chunkedFileReader.Close()
  144. if _, err := filer.ReadEachLogEntry(chunkedFileReader, sizeBuf, startTsNs, eachLogEntryFn); err != nil {
  145. chunkedFileReader.Close()
  146. if err == io.EOF {
  147. return err
  148. }
  149. return fmt.Errorf("reading %s/%s: %v", dayDir, hourMinuteEntry.Name, err)
  150. }
  151. return nil
  152. }, "", false, 24*60)
  153. }, startDate, true, 366)
  154. }