You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

44 lines
1.3 KiB

1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
  1. package broker
  2. import (
  3. "fmt"
  4. "github.com/seaweedfs/seaweedfs/weed/glog"
  5. "github.com/seaweedfs/seaweedfs/weed/mq/topic"
  6. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  7. "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
  8. "time"
  9. )
  10. func (broker *MessageQueueBroker) Subscribe(req *mq_pb.SubscribeRequest, stream mq_pb.SeaweedMessaging_SubscribeServer) error {
  11. localTopicPartition := broker.localTopicManager.GetTopicPartition(topic.FromPbTopic(req.GetInit().Topic),
  12. topic.FromPbPartition(req.GetInit().Partition))
  13. if localTopicPartition == nil {
  14. stream.Send(&mq_pb.SubscribeResponse{
  15. Message: &mq_pb.SubscribeResponse_Ctrl{
  16. Ctrl: &mq_pb.SubscribeResponse_CtrlMessage{
  17. Error: "not initialized",
  18. },
  19. },
  20. })
  21. return nil
  22. }
  23. clientName := fmt.Sprintf("%s/%s-%s", req.GetInit().ConsumerGroup, req.GetInit().ConsumerId, req.GetInit().ClientId)
  24. localTopicPartition.Subscribe(clientName, time.Now(), func(logEntry *filer_pb.LogEntry) error {
  25. value := logEntry.GetData()
  26. if err := stream.Send(&mq_pb.SubscribeResponse{Message: &mq_pb.SubscribeResponse_Data{
  27. Data: &mq_pb.DataMessage{
  28. Key: []byte(fmt.Sprintf("key-%d", logEntry.PartitionKeyHash)),
  29. Value: value,
  30. },
  31. }}); err != nil {
  32. glog.Errorf("Error sending setup response: %v", err)
  33. return err
  34. }
  35. return nil
  36. })
  37. return nil
  38. }