You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

49 lines
1.4 KiB

  1. package broker
  2. import (
  3. "fmt"
  4. "github.com/seaweedfs/seaweedfs/weed/glog"
  5. "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
  6. "io"
  7. )
  8. func (b *MessageQueueBroker) PublishFollowMe(stream mq_pb.SeaweedMessaging_PublishFollowMeServer) error {
  9. req, err := stream.Recv()
  10. if err != nil {
  11. return err
  12. }
  13. initMessage := req.GetInit()
  14. if initMessage == nil {
  15. return fmt.Errorf("missing init message")
  16. }
  17. // t, p := topic.FromPbTopic(initMessage.Topic), topic.FromPbPartition(initMessage.Partition)
  18. // follow each published messages
  19. for {
  20. // receive a message
  21. req, err := stream.Recv()
  22. if err != nil {
  23. if err == io.EOF {
  24. break
  25. }
  26. glog.V(0).Infof("topic %v partition %v publish stream error: %v", initMessage.Topic, initMessage.Partition, err)
  27. return err
  28. }
  29. // Process the received message
  30. if dataMessage := req.GetData(); dataMessage != nil {
  31. // send back the ack
  32. if err := stream.Send(&mq_pb.PublishFollowMeResponse{
  33. AckTsNs: dataMessage.TsNs,
  34. }); err != nil {
  35. // TODO save un-acked messages to disk
  36. glog.Errorf("Error sending response %v: %v", dataMessage, err)
  37. }
  38. println("ack", string(dataMessage.Key), dataMessage.TsNs)
  39. } else if closeMessage := req.GetClose(); closeMessage != nil {
  40. glog.V(0).Infof("topic %v partition %v publish stream closed: %v", initMessage.Topic, initMessage.Partition, closeMessage)
  41. break
  42. }
  43. }
  44. return nil
  45. }