You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

47 lines
1.1 KiB

3 years ago
2 years ago
3 years ago
2 years ago
3 years ago
  1. package broker
  2. import (
  3. "github.com/seaweedfs/seaweedfs/weed/pb/message_fbs"
  4. "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
  5. )
  6. /*
  7. The messages is buffered in memory, and saved to filer under
  8. /topics/<topic>/<date>/<hour>/<segment>/*.msg
  9. /topics/<topic>/<date>/<hour>/segment
  10. /topics/<topic>/info/segment_<id>.meta
  11. */
  12. func (broker *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_PublishMessageServer) error {
  13. println("connected")
  14. for {
  15. request, recvErr := stream.Recv()
  16. if recvErr != nil {
  17. return recvErr
  18. }
  19. print(">")
  20. if request.Control != nil {
  21. }
  22. if request.Data != nil {
  23. if err := broker.processDataMessage(stream, request.Data); err != nil {
  24. return err
  25. }
  26. }
  27. }
  28. return nil
  29. }
  30. func (broker *MessageQueueBroker) processDataMessage(stream mq_pb.SeaweedMessaging_PublishMessageServer, data *mq_pb.PublishRequest_DataMessage) error {
  31. mb := message_fbs.GetRootAsMessageBatch(data.Message, 0)
  32. println("message count:", mb.MessagesLength(), len(data.Message))
  33. m := &message_fbs.Message{}
  34. for i := 0; i < mb.MessagesLength(); i++ {
  35. mb.Messages(m, i)
  36. println(i, ">", string(m.Data()))
  37. }
  38. return nil
  39. }