From f16375621f25995837868bd77e64a7d0da2ac68d Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sun, 23 Jun 2019 03:08:27 -0700 Subject: [PATCH] big refactoring --- weed/command/master.go | 108 ++++++++++++-------- weed/command/server.go | 67 ++++++------ weed/command/volume.go | 7 +- weed/server/master_grpc_server.go | 6 +- weed/server/master_grpc_server_volume.go | 8 +- weed/server/master_server.go | 70 ++++++------- weed/server/master_server_handlers_admin.go | 10 +- 7 files changed, 145 insertions(+), 131 deletions(-) diff --git a/weed/command/master.go b/weed/command/master.go index d7c0356d3..9c03cfaaa 100644 --- a/weed/command/master.go +++ b/weed/command/master.go @@ -18,8 +18,43 @@ import ( "google.golang.org/grpc/reflection" ) +var ( + m MasterOptions +) + +type MasterOptions struct { + port *int + ip *string + ipBind *string + metaFolder *string + peers *string + volumeSizeLimitMB *uint + volumePreallocate *bool + pulseSeconds *int + defaultReplication *string + garbageThreshold *float64 + whiteList *string + disableHttp *bool + metricsAddress *string + metricsIntervalSec *int +} + func init() { cmdMaster.Run = runMaster // break init cycle + m.port = cmdMaster.Flag.Int("port", 9333, "http listen port") + m.ip = cmdMaster.Flag.String("ip", "localhost", "master | address") + m.ipBind = cmdMaster.Flag.String("ip.bind", "0.0.0.0", "ip address to bind to") + m.metaFolder = cmdMaster.Flag.String("mdir", os.TempDir(), "data directory to store meta data") + m.peers = cmdMaster.Flag.String("peers", "", "all master nodes in comma separated ip:port list, example: 127.0.0.1:9093,127.0.0.1:9094") + m.volumeSizeLimitMB = cmdMaster.Flag.Uint("volumeSizeLimitMB", 30*1000, "Master stops directing writes to oversized volumes.") + m.volumePreallocate = cmdMaster.Flag.Bool("volumePreallocate", false, "Preallocate disk space for volumes.") + m.pulseSeconds = cmdMaster.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats") + m.defaultReplication = cmdMaster.Flag.String("defaultReplication", "000", "Default replication type if not specified.") + m.garbageThreshold = cmdMaster.Flag.Float64("garbageThreshold", 0.3, "threshold to vacuum and reclaim spaces") + m.whiteList = cmdMaster.Flag.String("whiteList", "", "comma separated Ip addresses having write permission. No limit if empty.") + m.disableHttp = cmdMaster.Flag.Bool("disableHttp", false, "disable http requests, only gRPC operations are allowed.") + m.metricsAddress = cmdMaster.Flag.String("metrics.address", "", "Prometheus gateway address") + m.metricsIntervalSec = cmdMaster.Flag.Int("metrics.intervalSeconds", 15, "Prometheus push interval in seconds") } var cmdMaster = &Command{ @@ -35,24 +70,8 @@ var cmdMaster = &Command{ } var ( - mport = cmdMaster.Flag.Int("port", 9333, "http listen port") - masterIp = cmdMaster.Flag.String("ip", "localhost", "master | address") - masterBindIp = cmdMaster.Flag.String("ip.bind", "0.0.0.0", "ip address to bind to") - metaFolder = cmdMaster.Flag.String("mdir", os.TempDir(), "data directory to store meta data") - masterPeers = cmdMaster.Flag.String("peers", "", "all master nodes in comma separated ip:port list, example: 127.0.0.1:9093,127.0.0.1:9094") - volumeSizeLimitMB = cmdMaster.Flag.Uint("volumeSizeLimitMB", 30*1000, "Master stops directing writes to oversized volumes.") - volumePreallocate = cmdMaster.Flag.Bool("volumePreallocate", false, "Preallocate disk space for volumes.") - mpulse = cmdMaster.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats") - defaultReplicaPlacement = cmdMaster.Flag.String("defaultReplication", "000", "Default replication type if not specified.") - // mTimeout = cmdMaster.Flag.Int("idleTimeout", 30, "connection idle seconds") - mMaxCpu = cmdMaster.Flag.Int("maxCpu", 0, "maximum number of CPUs. 0 means all available CPUs") - garbageThreshold = cmdMaster.Flag.Float64("garbageThreshold", 0.3, "threshold to vacuum and reclaim spaces") - masterWhiteListOption = cmdMaster.Flag.String("whiteList", "", "comma separated Ip addresses having write permission. No limit if empty.") - disableHttp = cmdMaster.Flag.Bool("disableHttp", false, "disable http requests, only gRPC operations are allowed.") - masterCpuProfile = cmdMaster.Flag.String("cpuprofile", "", "cpu profile output file") - masterMemProfile = cmdMaster.Flag.String("memprofile", "", "memory profile output file") - masterMetricsAddress = cmdMaster.Flag.String("metrics.address", "", "Prometheus gateway address") - masterMetricsIntervalSec = cmdMaster.Flag.Int("metrics.intervalSeconds", 15, "Prometheus push interval in seconds") + masterCpuProfile = cmdMaster.Flag.String("cpuprofile", "", "cpu profile output file") + masterMemProfile = cmdMaster.Flag.String("memprofile", "", "memory profile output file") masterWhiteList []string ) @@ -62,32 +81,23 @@ func runMaster(cmd *Command, args []string) bool { util.LoadConfiguration("security", false) util.LoadConfiguration("master", false) - if *mMaxCpu < 1 { - *mMaxCpu = runtime.NumCPU() - } - runtime.GOMAXPROCS(*mMaxCpu) + runtime.GOMAXPROCS(runtime.NumCPU()) util.SetupProfiling(*masterCpuProfile, *masterMemProfile) - if err := util.TestFolderWritable(*metaFolder); err != nil { - glog.Fatalf("Check Meta Folder (-mdir) Writable %s : %s", *metaFolder, err) + if err := util.TestFolderWritable(*m.metaFolder); err != nil { + glog.Fatalf("Check Meta Folder (-mdir) Writable %s : %s", *m.metaFolder, err) } - if *masterWhiteListOption != "" { - masterWhiteList = strings.Split(*masterWhiteListOption, ",") + if *m.whiteList != "" { + masterWhiteList = strings.Split(*m.whiteList, ",") } - if *volumeSizeLimitMB > util.VolumeSizeLimitGB*1000 { + if *m.volumeSizeLimitMB > util.VolumeSizeLimitGB*1000 { glog.Fatalf("volumeSizeLimitMB should be smaller than 30000") } r := mux.NewRouter() - ms := weed_server.NewMasterServer(r, *mport, *metaFolder, - *volumeSizeLimitMB, *volumePreallocate, - *mpulse, *defaultReplicaPlacement, *garbageThreshold, - masterWhiteList, - *disableHttp, - *masterMetricsAddress, *masterMetricsIntervalSec, - ) + ms := weed_server.NewMasterServer(r, m.toMasterOption(masterWhiteList)) - listeningAddress := *masterBindIp + ":" + strconv.Itoa(*mport) + listeningAddress := *m.ipBind + ":" + strconv.Itoa(*m.port) glog.V(0).Infoln("Start Seaweed Master", util.VERSION, "at", listeningAddress) @@ -98,18 +108,18 @@ func runMaster(cmd *Command, args []string) bool { go func() { // start raftServer - myMasterAddress, peers := checkPeers(*masterIp, *mport, *masterPeers) + myMasterAddress, peers := checkPeers(*m.ip, *m.port, *m.peers) raftServer := weed_server.NewRaftServer(security.LoadClientTLS(viper.Sub("grpc"), "master"), - peers, myMasterAddress, *metaFolder, ms.Topo, *mpulse) + peers, myMasterAddress, *m.metaFolder, ms.Topo, *m.pulseSeconds) if raftServer == nil { - glog.Fatalf("please verify %s is writable, see https://github.com/chrislusf/seaweedfs/issues/717", *metaFolder) + glog.Fatalf("please verify %s is writable, see https://github.com/chrislusf/seaweedfs/issues/717", *m.metaFolder) } ms.SetRaftServer(raftServer) r.HandleFunc("/cluster/status", raftServer.StatusHandler).Methods("GET") // starting grpc server - grpcPort := *mport + 10000 - grpcL, err := util.NewListener(*masterBindIp+":"+strconv.Itoa(grpcPort), 0) + grpcPort := *m.port + 10000 + grpcL, err := util.NewListener(*m.ipBind+":"+strconv.Itoa(grpcPort), 0) if err != nil { glog.Fatalf("master failed to listen on grpc port %d: %v", grpcPort, err) } @@ -119,7 +129,7 @@ func runMaster(cmd *Command, args []string) bool { protobuf.RegisterRaftServer(grpcS, raftServer) reflection.Register(grpcS) - glog.V(0).Infof("Start Seaweed Master %s grpc server at %s:%d", util.VERSION, *masterBindIp, grpcPort) + glog.V(0).Infof("Start Seaweed Master %s grpc server at %s:%d", util.VERSION, *m.ipBind, grpcPort) grpcS.Serve(grpcL) }() @@ -155,3 +165,19 @@ func checkPeers(masterIp string, masterPort int, peers string) (masterAddress st } return } + +func (m *MasterOptions) toMasterOption(whiteList []string) *weed_server.MasterOption { + return &weed_server.MasterOption{ + Port: *m.port, + MetaFolder: *m.metaFolder, + VolumeSizeLimitMB: *m.volumeSizeLimitMB, + VolumePreallocate: *m.volumePreallocate, + PulseSeconds: *m.pulseSeconds, + DefaultReplicaPlacement: *m.defaultReplication, + GarbageThreshold: *m.garbageThreshold, + WhiteList: whiteList, + DisableHttp: *m.disableHttp, + MetricsAddress: *m.metricsAddress, + MetricsIntervalSec: *m.metricsIntervalSec, + } +} \ No newline at end of file diff --git a/weed/command/server.go b/weed/command/server.go index 3799bedeb..04e307835 100644 --- a/weed/command/server.go +++ b/weed/command/server.go @@ -30,6 +30,7 @@ type ServerOptions struct { var ( serverOptions ServerOptions + masterOptions MasterOptions filerOptions FilerOptions s3Options S3Options ) @@ -57,21 +58,13 @@ var cmdServer = &Command{ var ( serverIp = cmdServer.Flag.String("ip", "localhost", "ip or server name") serverBindIp = cmdServer.Flag.String("ip.bind", "0.0.0.0", "ip address to bind to") - serverMaxCpu = cmdServer.Flag.Int("maxCpu", 0, "maximum number of CPUs. 0 means all available CPUs") serverTimeout = cmdServer.Flag.Int("idleTimeout", 30, "connection idle seconds") serverDataCenter = cmdServer.Flag.String("dataCenter", "", "current volume server's data center name") serverRack = cmdServer.Flag.String("rack", "", "current volume server's rack name") serverWhiteListOption = cmdServer.Flag.String("whiteList", "", "comma separated Ip addresses having write permission. No limit if empty.") serverDisableHttp = cmdServer.Flag.Bool("disableHttp", false, "disable http requests, only gRPC operations are allowed.") - serverPeers = cmdServer.Flag.String("master.peers", "", "all master nodes in comma separated ip:masterPort list") - serverGarbageThreshold = cmdServer.Flag.Float64("garbageThreshold", 0.3, "threshold to vacuum and reclaim spaces") serverMetricsAddress = cmdServer.Flag.String("metrics.address", "", "Prometheus gateway address") serverMetricsIntervalSec = cmdServer.Flag.Int("metrics.intervalSeconds", 15, "Prometheus push interval in seconds") - masterPort = cmdServer.Flag.Int("master.port", 9333, "master server http listen port") - masterMetaFolder = cmdServer.Flag.String("master.dir", "", "data directory to store meta data, default to same as -dir specified") - masterVolumeSizeLimitMB = cmdServer.Flag.Uint("master.volumeSizeLimitMB", 30*1000, "Master stops directing writes to oversized volumes.") - masterVolumePreallocate = cmdServer.Flag.Bool("master.volumePreallocate", false, "Preallocate disk space for volumes.") - masterDefaultReplicaPlacement = cmdServer.Flag.String("master.defaultReplicaPlacement", "000", "Default replication type if not specified.") volumeDataFolders = cmdServer.Flag.String("dir", os.TempDir(), "directories to store data files. dir[,dir]...") volumeMaxDataVolumeCounts = cmdServer.Flag.String("volume.max", "7", "maximum numbers of volumes, count[,count]...") pulseSeconds = cmdServer.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats") @@ -83,6 +76,16 @@ var ( func init() { serverOptions.cpuprofile = cmdServer.Flag.String("cpuprofile", "", "cpu profile output file") + + masterOptions.port = cmdServer.Flag.Int("master.port", 9333, "master server http listen port") + masterOptions.metaFolder = cmdServer.Flag.String("master.dir", "", "data directory to store meta data, default to same as -dir specified") + masterOptions.peers = cmdServer.Flag.String("master.peers", "", "all master nodes in comma separated ip:masterPort list") + masterOptions.volumeSizeLimitMB = cmdServer.Flag.Uint("master.volumeSizeLimitMB", 30*1000, "Master stops directing writes to oversized volumes.") + masterOptions.volumePreallocate = cmdServer.Flag.Bool("master.volumePreallocate", false, "Preallocate disk space for volumes.") + masterOptions.pulseSeconds = cmdServer.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats") + masterOptions.defaultReplication = cmdServer.Flag.String("master.defaultReplication", "000", "Default replication type if not specified.") + masterOptions.garbageThreshold = cmdServer.Flag.Float64("garbageThreshold", 0.3, "threshold to vacuum and reclaim spaces") + filerOptions.collection = cmdServer.Flag.String("filer.collection", "", "all data will be stored in this collection") filerOptions.port = cmdServer.Flag.Int("filer.port", 8888, "filer server http listen port") filerOptions.publicPort = cmdServer.Flag.Int("filer.port.public", 0, "filer server public http listen port") @@ -130,49 +133,52 @@ func runServer(cmd *Command, args []string) bool { *isStartingFiler = true } - master := *serverIp + ":" + strconv.Itoa(*masterPort) + master := *serverIp + ":" + strconv.Itoa(*masterOptions.port) + masterOptions.ip = serverIp + masterOptions.ipBind = serverBindIp filerOptions.masters = &master filerOptions.ip = serverBindIp serverOptions.v.ip = serverIp serverOptions.v.bindIp = serverBindIp serverOptions.v.masters = &master serverOptions.v.idleConnectionTimeout = serverTimeout - serverOptions.v.maxCpu = serverMaxCpu serverOptions.v.dataCenter = serverDataCenter serverOptions.v.rack = serverRack serverOptions.v.pulseSeconds = pulseSeconds + masterOptions.whiteList = serverWhiteListOption + filerOptions.dataCenter = serverDataCenter filerOptions.disableHttp = serverDisableHttp + masterOptions.disableHttp = serverDisableHttp filerOptions.metricsAddress = serverMetricsAddress filerOptions.metricsIntervalSec = serverMetricsIntervalSec + masterOptions.metricsAddress = serverMetricsAddress + masterOptions.metricsIntervalSec = serverMetricsIntervalSec filerAddress := fmt.Sprintf("%s:%d", *serverIp, *filerOptions.port) s3Options.filer = &filerAddress if *filerOptions.defaultReplicaPlacement == "" { - *filerOptions.defaultReplicaPlacement = *masterDefaultReplicaPlacement + *filerOptions.defaultReplicaPlacement = *masterOptions.defaultReplication } - if *serverMaxCpu < 1 { - *serverMaxCpu = runtime.NumCPU() - } - runtime.GOMAXPROCS(*serverMaxCpu) + runtime.GOMAXPROCS(runtime.NumCPU()) folders := strings.Split(*volumeDataFolders, ",") - if *masterVolumeSizeLimitMB > util.VolumeSizeLimitGB*1000 { + if *masterOptions.volumeSizeLimitMB > util.VolumeSizeLimitGB*1000 { glog.Fatalf("masterVolumeSizeLimitMB should be less than 30000") } - if *masterMetaFolder == "" { - *masterMetaFolder = folders[0] + if *masterOptions.metaFolder == "" { + *masterOptions.metaFolder = folders[0] } - if err := util.TestFolderWritable(*masterMetaFolder); err != nil { - glog.Fatalf("Check Meta Folder (-mdir=\"%s\") Writable: %s", *masterMetaFolder, err) + if err := util.TestFolderWritable(*masterOptions.metaFolder); err != nil { + glog.Fatalf("Check Meta Folder (-mdir=\"%s\") Writable: %s", *masterOptions.metaFolder, err) } - filerOptions.defaultLevelDbDirectory = masterMetaFolder + filerOptions.defaultLevelDbDirectory = masterOptions.metaFolder if *serverWhiteListOption != "" { serverWhiteList = strings.Split(*serverWhiteListOption, ",") @@ -202,29 +208,24 @@ func runServer(cmd *Command, args []string) bool { go func() { r := mux.NewRouter() - ms := weed_server.NewMasterServer(r, *masterPort, *masterMetaFolder, - *masterVolumeSizeLimitMB, *masterVolumePreallocate, - *pulseSeconds, *masterDefaultReplicaPlacement, *serverGarbageThreshold, - serverWhiteList, *serverDisableHttp, - *serverMetricsAddress, *serverMetricsIntervalSec, - ) - - glog.V(0).Infof("Start Seaweed Master %s at %s:%d", util.VERSION, *serverIp, *masterPort) - masterListener, e := util.NewListener(*serverBindIp+":"+strconv.Itoa(*masterPort), 0) + ms := weed_server.NewMasterServer(r, masterOptions.toMasterOption(serverWhiteList)) + + glog.V(0).Infof("Start Seaweed Master %s at %s:%d", util.VERSION, *serverIp, *masterOptions.port) + masterListener, e := util.NewListener(*serverBindIp+":"+strconv.Itoa(*masterOptions.port), 0) if e != nil { glog.Fatalf("Master startup error: %v", e) } go func() { // start raftServer - myMasterAddress, peers := checkPeers(*serverIp, *masterPort, *serverPeers) + myMasterAddress, peers := checkPeers(*serverIp, *masterOptions.port, *masterOptions.peers) raftServer := weed_server.NewRaftServer(security.LoadClientTLS(viper.Sub("grpc"), "master"), - peers, myMasterAddress, *masterMetaFolder, ms.Topo, *pulseSeconds) + peers, myMasterAddress, *masterOptions.metaFolder, ms.Topo, *masterOptions.pulseSeconds) ms.SetRaftServer(raftServer) r.HandleFunc("/cluster/status", raftServer.StatusHandler).Methods("GET") // starting grpc server - grpcPort := *masterPort + 10000 + grpcPort := *masterOptions.port + 10000 grpcL, err := util.NewListener(*serverBindIp+":"+strconv.Itoa(grpcPort), 0) if err != nil { glog.Fatalf("master failed to listen on grpc port %d: %v", grpcPort, err) diff --git a/weed/command/volume.go b/weed/command/volume.go index c775ac5cf..3c1aa2b50 100644 --- a/weed/command/volume.go +++ b/weed/command/volume.go @@ -35,7 +35,6 @@ type VolumeServerOptions struct { masters *string pulseSeconds *int idleConnectionTimeout *int - maxCpu *int dataCenter *string rack *string whiteList []string @@ -57,7 +56,6 @@ func init() { v.masters = cmdVolume.Flag.String("mserver", "localhost:9333", "comma-separated master servers") v.pulseSeconds = cmdVolume.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats, must be smaller than or equal to the master's setting") v.idleConnectionTimeout = cmdVolume.Flag.Int("idleTimeout", 30, "connection idle seconds") - v.maxCpu = cmdVolume.Flag.Int("maxCpu", 0, "maximum number of CPUs. 0 means all available CPUs") v.dataCenter = cmdVolume.Flag.String("dataCenter", "", "current volume server's data center name") v.rack = cmdVolume.Flag.String("rack", "", "current volume server's rack name") v.indexType = cmdVolume.Flag.String("index", "memory", "Choose [memory|leveldb|leveldbMedium|leveldbLarge] mode for memory~performance balance.") @@ -86,10 +84,7 @@ func runVolume(cmd *Command, args []string) bool { util.LoadConfiguration("security", false) - if *v.maxCpu < 1 { - *v.maxCpu = runtime.NumCPU() - } - runtime.GOMAXPROCS(*v.maxCpu) + runtime.GOMAXPROCS(runtime.NumCPU()) util.SetupProfiling(*v.cpuProfile, *v.memProfile) v.startVolumeServer(*volumeFolders, *maxVolumeCounts, *volumeWhiteListOption) diff --git a/weed/server/master_grpc_server.go b/weed/server/master_grpc_server.go index b766b8d7d..1a17327a0 100644 --- a/weed/server/master_grpc_server.go +++ b/weed/server/master_grpc_server.go @@ -70,7 +70,7 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ int64(heartbeat.MaxVolumeCount)) glog.V(0).Infof("added volume server %v:%d", heartbeat.GetIp(), heartbeat.GetPort()) if err := stream.Send(&master_pb.HeartbeatResponse{ - VolumeSizeLimit: uint64(ms.volumeSizeLimitMB) * 1024 * 1024, + VolumeSizeLimit: uint64(ms.option.VolumeSizeLimitMB) * 1024 * 1024, }); err != nil { return err } @@ -157,8 +157,8 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ } if err := stream.Send(&master_pb.HeartbeatResponse{ Leader: newLeader, - MetricsAddress: ms.metricsAddress, - MetricsIntervalSeconds: uint32(ms.metricsIntervalSec), + MetricsAddress: ms.option.MetricsAddress, + MetricsIntervalSeconds: uint32(ms.option.MetricsIntervalSec), }); err != nil { return err } diff --git a/weed/server/master_grpc_server_volume.go b/weed/server/master_grpc_server_volume.go index 2aaf7b891..17903f020 100644 --- a/weed/server/master_grpc_server_volume.go +++ b/weed/server/master_grpc_server_volume.go @@ -50,7 +50,7 @@ func (ms *MasterServer) Assign(ctx context.Context, req *master_pb.AssignRequest } if req.Replication == "" { - req.Replication = ms.defaultReplicaPlacement + req.Replication = ms.option.DefaultReplicaPlacement } replicaPlacement, err := storage.NewReplicaPlacementFromString(req.Replication) if err != nil { @@ -65,7 +65,7 @@ func (ms *MasterServer) Assign(ctx context.Context, req *master_pb.AssignRequest Collection: req.Collection, ReplicaPlacement: replicaPlacement, Ttl: ttl, - Prealloacte: ms.preallocate, + Prealloacte: ms.preallocateSize, DataCenter: req.DataCenter, Rack: req.Rack, DataNode: req.DataNode, @@ -105,7 +105,7 @@ func (ms *MasterServer) Statistics(ctx context.Context, req *master_pb.Statistic } if req.Replication == "" { - req.Replication = ms.defaultReplicaPlacement + req.Replication = ms.option.DefaultReplicaPlacement } replicaPlacement, err := storage.NewReplicaPlacementFromString(req.Replication) if err != nil { @@ -136,7 +136,7 @@ func (ms *MasterServer) VolumeList(ctx context.Context, req *master_pb.VolumeLis resp := &master_pb.VolumeListResponse{ TopologyInfo: ms.Topo.ToTopologyInfo(), - VolumeSizeLimitMb: uint64(ms.volumeSizeLimitMB), + VolumeSizeLimitMb: uint64(ms.option.VolumeSizeLimitMB), } return resp, nil diff --git a/weed/server/master_server.go b/weed/server/master_server.go index 180007df7..3689b5495 100644 --- a/weed/server/master_server.go +++ b/weed/server/master_server.go @@ -25,17 +25,25 @@ import ( "github.com/spf13/viper" ) +type MasterOption struct { + Port int + MetaFolder string + VolumeSizeLimitMB uint + VolumePreallocate bool + PulseSeconds int + DefaultReplicaPlacement string + GarbageThreshold float64 + WhiteList []string + DisableHttp bool + MetricsAddress string + MetricsIntervalSec int +} + type MasterServer struct { - port int - metaFolder string - volumeSizeLimitMB uint - preallocate int64 - pulseSeconds int - defaultReplicaPlacement string - garbageThreshold float64 - guard *security.Guard - metricsAddress string - metricsIntervalSec int + option *MasterOption + guard *security.Guard + + preallocateSize int64 Topo *topology.Topology vg *topology.VolumeGrowth @@ -50,17 +58,7 @@ type MasterServer struct { grpcDialOpiton grpc.DialOption } -func NewMasterServer(r *mux.Router, port int, metaFolder string, - volumeSizeLimitMB uint, - preallocate bool, - pulseSeconds int, - defaultReplicaPlacement string, - garbageThreshold float64, - whiteList []string, - disableHttp bool, - metricsAddress string, - metricsIntervalSec int, -) *MasterServer { +func NewMasterServer(r *mux.Router, option *MasterOption) *MasterServer { v := viper.GetViper() signingKey := v.GetString("jwt.signing.key") @@ -72,30 +70,24 @@ func NewMasterServer(r *mux.Router, port int, metaFolder string, readExpiresAfterSec := v.GetInt("jwt.signing.read.expires_after_seconds") var preallocateSize int64 - if preallocate { - preallocateSize = int64(volumeSizeLimitMB) * (1 << 20) + if option.VolumePreallocate { + preallocateSize = int64(option.VolumeSizeLimitMB) * (1 << 20) } ms := &MasterServer{ - port: port, - volumeSizeLimitMB: volumeSizeLimitMB, - preallocate: preallocateSize, - pulseSeconds: pulseSeconds, - defaultReplicaPlacement: defaultReplicaPlacement, - garbageThreshold: garbageThreshold, - clientChans: make(map[string]chan *master_pb.VolumeLocation), - grpcDialOpiton: security.LoadClientTLS(v.Sub("grpc"), "master"), - metricsAddress: metricsAddress, - metricsIntervalSec: metricsIntervalSec, + option: option, + preallocateSize: preallocateSize, + clientChans: make(map[string]chan *master_pb.VolumeLocation), + grpcDialOpiton: security.LoadClientTLS(v.Sub("grpc"), "master"), } ms.bounedLeaderChan = make(chan int, 16) seq := sequence.NewMemorySequencer() - ms.Topo = topology.NewTopology("topo", seq, uint64(volumeSizeLimitMB)*1024*1024, pulseSeconds) + ms.Topo = topology.NewTopology("topo", seq, uint64(ms.option.VolumeSizeLimitMB)*1024*1024, ms.option.PulseSeconds) ms.vg = topology.NewDefaultVolumeGrowth() - glog.V(0).Infoln("Volume Size Limit is", volumeSizeLimitMB, "MB") + glog.V(0).Infoln("Volume Size Limit is", ms.option.VolumeSizeLimitMB, "MB") - ms.guard = security.NewGuard(whiteList, signingKey, expiresAfterSec, readSigningKey, readExpiresAfterSec) + ms.guard = security.NewGuard(ms.option.WhiteList, signingKey, expiresAfterSec, readSigningKey, readExpiresAfterSec) - if !disableHttp { + if !ms.option.DisableHttp { handleStaticResources2(r) r.HandleFunc("/", ms.proxyToLeader(ms.uiStatusHandler)) r.HandleFunc("/ui/index.html", ms.uiStatusHandler) @@ -113,7 +105,7 @@ func NewMasterServer(r *mux.Router, port int, metaFolder string, r.HandleFunc("/{fileId}", ms.proxyToLeader(ms.redirectHandler)) } - ms.Topo.StartRefreshWritableVolumes(ms.grpcDialOpiton, garbageThreshold, ms.preallocate) + ms.Topo.StartRefreshWritableVolumes(ms.grpcDialOpiton, ms.option.GarbageThreshold, ms.preallocateSize) ms.startAdminScripts() @@ -185,7 +177,7 @@ func (ms *MasterServer) startAdminScripts() { scriptLines := strings.Split(adminScripts, "\n") - masterAddress := "localhost:" + strconv.Itoa(ms.port) + masterAddress := "localhost:" + strconv.Itoa(ms.option.Port) var shellOptions shell.ShellOptions shellOptions.GrpcDialOption = security.LoadClientTLS(viper.Sub("grpc"), "master") diff --git a/weed/server/master_server_handlers_admin.go b/weed/server/master_server_handlers_admin.go index 244098515..343bcb8da 100644 --- a/weed/server/master_server_handlers_admin.go +++ b/weed/server/master_server_handlers_admin.go @@ -47,7 +47,7 @@ func (ms *MasterServer) dirStatusHandler(w http.ResponseWriter, r *http.Request) func (ms *MasterServer) volumeVacuumHandler(w http.ResponseWriter, r *http.Request) { gcString := r.FormValue("garbageThreshold") - gcThreshold := ms.garbageThreshold + gcThreshold := ms.option.GarbageThreshold if gcString != "" { var err error gcThreshold, err = strconv.ParseFloat(gcString, 32) @@ -57,7 +57,7 @@ func (ms *MasterServer) volumeVacuumHandler(w http.ResponseWriter, r *http.Reque } } glog.Infoln("garbageThreshold =", gcThreshold) - ms.Topo.Vacuum(ms.grpcDialOpiton, gcThreshold, ms.preallocate) + ms.Topo.Vacuum(ms.grpcDialOpiton, gcThreshold, ms.preallocateSize) ms.dirStatusHandler(w, r) } @@ -119,7 +119,7 @@ func (ms *MasterServer) selfUrl(r *http.Request) string { if r.Host != "" { return r.Host } - return "localhost:" + strconv.Itoa(ms.port) + return "localhost:" + strconv.Itoa(ms.option.Port) } func (ms *MasterServer) submitFromMasterServerHandler(w http.ResponseWriter, r *http.Request) { if ms.Topo.IsLeader() { @@ -142,7 +142,7 @@ func (ms *MasterServer) HasWritableVolume(option *topology.VolumeGrowOption) boo func (ms *MasterServer) getVolumeGrowOption(r *http.Request) (*topology.VolumeGrowOption, error) { replicationString := r.FormValue("replication") if replicationString == "" { - replicationString = ms.defaultReplicaPlacement + replicationString = ms.option.DefaultReplicaPlacement } replicaPlacement, err := storage.NewReplicaPlacementFromString(replicationString) if err != nil { @@ -152,7 +152,7 @@ func (ms *MasterServer) getVolumeGrowOption(r *http.Request) (*topology.VolumeGr if err != nil { return nil, err } - preallocate := ms.preallocate + preallocate := ms.preallocateSize if r.FormValue("preallocate") != "" { preallocate, err = strconv.ParseInt(r.FormValue("preallocate"), 10, 64) if err != nil {