You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

106 lines
2.4 KiB

7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
  1. package wdclient
  2. import (
  3. "context"
  4. "fmt"
  5. "time"
  6. "github.com/chrislusf/seaweedfs/weed/glog"
  7. "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
  8. "github.com/chrislusf/seaweedfs/weed/util"
  9. "math/rand"
  10. )
  11. type MasterClient struct {
  12. ctx context.Context
  13. name string
  14. currentMaster string
  15. masters []string
  16. vidMap
  17. }
  18. func NewMasterClient(ctx context.Context, clientName string, masters []string) *MasterClient {
  19. return &MasterClient{
  20. ctx: ctx,
  21. name: clientName,
  22. masters: masters,
  23. vidMap: newVidMap(),
  24. }
  25. }
  26. func (mc *MasterClient) GetMaster() string {
  27. return mc.currentMaster
  28. }
  29. func (mc *MasterClient) WaitUntilConnected() {
  30. for mc.currentMaster == "" {
  31. time.Sleep(time.Duration(rand.Int31n(200)) * time.Millisecond)
  32. }
  33. }
  34. func (mc *MasterClient) KeepConnectedToMaster() {
  35. glog.V(0).Infof("%s bootstraps with masters %v", mc.name, mc.masters)
  36. for {
  37. mc.tryAllMasters()
  38. time.Sleep(time.Second)
  39. }
  40. }
  41. func (mc *MasterClient) tryAllMasters() {
  42. for _, master := range mc.masters {
  43. glog.V(0).Infof("Connecting to %v", master)
  44. withMasterClient(master, func(client master_pb.SeaweedClient) error {
  45. stream, err := client.KeepConnected(context.Background())
  46. if err != nil {
  47. glog.V(0).Infof("failed to keep connected to %s: %v", master, err)
  48. return err
  49. }
  50. if err = stream.Send(&master_pb.ClientListenRequest{Name: mc.name}); err != nil {
  51. glog.V(0).Infof("failed to send to %s: %v", master, err)
  52. return err
  53. }
  54. for {
  55. if volumeLocation, err := stream.Recv(); err != nil {
  56. glog.V(0).Infof("failed to receive from %s: %v", master, err)
  57. return err
  58. } else {
  59. loc := Location{
  60. Url: volumeLocation.Url,
  61. PublicUrl: volumeLocation.PublicUrl,
  62. }
  63. for _, newVid := range volumeLocation.NewVids {
  64. mc.addLocation(newVid, loc)
  65. }
  66. for _, deletedVid := range volumeLocation.DeletedVids {
  67. mc.deleteLocation(deletedVid, loc)
  68. }
  69. if mc.currentMaster == "" {
  70. glog.V(0).Infof("Connected to %v", master)
  71. mc.currentMaster = master
  72. }
  73. }
  74. }
  75. })
  76. mc.currentMaster = ""
  77. }
  78. }
  79. func withMasterClient(master string, fn func(client master_pb.SeaweedClient) error) error {
  80. grpcConnection, err := util.GrpcDial(master)
  81. if err != nil {
  82. return fmt.Errorf("fail to dial %s: %v", master, err)
  83. }
  84. defer grpcConnection.Close()
  85. client := master_pb.NewSeaweedClient(grpcConnection)
  86. return fn(client)
  87. }