You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

95 lines
2.3 KiB

7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
  1. package wdclient
  2. import (
  3. "context"
  4. "fmt"
  5. "time"
  6. "github.com/chrislusf/seaweedfs/weed/glog"
  7. "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
  8. "github.com/chrislusf/seaweedfs/weed/util"
  9. )
  10. type MasterClient struct {
  11. ctx context.Context
  12. name string
  13. currentMaster string
  14. masters []string
  15. vidMap
  16. }
  17. func NewMasterClient(ctx context.Context, clientName string, masters []string) *MasterClient {
  18. return &MasterClient{
  19. ctx: ctx,
  20. name: clientName,
  21. masters: masters,
  22. vidMap: newVidMap(),
  23. }
  24. }
  25. func (mc *MasterClient) GetMaster() string {
  26. return mc.currentMaster
  27. }
  28. func (mc *MasterClient) KeepConnectedToMaster() {
  29. glog.V(0).Infof("%s bootstraps with masters %v", mc.name, mc.masters)
  30. for {
  31. mc.tryAllMasters()
  32. time.Sleep(time.Second)
  33. }
  34. }
  35. func (mc *MasterClient) tryAllMasters() {
  36. for _, master := range mc.masters {
  37. glog.V(0).Infof("Connecting to %v", master)
  38. withMasterClient(master, func(client master_pb.SeaweedClient) error {
  39. stream, err := client.KeepConnected(context.Background())
  40. if err != nil {
  41. glog.V(0).Infof("failed to keep connected to %s: %v", master, err)
  42. return err
  43. }
  44. glog.V(0).Infof("Connected to %v", master)
  45. mc.currentMaster = master
  46. if err = stream.Send(&master_pb.ClientListenRequest{Name: mc.name}); err != nil {
  47. glog.V(0).Infof("failed to send to %s: %v", master, err)
  48. return err
  49. }
  50. for {
  51. if volumeLocation, err := stream.Recv(); err != nil {
  52. glog.V(0).Infof("failed to receive from %s: %v", master, err)
  53. return err
  54. } else {
  55. glog.V(0).Infof("volume location: %+v", volumeLocation)
  56. loc := Location{
  57. Url: volumeLocation.Url,
  58. PublicUrl: volumeLocation.PublicUrl,
  59. }
  60. for _, newVid := range volumeLocation.NewVids {
  61. mc.addLocation(newVid, loc)
  62. }
  63. for _, deletedVid := range volumeLocation.DeletedVids {
  64. mc.deleteLocation(deletedVid, loc)
  65. }
  66. }
  67. }
  68. })
  69. mc.currentMaster = ""
  70. }
  71. }
  72. func withMasterClient(master string, fn func(client master_pb.SeaweedClient) error) error {
  73. grpcConnection, err := util.GrpcDial(master)
  74. if err != nil {
  75. return fmt.Errorf("fail to dial %s: %v", master, err)
  76. }
  77. defer grpcConnection.Close()
  78. client := master_pb.NewSeaweedClient(grpcConnection)
  79. return fn(client)
  80. }