From f37ab2d3144c250bb01519eefed40bf348129cb5 Mon Sep 17 00:00:00 2001 From: bingoohuang Date: Thu, 24 Jan 2019 10:25:05 +0800 Subject: [PATCH] refactor code 1. make volumeSizeLimit argument more clear 2. use humanize.MiByte instead of 1024 * 1024 3. constantized NodeType together --- weed/command/filer_copy.go | 3 ++- weed/command/fix.go | 2 +- weed/command/master.go | 10 ++++---- weed/command/mount.go | 4 ++-- weed/command/mount_std.go | 5 ++-- weed/command/server.go | 9 ++++--- weed/operation/submit.go | 5 ++-- .../filer_server_handlers_write_autochunk.go | 5 ++-- weed/server/master_grpc_server.go | 2 +- weed/server/master_server.go | 12 +++++----- weed/server/volume_grpc_sync.go | 5 ++-- weed/storage/needle/compact_map_perf_test.go | 3 ++- weed/storage/types/needle_types.go | 3 ++- weed/topology/data_center.go | 2 +- weed/topology/data_node.go | 2 +- weed/topology/node.go | 13 +++++++--- weed/topology/rack.go | 2 +- weed/topology/topology.go | 2 +- weed/util/arg.go | 24 +++++++++++++++++++ weed/util/arg_test.go | 23 ++++++++++++++++++ weed/weed.go | 3 ++- 21 files changed, 100 insertions(+), 39 deletions(-) create mode 100644 weed/util/arg.go create mode 100644 weed/util/arg_test.go diff --git a/weed/command/filer_copy.go b/weed/command/filer_copy.go index 3638bcb27..3342eeb60 100644 --- a/weed/command/filer_copy.go +++ b/weed/command/filer_copy.go @@ -2,6 +2,7 @@ package command import ( "fmt" + "github.com/dustin/go-humanize" "io/ioutil" "net/url" "os" @@ -143,7 +144,7 @@ func doEachCopy(fileOrDir string, filerAddress, filerGrpcAddress string, path st } // find the chunk count - chunkSize := int64(*copy.maxMB * 1024 * 1024) + chunkSize := int64(*copy.maxMB * humanize.MiByte) chunkCount := 1 if chunkSize > 0 && fi.Size() > chunkSize { chunkCount = int(fi.Size()/chunkSize) + 1 diff --git a/weed/command/fix.go b/weed/command/fix.go index a800978c6..4cdafe946 100644 --- a/weed/command/fix.go +++ b/weed/command/fix.go @@ -81,8 +81,8 @@ func runFix(cmd *Command, args []string) bool { err = storage.ScanVolumeFile(*fixVolumePath, *fixVolumeCollection, vid, storage.NeedleMapInMemory, scanner) if err != nil { - glog.Fatalf("Export Volume File [ERROR] %s\n", err) os.Remove(indexFileName) + glog.Fatalf("Export Volume File [ERROR] %s\n", err) } return true diff --git a/weed/command/master.go b/weed/command/master.go index bd2267b9e..037ee6197 100644 --- a/weed/command/master.go +++ b/weed/command/master.go @@ -36,7 +36,8 @@ var ( masterBindIp = cmdMaster.Flag.String("ip.bind", "0.0.0.0", "ip address to bind to") metaFolder = cmdMaster.Flag.String("mdir", os.TempDir(), "data directory to store meta data") masterPeers = cmdMaster.Flag.String("peers", "", "all master nodes in comma separated ip:port list, example: 127.0.0.1:9093,127.0.0.1:9094") - volumeSizeLimitMB = cmdMaster.Flag.Uint("volumeSizeLimitMB", 30*1000, "Master stops directing writes to oversized volumes.") + volumeSizeLimitMiB = cmdMaster.Flag.Uint("volumeSizeLimitMB", 30*1000, "Master stops directing writes to over-sized volumes. (Deprecated, please use volumeSizeLimit!)") + volumeSizeLimitArg = cmdMaster.Flag.String("volumeSizeLimit", "30000MB", "Master stops directing writes to over-sized volumes upper to 30000MB.(eg, 30GB, 20GiB, 500MiB and etc)") volumePreallocate = cmdMaster.Flag.Bool("volumePreallocate", false, "Preallocate disk space for volumes.") mpulse = cmdMaster.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats") defaultReplicaPlacement = cmdMaster.Flag.String("defaultReplication", "000", "Default replication type if not specified.") @@ -64,13 +65,12 @@ func runMaster(cmd *Command, args []string) bool { if *masterWhiteListOption != "" { masterWhiteList = strings.Split(*masterWhiteListOption, ",") } - if *volumeSizeLimitMB > 30*1000 { - glog.Fatalf("volumeSizeLimitMB should be smaller than 30000") - } + + volumeSizeLimit := util.ParseVolumeSizeLimit(*volumeSizeLimitMiB, *volumeSizeLimitArg) r := mux.NewRouter() ms := weed_server.NewMasterServer(r, *mport, *metaFolder, - *volumeSizeLimitMB, *volumePreallocate, + volumeSizeLimit, *volumePreallocate, *mpulse, *defaultReplicaPlacement, *garbageThreshold, masterWhiteList, *masterSecureKey, ) diff --git a/weed/command/mount.go b/weed/command/mount.go index e61f16783..37b682bc3 100644 --- a/weed/command/mount.go +++ b/weed/command/mount.go @@ -15,7 +15,7 @@ type MountOptions struct { collection *string replication *string ttlSec *int - chunkSizeLimitMB *int + chunkSizeLimitMiB *int dataCenter *string } @@ -35,7 +35,7 @@ func init() { mountOptions.collection = cmdMount.Flag.String("collection", "", "collection to create the files") mountOptions.replication = cmdMount.Flag.String("replication", "", "replication(e.g. 000, 001) to create to files. If empty, let filer decide.") mountOptions.ttlSec = cmdMount.Flag.Int("ttl", 0, "file ttl in seconds") - mountOptions.chunkSizeLimitMB = cmdMount.Flag.Int("chunkSizeLimitMB", 4, "local write buffer size, also chunk large files") + mountOptions.chunkSizeLimitMiB = cmdMount.Flag.Int("chunkSizeLimitMB", 4, "local write buffer size, also chunk large files") mountOptions.dataCenter = cmdMount.Flag.String("dataCenter", "", "prefer to write to the data center") mountCpuProfile = cmdMount.Flag.String("cpuprofile", "", "cpu profile output file") mountMemProfile = cmdMount.Flag.String("memprofile", "", "memory profile output file") diff --git a/weed/command/mount_std.go b/weed/command/mount_std.go index 2937b9ef1..0058ea857 100644 --- a/weed/command/mount_std.go +++ b/weed/command/mount_std.go @@ -4,6 +4,7 @@ package command import ( "fmt" + "github.com/dustin/go-humanize" "os" "os/user" "runtime" @@ -24,7 +25,7 @@ func runMount(cmd *Command, args []string) bool { fmt.Printf("Please specify the mount directory via \"-dir\"") return false } - if *mountOptions.chunkSizeLimitMB <= 0 { + if *mountOptions.chunkSizeLimitMiB <= 0 { fmt.Printf("Please specify a reasonable buffer size.") return false } @@ -95,7 +96,7 @@ func runMount(cmd *Command, args []string) bool { Collection: *mountOptions.collection, Replication: *mountOptions.replication, TtlSec: int32(*mountOptions.ttlSec), - ChunkSizeLimit: int64(*mountOptions.chunkSizeLimitMB) * 1024 * 1024, + ChunkSizeLimit: int64(*mountOptions.chunkSizeLimitMiB) * humanize.MiByte, DataCenter: *mountOptions.dataCenter, DirListingLimit: *mountOptions.dirListingLimit, EntryCacheTtl: 3 * time.Second, diff --git a/weed/command/server.go b/weed/command/server.go index ba5305a97..a5c201fd2 100644 --- a/weed/command/server.go +++ b/weed/command/server.go @@ -63,7 +63,8 @@ var ( masterPort = cmdServer.Flag.Int("master.port", 9333, "master server http listen port") masterGrpcPort = cmdServer.Flag.Int("master.port.grpc", 0, "master grpc server listen port, default to http port + 10000") masterMetaFolder = cmdServer.Flag.String("master.dir", "", "data directory to store meta data, default to same as -dir specified") - masterVolumeSizeLimitMB = cmdServer.Flag.Uint("master.volumeSizeLimitMB", 30*1000, "Master stops directing writes to oversized volumes.") + masterVolumeSizeLimitMiB = cmdServer.Flag.Uint("master.volumeSizeLimitMB", 30*1000, "Master stops directing writes to over-sized volumes.(Deprecated, please use master.volumeSizeLimit!)") + masterVolumeSizeLimitArg = cmdServer.Flag.String("master.volumeSizeLimit", "30GB", "Master stops directing writes to over-sized volumes upper to 30GB.(eg, 30GB, 20GiB, 500MiB and etc)") masterVolumePreallocate = cmdServer.Flag.Bool("master.volumePreallocate", false, "Preallocate disk space for volumes.") masterDefaultReplicaPlacement = cmdServer.Flag.String("master.defaultReplicaPlacement", "000", "Default replication type if not specified.") volumeDataFolders = cmdServer.Flag.String("dir", os.TempDir(), "directories to store data files. dir[,dir]...") @@ -134,9 +135,7 @@ func runServer(cmd *Command, args []string) bool { folders := strings.Split(*volumeDataFolders, ",") - if *masterVolumeSizeLimitMB > 30*1000 { - glog.Fatalf("masterVolumeSizeLimitMB should be less than 30000") - } + masterVolumeSizeLimit := util.ParseVolumeSizeLimit(*masterVolumeSizeLimitMiB, *masterVolumeSizeLimitArg) if *masterMetaFolder == "" { *masterMetaFolder = folders[0] @@ -168,7 +167,7 @@ func runServer(cmd *Command, args []string) bool { go func() { r := mux.NewRouter() ms := weed_server.NewMasterServer(r, *masterPort, *masterMetaFolder, - *masterVolumeSizeLimitMB, *masterVolumePreallocate, + masterVolumeSizeLimit, *masterVolumePreallocate, *pulseSeconds, *masterDefaultReplicaPlacement, *serverGarbageThreshold, serverWhiteList, *serverSecureKey, ) diff --git a/weed/operation/submit.go b/weed/operation/submit.go index 7a1a3085e..883f85d6d 100644 --- a/weed/operation/submit.go +++ b/weed/operation/submit.go @@ -2,6 +2,7 @@ package operation import ( "bytes" + "github.com/dustin/go-humanize" "io" "mime" "net/url" @@ -120,8 +121,8 @@ func (fi FilePart) Upload(maxMB int, master string, secret security.Secret) (ret defer closer.Close() } baseName := path.Base(fi.FileName) - if maxMB > 0 && fi.FileSize > int64(maxMB*1024*1024) { - chunkSize := int64(maxMB * 1024 * 1024) + if maxMB > 0 && fi.FileSize > int64(maxMB*humanize.MiByte) { + chunkSize := int64(maxMB * humanize.MiByte) chunks := fi.FileSize/chunkSize + 1 cm := ChunkManifest{ Name: baseName, diff --git a/weed/server/filer_server_handlers_write_autochunk.go b/weed/server/filer_server_handlers_write_autochunk.go index 4b1745aaa..240947c52 100644 --- a/weed/server/filer_server_handlers_write_autochunk.go +++ b/weed/server/filer_server_handlers_write_autochunk.go @@ -2,6 +2,7 @@ package weed_server import ( "bytes" + "github.com/dustin/go-humanize" "io" "io/ioutil" "net/http" @@ -37,7 +38,7 @@ func (fs *FilerServer) autoChunk(w http.ResponseWriter, r *http.Request, replica } glog.V(4).Infoln("AutoChunking level set to", maxMB, "(MB)") - chunkSize := 1024 * 1024 * maxMB + chunkSize := maxMB * humanize.MiByte contentLength := int64(0) if contentLengthHeader := r.Header["Content-Length"]; len(contentLengthHeader) == 1 { @@ -82,7 +83,7 @@ func (fs *FilerServer) doAutoChunk(w http.ResponseWriter, r *http.Request, conte var fileChunks []*filer_pb.FileChunk totalBytesRead := int64(0) - tmpBufferSize := int32(1024 * 1024) + tmpBufferSize := int32(1 * humanize.MiByte) tmpBuffer := bytes.NewBuffer(make([]byte, 0, tmpBufferSize)) chunkBuf := make([]byte, chunkSize+tmpBufferSize, chunkSize+tmpBufferSize) // chunk size plus a little overflow chunkBufOffset := int32(0) diff --git a/weed/server/master_grpc_server.go b/weed/server/master_grpc_server.go index 93dce59d8..58238e2bc 100644 --- a/weed/server/master_grpc_server.go +++ b/weed/server/master_grpc_server.go @@ -66,7 +66,7 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ int(heartbeat.MaxVolumeCount)) glog.V(0).Infof("added volume server %v:%d", heartbeat.GetIp(), heartbeat.GetPort()) if err := stream.Send(&master_pb.HeartbeatResponse{ - VolumeSizeLimit: uint64(ms.volumeSizeLimitMB) * 1024 * 1024, + VolumeSizeLimit: ms.volumeSizeLimit, SecretKey: string(ms.guard.SecretKey), }); err != nil { return err diff --git a/weed/server/master_server.go b/weed/server/master_server.go index f22925e56..a78c8e2bc 100644 --- a/weed/server/master_server.go +++ b/weed/server/master_server.go @@ -20,7 +20,7 @@ import ( type MasterServer struct { port int metaFolder string - volumeSizeLimitMB uint + volumeSizeLimit uint64 preallocate int64 pulseSeconds int defaultReplicaPlacement string @@ -39,7 +39,7 @@ type MasterServer struct { } func NewMasterServer(r *mux.Router, port int, metaFolder string, - volumeSizeLimitMB uint, + volumeSizeLimit uint64, preallocate bool, pulseSeconds int, defaultReplicaPlacement string, @@ -50,11 +50,11 @@ func NewMasterServer(r *mux.Router, port int, metaFolder string, var preallocateSize int64 if preallocate { - preallocateSize = int64(volumeSizeLimitMB) * (1 << 20) + preallocateSize = int64(volumeSizeLimit) } ms := &MasterServer{ port: port, - volumeSizeLimitMB: volumeSizeLimitMB, + volumeSizeLimit: volumeSizeLimit, preallocate: preallocateSize, pulseSeconds: pulseSeconds, defaultReplicaPlacement: defaultReplicaPlacement, @@ -63,9 +63,9 @@ func NewMasterServer(r *mux.Router, port int, metaFolder string, } ms.bounedLeaderChan = make(chan int, 16) seq := sequence.NewMemorySequencer() - ms.Topo = topology.NewTopology("topo", seq, uint64(volumeSizeLimitMB)*1024*1024, pulseSeconds) + ms.Topo = topology.NewTopology("topo", seq, volumeSizeLimit, pulseSeconds) ms.vg = topology.NewDefaultVolumeGrowth() - glog.V(0).Infoln("Volume Size Limit is", volumeSizeLimitMB, "MB") + glog.V(0).Infoln("Volume Size Limit is", volumeSizeLimit) ms.guard = security.NewGuard(whiteList, secureKey) diff --git a/weed/server/volume_grpc_sync.go b/weed/server/volume_grpc_sync.go index 5f56ec17d..d7ed96437 100644 --- a/weed/server/volume_grpc_sync.go +++ b/weed/server/volume_grpc_sync.go @@ -3,6 +3,7 @@ package weed_server import ( "context" "fmt" + "github.com/dustin/go-humanize" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" @@ -40,7 +41,7 @@ func (vs *VolumeServer) VolumeSyncIndex(req *volume_server_pb.VolumeSyncIndexReq glog.V(2).Infof("sync volume %d index", req.VolumdId) } - const blockSizeLimit = 1024 * 1024 * 2 + const blockSizeLimit = 2 * humanize.MiByte for i := 0; i < len(content); i += blockSizeLimit { blockSize := len(content) - i if blockSize > blockSizeLimit { @@ -85,7 +86,7 @@ func (vs *VolumeServer) VolumeSyncData(req *volume_server_pb.VolumeSyncDataReque glog.Errorf("sync volume %d data: %v", req.VolumdId, err) } - const blockSizeLimit = 1024 * 1024 * 2 + const blockSizeLimit = 2 * humanize.MiByte for i := 0; i < len(content); i += blockSizeLimit { blockSize := len(content) - i if blockSize > blockSizeLimit { diff --git a/weed/storage/needle/compact_map_perf_test.go b/weed/storage/needle/compact_map_perf_test.go index cd21cc184..46469e3ba 100644 --- a/weed/storage/needle/compact_map_perf_test.go +++ b/weed/storage/needle/compact_map_perf_test.go @@ -2,6 +2,7 @@ package needle import ( "fmt" + "github.com/dustin/go-humanize" "log" "os" "runtime" @@ -83,5 +84,5 @@ func PrintMemUsage() { fmt.Printf("\tNumGC = %v", m.NumGC) } func bToMb(b uint64) uint64 { - return b / 1024 / 1024 + return b / humanize.MiByte } diff --git a/weed/storage/types/needle_types.go b/weed/storage/types/needle_types.go index ce4e601e4..2239044f6 100644 --- a/weed/storage/types/needle_types.go +++ b/weed/storage/types/needle_types.go @@ -3,6 +3,7 @@ package types import ( "fmt" "github.com/chrislusf/seaweedfs/weed/util" + "github.com/dustin/go-humanize" "math" "strconv" ) @@ -16,7 +17,7 @@ const ( NeedleEntrySize = NeedleIdSize + OffsetSize + SizeSize TimestampSize = 8 // int64 size NeedlePaddingSize = 8 - MaxPossibleVolumeSize = 4 * 1024 * 1024 * 1024 * 8 + MaxPossibleVolumeSize = 4 * humanize.GiByte * 8 TombstoneFileSize = math.MaxUint32 CookieSize = 4 ) diff --git a/weed/topology/data_center.go b/weed/topology/data_center.go index bcf2dfd31..5d0471a3a 100644 --- a/weed/topology/data_center.go +++ b/weed/topology/data_center.go @@ -7,7 +7,7 @@ type DataCenter struct { func NewDataCenter(id string) *DataCenter { dc := &DataCenter{} dc.id = NodeId(id) - dc.nodeType = "DataCenter" + dc.nodeType = NodeTypeDataCenter dc.children = make(map[NodeId]Node) dc.NodeImpl.value = dc return dc diff --git a/weed/topology/data_node.go b/weed/topology/data_node.go index 6ea6d3938..558c63f40 100644 --- a/weed/topology/data_node.go +++ b/weed/topology/data_node.go @@ -20,7 +20,7 @@ type DataNode struct { func NewDataNode(id string) *DataNode { s := &DataNode{} s.id = NodeId(id) - s.nodeType = "DataNode" + s.nodeType = NodeTypeDataNode s.volumes = make(map[storage.VolumeId]storage.VolumeInfo) s.NodeImpl.value = s return s diff --git a/weed/topology/node.go b/weed/topology/node.go index b7d2f79ec..29fdcf86c 100644 --- a/weed/topology/node.go +++ b/weed/topology/node.go @@ -53,6 +53,13 @@ type NodeImpl struct { value interface{} } +const ( + NodeTypeTopology = "Topology" + NodeTypeDataCenter = "DataCenter" + NodeTypeRack = "Rack" + NodeTypeDataNode = "DataNode" +) + // the first node must satisfy filterFirstNodeFn(), the rest nodes must have one free slot func (n *NodeImpl) RandomlyPickNodes(numberOfNodes int, filterFirstNodeFn func(dn Node) error) (firstNode Node, restNodes []Node, err error) { candidates := make([]Node, 0, len(n.children)) @@ -109,13 +116,13 @@ func (n *NodeImpl) RandomlyPickNodes(numberOfNodes int, filterFirstNodeFn func(d } func (n *NodeImpl) IsDataNode() bool { - return n.nodeType == "DataNode" + return n.nodeType == NodeTypeDataNode } func (n *NodeImpl) IsRack() bool { - return n.nodeType == "Rack" + return n.nodeType == NodeTypeRack } func (n *NodeImpl) IsDataCenter() bool { - return n.nodeType == "DataCenter" + return n.nodeType == NodeTypeDataCenter } func (n *NodeImpl) String() string { if n.parent != nil { diff --git a/weed/topology/rack.go b/weed/topology/rack.go index a48d64323..de283e1a6 100644 --- a/weed/topology/rack.go +++ b/weed/topology/rack.go @@ -12,7 +12,7 @@ type Rack struct { func NewRack(id string) *Rack { r := &Rack{} r.id = NodeId(id) - r.nodeType = "Rack" + r.nodeType = NodeTypeRack r.children = make(map[NodeId]Node) r.NodeImpl.value = r return r diff --git a/weed/topology/topology.go b/weed/topology/topology.go index 4242bfa05..d43a4ea29 100644 --- a/weed/topology/topology.go +++ b/weed/topology/topology.go @@ -33,7 +33,7 @@ type Topology struct { func NewTopology(id string, seq sequence.Sequencer, volumeSizeLimit uint64, pulse int) *Topology { t := &Topology{} t.id = NodeId(id) - t.nodeType = "Topology" + t.nodeType = NodeTypeTopology t.NodeImpl.value = t t.children = make(map[NodeId]Node) t.collectionMap = util.NewConcurrentReadMap() diff --git a/weed/util/arg.go b/weed/util/arg.go new file mode 100644 index 000000000..3794a3b6b --- /dev/null +++ b/weed/util/arg.go @@ -0,0 +1,24 @@ +package util + +import ( + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/dustin/go-humanize" +) + +func ParseVolumeSizeLimit(volumeSizeLimitMiB uint, volumeSizeLimitArg string) uint64 { + volumeSizeLimit := uint64(volumeSizeLimitMiB) * humanize.MiByte + if volumeSizeLimitArg != "" { + var err error + volumeSizeLimit, err = humanize.ParseBytes(volumeSizeLimitArg) + Assert(err != nil, "Parse volumeSizeLimit %s : %s", volumeSizeLimitArg, err) + } + + Assert(volumeSizeLimit > 30*1000*humanize.MiByte, "volumeSizeLimit should be smaller than 30000MB") + return volumeSizeLimit +} + +func Assert(condition bool, format string, args ...interface{}) { + if condition { + glog.Fatalf(format, args...) + } +} diff --git a/weed/util/arg_test.go b/weed/util/arg_test.go new file mode 100644 index 000000000..f27a08260 --- /dev/null +++ b/weed/util/arg_test.go @@ -0,0 +1,23 @@ +package util + +import ( + "github.com/dustin/go-humanize" + "testing" +) + +func TestParseVolumeSizeLimit(t *testing.T) { + volumeSizeLimit := ParseVolumeSizeLimit(10, "") + if volumeSizeLimit != 10*humanize.MiByte { + t.Fail() + } + + volumeSizeLimit = ParseVolumeSizeLimit(10, "11GiB") + if volumeSizeLimit != 11*humanize.GiByte { + t.Fail() + } + + volumeSizeLimit = ParseVolumeSizeLimit(10, "11811160064") + if volumeSizeLimit != 11*humanize.GiByte { + t.Fail() + } +} diff --git a/weed/weed.go b/weed/weed.go index 340da6625..7680d6993 100644 --- a/weed/weed.go +++ b/weed/weed.go @@ -6,6 +6,7 @@ package main import ( "flag" "fmt" + "github.com/dustin/go-humanize" "io" "math/rand" "os" @@ -37,7 +38,7 @@ func setExitStatus(n int) { } func main() { - glog.MaxSize = 1024 * 1024 * 32 + glog.MaxSize = 32 * humanize.MiByte rand.Seed(time.Now().UnixNano()) flag.Usage = usage flag.Parse()