You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

79 lines
2.0 KiB

1 week ago
1 week ago
1 week ago
1 week ago
1 week ago
1 week ago
1 week ago
1 week ago
  1. package agent
  2. import (
  3. "fmt"
  4. "github.com/seaweedfs/seaweedfs/weed/glog"
  5. "github.com/seaweedfs/seaweedfs/weed/mq/client/sub_client"
  6. "github.com/seaweedfs/seaweedfs/weed/pb/mq_agent_pb"
  7. "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
  8. "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
  9. "google.golang.org/protobuf/proto"
  10. "time"
  11. )
  12. func (a *MessageQueueAgent) SubscribeRecord(stream mq_agent_pb.SeaweedMessagingAgent_SubscribeRecordServer) error {
  13. // the first message is the subscribe request
  14. // it should only contain the session id
  15. initMessage, err := stream.Recv()
  16. if err != nil {
  17. return err
  18. }
  19. sessionId := SessionId(initMessage.SessionId)
  20. a.subscribersLock.RLock()
  21. subscriberEntry, found := a.subscribers[sessionId]
  22. a.subscribersLock.RUnlock()
  23. if !found {
  24. return fmt.Errorf("subscribe session id %d not found", sessionId)
  25. }
  26. defer func() {
  27. subscriberEntry.lastActiveTsNs = time.Now().UnixNano()
  28. }()
  29. subscriberEntry.lastActiveTsNs = 0
  30. var lastErr error
  31. subscriberEntry.entry.SetOnDataMessageFn(func(m *mq_pb.SubscribeMessageResponse_Data) {
  32. record := &schema_pb.RecordValue{}
  33. err := proto.Unmarshal(m.Data.Value, record)
  34. if err != nil {
  35. glog.V(0).Infof("unmarshal record value: %v", err)
  36. if lastErr == nil {
  37. lastErr = err
  38. }
  39. return
  40. }
  41. if sendErr := stream.Send(&mq_agent_pb.SubscribeRecordResponse{
  42. Key: m.Data.Key,
  43. Value: record,
  44. TsNs: m.Data.TsNs,
  45. }); sendErr != nil {
  46. glog.V(0).Infof("send record: %v", sendErr)
  47. if lastErr == nil {
  48. lastErr = sendErr
  49. }
  50. }
  51. })
  52. go func() {
  53. subErr := subscriberEntry.entry.Subscribe()
  54. if subErr != nil {
  55. glog.V(0).Infof("subscriber %d subscribe: %v", sessionId, subErr)
  56. if lastErr == nil {
  57. lastErr = subErr
  58. }
  59. }
  60. }()
  61. for {
  62. m, err := stream.Recv()
  63. if err != nil {
  64. glog.V(0).Infof("subscriber %d receive: %v", sessionId, err)
  65. return err
  66. }
  67. if m != nil {
  68. subscriberEntry.entry.PartitionOffsetChan <- sub_client.KeyedOffset{
  69. Key: m.AckKey,
  70. Offset: m.AckSequence,
  71. }
  72. }
  73. }
  74. }