76 lines
1.6 KiB

5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
  1. package msgclient
  2. import (
  3. "crypto/md5"
  4. "hash"
  5. "io"
  6. "log"
  7. "google.golang.org/grpc"
  8. "github.com/chrislusf/seaweedfs/weed/messaging/broker"
  9. "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb"
  10. )
  11. type PubChannel struct {
  12. client messaging_pb.SeaweedMessaging_PublishClient
  13. grpcConnection *grpc.ClientConn
  14. md5hash hash.Hash
  15. }
  16. func (mc *MessagingClient) NewPubChannel(chanName string) (*PubChannel, error) {
  17. tp := broker.TopicPartition{
  18. Namespace: "chan",
  19. Topic: chanName,
  20. Partition: 0,
  21. }
  22. grpcConnection, err := mc.findBroker(tp)
  23. if err != nil {
  24. return nil, err
  25. }
  26. pc, err := setupPublisherClient(grpcConnection, tp)
  27. if err != nil {
  28. return nil, err
  29. }
  30. return &PubChannel{
  31. client: pc,
  32. grpcConnection: grpcConnection,
  33. md5hash: md5.New(),
  34. }, nil
  35. }
  36. func (pc *PubChannel) Publish(m []byte) error {
  37. err := pc.client.Send(&messaging_pb.PublishRequest{
  38. Data: &messaging_pb.Message{
  39. Value: m,
  40. },
  41. })
  42. if err == nil {
  43. pc.md5hash.Write(m)
  44. }
  45. return err
  46. }
  47. func (pc *PubChannel) Close() error {
  48. // println("send closing")
  49. if err := pc.client.Send(&messaging_pb.PublishRequest{
  50. Data: &messaging_pb.Message{
  51. IsClose: true,
  52. },
  53. }); err != nil {
  54. log.Printf("err send close: %v", err)
  55. }
  56. // println("receive closing")
  57. if _, err := pc.client.Recv(); err != nil && err != io.EOF {
  58. log.Printf("err receive close: %v", err)
  59. }
  60. // println("close connection")
  61. if err := pc.grpcConnection.Close(); err != nil {
  62. log.Printf("err connection close: %v", err)
  63. }
  64. return nil
  65. }
  66. func (pc *PubChannel) Md5() []byte {
  67. return pc.md5hash.Sum(nil)
  68. }