Browse Source

refactoring

pull/1023/head
Chris Lu 5 years ago
parent
commit
2c6cf72e73
  1. 42
      weed/command/master.go
  2. 65
      weed/command/server.go

42
weed/command/master.go

@ -72,8 +72,6 @@ var cmdMaster = &Command{
var ( var (
masterCpuProfile = cmdMaster.Flag.String("cpuprofile", "", "cpu profile output file") masterCpuProfile = cmdMaster.Flag.String("cpuprofile", "", "cpu profile output file")
masterMemProfile = cmdMaster.Flag.String("memprofile", "", "memory profile output file") masterMemProfile = cmdMaster.Flag.String("memprofile", "", "memory profile output file")
masterWhiteList []string
) )
func runMaster(cmd *Command, args []string) bool { func runMaster(cmd *Command, args []string) bool {
@ -87,6 +85,8 @@ func runMaster(cmd *Command, args []string) bool {
if err := util.TestFolderWritable(*m.metaFolder); err != nil { if err := util.TestFolderWritable(*m.metaFolder); err != nil {
glog.Fatalf("Check Meta Folder (-mdir) Writable %s : %s", *m.metaFolder, err) glog.Fatalf("Check Meta Folder (-mdir) Writable %s : %s", *m.metaFolder, err)
} }
var masterWhiteList []string
if *m.whiteList != "" { if *m.whiteList != "" {
masterWhiteList = strings.Split(*m.whiteList, ",") masterWhiteList = strings.Split(*m.whiteList, ",")
} }
@ -94,32 +94,32 @@ func runMaster(cmd *Command, args []string) bool {
glog.Fatalf("volumeSizeLimitMB should be smaller than 30000") glog.Fatalf("volumeSizeLimitMB should be smaller than 30000")
} }
r := mux.NewRouter()
ms := weed_server.NewMasterServer(r, m.toMasterOption(masterWhiteList))
listeningAddress := *m.ipBind + ":" + strconv.Itoa(*m.port)
startMaster(m, masterWhiteList)
glog.V(0).Infoln("Start Seaweed Master", util.VERSION, "at", listeningAddress)
return true
}
func startMaster(masterOption MasterOptions, masterWhiteList []string) {
r := mux.NewRouter()
ms := weed_server.NewMasterServer(r, masterOption.toMasterOption(masterWhiteList))
listeningAddress := *masterOption.ipBind + ":" + strconv.Itoa(*masterOption.port)
glog.V(0).Infof("Start Seaweed Master %s at %s", util.VERSION, listeningAddress)
masterListener, e := util.NewListener(listeningAddress, 0) masterListener, e := util.NewListener(listeningAddress, 0)
if e != nil { if e != nil {
glog.Fatalf("Master startup error: %v", e) glog.Fatalf("Master startup error: %v", e)
} }
go func() {
// start raftServer // start raftServer
myMasterAddress, peers := checkPeers(*m.ip, *m.port, *m.peers)
myMasterAddress, peers := checkPeers(*masterOption.ip, *masterOption.port, *masterOption.peers)
raftServer := weed_server.NewRaftServer(security.LoadClientTLS(viper.Sub("grpc"), "master"), raftServer := weed_server.NewRaftServer(security.LoadClientTLS(viper.Sub("grpc"), "master"),
peers, myMasterAddress, *m.metaFolder, ms.Topo, *m.pulseSeconds)
peers, myMasterAddress, *masterOption.metaFolder, ms.Topo, *masterOption.pulseSeconds)
if raftServer == nil { if raftServer == nil {
glog.Fatalf("please verify %s is writable, see https://github.com/chrislusf/seaweedfs/issues/717", *m.metaFolder)
glog.Fatalf("please verify %s is writable, see https://github.com/chrislusf/seaweedfs/issues/717", *masterOption.metaFolder)
} }
ms.SetRaftServer(raftServer) ms.SetRaftServer(raftServer)
r.HandleFunc("/cluster/status", raftServer.StatusHandler).Methods("GET") r.HandleFunc("/cluster/status", raftServer.StatusHandler).Methods("GET")
// starting grpc server // starting grpc server
grpcPort := *m.port + 10000
grpcL, err := util.NewListener(*m.ipBind+":"+strconv.Itoa(grpcPort), 0)
grpcPort := *masterOption.port + 10000
grpcL, err := util.NewListener(*masterOption.ipBind+":"+strconv.Itoa(grpcPort), 0)
if err != nil { if err != nil {
glog.Fatalf("master failed to listen on grpc port %d: %v", grpcPort, err) glog.Fatalf("master failed to listen on grpc port %d: %v", grpcPort, err)
} }
@ -128,18 +128,14 @@ func runMaster(cmd *Command, args []string) bool {
master_pb.RegisterSeaweedServer(grpcS, ms) master_pb.RegisterSeaweedServer(grpcS, ms)
protobuf.RegisterRaftServer(grpcS, raftServer) protobuf.RegisterRaftServer(grpcS, raftServer)
reflection.Register(grpcS) reflection.Register(grpcS)
glog.V(0).Infof("Start Seaweed Master %s grpc server at %s:%d", util.VERSION, *m.ipBind, grpcPort)
grpcS.Serve(grpcL)
}()
glog.V(0).Infof("Start Seaweed Master %s grpc server at %s:%d", util.VERSION, *masterOption.ipBind, grpcPort)
go grpcS.Serve(grpcL)
// start http server // start http server
httpS := &http.Server{Handler: r} httpS := &http.Server{Handler: r}
if err := httpS.Serve(masterListener); err != nil {
glog.Fatalf("master server failed to serve: %v", err)
}
go httpS.Serve(masterListener)
return true
select {}
} }
func checkPeers(masterIp string, masterPort int, peers string) (masterAddress string, cleanedPeers []string) { func checkPeers(masterIp string, masterPort int, peers string) (masterAddress string, cleanedPeers []string) {

65
weed/command/server.go

@ -2,25 +2,15 @@ package command
import ( import (
"fmt" "fmt"
"net/http"
"os" "os"
"runtime" "runtime"
"runtime/pprof" "runtime/pprof"
"strconv" "strconv"
"strings" "strings"
"sync"
"time" "time"
"github.com/chrislusf/raft/protobuf"
"github.com/chrislusf/seaweedfs/weed/security"
"github.com/spf13/viper"
"github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
"github.com/chrislusf/seaweedfs/weed/server"
"github.com/chrislusf/seaweedfs/weed/util" "github.com/chrislusf/seaweedfs/weed/util"
"github.com/gorilla/mux"
"google.golang.org/grpc/reflection"
) )
type ServerOptions struct { type ServerOptions struct {
@ -198,59 +188,12 @@ func runServer(cmd *Command, args []string) bool {
}() }()
} }
var volumeWait sync.WaitGroup
volumeWait.Add(1)
go func() {
r := mux.NewRouter()
ms := weed_server.NewMasterServer(r, masterOptions.toMasterOption(serverWhiteList))
glog.V(0).Infof("Start Seaweed Master %s at %s:%d", util.VERSION, *serverIp, *masterOptions.port)
masterListener, e := util.NewListener(*serverBindIp+":"+strconv.Itoa(*masterOptions.port), 0)
if e != nil {
glog.Fatalf("Master startup error: %v", e)
}
go func() {
// start raftServer
myMasterAddress, peers := checkPeers(*serverIp, *masterOptions.port, *masterOptions.peers)
raftServer := weed_server.NewRaftServer(security.LoadClientTLS(viper.Sub("grpc"), "master"),
peers, myMasterAddress, *masterOptions.metaFolder, ms.Topo, *masterOptions.pulseSeconds)
ms.SetRaftServer(raftServer)
r.HandleFunc("/cluster/status", raftServer.StatusHandler).Methods("GET")
// starting grpc server
grpcPort := *masterOptions.port + 10000
grpcL, err := util.NewListener(*serverBindIp+":"+strconv.Itoa(grpcPort), 0)
if err != nil {
glog.Fatalf("master failed to listen on grpc port %d: %v", grpcPort, err)
}
// Create your protocol servers.
glog.V(1).Infof("grpc config %+v", viper.Sub("grpc"))
grpcS := util.NewGrpcServer(security.LoadServerTLS(viper.Sub("grpc"), "master"))
master_pb.RegisterSeaweedServer(grpcS, ms)
protobuf.RegisterRaftServer(grpcS, raftServer)
reflection.Register(grpcS)
glog.V(0).Infof("Start Seaweed Master %s grpc server at %s:%d", util.VERSION, *serverIp, grpcPort)
grpcS.Serve(grpcL)
}()
volumeWait.Done()
// start http server
httpS := &http.Server{Handler: r}
if err := httpS.Serve(masterListener); err != nil {
glog.Fatalf("master server failed to serve: %v", err)
// start volume server
{
go serverOptions.v.startVolumeServer(*volumeDataFolders, *volumeMaxDataVolumeCounts, *serverWhiteListOption)
} }
}()
volumeWait.Wait()
time.Sleep(100 * time.Millisecond)
serverOptions.v.startVolumeServer(*volumeDataFolders, *volumeMaxDataVolumeCounts, *serverWhiteListOption)
startMaster(masterOptions, serverWhiteList)
return true return true
} }
Loading…
Cancel
Save