You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

78 lines
2.4 KiB

  1. package broker
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/seaweedfs/seaweedfs/weed/glog"
  6. "github.com/seaweedfs/seaweedfs/weed/pb"
  7. "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
  8. "io"
  9. "math/rand"
  10. "time"
  11. )
  12. func (b *MessageQueueBroker) PublishFollowMe(c context.Context, request *mq_pb.PublishFollowMeRequest) (*mq_pb.PublishFollowMeResponse, error){
  13. glog.V(0).Infof("PublishFollowMe %v", request)
  14. go b.withBrokerClient(true, pb.ServerAddress(request.BrokerSelf), func(client mq_pb.SeaweedMessagingClient) error {
  15. followerId := rand.Int31()
  16. subscribeClient, err := client.FollowInMemoryMessages(context.Background(), &mq_pb.FollowInMemoryMessagesRequest{
  17. Message: &mq_pb.FollowInMemoryMessagesRequest_Init{
  18. Init: &mq_pb.FollowInMemoryMessagesRequest_InitMessage{
  19. ConsumerGroup: string(b.option.BrokerAddress()),
  20. ConsumerId: fmt.Sprintf("followMe-%d", followerId),
  21. FollowerId: followerId,
  22. Topic: request.Topic,
  23. PartitionOffset: &mq_pb.PartitionOffset{
  24. Partition: request.Partition,
  25. StartTsNs: 0,
  26. StartType: mq_pb.PartitionOffsetStartType_EARLIEST_IN_MEMORY,
  27. },
  28. },
  29. },
  30. })
  31. if err != nil {
  32. glog.Errorf("FollowInMemoryMessages error: %v", err)
  33. return err
  34. }
  35. b.doFollowInMemoryMessage(context.Background(), subscribeClient)
  36. return nil
  37. })
  38. return &mq_pb.PublishFollowMeResponse{}, nil
  39. }
  40. func (b *MessageQueueBroker) doFollowInMemoryMessage(c context.Context, client mq_pb.SeaweedMessaging_FollowInMemoryMessagesClient) {
  41. for {
  42. resp, err := client.Recv()
  43. if err != nil {
  44. if err != io.EOF {
  45. glog.V(0).Infof("doFollowInMemoryMessage error: %v", err)
  46. }
  47. return
  48. }
  49. if resp == nil {
  50. glog.V(0).Infof("doFollowInMemoryMessage nil response")
  51. return
  52. }
  53. if resp.Message != nil {
  54. // process ctrl message or data message
  55. switch m:= resp.Message.(type) {
  56. case *mq_pb.FollowInMemoryMessagesResponse_Data:
  57. // process data message
  58. print("d")
  59. case *mq_pb.FollowInMemoryMessagesResponse_Ctrl:
  60. // process ctrl message
  61. if m.Ctrl.FlushedSequence > 0 {
  62. flushTime := time.Unix(0, m.Ctrl.FlushedSequence)
  63. glog.V(0).Infof("doFollowInMemoryMessage flushTime: %v", flushTime)
  64. }
  65. if m.Ctrl.FollowerChangedToId != 0 {
  66. // follower changed
  67. glog.V(0).Infof("doFollowInMemoryMessage follower changed to %d", m.Ctrl.FollowerChangedToId)
  68. return
  69. }
  70. }
  71. }
  72. }
  73. }