You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

138 lines
4.6 KiB

  1. package command
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/aws/aws-sdk-go/aws"
  6. "github.com/chrislusf/seaweedfs/weed/glog"
  7. "github.com/chrislusf/seaweedfs/weed/pb"
  8. "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
  9. "github.com/chrislusf/seaweedfs/weed/security"
  10. "github.com/chrislusf/seaweedfs/weed/server"
  11. "github.com/chrislusf/seaweedfs/weed/util"
  12. "github.com/gorilla/mux"
  13. "google.golang.org/grpc/reflection"
  14. "net/http"
  15. "time"
  16. )
  17. var (
  18. mf MasterOptions
  19. )
  20. func init() {
  21. cmdMasterFollower.Run = runMasterFollower // break init cycle
  22. mf.port = cmdMasterFollower.Flag.Int("port", 9334, "http listen port")
  23. mf.portGrpc = cmdMasterFollower.Flag.Int("port.grpc", 19334, "grpc listen port")
  24. mf.ipBind = cmdMasterFollower.Flag.String("ip.bind", "", "ip address to bind to")
  25. mf.peers = cmdMasterFollower.Flag.String("masters", "localhost:9333", "all master nodes in comma separated ip:port list, example: 127.0.0.1:9093,127.0.0.1:9094,127.0.0.1:9095")
  26. mf.ip = aws.String(util.DetectedHostAddress())
  27. mf.metaFolder = aws.String("")
  28. mf.volumeSizeLimitMB = nil
  29. mf.volumePreallocate = nil
  30. mf.defaultReplication = nil
  31. mf.garbageThreshold = aws.Float64(0.1)
  32. mf.whiteList = nil
  33. mf.disableHttp = aws.Bool(false)
  34. mf.metricsAddress = aws.String("")
  35. mf.metricsIntervalSec = aws.Int(0)
  36. mf.raftResumeState = aws.Bool(false)
  37. }
  38. var cmdMasterFollower = &Command{
  39. UsageLine: "master.follower -port=9333 -masters=<master1Host>:<master1Port>",
  40. Short: "start a master follower",
  41. Long: `start a master follower to provide volume=>location mapping service
  42. The master follower does not participate in master election.
  43. It just follow the existing masters, and listen for any volume location changes.
  44. In most cases, the master follower is not needed. In big data centers with thousands of volume
  45. servers. In theory, the master may have trouble to keep up with the write requests and read requests.
  46. The master follower can relieve the master from from read requests, which only needs to
  47. lookup a fileId or volumeId.
  48. The master follower currently can handle fileId lookup requests:
  49. /dir/lookup?volumeId=4
  50. /dir/lookup?fileId=4,49c50924569199
  51. And gRPC API
  52. rpc LookupVolume (LookupVolumeRequest) returns (LookupVolumeResponse) {}
  53. This master follower is stateless and can run from any place.
  54. `,
  55. }
  56. func runMasterFollower(cmd *Command, args []string) bool {
  57. util.LoadConfiguration("security", false)
  58. util.LoadConfiguration("master", false)
  59. startMasterFollower(mf)
  60. return true
  61. }
  62. func startMasterFollower(masterOptions MasterOptions) {
  63. // collect settings from main masters
  64. masters := pb.ServerAddresses(*mf.peers).ToAddresses()
  65. var err error
  66. grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.master")
  67. for i := 0; i < 10; i++ {
  68. err = pb.WithOneOfGrpcMasterClients(masters, grpcDialOption, func(client master_pb.SeaweedClient) error {
  69. resp, err := client.GetMasterConfiguration(context.Background(), &master_pb.GetMasterConfigurationRequest{})
  70. if err != nil {
  71. return fmt.Errorf("get master grpc address %v configuration: %v", masters, err)
  72. }
  73. masterOptions.defaultReplication = &resp.DefaultReplication
  74. masterOptions.volumeSizeLimitMB = aws.Uint(uint(resp.VolumeSizeLimitMB))
  75. masterOptions.volumePreallocate = &resp.VolumePreallocate
  76. return nil
  77. })
  78. if err != nil {
  79. glog.V(0).Infof("failed to talk to filer %v: %v", masters, err)
  80. glog.V(0).Infof("wait for %d seconds ...", i+1)
  81. time.Sleep(time.Duration(i+1) * time.Second)
  82. }
  83. }
  84. if err != nil {
  85. glog.Errorf("failed to talk to filer %v: %v", masters, err)
  86. return
  87. }
  88. option := masterOptions.toMasterOption(nil)
  89. option.IsFollower = true
  90. r := mux.NewRouter()
  91. ms := weed_server.NewMasterServer(r, option, masters)
  92. listeningAddress := util.JoinHostPort(*masterOptions.ipBind, *masterOptions.port)
  93. glog.V(0).Infof("Start Seaweed Master %s at %s", util.Version(), listeningAddress)
  94. masterListener, e := util.NewListener(listeningAddress, 0)
  95. if e != nil {
  96. glog.Fatalf("Master startup error: %v", e)
  97. }
  98. // starting grpc server
  99. grpcPort := *masterOptions.portGrpc
  100. grpcL, err := util.NewListener(util.JoinHostPort(*masterOptions.ipBind, grpcPort), 0)
  101. if err != nil {
  102. glog.Fatalf("master failed to listen on grpc port %d: %v", grpcPort, err)
  103. }
  104. grpcS := pb.NewGrpcServer(security.LoadServerTLS(util.GetViper(), "grpc.master"))
  105. master_pb.RegisterSeaweedServer(grpcS, ms)
  106. reflection.Register(grpcS)
  107. glog.V(0).Infof("Start Seaweed Master %s grpc server at %s:%d", util.Version(), *masterOptions.ip, grpcPort)
  108. go grpcS.Serve(grpcL)
  109. go ms.MasterClient.KeepConnectedToMaster()
  110. // start http server
  111. httpS := &http.Server{Handler: r}
  112. go httpS.Serve(masterListener)
  113. select {}
  114. }