You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

83 lines
2.2 KiB

1 week ago
1 week ago
1 week ago
1 week ago
1 week ago
1 week ago
  1. package agent
  2. import (
  3. "fmt"
  4. "github.com/seaweedfs/seaweedfs/weed/glog"
  5. "github.com/seaweedfs/seaweedfs/weed/mq/client/sub_client"
  6. "github.com/seaweedfs/seaweedfs/weed/pb/mq_agent_pb"
  7. "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
  8. "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
  9. "github.com/seaweedfs/seaweedfs/weed/util"
  10. "google.golang.org/protobuf/proto"
  11. )
  12. func (a *MessageQueueAgent) SubscribeRecord(stream mq_agent_pb.SeaweedMessagingAgent_SubscribeRecordServer) error {
  13. // the first message is the subscribe request
  14. // it should only contain the session id
  15. initMessage, err := stream.Recv()
  16. if err != nil {
  17. return err
  18. }
  19. sessionId := SessionId(initMessage.SessionId)
  20. a.subscribersLock.RLock()
  21. subscriberEntry, found := a.subscribers[sessionId]
  22. a.subscribersLock.RUnlock()
  23. if !found {
  24. return fmt.Errorf("subscribe session id %d not found", sessionId)
  25. }
  26. defer func() {
  27. a.subscribersLock.Lock()
  28. delete(a.subscribers, sessionId)
  29. a.subscribersLock.Unlock()
  30. }()
  31. var lastErr error
  32. executors := util.NewLimitedConcurrentExecutor(int(subscriberEntry.entry.SubscriberConfig.SlidingWindowSize))
  33. subscriberEntry.entry.SetOnDataMessageFn(func(m *mq_pb.SubscribeMessageResponse_Data) {
  34. executors.Execute(func() {
  35. record := &schema_pb.RecordValue{}
  36. err := proto.Unmarshal(m.Data.Value, record)
  37. if err != nil {
  38. glog.V(0).Infof("unmarshal record value: %v", err)
  39. if lastErr == nil {
  40. lastErr = err
  41. }
  42. return
  43. }
  44. if sendErr := stream.Send(&mq_agent_pb.SubscribeRecordResponse{
  45. Key: m.Data.Key,
  46. Value: record,
  47. TsNs: m.Data.TsNs,
  48. }); sendErr != nil {
  49. glog.V(0).Infof("send record: %v", sendErr)
  50. if lastErr == nil {
  51. lastErr = sendErr
  52. }
  53. }
  54. })
  55. })
  56. go func() {
  57. subErr := subscriberEntry.entry.Subscribe()
  58. if subErr != nil {
  59. glog.V(0).Infof("subscriber %d subscribe: %v", sessionId, subErr)
  60. if lastErr == nil {
  61. lastErr = subErr
  62. }
  63. }
  64. }()
  65. for {
  66. m, err := stream.Recv()
  67. if err != nil {
  68. glog.V(0).Infof("subscriber %d receive: %v", sessionId, err)
  69. return err
  70. }
  71. if m != nil {
  72. subscriberEntry.entry.PartitionOffsetChan <- sub_client.KeyedOffset{
  73. Key: m.AckKey,
  74. Offset: m.AckSequence,
  75. }
  76. }
  77. }
  78. }