You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

98 lines
1.8 KiB

5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
  1. package broker
  2. import (
  3. "io"
  4. "time"
  5. "github.com/golang/protobuf/proto"
  6. "github.com/chrislusf/seaweedfs/weed/glog"
  7. "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb"
  8. )
  9. func (broker *MessageBroker) Publish(stream messaging_pb.SeaweedMessaging_PublishServer) error {
  10. // process initial request
  11. in, err := stream.Recv()
  12. if err == io.EOF {
  13. return nil
  14. }
  15. if err != nil {
  16. return err
  17. }
  18. // TODO look it up
  19. topicConfig := &messaging_pb.TopicConfiguration{
  20. IsTransient: true,
  21. }
  22. // send init response
  23. initResponse := &messaging_pb.PublishResponse{
  24. Config: nil,
  25. Redirect: nil,
  26. }
  27. err = stream.Send(initResponse)
  28. if err != nil {
  29. return err
  30. }
  31. if initResponse.Redirect != nil {
  32. return nil
  33. }
  34. // get lock
  35. tp := TopicPartition{
  36. Namespace: in.Init.Namespace,
  37. Topic: in.Init.Topic,
  38. Partition: in.Init.Partition,
  39. }
  40. tl := broker.topicLocks.RequestLock(tp, topicConfig, true)
  41. defer broker.topicLocks.ReleaseLock(tp, true)
  42. updatesChan := make(chan int32)
  43. go func() {
  44. for update := range updatesChan {
  45. if err := stream.Send(&messaging_pb.PublishResponse{
  46. Config: &messaging_pb.PublishResponse_ConfigMessage{
  47. PartitionCount: update,
  48. },
  49. }); err != nil {
  50. glog.V(0).Infof("err sending publish response: %v", err)
  51. return
  52. }
  53. }
  54. }()
  55. // process each message
  56. for {
  57. in, err := stream.Recv()
  58. if err == io.EOF {
  59. return nil
  60. }
  61. if err != nil {
  62. return err
  63. }
  64. if in.Data == nil {
  65. continue
  66. }
  67. m := &messaging_pb.Message{
  68. Timestamp: time.Now().UnixNano(),
  69. Key: in.Data.Key,
  70. Value: in.Data.Value,
  71. Headers: in.Data.Headers,
  72. }
  73. // fmt.Printf("received: %d : %s\n", len(m.Value), string(m.Value))
  74. data, err := proto.Marshal(m)
  75. if err != nil {
  76. glog.Errorf("marshall error: %v\n", err)
  77. continue
  78. }
  79. tl.logBuffer.AddToBuffer(in.Data.Key, data)
  80. }
  81. }