Browse Source

refactor volume server to startVolumeServer()

pull/753/head
Chris Lu 6 years ago
parent
commit
da6154b29c
  1. 112
      weed/command/server.go
  2. 28
      weed/command/volume.go

112
weed/command/server.go

@ -13,8 +13,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb" "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
"github.com/chrislusf/seaweedfs/weed/server" "github.com/chrislusf/seaweedfs/weed/server"
"github.com/chrislusf/seaweedfs/weed/storage"
"github.com/chrislusf/seaweedfs/weed/util"
"github.com/chrislusf/seaweedfs/weed/util"
"github.com/gorilla/mux" "github.com/gorilla/mux"
"github.com/soheilhy/cmux" "github.com/soheilhy/cmux"
"google.golang.org/grpc/reflection" "google.golang.org/grpc/reflection"
@ -22,6 +21,7 @@ import (
type ServerOptions struct { type ServerOptions struct {
cpuprofile *string cpuprofile *string
v VolumeServerOptions
} }
var ( var (
@ -66,15 +66,9 @@ var (
masterVolumeSizeLimitMB = cmdServer.Flag.Uint("master.volumeSizeLimitMB", 30*1000, "Master stops directing writes to oversized volumes.") masterVolumeSizeLimitMB = cmdServer.Flag.Uint("master.volumeSizeLimitMB", 30*1000, "Master stops directing writes to oversized volumes.")
masterVolumePreallocate = cmdServer.Flag.Bool("master.volumePreallocate", false, "Preallocate disk space for volumes.") masterVolumePreallocate = cmdServer.Flag.Bool("master.volumePreallocate", false, "Preallocate disk space for volumes.")
masterDefaultReplicaPlacement = cmdServer.Flag.String("master.defaultReplicaPlacement", "000", "Default replication type if not specified.") masterDefaultReplicaPlacement = cmdServer.Flag.String("master.defaultReplicaPlacement", "000", "Default replication type if not specified.")
volumePort = cmdServer.Flag.Int("volume.port", 8080, "volume server http listen port")
volumePublicPort = cmdServer.Flag.Int("volume.port.public", 0, "volume server public port")
volumeDataFolders = cmdServer.Flag.String("dir", os.TempDir(), "directories to store data files. dir[,dir]...") volumeDataFolders = cmdServer.Flag.String("dir", os.TempDir(), "directories to store data files. dir[,dir]...")
volumeMaxDataVolumeCounts = cmdServer.Flag.String("volume.max", "7", "maximum numbers of volumes, count[,count]...") volumeMaxDataVolumeCounts = cmdServer.Flag.String("volume.max", "7", "maximum numbers of volumes, count[,count]...")
volumePulse = cmdServer.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats")
volumeIndexType = cmdServer.Flag.String("volume.index", "memory", "Choose [memory|leveldb|boltdb|btree] mode for memory~performance balance.")
volumeFixJpgOrientation = cmdServer.Flag.Bool("volume.images.fix.orientation", true, "Adjust jpg orientation when uploading.")
volumeReadRedirect = cmdServer.Flag.Bool("volume.read.redirect", true, "Redirect moved or non-local volumes.")
volumeServerPublicUrl = cmdServer.Flag.String("volume.publicUrl", "", "publicly accessible address")
pulseSeconds = cmdServer.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats")
isStartingFiler = cmdServer.Flag.Bool("filer", false, "whether to start filer") isStartingFiler = cmdServer.Flag.Bool("filer", false, "whether to start filer")
serverWhiteList []string serverWhiteList []string
@ -91,6 +85,14 @@ func init() {
filerOptions.disableDirListing = cmdServer.Flag.Bool("filer.disableDirListing", false, "turn off directory listing") filerOptions.disableDirListing = cmdServer.Flag.Bool("filer.disableDirListing", false, "turn off directory listing")
filerOptions.maxMB = cmdServer.Flag.Int("filer.maxMB", 32, "split files larger than the limit") filerOptions.maxMB = cmdServer.Flag.Int("filer.maxMB", 32, "split files larger than the limit")
filerOptions.dirListingLimit = cmdServer.Flag.Int("filer.dirListLimit", 1000, "limit sub dir listing size") filerOptions.dirListingLimit = cmdServer.Flag.Int("filer.dirListLimit", 1000, "limit sub dir listing size")
serverOptions.v.port = cmdServer.Flag.Int("volume.port", 8080, "volume server http listen port")
serverOptions.v.publicPort = cmdServer.Flag.Int("volume.port.public", 0, "volume server public port")
serverOptions.v.indexType = cmdServer.Flag.String("volume.index", "memory", "Choose [memory|leveldb|boltdb|btree] mode for memory~performance balance.")
serverOptions.v.fixJpgOrientation = cmdServer.Flag.Bool("volume.images.fix.orientation", true, "Adjust jpg orientation when uploading.")
serverOptions.v.readRedirect = cmdServer.Flag.Bool("volume.read.redirect", true, "Redirect moved or non-local volumes.")
serverOptions.v.publicUrl = cmdServer.Flag.String("volume.publicUrl", "", "publicly accessible address")
} }
func runServer(cmd *Command, args []string) bool { func runServer(cmd *Command, args []string) bool {
@ -110,6 +112,14 @@ func runServer(cmd *Command, args []string) bool {
master := *serverIp + ":" + strconv.Itoa(*masterPort) master := *serverIp + ":" + strconv.Itoa(*masterPort)
filerOptions.ip = serverIp filerOptions.ip = serverIp
serverOptions.v.ip = serverIp
serverOptions.v.bindIp = serverBindIp
serverOptions.v.masters = &master
serverOptions.v.idleConnectionTimeout = serverTimeout
serverOptions.v.maxCpu = serverMaxCpu
serverOptions.v.dataCenter = serverDataCenter
serverOptions.v.rack = serverRack
serverOptions.v.pulseSeconds = pulseSeconds
filerOptions.dataCenter = serverDataCenter filerOptions.dataCenter = serverDataCenter
@ -117,33 +127,12 @@ func runServer(cmd *Command, args []string) bool {
*filerOptions.defaultReplicaPlacement = *masterDefaultReplicaPlacement *filerOptions.defaultReplicaPlacement = *masterDefaultReplicaPlacement
} }
if *volumePublicPort == 0 {
*volumePublicPort = *volumePort
}
if *serverMaxCpu < 1 { if *serverMaxCpu < 1 {
*serverMaxCpu = runtime.NumCPU() *serverMaxCpu = runtime.NumCPU()
} }
runtime.GOMAXPROCS(*serverMaxCpu) runtime.GOMAXPROCS(*serverMaxCpu)
folders := strings.Split(*volumeDataFolders, ",") folders := strings.Split(*volumeDataFolders, ",")
maxCountStrings := strings.Split(*volumeMaxDataVolumeCounts, ",")
var maxCounts []int
for _, maxString := range maxCountStrings {
if max, e := strconv.Atoi(maxString); e == nil {
maxCounts = append(maxCounts, max)
} else {
glog.Fatalf("The max specified in -max not a valid number %s", maxString)
}
}
if len(folders) != len(maxCounts) {
glog.Fatalf("%d directories by -dir, but only %d max is set by -max", len(folders), len(maxCounts))
}
for _, folder := range folders {
if err := util.TestFolderWritable(folder); err != nil {
glog.Fatalf("Check Data Folder(-dir) Writable %s : %s", folder, err)
}
}
if *masterVolumeSizeLimitMB > 30*1000 { if *masterVolumeSizeLimitMB > 30*1000 {
glog.Fatalf("masterVolumeSizeLimitMB should be less than 30000") glog.Fatalf("masterVolumeSizeLimitMB should be less than 30000")
@ -179,7 +168,7 @@ func runServer(cmd *Command, args []string) bool {
r := mux.NewRouter() r := mux.NewRouter()
ms := weed_server.NewMasterServer(r, *masterPort, *masterMetaFolder, ms := weed_server.NewMasterServer(r, *masterPort, *masterMetaFolder,
*masterVolumeSizeLimitMB, *masterVolumePreallocate, *masterVolumeSizeLimitMB, *masterVolumePreallocate,
*volumePulse, *masterDefaultReplicaPlacement, *serverGarbageThreshold,
*pulseSeconds, *masterDefaultReplicaPlacement, *serverGarbageThreshold,
serverWhiteList, *serverSecureKey, serverWhiteList, *serverSecureKey,
) )
@ -193,7 +182,7 @@ func runServer(cmd *Command, args []string) bool {
raftWaitForMaster.Wait() raftWaitForMaster.Wait()
time.Sleep(100 * time.Millisecond) time.Sleep(100 * time.Millisecond)
myAddress, peers := checkPeers(*serverIp, *masterPort, *serverPeers) myAddress, peers := checkPeers(*serverIp, *masterPort, *serverPeers)
raftServer := weed_server.NewRaftServer(r, peers, myAddress, *masterMetaFolder, ms.Topo, *volumePulse)
raftServer := weed_server.NewRaftServer(r, peers, myAddress, *masterMetaFolder, ms.Topo, *pulseSeconds)
ms.SetRaftServer(raftServer) ms.SetRaftServer(raftServer)
volumeWait.Done() volumeWait.Done()
}() }()
@ -224,65 +213,8 @@ func runServer(cmd *Command, args []string) bool {
volumeWait.Wait() volumeWait.Wait()
time.Sleep(100 * time.Millisecond) time.Sleep(100 * time.Millisecond)
if *volumePublicPort == 0 {
*volumePublicPort = *volumePort
}
if *volumeServerPublicUrl == "" {
*volumeServerPublicUrl = *serverIp + ":" + strconv.Itoa(*volumePublicPort)
}
isSeperatedPublicPort := *volumePublicPort != *volumePort
volumeMux := http.NewServeMux()
publicVolumeMux := volumeMux
if isSeperatedPublicPort {
publicVolumeMux = http.NewServeMux()
}
volumeNeedleMapKind := storage.NeedleMapInMemory
switch *volumeIndexType {
case "leveldb":
volumeNeedleMapKind = storage.NeedleMapLevelDb
case "boltdb":
volumeNeedleMapKind = storage.NeedleMapBoltDb
case "btree":
volumeNeedleMapKind = storage.NeedleMapBtree
}
volumeServer := weed_server.NewVolumeServer(volumeMux, publicVolumeMux,
*serverIp, *volumePort, *volumeServerPublicUrl,
folders, maxCounts,
volumeNeedleMapKind,
[]string{master}, *volumePulse, *serverDataCenter, *serverRack,
serverWhiteList, *volumeFixJpgOrientation, *volumeReadRedirect,
)
glog.V(0).Infoln("Start Seaweed volume server", util.VERSION, "at", *serverIp+":"+strconv.Itoa(*volumePort))
volumeListener, eListen := util.NewListener(
*serverBindIp+":"+strconv.Itoa(*volumePort),
time.Duration(*serverTimeout)*time.Second,
)
if eListen != nil {
glog.Fatalf("Volume server listener error: %v", eListen)
}
if isSeperatedPublicPort {
publicListeningAddress := *serverIp + ":" + strconv.Itoa(*volumePublicPort)
glog.V(0).Infoln("Start Seaweed volume server", util.VERSION, "public at", publicListeningAddress)
publicListener, e := util.NewListener(publicListeningAddress, time.Duration(*serverTimeout)*time.Second)
if e != nil {
glog.Fatalf("Volume server listener error:%v", e)
}
go func() {
if e := http.Serve(publicListener, publicVolumeMux); e != nil {
glog.Fatalf("Volume server fail to serve public: %v", e)
}
}()
}
util.OnInterrupt(func() {
volumeServer.Shutdown()
pprof.StopCPUProfile()
})
if e := http.Serve(volumeListener, volumeMux); e != nil {
glog.Fatalf("Volume server fail to serve:%v", e)
}
serverOptions.v.startVolumeServer(*volumeDataFolders, *volumeMaxDataVolumeCounts, *serverWhiteListOption)
return true return true
} }

28
weed/command/volume.go

@ -1,17 +1,17 @@
package command package command
import ( import (
"net/http"
"os" "os"
"runtime" "runtime"
"strconv" "strconv"
"strings" "strings"
"time"
"github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/server"
"github.com/chrislusf/seaweedfs/weed/storage"
"github.com/chrislusf/seaweedfs/weed/util" "github.com/chrislusf/seaweedfs/weed/util"
"net/http"
"github.com/chrislusf/seaweedfs/weed/storage"
"github.com/chrislusf/seaweedfs/weed/server"
"time"
"runtime/pprof"
) )
var ( var (
@ -81,9 +81,16 @@ func runVolume(cmd *Command, args []string) bool {
runtime.GOMAXPROCS(*v.maxCpu) runtime.GOMAXPROCS(*v.maxCpu)
util.SetupProfiling(*v.cpuProfile, *v.memProfile) util.SetupProfiling(*v.cpuProfile, *v.memProfile)
v.startVolumeServer(*volumeFolders, *maxVolumeCounts, *volumeWhiteListOption)
return true
}
func (v VolumeServerOptions) startVolumeServer(volumeFolders, maxVolumeCounts, volumeWhiteListOption string) {
//Set multiple folders and each folder's max volume count limit' //Set multiple folders and each folder's max volume count limit'
v.folders = strings.Split(*volumeFolders, ",")
maxCountStrings := strings.Split(*maxVolumeCounts, ",")
v.folders = strings.Split(volumeFolders, ",")
maxCountStrings := strings.Split(maxVolumeCounts, ",")
for _, maxString := range maxCountStrings { for _, maxString := range maxCountStrings {
if max, e := strconv.Atoi(maxString); e == nil { if max, e := strconv.Atoi(maxString); e == nil {
v.folderMaxLimits = append(v.folderMaxLimits, max) v.folderMaxLimits = append(v.folderMaxLimits, max)
@ -101,8 +108,8 @@ func runVolume(cmd *Command, args []string) bool {
} }
//security related white list configuration //security related white list configuration
if *volumeWhiteListOption != "" {
v.whiteList = strings.Split(*volumeWhiteListOption, ",")
if volumeWhiteListOption != "" {
v.whiteList = strings.Split(volumeWhiteListOption, ",")
} }
if *v.ip == "" { if *v.ip == "" {
@ -166,10 +173,11 @@ func runVolume(cmd *Command, args []string) bool {
util.OnInterrupt(func() { util.OnInterrupt(func() {
volumeServer.Shutdown() volumeServer.Shutdown()
pprof.StopCPUProfile()
}) })
if e := http.Serve(listener, volumeMux); e != nil { if e := http.Serve(listener, volumeMux); e != nil {
glog.Fatalf("Volume server fail to serve: %v", e) glog.Fatalf("Volume server fail to serve: %v", e)
} }
return true
} }
Loading…
Cancel
Save