You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

98 lines
2.9 KiB

10 months ago
10 months ago
  1. package broker
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/seaweedfs/seaweedfs/weed/glog"
  6. "github.com/seaweedfs/seaweedfs/weed/pb"
  7. "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
  8. "io"
  9. "math/rand"
  10. "sync"
  11. "time"
  12. )
  13. func (b *MessageQueueBroker) PublishFollowMe(c context.Context, request *mq_pb.PublishFollowMeRequest) (*mq_pb.PublishFollowMeResponse, error) {
  14. glog.V(0).Infof("PublishFollowMe %v", request)
  15. var wg sync.WaitGroup
  16. wg.Add(1)
  17. var ret error
  18. go b.withBrokerClient(true, pb.ServerAddress(request.BrokerSelf), func(client mq_pb.SeaweedMessagingClient) error {
  19. followerId := rand.Int31()
  20. subscribeClient, err := client.FollowInMemoryMessages(context.Background(), &mq_pb.FollowInMemoryMessagesRequest{
  21. Message: &mq_pb.FollowInMemoryMessagesRequest_Init{
  22. Init: &mq_pb.FollowInMemoryMessagesRequest_InitMessage{
  23. ConsumerGroup: string(b.option.BrokerAddress()),
  24. ConsumerId: fmt.Sprintf("followMe@%s-%d", b.option.BrokerAddress(), followerId),
  25. FollowerId: followerId,
  26. Topic: request.Topic,
  27. PartitionOffset: &mq_pb.PartitionOffset{
  28. Partition: request.Partition,
  29. StartTsNs: 0,
  30. StartType: mq_pb.PartitionOffsetStartType_EARLIEST_IN_MEMORY,
  31. },
  32. },
  33. },
  34. })
  35. if err != nil {
  36. glog.Errorf("FollowInMemoryMessages error: %v", err)
  37. ret = err
  38. return err
  39. }
  40. // receive first hello message
  41. resp, err := subscribeClient.Recv()
  42. if err != nil {
  43. return fmt.Errorf("FollowInMemoryMessages recv first message error: %v", err)
  44. }
  45. if resp == nil {
  46. glog.V(0).Infof("doFollowInMemoryMessage recv first message nil response")
  47. return io.ErrUnexpectedEOF
  48. }
  49. wg.Done()
  50. b.doFollowInMemoryMessage(context.Background(), followerId, subscribeClient)
  51. return nil
  52. })
  53. wg.Wait()
  54. return &mq_pb.PublishFollowMeResponse{}, ret
  55. }
  56. func (b *MessageQueueBroker) doFollowInMemoryMessage(c context.Context, followerId int32, client mq_pb.SeaweedMessaging_FollowInMemoryMessagesClient) {
  57. for {
  58. resp, err := client.Recv()
  59. if err != nil {
  60. if err != io.EOF {
  61. glog.V(0).Infof("doFollowInMemoryMessage error: %v", err)
  62. }
  63. return
  64. }
  65. if resp == nil {
  66. glog.V(0).Infof("doFollowInMemoryMessage nil response")
  67. return
  68. }
  69. if resp.Message != nil {
  70. // process ctrl message or data message
  71. switch m := resp.Message.(type) {
  72. case *mq_pb.FollowInMemoryMessagesResponse_Data:
  73. // process data message
  74. print("d")
  75. case *mq_pb.FollowInMemoryMessagesResponse_Ctrl:
  76. // process ctrl message
  77. if m.Ctrl.FlushedSequence > 0 {
  78. flushTime := time.Unix(0, m.Ctrl.FlushedSequence)
  79. glog.V(0).Infof("doFollowInMemoryMessage flushTime: %v", flushTime)
  80. }
  81. if m.Ctrl.FollowerChangedToId != 0 {
  82. // follower changed
  83. glog.V(0).Infof("doFollowInMemoryMessage follower changed from %d to %d", followerId, m.Ctrl.FollowerChangedToId)
  84. return
  85. }
  86. default:
  87. glog.V(0).Infof("doFollowInMemoryMessage unknown message type: %v", m)
  88. }
  89. }
  90. }
  91. }