From da6154b29c71c5f12d482ffd773e77d492b89371 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Thu, 11 Oct 2018 00:04:31 -0700 Subject: [PATCH] refactor volume server to startVolumeServer() --- weed/command/server.go | 112 ++++++++--------------------------------- weed/command/volume.go | 28 +++++++---- 2 files changed, 40 insertions(+), 100 deletions(-) diff --git a/weed/command/server.go b/weed/command/server.go index 7cd245b94..875feebad 100644 --- a/weed/command/server.go +++ b/weed/command/server.go @@ -13,8 +13,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/master_pb" "github.com/chrislusf/seaweedfs/weed/server" - "github.com/chrislusf/seaweedfs/weed/storage" - "github.com/chrislusf/seaweedfs/weed/util" + "github.com/chrislusf/seaweedfs/weed/util" "github.com/gorilla/mux" "github.com/soheilhy/cmux" "google.golang.org/grpc/reflection" @@ -22,6 +21,7 @@ import ( type ServerOptions struct { cpuprofile *string + v VolumeServerOptions } var ( @@ -66,15 +66,9 @@ var ( masterVolumeSizeLimitMB = cmdServer.Flag.Uint("master.volumeSizeLimitMB", 30*1000, "Master stops directing writes to oversized volumes.") masterVolumePreallocate = cmdServer.Flag.Bool("master.volumePreallocate", false, "Preallocate disk space for volumes.") masterDefaultReplicaPlacement = cmdServer.Flag.String("master.defaultReplicaPlacement", "000", "Default replication type if not specified.") - volumePort = cmdServer.Flag.Int("volume.port", 8080, "volume server http listen port") - volumePublicPort = cmdServer.Flag.Int("volume.port.public", 0, "volume server public port") volumeDataFolders = cmdServer.Flag.String("dir", os.TempDir(), "directories to store data files. dir[,dir]...") volumeMaxDataVolumeCounts = cmdServer.Flag.String("volume.max", "7", "maximum numbers of volumes, count[,count]...") - volumePulse = cmdServer.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats") - volumeIndexType = cmdServer.Flag.String("volume.index", "memory", "Choose [memory|leveldb|boltdb|btree] mode for memory~performance balance.") - volumeFixJpgOrientation = cmdServer.Flag.Bool("volume.images.fix.orientation", true, "Adjust jpg orientation when uploading.") - volumeReadRedirect = cmdServer.Flag.Bool("volume.read.redirect", true, "Redirect moved or non-local volumes.") - volumeServerPublicUrl = cmdServer.Flag.String("volume.publicUrl", "", "publicly accessible address") + pulseSeconds = cmdServer.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats") isStartingFiler = cmdServer.Flag.Bool("filer", false, "whether to start filer") serverWhiteList []string @@ -91,6 +85,14 @@ func init() { filerOptions.disableDirListing = cmdServer.Flag.Bool("filer.disableDirListing", false, "turn off directory listing") filerOptions.maxMB = cmdServer.Flag.Int("filer.maxMB", 32, "split files larger than the limit") filerOptions.dirListingLimit = cmdServer.Flag.Int("filer.dirListLimit", 1000, "limit sub dir listing size") + + serverOptions.v.port = cmdServer.Flag.Int("volume.port", 8080, "volume server http listen port") + serverOptions.v.publicPort = cmdServer.Flag.Int("volume.port.public", 0, "volume server public port") + serverOptions.v.indexType = cmdServer.Flag.String("volume.index", "memory", "Choose [memory|leveldb|boltdb|btree] mode for memory~performance balance.") + serverOptions.v.fixJpgOrientation = cmdServer.Flag.Bool("volume.images.fix.orientation", true, "Adjust jpg orientation when uploading.") + serverOptions.v.readRedirect = cmdServer.Flag.Bool("volume.read.redirect", true, "Redirect moved or non-local volumes.") + serverOptions.v.publicUrl = cmdServer.Flag.String("volume.publicUrl", "", "publicly accessible address") + } func runServer(cmd *Command, args []string) bool { @@ -110,6 +112,14 @@ func runServer(cmd *Command, args []string) bool { master := *serverIp + ":" + strconv.Itoa(*masterPort) filerOptions.ip = serverIp + serverOptions.v.ip = serverIp + serverOptions.v.bindIp = serverBindIp + serverOptions.v.masters = &master + serverOptions.v.idleConnectionTimeout = serverTimeout + serverOptions.v.maxCpu = serverMaxCpu + serverOptions.v.dataCenter = serverDataCenter + serverOptions.v.rack = serverRack + serverOptions.v.pulseSeconds = pulseSeconds filerOptions.dataCenter = serverDataCenter @@ -117,33 +127,12 @@ func runServer(cmd *Command, args []string) bool { *filerOptions.defaultReplicaPlacement = *masterDefaultReplicaPlacement } - if *volumePublicPort == 0 { - *volumePublicPort = *volumePort - } - if *serverMaxCpu < 1 { *serverMaxCpu = runtime.NumCPU() } runtime.GOMAXPROCS(*serverMaxCpu) folders := strings.Split(*volumeDataFolders, ",") - maxCountStrings := strings.Split(*volumeMaxDataVolumeCounts, ",") - var maxCounts []int - for _, maxString := range maxCountStrings { - if max, e := strconv.Atoi(maxString); e == nil { - maxCounts = append(maxCounts, max) - } else { - glog.Fatalf("The max specified in -max not a valid number %s", maxString) - } - } - if len(folders) != len(maxCounts) { - glog.Fatalf("%d directories by -dir, but only %d max is set by -max", len(folders), len(maxCounts)) - } - for _, folder := range folders { - if err := util.TestFolderWritable(folder); err != nil { - glog.Fatalf("Check Data Folder(-dir) Writable %s : %s", folder, err) - } - } if *masterVolumeSizeLimitMB > 30*1000 { glog.Fatalf("masterVolumeSizeLimitMB should be less than 30000") @@ -179,7 +168,7 @@ func runServer(cmd *Command, args []string) bool { r := mux.NewRouter() ms := weed_server.NewMasterServer(r, *masterPort, *masterMetaFolder, *masterVolumeSizeLimitMB, *masterVolumePreallocate, - *volumePulse, *masterDefaultReplicaPlacement, *serverGarbageThreshold, + *pulseSeconds, *masterDefaultReplicaPlacement, *serverGarbageThreshold, serverWhiteList, *serverSecureKey, ) @@ -193,7 +182,7 @@ func runServer(cmd *Command, args []string) bool { raftWaitForMaster.Wait() time.Sleep(100 * time.Millisecond) myAddress, peers := checkPeers(*serverIp, *masterPort, *serverPeers) - raftServer := weed_server.NewRaftServer(r, peers, myAddress, *masterMetaFolder, ms.Topo, *volumePulse) + raftServer := weed_server.NewRaftServer(r, peers, myAddress, *masterMetaFolder, ms.Topo, *pulseSeconds) ms.SetRaftServer(raftServer) volumeWait.Done() }() @@ -224,65 +213,8 @@ func runServer(cmd *Command, args []string) bool { volumeWait.Wait() time.Sleep(100 * time.Millisecond) - if *volumePublicPort == 0 { - *volumePublicPort = *volumePort - } - if *volumeServerPublicUrl == "" { - *volumeServerPublicUrl = *serverIp + ":" + strconv.Itoa(*volumePublicPort) - } - isSeperatedPublicPort := *volumePublicPort != *volumePort - volumeMux := http.NewServeMux() - publicVolumeMux := volumeMux - if isSeperatedPublicPort { - publicVolumeMux = http.NewServeMux() - } - volumeNeedleMapKind := storage.NeedleMapInMemory - switch *volumeIndexType { - case "leveldb": - volumeNeedleMapKind = storage.NeedleMapLevelDb - case "boltdb": - volumeNeedleMapKind = storage.NeedleMapBoltDb - case "btree": - volumeNeedleMapKind = storage.NeedleMapBtree - } - volumeServer := weed_server.NewVolumeServer(volumeMux, publicVolumeMux, - *serverIp, *volumePort, *volumeServerPublicUrl, - folders, maxCounts, - volumeNeedleMapKind, - []string{master}, *volumePulse, *serverDataCenter, *serverRack, - serverWhiteList, *volumeFixJpgOrientation, *volumeReadRedirect, - ) - - glog.V(0).Infoln("Start Seaweed volume server", util.VERSION, "at", *serverIp+":"+strconv.Itoa(*volumePort)) - volumeListener, eListen := util.NewListener( - *serverBindIp+":"+strconv.Itoa(*volumePort), - time.Duration(*serverTimeout)*time.Second, - ) - if eListen != nil { - glog.Fatalf("Volume server listener error: %v", eListen) - } - if isSeperatedPublicPort { - publicListeningAddress := *serverIp + ":" + strconv.Itoa(*volumePublicPort) - glog.V(0).Infoln("Start Seaweed volume server", util.VERSION, "public at", publicListeningAddress) - publicListener, e := util.NewListener(publicListeningAddress, time.Duration(*serverTimeout)*time.Second) - if e != nil { - glog.Fatalf("Volume server listener error:%v", e) - } - go func() { - if e := http.Serve(publicListener, publicVolumeMux); e != nil { - glog.Fatalf("Volume server fail to serve public: %v", e) - } - }() - } - util.OnInterrupt(func() { - volumeServer.Shutdown() - pprof.StopCPUProfile() - }) - - if e := http.Serve(volumeListener, volumeMux); e != nil { - glog.Fatalf("Volume server fail to serve:%v", e) - } + serverOptions.v.startVolumeServer(*volumeDataFolders, *volumeMaxDataVolumeCounts, *serverWhiteListOption) return true } diff --git a/weed/command/volume.go b/weed/command/volume.go index 407c39eb1..df8d842dc 100644 --- a/weed/command/volume.go +++ b/weed/command/volume.go @@ -1,17 +1,17 @@ package command import ( - "net/http" "os" "runtime" "strconv" "strings" - "time" - "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/server" - "github.com/chrislusf/seaweedfs/weed/storage" "github.com/chrislusf/seaweedfs/weed/util" + "net/http" + "github.com/chrislusf/seaweedfs/weed/storage" + "github.com/chrislusf/seaweedfs/weed/server" + "time" + "runtime/pprof" ) var ( @@ -81,9 +81,16 @@ func runVolume(cmd *Command, args []string) bool { runtime.GOMAXPROCS(*v.maxCpu) util.SetupProfiling(*v.cpuProfile, *v.memProfile) + v.startVolumeServer(*volumeFolders, *maxVolumeCounts, *volumeWhiteListOption) + + return true +} + +func (v VolumeServerOptions) startVolumeServer(volumeFolders, maxVolumeCounts, volumeWhiteListOption string) { + //Set multiple folders and each folder's max volume count limit' - v.folders = strings.Split(*volumeFolders, ",") - maxCountStrings := strings.Split(*maxVolumeCounts, ",") + v.folders = strings.Split(volumeFolders, ",") + maxCountStrings := strings.Split(maxVolumeCounts, ",") for _, maxString := range maxCountStrings { if max, e := strconv.Atoi(maxString); e == nil { v.folderMaxLimits = append(v.folderMaxLimits, max) @@ -101,8 +108,8 @@ func runVolume(cmd *Command, args []string) bool { } //security related white list configuration - if *volumeWhiteListOption != "" { - v.whiteList = strings.Split(*volumeWhiteListOption, ",") + if volumeWhiteListOption != "" { + v.whiteList = strings.Split(volumeWhiteListOption, ",") } if *v.ip == "" { @@ -166,10 +173,11 @@ func runVolume(cmd *Command, args []string) bool { util.OnInterrupt(func() { volumeServer.Shutdown() + pprof.StopCPUProfile() }) if e := http.Serve(listener, volumeMux); e != nil { glog.Fatalf("Volume server fail to serve: %v", e) } - return true + }