You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

120 lines
2.9 KiB

5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
  1. package messaging
  2. import (
  3. "context"
  4. "fmt"
  5. "time"
  6. "google.golang.org/grpc"
  7. "github.com/chrislusf/seaweedfs/weed/pb"
  8. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  9. "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
  10. )
  11. type MessageBrokerOption struct {
  12. Filers []string
  13. DefaultReplication string
  14. MaxMB int
  15. Port int
  16. Cipher bool
  17. }
  18. type MessageBroker struct {
  19. option *MessageBrokerOption
  20. grpcDialOption grpc.DialOption
  21. }
  22. func NewMessageBroker(option *MessageBrokerOption, grpcDialOption grpc.DialOption) (messageBroker *MessageBroker, err error) {
  23. messageBroker = &MessageBroker{
  24. option: option,
  25. grpcDialOption: grpcDialOption,
  26. }
  27. go messageBroker.loopForEver()
  28. return messageBroker, nil
  29. }
  30. func (broker *MessageBroker) loopForEver() {
  31. for {
  32. broker.checkPeers()
  33. time.Sleep(3 * time.Second)
  34. }
  35. }
  36. func (broker *MessageBroker) checkPeers() {
  37. // contact a filer about masters
  38. var masters []string
  39. for _, filer := range broker.option.Filers {
  40. err := broker.withFilerClient(filer, func(client filer_pb.SeaweedFilerClient) error {
  41. resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{})
  42. if err != nil {
  43. return err
  44. }
  45. masters = append(masters, resp.Masters...)
  46. return nil
  47. })
  48. if err != nil {
  49. fmt.Printf("failed to read masters from %+v: %v\n", broker.option.Filers, err)
  50. return
  51. }
  52. }
  53. // contact each masters for filers
  54. var filers []string
  55. for _, master := range masters {
  56. err := broker.withMasterClient(master, func(client master_pb.SeaweedClient) error {
  57. resp, err := client.ListMasterClients(context.Background(), &master_pb.ListMasterClientsRequest{
  58. ClientType: "filer",
  59. })
  60. if err != nil {
  61. return err
  62. }
  63. fmt.Printf("filers: %+v\n", resp.GrpcAddresses)
  64. filers = append(filers, resp.GrpcAddresses...)
  65. return nil
  66. })
  67. if err != nil {
  68. fmt.Printf("failed to list filers: %v\n", err)
  69. return
  70. }
  71. }
  72. // contact each filer about brokers
  73. for _, filer := range filers {
  74. err := broker.withFilerClient(filer, func(client filer_pb.SeaweedFilerClient) error {
  75. resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{})
  76. if err != nil {
  77. return err
  78. }
  79. masters = append(masters, resp.Masters...)
  80. return nil
  81. })
  82. if err != nil {
  83. fmt.Printf("failed to read masters from %+v: %v\n", broker.option.Filers, err)
  84. return
  85. }
  86. }
  87. }
  88. func (broker *MessageBroker) withFilerClient(filer string, fn func(filer_pb.SeaweedFilerClient) error) error {
  89. return pb.WithFilerClient(filer, broker.grpcDialOption, fn)
  90. }
  91. func (broker *MessageBroker) withMasterClient(master string, fn func(client master_pb.SeaweedClient) error) error {
  92. return pb.WithMasterClient(master, broker.grpcDialOption, func(client master_pb.SeaweedClient) error {
  93. return fn(client)
  94. })
  95. }