You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

143 lines
4.7 KiB

  1. package command
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/aws/aws-sdk-go/aws"
  6. "github.com/chrislusf/seaweedfs/weed/glog"
  7. "github.com/chrislusf/seaweedfs/weed/pb"
  8. "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
  9. "github.com/chrislusf/seaweedfs/weed/security"
  10. "github.com/chrislusf/seaweedfs/weed/server"
  11. "github.com/chrislusf/seaweedfs/weed/util"
  12. "github.com/gorilla/mux"
  13. "google.golang.org/grpc/reflection"
  14. "net/http"
  15. "strconv"
  16. "strings"
  17. "time"
  18. )
  19. var (
  20. mf MasterOptions
  21. )
  22. func init() {
  23. cmdMasterFollower.Run = runMasterFollower // break init cycle
  24. mf.port = cmdMasterFollower.Flag.Int("port", 9334, "http listen port")
  25. mf.ipBind = cmdMasterFollower.Flag.String("ip.bind", "", "ip address to bind to")
  26. mf.peers = cmdMasterFollower.Flag.String("masters", "localhost:9333", "all master nodes in comma separated ip:port list, example: 127.0.0.1:9093,127.0.0.1:9094,127.0.0.1:9095")
  27. mf.ip = aws.String(util.DetectedHostAddress())
  28. mf.metaFolder = aws.String("")
  29. mf.volumeSizeLimitMB = nil
  30. mf.volumePreallocate = nil
  31. mf.defaultReplication = nil
  32. mf.garbageThreshold = aws.Float64(0.1)
  33. mf.whiteList = nil
  34. mf.disableHttp = aws.Bool(false)
  35. mf.metricsAddress = aws.String("")
  36. mf.metricsIntervalSec = aws.Int(0)
  37. mf.raftResumeState = aws.Bool(false)
  38. }
  39. var cmdMasterFollower = &Command{
  40. UsageLine: "master.follower -port=9333 -masters=<master1Host>:<master1Port>",
  41. Short: "start a master follower",
  42. Long: `start a master follower to provide volume=>location mapping service
  43. The master follower does not participate in master election.
  44. It just follow the existing masters, and listen for any volume location changes.
  45. In most cases, the master follower is not needed. In big data centers with thousands of volume
  46. servers. In theory, the master may have trouble to keep up with the write requests and read requests.
  47. The master follower can relieve the master from from read requests, which only needs to
  48. lookup a fileId or volumeId.
  49. The master follower currently can handle fileId lookup requests:
  50. /dir/lookup?volumeId=4
  51. /dir/lookup?fileId=4,49c50924569199
  52. And gRPC API
  53. rpc LookupVolume (LookupVolumeRequest) returns (LookupVolumeResponse) {}
  54. This master follower is stateless and can run from any place.
  55. `,
  56. }
  57. func runMasterFollower(cmd *Command, args []string) bool {
  58. util.LoadConfiguration("security", false)
  59. util.LoadConfiguration("master", false)
  60. startMasterFollower(mf)
  61. return true
  62. }
  63. func startMasterFollower(masterOptions MasterOptions) {
  64. // collect settings from main masters
  65. masters := strings.Split(*mf.peers, ",")
  66. masterGrpcAddresses, err := pb.ParseServersToGrpcAddresses(masters)
  67. if err != nil {
  68. glog.V(0).Infof("ParseFilerGrpcAddress: %v", err)
  69. return
  70. }
  71. grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.master")
  72. for i := 0; i < 10; i++ {
  73. err = pb.WithOneOfGrpcMasterClients(masterGrpcAddresses, grpcDialOption, func(client master_pb.SeaweedClient) error {
  74. resp, err := client.GetMasterConfiguration(context.Background(), &master_pb.GetMasterConfigurationRequest{})
  75. if err != nil {
  76. return fmt.Errorf("get master grpc address %v configuration: %v", masterGrpcAddresses, err)
  77. }
  78. masterOptions.defaultReplication = &resp.DefaultReplication
  79. masterOptions.volumeSizeLimitMB = aws.Uint(uint(resp.VolumeSizeLimitMB))
  80. masterOptions.volumePreallocate = &resp.VolumePreallocate
  81. return nil
  82. })
  83. if err != nil {
  84. glog.V(0).Infof("failed to talk to filer %v: %v", masterGrpcAddresses, err)
  85. glog.V(0).Infof("wait for %d seconds ...", i+1)
  86. time.Sleep(time.Duration(i+1) * time.Second)
  87. }
  88. }
  89. if err != nil {
  90. glog.Errorf("failed to talk to filer %v: %v", masterGrpcAddresses, err)
  91. return
  92. }
  93. option := masterOptions.toMasterOption(nil)
  94. option.IsFollower = true
  95. r := mux.NewRouter()
  96. ms := weed_server.NewMasterServer(r, option, masters)
  97. listeningAddress := *masterOptions.ipBind + ":" + strconv.Itoa(*masterOptions.port)
  98. glog.V(0).Infof("Start Seaweed Master %s at %s", util.Version(), listeningAddress)
  99. masterListener, e := util.NewListener(listeningAddress, 0)
  100. if e != nil {
  101. glog.Fatalf("Master startup error: %v", e)
  102. }
  103. // starting grpc server
  104. grpcPort := *masterOptions.port + 10000
  105. grpcL, err := util.NewListener(*masterOptions.ipBind+":"+strconv.Itoa(grpcPort), 0)
  106. if err != nil {
  107. glog.Fatalf("master failed to listen on grpc port %d: %v", grpcPort, err)
  108. }
  109. grpcS := pb.NewGrpcServer(security.LoadServerTLS(util.GetViper(), "grpc.master"))
  110. master_pb.RegisterSeaweedServer(grpcS, ms)
  111. reflection.Register(grpcS)
  112. glog.V(0).Infof("Start Seaweed Master %s grpc server at %s:%d", util.Version(), *masterOptions.ip, grpcPort)
  113. go grpcS.Serve(grpcL)
  114. go ms.MasterClient.KeepConnectedToMaster()
  115. // start http server
  116. httpS := &http.Server{Handler: r}
  117. go httpS.Serve(masterListener)
  118. select {}
  119. }