You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

115 lines
2.7 KiB

7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
  1. package wdclient
  2. import (
  3. "context"
  4. "fmt"
  5. "time"
  6. "github.com/chrislusf/seaweedfs/weed/glog"
  7. "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
  8. "github.com/chrislusf/seaweedfs/weed/util"
  9. "math/rand"
  10. )
  11. type MasterClient struct {
  12. ctx context.Context
  13. name string
  14. currentMaster string
  15. masters []string
  16. vidMap
  17. }
  18. func NewMasterClient(ctx context.Context, clientName string, masters []string) *MasterClient {
  19. return &MasterClient{
  20. ctx: ctx,
  21. name: clientName,
  22. masters: masters,
  23. vidMap: newVidMap(),
  24. }
  25. }
  26. func (mc *MasterClient) GetMaster() string {
  27. return mc.currentMaster
  28. }
  29. func (mc *MasterClient) WaitUntilConnected() {
  30. for mc.currentMaster == "" {
  31. time.Sleep(time.Duration(rand.Int31n(200)) * time.Millisecond)
  32. }
  33. }
  34. func (mc *MasterClient) KeepConnectedToMaster() {
  35. glog.V(0).Infof("%s bootstraps with masters %v", mc.name, mc.masters)
  36. for {
  37. mc.tryAllMasters()
  38. time.Sleep(time.Second)
  39. }
  40. }
  41. func (mc *MasterClient) tryAllMasters() {
  42. for _, master := range mc.masters {
  43. glog.V(0).Infof("Connecting to master %v", master)
  44. gprcErr := withMasterClient(master, func(client master_pb.SeaweedClient) error {
  45. stream, err := client.KeepConnected(context.Background())
  46. if err != nil {
  47. glog.V(0).Infof("failed to keep connected to %s: %v", master, err)
  48. return err
  49. }
  50. if err = stream.Send(&master_pb.ClientListenRequest{Name: mc.name}); err != nil {
  51. glog.V(0).Infof("failed to send to %s: %v", master, err)
  52. return err
  53. }
  54. if mc.currentMaster == "" {
  55. glog.V(0).Infof("Connected to %v", master)
  56. mc.currentMaster = master
  57. }
  58. for {
  59. if volumeLocation, err := stream.Recv(); err != nil {
  60. glog.V(0).Infof("failed to receive from %s: %v", master, err)
  61. return err
  62. } else {
  63. loc := Location{
  64. Url: volumeLocation.Url,
  65. PublicUrl: volumeLocation.PublicUrl,
  66. }
  67. for _, newVid := range volumeLocation.NewVids {
  68. mc.addLocation(newVid, loc)
  69. }
  70. for _, deletedVid := range volumeLocation.DeletedVids {
  71. mc.deleteLocation(deletedVid, loc)
  72. }
  73. }
  74. }
  75. })
  76. if gprcErr != nil {
  77. glog.V(0).Infof("%s failed to connect with master %v: %v", mc.name, master, gprcErr)
  78. }
  79. mc.currentMaster = ""
  80. }
  81. }
  82. func withMasterClient(master string, fn func(client master_pb.SeaweedClient) error) error {
  83. masterGrpcAddress, parseErr := util.ParseServerToGrpcAddress(master, 0)
  84. if parseErr != nil {
  85. return fmt.Errorf("failed to parse master grpc %v", master)
  86. }
  87. grpcConnection, err := util.GrpcDial(masterGrpcAddress)
  88. if err != nil {
  89. return fmt.Errorf("fail to dial %s: %v", master, err)
  90. }
  91. defer grpcConnection.Close()
  92. client := master_pb.NewSeaweedClient(grpcConnection)
  93. return fn(client)
  94. }