You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

100 lines
3.3 KiB

8 months ago
  1. package broker
  2. import (
  3. "fmt"
  4. "github.com/seaweedfs/seaweedfs/weed/filer"
  5. "github.com/seaweedfs/seaweedfs/weed/glog"
  6. "github.com/seaweedfs/seaweedfs/weed/mq/topic"
  7. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  8. "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
  9. "github.com/seaweedfs/seaweedfs/weed/util"
  10. "io"
  11. "time"
  12. )
  13. func (b *MessageQueueBroker) SubscribeFollowMe(stream mq_pb.SeaweedMessaging_SubscribeFollowMeServer) (err error) {
  14. var req *mq_pb.SubscribeFollowMeRequest
  15. req, err = stream.Recv()
  16. if err != nil {
  17. return err
  18. }
  19. initMessage := req.GetInit()
  20. if initMessage == nil {
  21. return fmt.Errorf("missing init message")
  22. }
  23. // create an in-memory offset
  24. var lastOffset int64
  25. // follow each published messages
  26. for {
  27. // receive a message
  28. req, err = stream.Recv()
  29. if err != nil {
  30. if err == io.EOF {
  31. err = nil
  32. break
  33. }
  34. glog.V(0).Infof("topic %v partition %v subscribe stream error: %v", initMessage.Topic, initMessage.Partition, err)
  35. break
  36. }
  37. // Process the received message
  38. if ackMessage := req.GetAck(); ackMessage != nil {
  39. lastOffset = ackMessage.TsNs
  40. // println("offset", lastOffset)
  41. } else if closeMessage := req.GetClose(); closeMessage != nil {
  42. glog.V(0).Infof("topic %v partition %v subscribe stream closed: %v", initMessage.Topic, initMessage.Partition, closeMessage)
  43. return nil
  44. } else {
  45. glog.Errorf("unknown message: %v", req)
  46. }
  47. }
  48. t, p := topic.FromPbTopic(initMessage.Topic), topic.FromPbPartition(initMessage.Partition)
  49. err = b.saveConsumerGroupOffset(t, p, initMessage.ConsumerGroup, lastOffset)
  50. glog.V(0).Infof("shut down follower for %v", initMessage)
  51. return err
  52. }
  53. func (b *MessageQueueBroker) readConsumerGroupOffset(initMessage *mq_pb.SubscribeMessageRequest_InitMessage) (offset int64, err error) {
  54. t, p := topic.FromPbTopic(initMessage.Topic), topic.FromPbPartition(initMessage.PartitionOffset.Partition)
  55. topicDir := fmt.Sprintf("%s/%s/%s", filer.TopicsDir, t.Namespace, t.Name)
  56. partitionGeneration := time.Unix(0, p.UnixTimeNs).UTC().Format(topic.TIME_FORMAT)
  57. partitionDir := fmt.Sprintf("%s/%s/%04d-%04d", topicDir, partitionGeneration, p.RangeStart, p.RangeStop)
  58. offsetFileName := fmt.Sprintf("%s.offset", initMessage.ConsumerGroup)
  59. err = b.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
  60. data, err := filer.ReadInsideFiler(client, partitionDir, offsetFileName)
  61. if err != nil {
  62. return err
  63. }
  64. if len(data) != 8 {
  65. return fmt.Errorf("no offset found")
  66. }
  67. offset = int64(util.BytesToUint64(data))
  68. return nil
  69. })
  70. return offset, err
  71. }
  72. func (b *MessageQueueBroker) saveConsumerGroupOffset(t topic.Topic, p topic.Partition, consumerGroup string, offset int64) error {
  73. topicDir := fmt.Sprintf("%s/%s/%s", filer.TopicsDir, t.Namespace, t.Name)
  74. partitionGeneration := time.Unix(0, p.UnixTimeNs).UTC().Format(topic.TIME_FORMAT)
  75. partitionDir := fmt.Sprintf("%s/%s/%04d-%04d", topicDir, partitionGeneration, p.RangeStart, p.RangeStop)
  76. offsetFileName := fmt.Sprintf("%s.offset", consumerGroup)
  77. offsetBytes := make([]byte, 8)
  78. util.Uint64toBytes(offsetBytes, uint64(offset))
  79. return b.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
  80. glog.V(0).Infof("saving topic %s partition %v consumer group %s offset %d", t, p, consumerGroup, offset)
  81. return filer.SaveInsideFiler(client, partitionDir, offsetFileName, offsetBytes)
  82. })
  83. }