You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

120 lines
2.7 KiB

5 years ago
5 years ago
  1. package msgclient
  2. import (
  3. "context"
  4. "github.com/OneOfOne/xxhash"
  5. "google.golang.org/grpc"
  6. "github.com/chrislusf/seaweedfs/weed/messaging/broker"
  7. "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb"
  8. )
  9. type Publisher struct {
  10. publishClients []messaging_pb.SeaweedMessaging_PublishClient
  11. topicConfiguration *messaging_pb.TopicConfiguration
  12. messageCount uint64
  13. publisherId string
  14. }
  15. /*
  16. func (mc *MessagingClient) NewPublisher(publisherId, namespace, topic string) (*Publisher, error) {
  17. // read topic configuration
  18. topicConfiguration := &messaging_pb.TopicConfiguration{
  19. PartitionCount: 4,
  20. }
  21. publishClients := make([]messaging_pb.SeaweedMessaging_PublishClient, topicConfiguration.PartitionCount)
  22. for i := 0; i < int(topicConfiguration.PartitionCount); i++ {
  23. client, err := setupPublisherClient(broker.TopicPartition{
  24. Namespace: namespace,
  25. Topic: topic,
  26. Partition: int32(i),
  27. })
  28. if err != nil {
  29. return nil, err
  30. }
  31. publishClients[i] = client
  32. }
  33. return &Publisher{
  34. publishClients: publishClients,
  35. topicConfiguration: topicConfiguration,
  36. }, nil
  37. }
  38. */
  39. func setupPublisherClient(grpcConnection *grpc.ClientConn, tp broker.TopicPartition) (messaging_pb.SeaweedMessaging_PublishClient, error) {
  40. stream, err := messaging_pb.NewSeaweedMessagingClient(grpcConnection).Publish(context.Background())
  41. if err != nil {
  42. return nil, err
  43. }
  44. // send init message
  45. err = stream.Send(&messaging_pb.PublishRequest{
  46. Init: &messaging_pb.PublishRequest_InitMessage{
  47. Namespace: tp.Namespace,
  48. Topic: tp.Topic,
  49. Partition: tp.Partition,
  50. },
  51. })
  52. if err != nil {
  53. return nil, err
  54. }
  55. // process init response
  56. initResponse, err := stream.Recv()
  57. if err != nil {
  58. return nil, err
  59. }
  60. if initResponse.Redirect != nil {
  61. // TODO follow redirection
  62. }
  63. if initResponse.Config != nil {
  64. }
  65. // setup looks for control messages
  66. doneChan := make(chan error, 1)
  67. go func() {
  68. for {
  69. in, err := stream.Recv()
  70. if err != nil {
  71. doneChan <- err
  72. return
  73. }
  74. if in.Redirect != nil {
  75. }
  76. if in.Config != nil {
  77. }
  78. }
  79. }()
  80. return stream, nil
  81. }
  82. func (p *Publisher) Publish(m *messaging_pb.Message) error {
  83. hashValue := p.messageCount
  84. p.messageCount++
  85. if p.topicConfiguration.Partitoning == messaging_pb.TopicConfiguration_NonNullKeyHash {
  86. if m.Key != nil {
  87. hashValue = xxhash.Checksum64(m.Key)
  88. }
  89. } else if p.topicConfiguration.Partitoning == messaging_pb.TopicConfiguration_KeyHash {
  90. hashValue = xxhash.Checksum64(m.Key)
  91. } else {
  92. // round robin
  93. }
  94. idx := int(hashValue) % len(p.publishClients)
  95. if idx < 0 {
  96. idx += len(p.publishClients)
  97. }
  98. return p.publishClients[idx].Send(&messaging_pb.PublishRequest{
  99. Data: m,
  100. })
  101. }
  102. func (p *Publisher) Shutdown() {
  103. for _, client := range p.publishClients {
  104. client.CloseSend()
  105. }
  106. }