Browse Source

refactor code

1. make volumeSizeLimit argument more clear
2. use humanize.MiByte instead of 1024 * 1024
3. constantized NodeType together
pull/848/head
bingoohuang 7 years ago
parent
commit
f37ab2d314
  1. 3
      weed/command/filer_copy.go
  2. 2
      weed/command/fix.go
  3. 10
      weed/command/master.go
  4. 4
      weed/command/mount.go
  5. 5
      weed/command/mount_std.go
  6. 9
      weed/command/server.go
  7. 5
      weed/operation/submit.go
  8. 5
      weed/server/filer_server_handlers_write_autochunk.go
  9. 2
      weed/server/master_grpc_server.go
  10. 12
      weed/server/master_server.go
  11. 5
      weed/server/volume_grpc_sync.go
  12. 3
      weed/storage/needle/compact_map_perf_test.go
  13. 3
      weed/storage/types/needle_types.go
  14. 2
      weed/topology/data_center.go
  15. 2
      weed/topology/data_node.go
  16. 13
      weed/topology/node.go
  17. 2
      weed/topology/rack.go
  18. 2
      weed/topology/topology.go
  19. 24
      weed/util/arg.go
  20. 23
      weed/util/arg_test.go
  21. 3
      weed/weed.go

3
weed/command/filer_copy.go

@ -2,6 +2,7 @@ package command
import (
"fmt"
"github.com/dustin/go-humanize"
"io/ioutil"
"net/url"
"os"
@ -143,7 +144,7 @@ func doEachCopy(fileOrDir string, filerAddress, filerGrpcAddress string, path st
}
// find the chunk count
chunkSize := int64(*copy.maxMB * 1024 * 1024)
chunkSize := int64(*copy.maxMB * humanize.MiByte)
chunkCount := 1
if chunkSize > 0 && fi.Size() > chunkSize {
chunkCount = int(fi.Size()/chunkSize) + 1

2
weed/command/fix.go

@ -81,8 +81,8 @@ func runFix(cmd *Command, args []string) bool {
err = storage.ScanVolumeFile(*fixVolumePath, *fixVolumeCollection, vid, storage.NeedleMapInMemory, scanner)
if err != nil {
glog.Fatalf("Export Volume File [ERROR] %s\n", err)
os.Remove(indexFileName)
glog.Fatalf("Export Volume File [ERROR] %s\n", err)
}
return true

10
weed/command/master.go

@ -36,7 +36,8 @@ var (
masterBindIp = cmdMaster.Flag.String("ip.bind", "0.0.0.0", "ip address to bind to")
metaFolder = cmdMaster.Flag.String("mdir", os.TempDir(), "data directory to store meta data")
masterPeers = cmdMaster.Flag.String("peers", "", "all master nodes in comma separated ip:port list, example: 127.0.0.1:9093,127.0.0.1:9094")
volumeSizeLimitMB = cmdMaster.Flag.Uint("volumeSizeLimitMB", 30*1000, "Master stops directing writes to oversized volumes.")
volumeSizeLimitMiB = cmdMaster.Flag.Uint("volumeSizeLimitMB", 30*1000, "Master stops directing writes to over-sized volumes. (Deprecated, please use volumeSizeLimit!)")
volumeSizeLimitArg = cmdMaster.Flag.String("volumeSizeLimit", "30000MB", "Master stops directing writes to over-sized volumes upper to 30000MB.(eg, 30GB, 20GiB, 500MiB and etc)")
volumePreallocate = cmdMaster.Flag.Bool("volumePreallocate", false, "Preallocate disk space for volumes.")
mpulse = cmdMaster.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats")
defaultReplicaPlacement = cmdMaster.Flag.String("defaultReplication", "000", "Default replication type if not specified.")
@ -64,13 +65,12 @@ func runMaster(cmd *Command, args []string) bool {
if *masterWhiteListOption != "" {
masterWhiteList = strings.Split(*masterWhiteListOption, ",")
}
if *volumeSizeLimitMB > 30*1000 {
glog.Fatalf("volumeSizeLimitMB should be smaller than 30000")
}
volumeSizeLimit := util.ParseVolumeSizeLimit(*volumeSizeLimitMiB, *volumeSizeLimitArg)
r := mux.NewRouter()
ms := weed_server.NewMasterServer(r, *mport, *metaFolder,
*volumeSizeLimitMB, *volumePreallocate,
volumeSizeLimit, *volumePreallocate,
*mpulse, *defaultReplicaPlacement, *garbageThreshold,
masterWhiteList, *masterSecureKey,
)

4
weed/command/mount.go

@ -15,7 +15,7 @@ type MountOptions struct {
collection *string
replication *string
ttlSec *int
chunkSizeLimitMB *int
chunkSizeLimitMiB *int
dataCenter *string
}
@ -35,7 +35,7 @@ func init() {
mountOptions.collection = cmdMount.Flag.String("collection", "", "collection to create the files")
mountOptions.replication = cmdMount.Flag.String("replication", "", "replication(e.g. 000, 001) to create to files. If empty, let filer decide.")
mountOptions.ttlSec = cmdMount.Flag.Int("ttl", 0, "file ttl in seconds")
mountOptions.chunkSizeLimitMB = cmdMount.Flag.Int("chunkSizeLimitMB", 4, "local write buffer size, also chunk large files")
mountOptions.chunkSizeLimitMiB = cmdMount.Flag.Int("chunkSizeLimitMB", 4, "local write buffer size, also chunk large files")
mountOptions.dataCenter = cmdMount.Flag.String("dataCenter", "", "prefer to write to the data center")
mountCpuProfile = cmdMount.Flag.String("cpuprofile", "", "cpu profile output file")
mountMemProfile = cmdMount.Flag.String("memprofile", "", "memory profile output file")

5
weed/command/mount_std.go

@ -4,6 +4,7 @@ package command
import (
"fmt"
"github.com/dustin/go-humanize"
"os"
"os/user"
"runtime"
@ -24,7 +25,7 @@ func runMount(cmd *Command, args []string) bool {
fmt.Printf("Please specify the mount directory via \"-dir\"")
return false
}
if *mountOptions.chunkSizeLimitMB <= 0 {
if *mountOptions.chunkSizeLimitMiB <= 0 {
fmt.Printf("Please specify a reasonable buffer size.")
return false
}
@ -95,7 +96,7 @@ func runMount(cmd *Command, args []string) bool {
Collection: *mountOptions.collection,
Replication: *mountOptions.replication,
TtlSec: int32(*mountOptions.ttlSec),
ChunkSizeLimit: int64(*mountOptions.chunkSizeLimitMB) * 1024 * 1024,
ChunkSizeLimit: int64(*mountOptions.chunkSizeLimitMiB) * humanize.MiByte,
DataCenter: *mountOptions.dataCenter,
DirListingLimit: *mountOptions.dirListingLimit,
EntryCacheTtl: 3 * time.Second,

9
weed/command/server.go

@ -63,7 +63,8 @@ var (
masterPort = cmdServer.Flag.Int("master.port", 9333, "master server http listen port")
masterGrpcPort = cmdServer.Flag.Int("master.port.grpc", 0, "master grpc server listen port, default to http port + 10000")
masterMetaFolder = cmdServer.Flag.String("master.dir", "", "data directory to store meta data, default to same as -dir specified")
masterVolumeSizeLimitMB = cmdServer.Flag.Uint("master.volumeSizeLimitMB", 30*1000, "Master stops directing writes to oversized volumes.")
masterVolumeSizeLimitMiB = cmdServer.Flag.Uint("master.volumeSizeLimitMB", 30*1000, "Master stops directing writes to over-sized volumes.(Deprecated, please use master.volumeSizeLimit!)")
masterVolumeSizeLimitArg = cmdServer.Flag.String("master.volumeSizeLimit", "30GB", "Master stops directing writes to over-sized volumes upper to 30GB.(eg, 30GB, 20GiB, 500MiB and etc)")
masterVolumePreallocate = cmdServer.Flag.Bool("master.volumePreallocate", false, "Preallocate disk space for volumes.")
masterDefaultReplicaPlacement = cmdServer.Flag.String("master.defaultReplicaPlacement", "000", "Default replication type if not specified.")
volumeDataFolders = cmdServer.Flag.String("dir", os.TempDir(), "directories to store data files. dir[,dir]...")
@ -134,9 +135,7 @@ func runServer(cmd *Command, args []string) bool {
folders := strings.Split(*volumeDataFolders, ",")
if *masterVolumeSizeLimitMB > 30*1000 {
glog.Fatalf("masterVolumeSizeLimitMB should be less than 30000")
}
masterVolumeSizeLimit := util.ParseVolumeSizeLimit(*masterVolumeSizeLimitMiB, *masterVolumeSizeLimitArg)
if *masterMetaFolder == "" {
*masterMetaFolder = folders[0]
@ -168,7 +167,7 @@ func runServer(cmd *Command, args []string) bool {
go func() {
r := mux.NewRouter()
ms := weed_server.NewMasterServer(r, *masterPort, *masterMetaFolder,
*masterVolumeSizeLimitMB, *masterVolumePreallocate,
masterVolumeSizeLimit, *masterVolumePreallocate,
*pulseSeconds, *masterDefaultReplicaPlacement, *serverGarbageThreshold,
serverWhiteList, *serverSecureKey,
)

5
weed/operation/submit.go

@ -2,6 +2,7 @@ package operation
import (
"bytes"
"github.com/dustin/go-humanize"
"io"
"mime"
"net/url"
@ -120,8 +121,8 @@ func (fi FilePart) Upload(maxMB int, master string, secret security.Secret) (ret
defer closer.Close()
}
baseName := path.Base(fi.FileName)
if maxMB > 0 && fi.FileSize > int64(maxMB*1024*1024) {
chunkSize := int64(maxMB * 1024 * 1024)
if maxMB > 0 && fi.FileSize > int64(maxMB*humanize.MiByte) {
chunkSize := int64(maxMB * humanize.MiByte)
chunks := fi.FileSize/chunkSize + 1
cm := ChunkManifest{
Name: baseName,

5
weed/server/filer_server_handlers_write_autochunk.go

@ -2,6 +2,7 @@ package weed_server
import (
"bytes"
"github.com/dustin/go-humanize"
"io"
"io/ioutil"
"net/http"
@ -37,7 +38,7 @@ func (fs *FilerServer) autoChunk(w http.ResponseWriter, r *http.Request, replica
}
glog.V(4).Infoln("AutoChunking level set to", maxMB, "(MB)")
chunkSize := 1024 * 1024 * maxMB
chunkSize := maxMB * humanize.MiByte
contentLength := int64(0)
if contentLengthHeader := r.Header["Content-Length"]; len(contentLengthHeader) == 1 {
@ -82,7 +83,7 @@ func (fs *FilerServer) doAutoChunk(w http.ResponseWriter, r *http.Request, conte
var fileChunks []*filer_pb.FileChunk
totalBytesRead := int64(0)
tmpBufferSize := int32(1024 * 1024)
tmpBufferSize := int32(1 * humanize.MiByte)
tmpBuffer := bytes.NewBuffer(make([]byte, 0, tmpBufferSize))
chunkBuf := make([]byte, chunkSize+tmpBufferSize, chunkSize+tmpBufferSize) // chunk size plus a little overflow
chunkBufOffset := int32(0)

2
weed/server/master_grpc_server.go

@ -66,7 +66,7 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ
int(heartbeat.MaxVolumeCount))
glog.V(0).Infof("added volume server %v:%d", heartbeat.GetIp(), heartbeat.GetPort())
if err := stream.Send(&master_pb.HeartbeatResponse{
VolumeSizeLimit: uint64(ms.volumeSizeLimitMB) * 1024 * 1024,
VolumeSizeLimit: ms.volumeSizeLimit,
SecretKey: string(ms.guard.SecretKey),
}); err != nil {
return err

12
weed/server/master_server.go

@ -20,7 +20,7 @@ import (
type MasterServer struct {
port int
metaFolder string
volumeSizeLimitMB uint
volumeSizeLimit uint64
preallocate int64
pulseSeconds int
defaultReplicaPlacement string
@ -39,7 +39,7 @@ type MasterServer struct {
}
func NewMasterServer(r *mux.Router, port int, metaFolder string,
volumeSizeLimitMB uint,
volumeSizeLimit uint64,
preallocate bool,
pulseSeconds int,
defaultReplicaPlacement string,
@ -50,11 +50,11 @@ func NewMasterServer(r *mux.Router, port int, metaFolder string,
var preallocateSize int64
if preallocate {
preallocateSize = int64(volumeSizeLimitMB) * (1 << 20)
preallocateSize = int64(volumeSizeLimit)
}
ms := &MasterServer{
port: port,
volumeSizeLimitMB: volumeSizeLimitMB,
volumeSizeLimit: volumeSizeLimit,
preallocate: preallocateSize,
pulseSeconds: pulseSeconds,
defaultReplicaPlacement: defaultReplicaPlacement,
@ -63,9 +63,9 @@ func NewMasterServer(r *mux.Router, port int, metaFolder string,
}
ms.bounedLeaderChan = make(chan int, 16)
seq := sequence.NewMemorySequencer()
ms.Topo = topology.NewTopology("topo", seq, uint64(volumeSizeLimitMB)*1024*1024, pulseSeconds)
ms.Topo = topology.NewTopology("topo", seq, volumeSizeLimit, pulseSeconds)
ms.vg = topology.NewDefaultVolumeGrowth()
glog.V(0).Infoln("Volume Size Limit is", volumeSizeLimitMB, "MB")
glog.V(0).Infoln("Volume Size Limit is", volumeSizeLimit)
ms.guard = security.NewGuard(whiteList, secureKey)

5
weed/server/volume_grpc_sync.go

@ -3,6 +3,7 @@ package weed_server
import (
"context"
"fmt"
"github.com/dustin/go-humanize"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
@ -40,7 +41,7 @@ func (vs *VolumeServer) VolumeSyncIndex(req *volume_server_pb.VolumeSyncIndexReq
glog.V(2).Infof("sync volume %d index", req.VolumdId)
}
const blockSizeLimit = 1024 * 1024 * 2
const blockSizeLimit = 2 * humanize.MiByte
for i := 0; i < len(content); i += blockSizeLimit {
blockSize := len(content) - i
if blockSize > blockSizeLimit {
@ -85,7 +86,7 @@ func (vs *VolumeServer) VolumeSyncData(req *volume_server_pb.VolumeSyncDataReque
glog.Errorf("sync volume %d data: %v", req.VolumdId, err)
}
const blockSizeLimit = 1024 * 1024 * 2
const blockSizeLimit = 2 * humanize.MiByte
for i := 0; i < len(content); i += blockSizeLimit {
blockSize := len(content) - i
if blockSize > blockSizeLimit {

3
weed/storage/needle/compact_map_perf_test.go

@ -2,6 +2,7 @@ package needle
import (
"fmt"
"github.com/dustin/go-humanize"
"log"
"os"
"runtime"
@ -83,5 +84,5 @@ func PrintMemUsage() {
fmt.Printf("\tNumGC = %v", m.NumGC)
}
func bToMb(b uint64) uint64 {
return b / 1024 / 1024
return b / humanize.MiByte
}

3
weed/storage/types/needle_types.go

@ -3,6 +3,7 @@ package types
import (
"fmt"
"github.com/chrislusf/seaweedfs/weed/util"
"github.com/dustin/go-humanize"
"math"
"strconv"
)
@ -16,7 +17,7 @@ const (
NeedleEntrySize = NeedleIdSize + OffsetSize + SizeSize
TimestampSize = 8 // int64 size
NeedlePaddingSize = 8
MaxPossibleVolumeSize = 4 * 1024 * 1024 * 1024 * 8
MaxPossibleVolumeSize = 4 * humanize.GiByte * 8
TombstoneFileSize = math.MaxUint32
CookieSize = 4
)

2
weed/topology/data_center.go

@ -7,7 +7,7 @@ type DataCenter struct {
func NewDataCenter(id string) *DataCenter {
dc := &DataCenter{}
dc.id = NodeId(id)
dc.nodeType = "DataCenter"
dc.nodeType = NodeTypeDataCenter
dc.children = make(map[NodeId]Node)
dc.NodeImpl.value = dc
return dc

2
weed/topology/data_node.go

@ -20,7 +20,7 @@ type DataNode struct {
func NewDataNode(id string) *DataNode {
s := &DataNode{}
s.id = NodeId(id)
s.nodeType = "DataNode"
s.nodeType = NodeTypeDataNode
s.volumes = make(map[storage.VolumeId]storage.VolumeInfo)
s.NodeImpl.value = s
return s

13
weed/topology/node.go

@ -53,6 +53,13 @@ type NodeImpl struct {
value interface{}
}
const (
NodeTypeTopology = "Topology"
NodeTypeDataCenter = "DataCenter"
NodeTypeRack = "Rack"
NodeTypeDataNode = "DataNode"
)
// the first node must satisfy filterFirstNodeFn(), the rest nodes must have one free slot
func (n *NodeImpl) RandomlyPickNodes(numberOfNodes int, filterFirstNodeFn func(dn Node) error) (firstNode Node, restNodes []Node, err error) {
candidates := make([]Node, 0, len(n.children))
@ -109,13 +116,13 @@ func (n *NodeImpl) RandomlyPickNodes(numberOfNodes int, filterFirstNodeFn func(d
}
func (n *NodeImpl) IsDataNode() bool {
return n.nodeType == "DataNode"
return n.nodeType == NodeTypeDataNode
}
func (n *NodeImpl) IsRack() bool {
return n.nodeType == "Rack"
return n.nodeType == NodeTypeRack
}
func (n *NodeImpl) IsDataCenter() bool {
return n.nodeType == "DataCenter"
return n.nodeType == NodeTypeDataCenter
}
func (n *NodeImpl) String() string {
if n.parent != nil {

2
weed/topology/rack.go

@ -12,7 +12,7 @@ type Rack struct {
func NewRack(id string) *Rack {
r := &Rack{}
r.id = NodeId(id)
r.nodeType = "Rack"
r.nodeType = NodeTypeRack
r.children = make(map[NodeId]Node)
r.NodeImpl.value = r
return r

2
weed/topology/topology.go

@ -33,7 +33,7 @@ type Topology struct {
func NewTopology(id string, seq sequence.Sequencer, volumeSizeLimit uint64, pulse int) *Topology {
t := &Topology{}
t.id = NodeId(id)
t.nodeType = "Topology"
t.nodeType = NodeTypeTopology
t.NodeImpl.value = t
t.children = make(map[NodeId]Node)
t.collectionMap = util.NewConcurrentReadMap()

24
weed/util/arg.go

@ -0,0 +1,24 @@
package util
import (
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/dustin/go-humanize"
)
func ParseVolumeSizeLimit(volumeSizeLimitMiB uint, volumeSizeLimitArg string) uint64 {
volumeSizeLimit := uint64(volumeSizeLimitMiB) * humanize.MiByte
if volumeSizeLimitArg != "" {
var err error
volumeSizeLimit, err = humanize.ParseBytes(volumeSizeLimitArg)
Assert(err != nil, "Parse volumeSizeLimit %s : %s", volumeSizeLimitArg, err)
}
Assert(volumeSizeLimit > 30*1000*humanize.MiByte, "volumeSizeLimit should be smaller than 30000MB")
return volumeSizeLimit
}
func Assert(condition bool, format string, args ...interface{}) {
if condition {
glog.Fatalf(format, args...)
}
}

23
weed/util/arg_test.go

@ -0,0 +1,23 @@
package util
import (
"github.com/dustin/go-humanize"
"testing"
)
func TestParseVolumeSizeLimit(t *testing.T) {
volumeSizeLimit := ParseVolumeSizeLimit(10, "")
if volumeSizeLimit != 10*humanize.MiByte {
t.Fail()
}
volumeSizeLimit = ParseVolumeSizeLimit(10, "11GiB")
if volumeSizeLimit != 11*humanize.GiByte {
t.Fail()
}
volumeSizeLimit = ParseVolumeSizeLimit(10, "11811160064")
if volumeSizeLimit != 11*humanize.GiByte {
t.Fail()
}
}

3
weed/weed.go

@ -6,6 +6,7 @@ package main
import (
"flag"
"fmt"
"github.com/dustin/go-humanize"
"io"
"math/rand"
"os"
@ -37,7 +38,7 @@ func setExitStatus(n int) {
}
func main() {
glog.MaxSize = 1024 * 1024 * 32
glog.MaxSize = 32 * humanize.MiByte
rand.Seed(time.Now().UnixNano())
flag.Usage = usage
flag.Parse()

Loading…
Cancel
Save