diff --git a/weed/command/master.go b/weed/command/master.go index 5b45c9627..9a0ae7eb4 100644 --- a/weed/command/master.go +++ b/weed/command/master.go @@ -1,21 +1,20 @@ package command import ( + "github.com/chrislusf/raft/protobuf" + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb/master_pb" "github.com/chrislusf/seaweedfs/weed/security" + "github.com/chrislusf/seaweedfs/weed/server" + "github.com/chrislusf/seaweedfs/weed/util" + "github.com/gorilla/mux" "github.com/spf13/viper" + "google.golang.org/grpc/reflection" "net/http" "os" "runtime" "strconv" "strings" - "time" - - "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/pb/master_pb" - "github.com/chrislusf/seaweedfs/weed/server" - "github.com/chrislusf/seaweedfs/weed/util" - "github.com/gorilla/mux" - "google.golang.org/grpc/reflection" ) func init() { @@ -36,7 +35,6 @@ var cmdMaster = &Command{ var ( mport = cmdMaster.Flag.Int("port", 9333, "http listen port") - mGrpcPort = cmdMaster.Flag.Int("port.grpc", 0, "grpc server listen port, default to http port + 10000") masterIp = cmdMaster.Flag.String("ip", "localhost", "master | address") masterBindIp = cmdMaster.Flag.String("ip.bind", "0.0.0.0", "ip address to bind to") metaFolder = cmdMaster.Flag.String("mdir", os.TempDir(), "data directory to store meta data") @@ -92,18 +90,14 @@ func runMaster(cmd *Command, args []string) bool { } go func() { - time.Sleep(100 * time.Millisecond) + // start raftServer myMasterAddress, peers := checkPeers(*masterIp, *mport, *masterPeers) - raftServer := weed_server.NewRaftServer(r, peers, myMasterAddress, *metaFolder, ms.Topo, *mpulse) + raftServer := weed_server.NewRaftServer(security.LoadClientTLS(viper.Sub("grpc"), "master"), + peers, myMasterAddress, *metaFolder, ms.Topo, *mpulse) ms.SetRaftServer(raftServer) - }() - go func() { // starting grpc server - grpcPort := *mGrpcPort - if grpcPort == 0 { - grpcPort = *mport + 10000 - } + grpcPort := *mport + 10000 grpcL, err := util.NewListener(*masterBindIp+":"+strconv.Itoa(grpcPort), 0) if err != nil { glog.Fatalf("master failed to listen on grpc port %d: %v", grpcPort, err) @@ -111,6 +105,7 @@ func runMaster(cmd *Command, args []string) bool { // Create your protocol servers. grpcS := util.NewGrpcServer(security.LoadServerTLS(viper.Sub("grpc"), "master")) master_pb.RegisterSeaweedServer(grpcS, ms) + protobuf.RegisterRaftServer(grpcS, raftServer) reflection.Register(grpcS) glog.V(0).Infof("Start Seaweed Master %s grpc server at %s:%d", util.VERSION, *masterBindIp, grpcPort) diff --git a/weed/command/server.go b/weed/command/server.go index a9415d068..456b96435 100644 --- a/weed/command/server.go +++ b/weed/command/server.go @@ -1,6 +1,7 @@ package command import ( + "github.com/chrislusf/raft/protobuf" "github.com/chrislusf/seaweedfs/weed/security" "github.com/spf13/viper" "net/http" @@ -62,7 +63,6 @@ var ( serverPeers = cmdServer.Flag.String("master.peers", "", "all master nodes in comma separated ip:masterPort list") serverGarbageThreshold = cmdServer.Flag.Float64("garbageThreshold", 0.3, "threshold to vacuum and reclaim spaces") masterPort = cmdServer.Flag.Int("master.port", 9333, "master server http listen port") - masterGrpcPort = cmdServer.Flag.Int("master.port.grpc", 0, "master grpc server listen port, default to http port + 10000") masterMetaFolder = cmdServer.Flag.String("master.dir", "", "data directory to store meta data, default to same as -dir specified") masterVolumeSizeLimitMB = cmdServer.Flag.Uint("master.volumeSizeLimitMB", 30*1000, "Master stops directing writes to oversized volumes.") masterVolumePreallocate = cmdServer.Flag.Bool("master.volumePreallocate", false, "Preallocate disk space for volumes.") @@ -162,10 +162,8 @@ func runServer(cmd *Command, args []string) bool { }() } - var raftWaitForMaster sync.WaitGroup var volumeWait sync.WaitGroup - raftWaitForMaster.Add(1) volumeWait.Add(1) go func() { @@ -183,11 +181,14 @@ func runServer(cmd *Command, args []string) bool { } go func() { + // start raftServer + myMasterAddress, peers := checkPeers(*masterIp, *mport, *masterPeers) + raftServer := weed_server.NewRaftServer(security.LoadClientTLS(viper.Sub("grpc"), "master"), + peers, myMasterAddress, *metaFolder, ms.Topo, *mpulse) + ms.SetRaftServer(raftServer) + // starting grpc server - grpcPort := *masterGrpcPort - if grpcPort == 0 { - grpcPort = *masterPort + 10000 - } + grpcPort := *masterPort + 10000 grpcL, err := util.NewListener(*serverIp+":"+strconv.Itoa(grpcPort), 0) if err != nil { glog.Fatalf("master failed to listen on grpc port %d: %v", grpcPort, err) @@ -196,22 +197,14 @@ func runServer(cmd *Command, args []string) bool { glog.V(0).Infof("grpc config %+v", viper.Sub("grpc")) grpcS := util.NewGrpcServer(security.LoadServerTLS(viper.Sub("grpc"), "master")) master_pb.RegisterSeaweedServer(grpcS, ms) + protobuf.RegisterRaftServer(grpcS, raftServer) reflection.Register(grpcS) glog.V(0).Infof("Start Seaweed Master %s grpc server at %s:%d", util.VERSION, *serverIp, grpcPort) grpcS.Serve(grpcL) }() - go func() { - raftWaitForMaster.Wait() - time.Sleep(100 * time.Millisecond) - myAddress, peers := checkPeers(*serverIp, *masterPort, *serverPeers) - raftServer := weed_server.NewRaftServer(r, peers, myAddress, *masterMetaFolder, ms.Topo, *pulseSeconds) - ms.SetRaftServer(raftServer) - volumeWait.Done() - }() - - raftWaitForMaster.Done() + volumeWait.Done() // start http server httpS := &http.Server{Handler: r} diff --git a/weed/server/master_ui/templates.go b/weed/server/master_ui/templates.go index f32e8e61b..ce632b099 100644 --- a/weed/server/master_ui/templates.go +++ b/weed/server/master_ui/templates.go @@ -41,7 +41,7 @@ var StatusTpl = template.Must(template.New("status").Parse(` diff --git a/weed/server/raft_server.go b/weed/server/raft_server.go index 7afef0b15..4be13810f 100644 --- a/weed/server/raft_server.go +++ b/weed/server/raft_server.go @@ -2,36 +2,35 @@ package weed_server import ( "encoding/json" + "github.com/chrislusf/seaweedfs/weed/util" + "google.golang.org/grpc" "io/ioutil" "os" "path" "reflect" "sort" - "strings" "time" "github.com/chrislusf/raft" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/topology" - "github.com/gorilla/mux" ) type RaftServer struct { peers []string // initial peers to join with raftServer raft.Server dataDir string - httpAddr string - router *mux.Router + serverAddr string topo *topology.Topology + *raft.GrpcServer } -func NewRaftServer(r *mux.Router, peers []string, httpAddr string, dataDir string, topo *topology.Topology, pulseSeconds int) *RaftServer { +func NewRaftServer(grpcDialOption grpc.DialOption, peers []string, serverAddr string, dataDir string, topo *topology.Topology, pulseSeconds int) *RaftServer { s := &RaftServer{ - peers: peers, - httpAddr: httpAddr, - dataDir: dataDir, - router: r, - topo: topo, + peers: peers, + serverAddr: serverAddr, + dataDir: dataDir, + topo: topo, } if glog.V(4) { @@ -41,43 +40,39 @@ func NewRaftServer(r *mux.Router, peers []string, httpAddr string, dataDir strin raft.RegisterCommand(&topology.MaxVolumeIdCommand{}) var err error - transporter := raft.NewHTTPTransporter("/cluster", time.Second) - transporter.Transport.MaxIdleConnsPerHost = 1024 - transporter.Transport.IdleConnTimeout = time.Second - transporter.Transport.ResponseHeaderTimeout = time.Second - glog.V(0).Infof("Starting RaftServer with %v", httpAddr) + transporter := raft.NewGrpcTransporter(grpcDialOption) + glog.V(0).Infof("Starting RaftServer with %v", serverAddr) // Clear old cluster configurations if peers are changed - if oldPeers, changed := isPeersChanged(s.dataDir, httpAddr, s.peers); changed { + if oldPeers, changed := isPeersChanged(s.dataDir, serverAddr, s.peers); changed { glog.V(0).Infof("Peers Change: %v => %v", oldPeers, s.peers) os.RemoveAll(path.Join(s.dataDir, "conf")) os.RemoveAll(path.Join(s.dataDir, "log")) os.RemoveAll(path.Join(s.dataDir, "snapshot")) } - s.raftServer, err = raft.NewServer(s.httpAddr, s.dataDir, transporter, nil, topo, "") + s.raftServer, err = raft.NewServer(s.serverAddr, s.dataDir, transporter, nil, topo, "") if err != nil { glog.V(0).Infoln(err) return nil } - transporter.Install(s.raftServer, s) s.raftServer.SetHeartbeatInterval(500 * time.Millisecond) s.raftServer.SetElectionTimeout(time.Duration(pulseSeconds) * 500 * time.Millisecond) s.raftServer.Start() - s.router.HandleFunc("/cluster/status", s.statusHandler).Methods("GET") - for _, peer := range s.peers { - s.raftServer.AddPeer(peer, "http://"+peer) + s.raftServer.AddPeer(peer, util.ServerToGrpcAddress(peer, 19333)) } - if s.raftServer.IsLogEmpty() && isTheFirstOne(httpAddr, s.peers) { + s.GrpcServer = raft.NewGrpcServer(s.raftServer) + + if s.raftServer.IsLogEmpty() && isTheFirstOne(serverAddr, s.peers) { // Initialize the server by joining itself. glog.V(0).Infoln("Initializing new cluster") _, err := s.raftServer.Do(&raft.DefaultJoinCommand{ Name: s.raftServer.Name(), - ConnectionString: "http://" + s.httpAddr, + ConnectionString: util.ServerToGrpcAddress(s.serverAddr, 19333), }) if err != nil { @@ -95,7 +90,7 @@ func (s *RaftServer) Peers() (members []string) { peers := s.raftServer.Peers() for _, p := range peers { - members = append(members, strings.TrimPrefix(p.ConnectionString, "http://")) + members = append(members, p.Name) } return @@ -114,7 +109,7 @@ func isPeersChanged(dir string, self string, peers []string) (oldPeers []string, } for _, p := range conf.Peers { - oldPeers = append(oldPeers, strings.TrimPrefix(p.ConnectionString, "http://")) + oldPeers = append(oldPeers, p.Name) } oldPeers = append(oldPeers, self) diff --git a/weed/server/raft_server_handlers.go b/weed/server/raft_server_handlers.go deleted file mode 100644 index 627fe354e..000000000 --- a/weed/server/raft_server_handlers.go +++ /dev/null @@ -1,21 +0,0 @@ -package weed_server - -import ( - "github.com/chrislusf/seaweedfs/weed/operation" - "net/http" -) - -func (s *RaftServer) HandleFunc(pattern string, handler func(http.ResponseWriter, *http.Request)) { - s.router.HandleFunc(pattern, handler) -} - -func (s *RaftServer) statusHandler(w http.ResponseWriter, r *http.Request) { - ret := operation.ClusterStatusResult{ - IsLeader: s.topo.IsLeader(), - Peers: s.Peers(), - } - if leader, e := s.topo.Leader(); e == nil { - ret.Leader = leader - } - writeJsonQuiet(w, r, http.StatusOK, ret) -} diff --git a/weed/util/grpc_client_server.go b/weed/util/grpc_client_server.go index b26366ae0..b989a35d1 100644 --- a/weed/util/grpc_client_server.go +++ b/weed/util/grpc_client_server.go @@ -83,18 +83,34 @@ func WithCachedGrpcClient(fn func(*grpc.ClientConn) error, address string, opts func ParseServerToGrpcAddress(server string, optionalGrpcPort int) (serverGrpcAddress string, err error) { hostnameAndPort := strings.Split(server, ":") if len(hostnameAndPort) != 2 { - return "", fmt.Errorf("The server should have hostname:port format: %v", hostnameAndPort) + return "", fmt.Errorf("server should have hostname:port format: %v", hostnameAndPort) } - filerPort, parseErr := strconv.ParseUint(hostnameAndPort[1], 10, 64) + port, parseErr := strconv.ParseUint(hostnameAndPort[1], 10, 64) if parseErr != nil { - return "", fmt.Errorf("The server port parse error: %v", parseErr) + return "", fmt.Errorf("server port parse error: %v", parseErr) } - filerGrpcPort := int(filerPort) + 10000 + grpcPort := int(port) + 10000 if optionalGrpcPort != 0 { - filerGrpcPort = optionalGrpcPort + grpcPort = optionalGrpcPort } - return fmt.Sprintf("%s:%d", hostnameAndPort[0], filerGrpcPort), nil + return fmt.Sprintf("%s:%d", hostnameAndPort[0], grpcPort), nil +} + +func ServerToGrpcAddress(server string, defaultGrpcPort int) (serverGrpcAddress string) { + hostnameAndPort := strings.Split(server, ":") + if len(hostnameAndPort) != 2 { + return fmt.Sprintf("%s:%d", server, defaultGrpcPort) + } + + port, parseErr := strconv.ParseUint(hostnameAndPort[1], 10, 64) + if parseErr != nil { + return fmt.Sprintf("%s:%d", hostnameAndPort[0], defaultGrpcPort) + } + + grpcPort := int(port) + 10000 + + return fmt.Sprintf("%s:%d", hostnameAndPort[0], grpcPort) }