From 5a51340e90441cf92f0cbd3daa4b905b9c55abd6 Mon Sep 17 00:00:00 2001 From: bingoohuang Date: Wed, 23 Jan 2019 11:02:46 +0800 Subject: [PATCH] volume disk watermark checked when master assign fid --- .../change_superblock/change_superblock.go | 27 +-- unmaintained/fix_dat/fix_dat.go | 18 +- unmaintained/see_dat/see_dat.go | 6 +- unmaintained/see_idx/see_idx.go | 6 +- weed/command/benchmark.go | 13 +- weed/command/compact.go | 17 +- weed/command/export.go | 19 +- weed/command/filer.go | 36 ++-- weed/command/filer_export.go | 21 +- weed/command/filer_replication.go | 27 +-- weed/command/fix.go | 7 +- weed/command/master.go | 42 ++-- weed/command/s3.go | 28 +-- weed/command/server.go | 31 +-- weed/command/volume.go | 63 +++--- weed/filer2/configuration.go | 15 +- weed/notification/configuration.go | 14 +- .../google_pub_sub/google_pub_sub.go | 30 +-- weed/operation/delete_content.go | 22 +- weed/operation/lookup.go | 16 +- weed/operation/submit.go | 10 +- weed/replication/sink/azuresink/azure_sink.go | 5 +- weed/replication/sink/gcssink/gcs_sink.go | 9 +- .../sub/notification_google_pub_sub.go | 37 ++-- weed/server/filer_server.go | 21 +- weed/server/master_grpc_server.go | 4 +- weed/server/master_server_handlers_admin.go | 2 +- weed/storage/volume_super_block.go | 12 +- weed/topology/topology.go | 2 +- weed/topology/topology_event_handling.go | 6 - weed/topology/topology_vacuum.go | 12 +- weed/topology/volume_growth.go | 12 +- weed/topology/volume_layout.go | 201 ++++++------------ weed/topology/volume_location_list.go | 73 ++++--- weed/util/arg.go | 10 +- weed/util/glog.go | 15 ++ weed/util/sys.go | 10 + 37 files changed, 358 insertions(+), 541 deletions(-) create mode 100644 weed/util/glog.go create mode 100644 weed/util/sys.go diff --git a/unmaintained/change_superblock/change_superblock.go b/unmaintained/change_superblock/change_superblock.go index 779580a9b..51859b7db 100644 --- a/unmaintained/change_superblock/change_superblock.go +++ b/unmaintained/change_superblock/change_superblock.go @@ -3,11 +3,11 @@ package main import ( "flag" "fmt" + "github.com/chrislusf/seaweedfs/weed/util" "os" "path" "strconv" - "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/storage" ) @@ -43,16 +43,11 @@ func main() { fileName = *fixVolumeCollection + "_" + fileName } datFile, err := os.OpenFile(path.Join(*fixVolumePath, fileName+".dat"), os.O_RDWR, 0644) - if err != nil { - glog.Fatalf("Open Volume Data File [ERROR]: %v", err) - } + util.LogFatalIfError(err, "Open Volume Data File [ERROR]: %v", err) defer datFile.Close() superBlock, err := storage.ReadSuperBlock(datFile) - - if err != nil { - glog.Fatalf("cannot parse existing super block: %v", err) - } + util.LogFatalIfError(err, "cannot parse existing super block: %v", err) fmt.Printf("Current Volume Replication: %s\n", superBlock.ReplicaPlacement) fmt.Printf("Current Volume TTL: %s\n", superBlock.Ttl.String()) @@ -61,10 +56,7 @@ func main() { if *targetReplica != "" { replica, err := storage.NewReplicaPlacementFromString(*targetReplica) - - if err != nil { - glog.Fatalf("cannot parse target replica %s: %v", *targetReplica, err) - } + util.LogFatalIfError(err, "cannot parse target replica %s: %v", *targetReplica, err) fmt.Printf("Changing replication to: %s\n", replica) @@ -74,10 +66,7 @@ func main() { if *targetTTL != "" { ttl, err := storage.ReadTTL(*targetTTL) - - if err != nil { - glog.Fatalf("cannot parse target ttl %s: %v", *targetTTL, err) - } + util.LogFatalIfError(err, "cannot parse target ttl %s: %v", *targetTTL, err) fmt.Printf("Changing ttl to: %s\n", ttl) @@ -86,12 +75,10 @@ func main() { } if hasChange { - header := superBlock.Bytes() - if n, e := datFile.WriteAt(header, 0); n == 0 || e != nil { - glog.Fatalf("cannot write super block: %v", e) - } + n, e := datFile.WriteAt(header, 0) + util.LogFatalIf(n == 0 || e != nil, "cannot write super block: %v", e) fmt.Println("Change Applied.") } diff --git a/unmaintained/fix_dat/fix_dat.go b/unmaintained/fix_dat/fix_dat.go index 9eb64b3b4..5f76a0c65 100644 --- a/unmaintained/fix_dat/fix_dat.go +++ b/unmaintained/fix_dat/fix_dat.go @@ -8,7 +8,6 @@ import ( "path" "strconv" - "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/storage" "github.com/chrislusf/seaweedfs/weed/storage/types" "github.com/chrislusf/seaweedfs/weed/util" @@ -39,26 +38,19 @@ func main() { fileName = *fixVolumeCollection + "_" + fileName } indexFile, err := os.OpenFile(path.Join(*fixVolumePath, fileName+".idx"), os.O_RDONLY, 0644) - if err != nil { - glog.Fatalf("Read Volume Index %v", err) - } + util.LogFatalIfError(err, "Read Volume Index %v", err) defer indexFile.Close() + datFile, err := os.OpenFile(path.Join(*fixVolumePath, fileName+".dat"), os.O_RDONLY, 0644) - if err != nil { - glog.Fatalf("Read Volume Data %v", err) - } + util.LogFatalIfError(err, "Read Volume Data %v", err) defer datFile.Close() newDatFile, err := os.Create(path.Join(*fixVolumePath, fileName+".dat_fixed")) - if err != nil { - glog.Fatalf("Write New Volume Data %v", err) - } + util.LogFatalIfError(err, "Write New Volume Data %v", err) defer newDatFile.Close() superBlock, err := storage.ReadSuperBlock(datFile) - if err != nil { - glog.Fatalf("Read Volume Data superblock %v", err) - } + util.LogFatalIfError(err, "Read Volume Data superblock %v", err) newDatFile.Write(superBlock.Bytes()) iterateEntries(datFile, indexFile, func(n *storage.Needle, offset int64) { diff --git a/unmaintained/see_dat/see_dat.go b/unmaintained/see_dat/see_dat.go index f79c0a6a9..c586a9532 100644 --- a/unmaintained/see_dat/see_dat.go +++ b/unmaintained/see_dat/see_dat.go @@ -4,6 +4,7 @@ import ( "flag" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/storage" + "github.com/chrislusf/seaweedfs/weed/util" ) var ( @@ -37,8 +38,5 @@ func main() { scanner := &VolumeFileScanner4SeeDat{} err := storage.ScanVolumeFile(*volumePath, *volumeCollection, vid, storage.NeedleMapInMemory, scanner) - if err != nil { - glog.Fatalf("Reading Volume File [ERROR] %s\n", err) - } - + util.LogFatalIfError(err, "Reading Volume File [ERROR] %s\n", err) } diff --git a/unmaintained/see_idx/see_idx.go b/unmaintained/see_idx/see_idx.go index 23ca04c2e..dbb217944 100644 --- a/unmaintained/see_idx/see_idx.go +++ b/unmaintained/see_idx/see_idx.go @@ -3,11 +3,11 @@ package main import ( "flag" "fmt" + "github.com/chrislusf/seaweedfs/weed/util" "os" "path" "strconv" - "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/storage" "github.com/chrislusf/seaweedfs/weed/storage/types" ) @@ -30,9 +30,7 @@ func main() { fileName = *fixVolumeCollection + "_" + fileName } indexFile, err := os.OpenFile(path.Join(*fixVolumePath, fileName+".idx"), os.O_RDONLY, 0644) - if err != nil { - glog.Fatalf("Create Volume Index [ERROR] %s\n", err) - } + util.LogFatalIfError(err, "Create Volume Index [ERROR] %s\n", err) defer indexFile.Close() storage.WalkIndexFile(indexFile, func(key types.NeedleId, offset types.Offset, size uint32) error { diff --git a/weed/command/benchmark.go b/weed/command/benchmark.go index 60fd88ccd..75343e733 100644 --- a/weed/command/benchmark.go +++ b/weed/command/benchmark.go @@ -103,10 +103,7 @@ var ( func runBenchmark(cmd *Command, args []string) bool { fmt.Printf("This is SeaweedFS version %s %s %s\n", util.VERSION, runtime.GOOS, runtime.GOARCH) - if *b.maxCpu < 1 { - *b.maxCpu = runtime.NumCPU() - } - runtime.GOMAXPROCS(*b.maxCpu) + util.GoMaxProcs(b.maxCpu) if *b.cpuprofile != "" { f, err := os.Create(*b.cpuprofile) if err != nil { @@ -283,9 +280,7 @@ func readFiles(fileIdLineChan chan string, s *stat) { func writeFileIds(fileName string, fileIdLineChan chan string, finishChan chan bool) { file, err := os.OpenFile(fileName, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644) - if err != nil { - glog.Fatalf("File to create file %s: %s\n", fileName, err) - } + util.LogFatalIfError(err, "File to create file %s: %s\n", fileName, err) defer file.Close() for { @@ -302,9 +297,7 @@ func writeFileIds(fileName string, fileIdLineChan chan string, finishChan chan b func readFileIds(fileName string, fileIdLineChan chan string) { file, err := os.Open(fileName) // For read access. - if err != nil { - glog.Fatalf("File to read file %s: %s\n", fileName, err) - } + util.LogFatalIfError(err, "File to read file %s: %s\n", fileName, err) defer file.Close() random := rand.New(rand.NewSource(time.Now().UnixNano())) diff --git a/weed/command/compact.go b/weed/command/compact.go index 0dd4efe0e..d05b01bb0 100644 --- a/weed/command/compact.go +++ b/weed/command/compact.go @@ -1,8 +1,8 @@ package command import ( - "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/storage" + "github.com/chrislusf/seaweedfs/weed/util" ) func init() { @@ -38,18 +38,15 @@ func runCompact(cmd *Command, args []string) bool { vid := storage.VolumeId(*compactVolumeId) v, err := storage.NewVolume(*compactVolumePath, *compactVolumeCollection, vid, storage.NeedleMapInMemory, nil, nil, preallocate) - if err != nil { - glog.Fatalf("Load Volume [ERROR] %s\n", err) - } + util.LogFatalIfError(err, "Load Volume [ERROR] %s\n", err) + if *compactMethod == 0 { - if err = v.Compact(preallocate); err != nil { - glog.Fatalf("Compact Volume [ERROR] %s\n", err) - } + err = v.Compact(preallocate) } else { - if err = v.Compact2(); err != nil { - glog.Fatalf("Compact Volume [ERROR] %s\n", err) - } + err = v.Compact2() } + util.LogFatalIfError(err, "Compact Volume [ERROR] %s\n", err) + return true } diff --git a/weed/command/export.go b/weed/command/export.go index 5c7e064ce..9f96d6125 100644 --- a/weed/command/export.go +++ b/weed/command/export.go @@ -4,6 +4,7 @@ import ( "archive/tar" "bytes" "fmt" + "github.com/chrislusf/seaweedfs/weed/util" "os" "path" "path/filepath" @@ -171,9 +172,8 @@ func runExport(cmd *Command, args []string) bool { if *output == "-" { outputFile = os.Stdout } else { - if outputFile, err = os.Create(*output); err != nil { - glog.Fatalf("cannot open output tar %s: %s", *output, err) - } + outputFile, err = os.Create(*output) + util.LogFatalIfError(err, "cannot open output tar %s: %s", *output, err) } defer outputFile.Close() tarOutputFile = tar.NewWriter(outputFile) @@ -191,15 +191,11 @@ func runExport(cmd *Command, args []string) bool { } vid := storage.VolumeId(*export.volumeId) indexFile, err := os.OpenFile(path.Join(*export.dir, fileName+".idx"), os.O_RDONLY, 0644) - if err != nil { - glog.Fatalf("Create Volume Index [ERROR] %s\n", err) - } + util.LogFatalIfError(err, "Create Volume Index [ERROR] %s\n", err) defer indexFile.Close() needleMap, err := storage.LoadBtreeNeedleMap(indexFile) - if err != nil { - glog.Fatalf("cannot load needle map from %s: %s", indexFile.Name(), err) - } + util.LogFatalIfError(err, "cannot load needle map from %s: %s", indexFile.Name(), err) volumeFileScanner := &VolumeFileScanner4Export{ needleMap: needleMap, @@ -211,9 +207,8 @@ func runExport(cmd *Command, args []string) bool { } err = storage.ScanVolumeFile(*export.dir, *export.collection, vid, storage.NeedleMapInMemory, volumeFileScanner) - if err != nil && err != io.EOF { - glog.Fatalf("Export Volume File [ERROR] %s\n", err) - } + util.LogFatalIf(err != nil && err != io.EOF, "Export Volume File [ERROR] %s\n", err) + return true } diff --git a/weed/command/filer.go b/weed/command/filer.go index 0c1950f96..39a681e22 100644 --- a/weed/command/filer.go +++ b/weed/command/filer.go @@ -76,14 +76,11 @@ var cmdFiler = &Command{ } func runFiler(cmd *Command, args []string) bool { - f.startFiler() - return true } func (fo *FilerOptions) startFiler() { - defaultMux := http.NewServeMux() publicVolumeMux := defaultMux @@ -96,7 +93,7 @@ func (fo *FilerOptions) startFiler() { defaultLevelDbDirectory = *fo.defaultLevelDbDirectory + "/filerdb" } - fs, nfs_err := weed_server.NewFilerServer(defaultMux, publicVolumeMux, &weed_server.FilerOption{ + fs, nfsErr := weed_server.NewFilerServer(defaultMux, publicVolumeMux, &weed_server.FilerOption{ Masters: strings.Split(*f.masters, ","), Collection: *fo.collection, DefaultReplication: *fo.defaultReplicaPlacement, @@ -108,21 +105,19 @@ func (fo *FilerOptions) startFiler() { DataCenter: *fo.dataCenter, DefaultLevelDbDir: defaultLevelDbDirectory, }) - if nfs_err != nil { - glog.Fatalf("Filer startup error: %v", nfs_err) - } + + util.LogFatalIfError(nfsErr, "Filer startup error: %v", nfsErr) if *fo.publicPort != 0 { publicListeningAddress := *fo.ip + ":" + strconv.Itoa(*fo.publicPort) glog.V(0).Infoln("Start Seaweed filer server", util.VERSION, "public at", publicListeningAddress) publicListener, e := util.NewListener(publicListeningAddress, 0) - if e != nil { - glog.Fatalf("Filer server public listener error on port %d:%v", *fo.publicPort, e) - } + + util.LogFatalIfError(e, "Filer server public listener error on port %d:%v", *fo.publicPort, e) + go func() { - if e := http.Serve(publicListener, publicVolumeMux); e != nil { - glog.Fatalf("Volume server fail to serve public: %v", e) - } + e := http.Serve(publicListener, publicVolumeMux) + util.LogFatalIfError(e, "Volume server fail to serve public: %v", e) }() } @@ -131,9 +126,7 @@ func (fo *FilerOptions) startFiler() { ":"+strconv.Itoa(*fo.port), time.Duration(10)*time.Second, ) - if e != nil { - glog.Fatalf("Filer listener error: %v", e) - } + util.LogFatalIfError(e, "Filer listener error: %v", e) // starting grpc server grpcPort := *fo.grpcPort @@ -141,17 +134,14 @@ func (fo *FilerOptions) startFiler() { grpcPort = *fo.port + 10000 } grpcL, err := util.NewListener(":"+strconv.Itoa(grpcPort), 0) - if err != nil { - glog.Fatalf("failed to listen on grpc port %d: %v", grpcPort, err) - } + util.LogFatalIfError(err, "failed to listen on grpc port %d: %v", grpcPort, err) + grpcS := util.NewGrpcServer() filer_pb.RegisterSeaweedFilerServer(grpcS, fs) reflection.Register(grpcS) go grpcS.Serve(grpcL) httpS := &http.Server{Handler: defaultMux} - if err := httpS.Serve(filerListener); err != nil { - glog.Fatalf("Filer Fail to serve: %v", e) - } - + err = httpS.Serve(filerListener) + util.LogFatalIfError(err, "Filer Fail to serve: %v", err) } diff --git a/weed/command/filer_export.go b/weed/command/filer_export.go index 7a2e7920a..038128a48 100644 --- a/weed/command/filer_export.go +++ b/weed/command/filer_export.go @@ -6,6 +6,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/notification" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/server" + "github.com/chrislusf/seaweedfs/weed/util" "github.com/spf13/viper" ) @@ -55,12 +56,10 @@ func runFilerExport(cmd *Command, args []string) bool { for _, store := range filer2.Stores { if store.GetName() == *filerExportSourceStore || *filerExportSourceStore == "" && config.GetBool(store.GetName()+".enabled") { viperSub := config.Sub(store.GetName()) - if err := store.Initialize(viperSub); err != nil { - glog.Fatalf("Failed to initialize source store for %s: %+v", - store.GetName(), err) - } else { - sourceStore = store - } + err := store.Initialize(viperSub) + util.LogFatalIfError(err, "Failed to initialize source store for %s: %+v", store.GetName(), err) + + sourceStore = store break } } @@ -68,12 +67,10 @@ func runFilerExport(cmd *Command, args []string) bool { for _, store := range filer2.Stores { if store.GetName() == *filerExportTargetStore { viperSub := config.Sub(store.GetName()) - if err := store.Initialize(viperSub); err != nil { - glog.Fatalf("Failed to initialize target store for %s: %+v", - store.GetName(), err) - } else { - targetStore = store - } + err := store.Initialize(viperSub) + util.LogFatalIfError(err, "Failed to Jinitialize target store for %s: %+v", store.GetName(), err) + + targetStore = store break } } diff --git a/weed/command/filer_replication.go b/weed/command/filer_replication.go index 3384e4023..c1ec17d90 100644 --- a/weed/command/filer_replication.go +++ b/weed/command/filer_replication.go @@ -1,6 +1,7 @@ package command import ( + "github.com/chrislusf/seaweedfs/weed/util" "strings" "github.com/chrislusf/seaweedfs/weed/glog" @@ -46,10 +47,9 @@ func runFilerReplicate(cmd *Command, args []string) bool { for _, input := range sub.NotificationInputs { if config.GetBool("notification." + input.GetName() + ".enabled") { viperSub := config.Sub("notification." + input.GetName()) - if err := input.Initialize(viperSub); err != nil { - glog.Fatalf("Failed to initialize notification input for %s: %+v", - input.GetName(), err) - } + err := input.Initialize(viperSub) + util.LogFatalIfError(err, "Failed to initialize notification input for %s: %+v", input.GetName(), err) + glog.V(0).Infof("Configure notification input to %s", input.GetName()) notificationInput = input break @@ -68,9 +68,8 @@ func runFilerReplicate(cmd *Command, args []string) bool { if sourceConfig.GetString("grpcAddress") == sinkConfig.GetString("grpcAddress") { fromDir := sourceConfig.GetString("directory") toDir := sinkConfig.GetString("directory") - if strings.HasPrefix(toDir, fromDir) { - glog.Fatalf("recursive replication! source directory %s includes the sink directory %s", fromDir, toDir) - } + util.LogFatalIf(strings.HasPrefix(toDir, fromDir), + "recursive replication! source directory %s includes the sink directory %s", fromDir, toDir) } } @@ -78,10 +77,9 @@ func runFilerReplicate(cmd *Command, args []string) bool { for _, sk := range sink.Sinks { if config.GetBool("sink." + sk.GetName() + ".enabled") { viperSub := config.Sub("sink." + sk.GetName()) - if err := sk.Initialize(viperSub); err != nil { - glog.Fatalf("Failed to initialize sink for %s: %+v", - sk.GetName(), err) - } + err := sk.Initialize(viperSub) + util.LogFatalIfError(err, "Failed to initialize sink for %s: %+v", sk.GetName(), err) + glog.V(0).Infof("Configure sink to %s", sk.GetName()) dataSink = sk break @@ -129,11 +127,8 @@ func validateOneEnabledInput(config *viper.Viper) { enabledInput := "" for _, input := range sub.NotificationInputs { if config.GetBool("notification." + input.GetName() + ".enabled") { - if enabledInput == "" { - enabledInput = input.GetName() - } else { - glog.Fatalf("Notification input is enabled for both %s and %s", enabledInput, input.GetName()) - } + util.LogFatalIf(enabledInput != "", "Notification input is enabled for both %s and %s", enabledInput, input.GetName()) + enabledInput = input.GetName() } } } diff --git a/weed/command/fix.go b/weed/command/fix.go index a800978c6..826d5cc75 100644 --- a/weed/command/fix.go +++ b/weed/command/fix.go @@ -1,6 +1,7 @@ package command import ( + "github.com/chrislusf/seaweedfs/weed/util" "os" "path" "strconv" @@ -66,9 +67,7 @@ func runFix(cmd *Command, args []string) bool { } indexFileName := path.Join(*fixVolumePath, baseFileName+".idx") indexFile, err := os.OpenFile(indexFileName, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644) - if err != nil { - glog.Fatalf("Create Volume Index [ERROR] %s\n", err) - } + util.LogFatalIfError(err, "Create Volume Index [ERROR] %s\n", err) defer indexFile.Close() nm := storage.NewBtreeNeedleMap(indexFile) @@ -81,9 +80,9 @@ func runFix(cmd *Command, args []string) bool { err = storage.ScanVolumeFile(*fixVolumePath, *fixVolumeCollection, vid, storage.NeedleMapInMemory, scanner) if err != nil { - glog.Fatalf("Export Volume File [ERROR] %s\n", err) os.Remove(indexFileName) } + util.LogFatalIfError(err, "Export Volume File [ERROR] %s\n", err) return true } diff --git a/weed/command/master.go b/weed/command/master.go index 03dd64332..a760d6971 100644 --- a/weed/command/master.go +++ b/weed/command/master.go @@ -3,7 +3,6 @@ package command import ( "net/http" "os" - "runtime" "strconv" "strings" "time" @@ -53,15 +52,12 @@ var ( ) func runMaster(cmd *Command, args []string) bool { - if *mMaxCpu < 1 { - *mMaxCpu = runtime.NumCPU() - } - runtime.GOMAXPROCS(*mMaxCpu) + util.GoMaxProcs(mMaxCpu) util.SetupProfiling(*masterCpuProfile, *masterMemProfile) - if err := util.TestFolderWritable(*metaFolder); err != nil { - glog.Fatalf("Check Meta Folder (-mdir) Writable %s : %s", *metaFolder, err) - } + err := util.TestFolderWritable(*metaFolder) + util.LogFatalIfError(err, "Check Meta Folder (-mdir) Writable %s : %s", *metaFolder, err) + if *masterWhiteListOption != "" { masterWhiteList = strings.Split(*masterWhiteListOption, ",") } @@ -80,9 +76,7 @@ func runMaster(cmd *Command, args []string) bool { glog.V(0).Infoln("Start Seaweed Master", util.VERSION, "at", listeningAddress) masterListener, e := util.NewListener(listeningAddress, 0) - if e != nil { - glog.Fatalf("Master startup error: %v", e) - } + util.LogFatalIfError(e, "Master startup error: %v", e) go func() { time.Sleep(100 * time.Millisecond) @@ -97,24 +91,22 @@ func runMaster(cmd *Command, args []string) bool { if grpcPort == 0 { grpcPort = *mport + 10000 } - grpcL, err := util.NewListener(*masterBindIp+":"+strconv.Itoa(grpcPort), 0) - if err != nil { - glog.Fatalf("master failed to listen on grpc port %d: %v", grpcPort, err) - } + grpcListener, err := util.NewListener(*masterBindIp+":"+strconv.Itoa(grpcPort), 0) + util.LogFatalIfError(err, "master failed to listen on grpc port %d: %v", grpcPort, err) + // Create your protocol servers. - grpcS := util.NewGrpcServer() - master_pb.RegisterSeaweedServer(grpcS, ms) - reflection.Register(grpcS) + grpcServer := util.NewGrpcServer() + master_pb.RegisterSeaweedServer(grpcServer, ms) + reflection.Register(grpcServer) glog.V(0).Infof("Start Seaweed Master %s grpc server at %s:%d", util.VERSION, *masterBindIp, grpcPort) - grpcS.Serve(grpcL) + grpcServer.Serve(grpcListener) }() // start http server httpS := &http.Server{Handler: r} - if err := httpS.Serve(masterListener); err != nil { - glog.Fatalf("master server failed to serve: %v", err) - } + err = httpS.Serve(masterListener) + util.LogFatalIfError(err, "master server failed to serve: %v", err) return true } @@ -137,8 +129,8 @@ func checkPeers(masterIp string, masterPort int, peers string) (masterAddress st if !hasSelf { peerCount++ } - if peerCount%2 == 0 { - glog.Fatalf("Only odd number of masters are supported!") - } + + util.LogFatalIf(peerCount%2 == 0, "Only odd number of masters are supported!") + return } diff --git a/weed/command/s3.go b/weed/command/s3.go index 16a9490ff..7b93aec02 100644 --- a/weed/command/s3.go +++ b/weed/command/s3.go @@ -45,42 +45,34 @@ var cmdS3 = &Command{ } func runS3(cmd *Command, args []string) bool { - filerGrpcAddress, err := parseFilerGrpcAddress(*s3options.filer, *s3options.filerGrpcPort) - if err != nil { - glog.Fatal(err) - return false - } + util.LogFatalIfError(err, "S3 API Server Fail to parseFilerGrpcAddress: %v", err) router := mux.NewRouter().SkipClean(true) - _, s3ApiServer_err := s3api.NewS3ApiServer(router, &s3api.S3ApiServerOption{ + _, err = s3api.NewS3ApiServer(router, &s3api.S3ApiServerOption{ Filer: *s3options.filer, FilerGrpcAddress: filerGrpcAddress, DomainName: *s3options.domainName, BucketsPath: *s3options.filerBucketsPath, }) - if s3ApiServer_err != nil { - glog.Fatalf("S3 API Server startup error: %v", s3ApiServer_err) - } + util.LogFatalIfError(err, "S3 API Server startup error: %v", err) httpS := &http.Server{Handler: router} listenAddress := fmt.Sprintf(":%d", *s3options.port) s3ApiListener, err := util.NewListener(listenAddress, time.Duration(10)*time.Second) - if err != nil { - glog.Fatalf("S3 API Server listener on %s error: %v", listenAddress, err) - } + util.LogFatalIfError(err, "S3 API Server listener on %s error: %v", listenAddress, err) if *s3options.tlsPrivateKey != "" { - if err = httpS.ServeTLS(s3ApiListener, *s3options.tlsCertificate, *s3options.tlsPrivateKey); err != nil { - glog.Fatalf("S3 API Server Fail to serve: %v", err) - } + err = httpS.ServeTLS(s3ApiListener, *s3options.tlsCertificate, *s3options.tlsPrivateKey) + util.LogFatalIfError(err, "S3 API Server Fail to serve: %v", err) + glog.V(0).Infof("Start Seaweed S3 API Server %s at https port %d", util.VERSION, *s3options.port) } else { - if err = httpS.Serve(s3ApiListener); err != nil { - glog.Fatalf("S3 API Server Fail to serve: %v", err) - } + err = httpS.Serve(s3ApiListener) + util.LogFatalIfError(err, "S3 API Server Fail to serve: %v", err) + glog.V(0).Infof("Start Seaweed S3 API Server %s at http port %d", util.VERSION, *s3options.port) } diff --git a/weed/command/server.go b/weed/command/server.go index ea594744f..1097ae5eb 100644 --- a/weed/command/server.go +++ b/weed/command/server.go @@ -3,7 +3,6 @@ package command import ( "net/http" "os" - "runtime" "runtime/pprof" "strconv" "strings" @@ -94,7 +93,7 @@ func init() { serverOptions.v.fixJpgOrientation = cmdServer.Flag.Bool("volume.images.fix.orientation", false, "Adjust jpg orientation when uploading.") serverOptions.v.readRedirect = cmdServer.Flag.Bool("volume.read.redirect", true, "Redirect moved or non-local volumes.") serverOptions.v.publicUrl = cmdServer.Flag.String("volume.publicUrl", "", "publicly accessible address") - serverOptions.v.diskWaterMark = cmdVolume.Flag.String("volume.diskWaterMark", "10GB", "disk watermark low to switch volume to read-only(eg 5G,100M,5GiB,100MiB)") + serverOptions.v.diskWaterMark = cmdServer.Flag.String("volume.diskWaterMark", "10GB", "disk watermark low to switch volume to read-only(eg 5G,100M,5GiB,100MiB)") } func runServer(cmd *Command, args []string) bool { @@ -129,10 +128,7 @@ func runServer(cmd *Command, args []string) bool { *filerOptions.defaultReplicaPlacement = *masterDefaultReplicaPlacement } - if *serverMaxCpu < 1 { - *serverMaxCpu = runtime.NumCPU() - } - runtime.GOMAXPROCS(*serverMaxCpu) + util.GoMaxProcs(serverMaxCpu) folders := strings.Split(*volumeDataFolders, ",") @@ -141,9 +137,9 @@ func runServer(cmd *Command, args []string) bool { if *masterMetaFolder == "" { *masterMetaFolder = folders[0] } - if err := util.TestFolderWritable(*masterMetaFolder); err != nil { - glog.Fatalf("Check Meta Folder (-mdir=\"%s\") Writable: %s", *masterMetaFolder, err) - } + err := util.TestFolderWritable(*masterMetaFolder) + util.LogFatalIfError(err, "Check Meta Folder (-mdir=\"%s\") Writable: %s", *masterMetaFolder, err) + filerOptions.defaultLevelDbDirectory = masterMetaFolder if *serverWhiteListOption != "" { @@ -153,9 +149,7 @@ func runServer(cmd *Command, args []string) bool { if *isStartingFiler { go func() { time.Sleep(1 * time.Second) - filerOptions.startFiler() - }() } @@ -175,9 +169,7 @@ func runServer(cmd *Command, args []string) bool { glog.V(0).Infof("Start Seaweed Master %s at %s:%d", util.VERSION, *serverIp, *masterPort) masterListener, e := util.NewListener(*serverBindIp+":"+strconv.Itoa(*masterPort), 0) - if e != nil { - glog.Fatalf("Master startup error: %v", e) - } + util.LogFatalIfError(e, "Master startup error: %v", e) go func() { // starting grpc server @@ -186,9 +178,8 @@ func runServer(cmd *Command, args []string) bool { grpcPort = *masterPort + 10000 } grpcL, err := util.NewListener(*serverIp+":"+strconv.Itoa(grpcPort), 0) - if err != nil { - glog.Fatalf("master failed to listen on grpc port %d: %v", grpcPort, err) - } + util.LogFatalIfError(err, "master failed to listen on grpc port %d: %v", grpcPort, err) + // Create your protocol servers. grpcS := util.NewGrpcServer() master_pb.RegisterSeaweedServer(grpcS, ms) @@ -211,10 +202,8 @@ func runServer(cmd *Command, args []string) bool { // start http server httpS := &http.Server{Handler: r} - if err := httpS.Serve(masterListener); err != nil { - glog.Fatalf("master server failed to serve: %v", err) - } - + err := httpS.Serve(masterListener) + util.LogFatalIfError(err, "master server failed to serve: %v", err) }() volumeWait.Wait() diff --git a/weed/command/volume.go b/weed/command/volume.go index 96e013c02..4ea2e0950 100644 --- a/weed/command/volume.go +++ b/weed/command/volume.go @@ -3,7 +3,6 @@ package command import ( "net/http" "os" - "runtime" "runtime/pprof" "strconv" "strings" @@ -81,10 +80,7 @@ var ( ) func runVolume(cmd *Command, args []string) bool { - if *v.maxCpu < 1 { - *v.maxCpu = runtime.NumCPU() - } - runtime.GOMAXPROCS(*v.maxCpu) + util.GoMaxProcs(v.maxCpu) util.SetupProfiling(*v.cpuProfile, *v.memProfile) v.startVolumeServer(*volumeFolders, *maxVolumeCounts, *volumeWhiteListOption) @@ -98,19 +94,16 @@ func (v VolumeServerOptions) startVolumeServer(volumeFolders, maxVolumeCounts, v v.folders = strings.Split(volumeFolders, ",") maxCountStrings := strings.Split(maxVolumeCounts, ",") for _, maxString := range maxCountStrings { - if max, e := strconv.Atoi(maxString); e == nil { - v.folderMaxLimits = append(v.folderMaxLimits, max) - } else { - glog.Fatalf("The max specified in -max not a valid number %s", maxString) - } - } - if len(v.folders) != len(v.folderMaxLimits) { - glog.Fatalf("%d directories by -dir, but only %d max is set by -max", len(v.folders), len(v.folderMaxLimits)) + max, e := strconv.Atoi(maxString) + util.LogFatalIfError(e, "The max specified in -max not a valid number %s", maxString) + v.folderMaxLimits = append(v.folderMaxLimits, max) } + util.LogFatalIf(len(v.folders) != len(v.folderMaxLimits), + "%d directories by -dir, but only %d max is set by -max", len(v.folders), len(v.folderMaxLimits)) + for _, folder := range v.folders { - if err := util.TestFolderWritable(folder); err != nil { - glog.Fatalf("Check Data Folder(-dir) Writable %s : %s", folder, err) - } + err := util.TestFolderWritable(folder) + util.LogFatalIfError(err, "Check Data Folder(-dir) Writable %s : %s", folder, err) } //security related white list configuration @@ -141,9 +134,7 @@ func (v VolumeServerOptions) startVolumeServer(volumeFolders, maxVolumeCounts, v masters := *v.masters diskWaterMarkBytes, err := humanize.ParseBytes(*v.diskWaterMark) - if err != nil { - glog.Fatalf("Check diskWaterMark %s error:%v ", *v.diskWaterMark, err) - } + util.LogFatalIfError(err, "Check diskWaterMark %s error:%v ", *v.diskWaterMark, err) volumeServer := weed_server.NewVolumeServer(volumeMux, publicVolumeMux, *v.ip, *v.port, *v.publicUrl, @@ -156,20 +147,17 @@ func (v VolumeServerOptions) startVolumeServer(volumeFolders, maxVolumeCounts, v listeningAddress := *v.bindIp + ":" + strconv.Itoa(*v.port) glog.V(0).Infof("Start Seaweed volume server %s at %s", util.VERSION, listeningAddress) listener, e := util.NewListener(listeningAddress, time.Duration(*v.idleConnectionTimeout)*time.Second) - if e != nil { - glog.Fatalf("Volume server listener error:%v", e) - } + util.LogFatalIfError(e, "Volume server listener error:%v", e) + if isSeparatedPublicPort { publicListeningAddress := *v.bindIp + ":" + strconv.Itoa(*v.publicPort) glog.V(0).Infoln("Start Seaweed volume server", util.VERSION, "public at", publicListeningAddress) publicListener, e := util.NewListener(publicListeningAddress, time.Duration(*v.idleConnectionTimeout)*time.Second) - if e != nil { - glog.Fatalf("Volume server listener error:%v", e) - } + util.LogFatalIfError(e, "Volume server listener error:%v", e) + go func() { - if e := http.Serve(publicListener, publicVolumeMux); e != nil { - glog.Fatalf("Volume server fail to serve public: %v", e) - } + e := http.Serve(publicListener, publicVolumeMux) + util.LogFatalIfError(e, "Volume server fail to serve public: %v", e) }() } @@ -180,17 +168,14 @@ func (v VolumeServerOptions) startVolumeServer(volumeFolders, maxVolumeCounts, v // starting grpc server grpcPort := *v.port + 10000 - grpcL, err := util.NewListener(*v.bindIp+":"+strconv.Itoa(grpcPort), 0) - if err != nil { - glog.Fatalf("failed to listen on grpc port %d: %v", grpcPort, err) - } - grpcS := util.NewGrpcServer() - volume_server_pb.RegisterVolumeServerServer(grpcS, volumeServer) - reflection.Register(grpcS) - go grpcS.Serve(grpcL) + grpcListener, err := util.NewListener(*v.bindIp+":"+strconv.Itoa(grpcPort), 0) + util.LogFatalIfError(err, "failed to listen on grpc port %d: %v", grpcPort, err) - if e := http.Serve(listener, volumeMux); e != nil { - glog.Fatalf("Volume server fail to serve: %v", e) - } + grpcServer := util.NewGrpcServer() + volume_server_pb.RegisterVolumeServerServer(grpcServer, volumeServer) + reflection.Register(grpcServer) + go grpcServer.Serve(grpcListener) + e = http.Serve(listener, volumeMux) + util.LogFatalIfError(e, "Volume server fail to serve: %v", e) } diff --git a/weed/filer2/configuration.go b/weed/filer2/configuration.go index 7b05b53dc..4cc20e9d7 100644 --- a/weed/filer2/configuration.go +++ b/weed/filer2/configuration.go @@ -1,6 +1,7 @@ package filer2 import ( + "github.com/chrislusf/seaweedfs/weed/util" "os" "github.com/chrislusf/seaweedfs/weed/glog" @@ -18,10 +19,9 @@ func (f *Filer) LoadConfiguration(config *viper.Viper) { for _, store := range Stores { if config.GetBool(store.GetName() + ".enabled") { viperSub := config.Sub(store.GetName()) - if err := store.Initialize(viperSub); err != nil { - glog.Fatalf("Failed to initialize store for %s: %+v", - store.GetName(), err) - } + err := store.Initialize(viperSub) + util.LogFatalIfError(err, "Failed to initialize store for %s: %+v", store.GetName(), err) + f.SetStore(store) glog.V(0).Infof("Configure filer for %s", store.GetName()) return @@ -41,11 +41,8 @@ func validateOneEnabledStore(config *viper.Viper) { enabledStore := "" for _, store := range Stores { if config.GetBool(store.GetName() + ".enabled") { - if enabledStore == "" { - enabledStore = store.GetName() - } else { - glog.Fatalf("Filer store is enabled for both %s and %s", enabledStore, store.GetName()) - } + util.LogFatalIf(enabledStore != "", "Filer store is enabled for both %s and %s", enabledStore, store.GetName()) + enabledStore = store.GetName() } } } diff --git a/weed/notification/configuration.go b/weed/notification/configuration.go index 7f8765cc3..bdbdbeeb7 100644 --- a/weed/notification/configuration.go +++ b/weed/notification/configuration.go @@ -32,10 +32,9 @@ func LoadConfiguration(config *viper.Viper) { for _, queue := range MessageQueues { if config.GetBool(queue.GetName() + ".enabled") { viperSub := config.Sub(queue.GetName()) - if err := queue.Initialize(viperSub); err != nil { - glog.Fatalf("Failed to initialize notification for %s: %+v", - queue.GetName(), err) - } + err := queue.Initialize(viperSub) + util.LogFatalIfError(err, "Failed to initialize notification for %s: %+v", queue.GetName(), err) + Queue = queue glog.V(0).Infof("Configure notification message queue for %s", queue.GetName()) return @@ -48,11 +47,8 @@ func validateOneEnabledQueue(config *viper.Viper) { enabledQueue := "" for _, queue := range MessageQueues { if config.GetBool(queue.GetName() + ".enabled") { - if enabledQueue == "" { - enabledQueue = queue.GetName() - } else { - glog.Fatalf("Notification message queue is enabled for both %s and %s", enabledQueue, queue.GetName()) - } + util.LogFatalIf(enabledQueue != "", "Notification message queue is enabled for both %s and %s", enabledQueue, queue.GetName()) + enabledQueue = queue.GetName() } } } diff --git a/weed/notification/google_pub_sub/google_pub_sub.go b/weed/notification/google_pub_sub/google_pub_sub.go index 7b26bfe38..793ac1d64 100644 --- a/weed/notification/google_pub_sub/google_pub_sub.go +++ b/weed/notification/google_pub_sub/google_pub_sub.go @@ -35,33 +35,25 @@ func (k *GooglePubSub) Initialize(configuration util.Configuration) (err error) ) } -func (k *GooglePubSub) initialize(google_application_credentials, projectId, topicName string) (err error) { +func (k *GooglePubSub) initialize(googleApplicationCredentials, projectId, topicName string) (err error) { ctx := context.Background() // Creates a client. - if google_application_credentials == "" { + if googleApplicationCredentials == "" { var found bool - google_application_credentials, found = os.LookupEnv("GOOGLE_APPLICATION_CREDENTIALS") - if !found { - glog.Fatalf("need to specific GOOGLE_APPLICATION_CREDENTIALS env variable or google_application_credentials in filer.toml") - } + googleApplicationCredentials, found = os.LookupEnv("GOOGLE_APPLICATION_CREDENTIALS") + util.LogFatalIf(!found, "need to specific GOOGLE_APPLICATION_CREDENTIALS env variable or google_application_credentials in filer.toml") } - client, err := pubsub.NewClient(ctx, projectId, option.WithCredentialsFile(google_application_credentials)) - if err != nil { - glog.Fatalf("Failed to create client: %v", err) - } + client, err := pubsub.NewClient(ctx, projectId, option.WithCredentialsFile(googleApplicationCredentials)) + util.LogFatalIfError(err, "Failed to create client: %v", err) k.topic = client.Topic(topicName) - if exists, err := k.topic.Exists(ctx); err == nil { - if !exists { - k.topic, err = client.CreateTopic(ctx, topicName) - if err != nil { - glog.Fatalf("Failed to create topic %s: %v", topicName, err) - } - } - } else { - glog.Fatalf("Failed to check topic %s: %v", topicName, err) + exists, err := k.topic.Exists(ctx) + util.LogFatalIfError(err, "Failed to check topic %s: %v", topicName, err) + if !exists { + k.topic, err = client.CreateTopic(ctx, topicName) + util.LogFatalIfError(err, "Failed to create topic %s: %v", topicName, err) } return nil diff --git a/weed/operation/delete_content.go b/weed/operation/delete_content.go index 3e468e1a3..b96592d43 100644 --- a/weed/operation/delete_content.go +++ b/weed/operation/delete_content.go @@ -19,7 +19,7 @@ type DeleteResult struct { Error string `json:"error,omitempty"` } -func ParseFileId(fid string) (vid string, key_cookie string, err error) { +func ParseFileId(fid string) (vid string, keyCookie string, err error) { commaIndex := strings.Index(fid, ",") if commaIndex <= 0 { return "", "", errors.New("Wrong fid format.") @@ -42,7 +42,7 @@ func DeleteFilesWithLookupVolumeId(fileIds []string, lookupFunc func(vid []strin var ret []*volume_server_pb.DeleteResult - vid_to_fileIds := make(map[string][]string) + vidToFileids := make(map[string][]string) var vids []string for _, fileId := range fileIds { vid, _, err := ParseFileId(fileId) @@ -54,11 +54,11 @@ func DeleteFilesWithLookupVolumeId(fileIds []string, lookupFunc func(vid []strin ) continue } - if _, ok := vid_to_fileIds[vid]; !ok { - vid_to_fileIds[vid] = make([]string, 0) + if _, ok := vidToFileids[vid]; !ok { + vidToFileids[vid] = make([]string, 0) vids = append(vids, vid) } - vid_to_fileIds[vid] = append(vid_to_fileIds[vid], fileId) + vidToFileids[vid] = append(vidToFileids[vid], fileId) } lookupResults, err := lookupFunc(vids) @@ -66,7 +66,7 @@ func DeleteFilesWithLookupVolumeId(fileIds []string, lookupFunc func(vid []strin return ret, err } - server_to_fileIds := make(map[string][]string) + serverToFileids := make(map[string][]string) for vid, result := range lookupResults { if result.Error != "" { ret = append(ret, &volume_server_pb.DeleteResult{ @@ -77,17 +77,17 @@ func DeleteFilesWithLookupVolumeId(fileIds []string, lookupFunc func(vid []strin continue } for _, location := range result.Locations { - if _, ok := server_to_fileIds[location.Url]; !ok { - server_to_fileIds[location.Url] = make([]string, 0) + if _, ok := serverToFileids[location.Url]; !ok { + serverToFileids[location.Url] = make([]string, 0) } - server_to_fileIds[location.Url] = append( - server_to_fileIds[location.Url], vid_to_fileIds[vid]...) + serverToFileids[location.Url] = append( + serverToFileids[location.Url], vidToFileids[vid]...) } } var wg sync.WaitGroup - for server, fidList := range server_to_fileIds { + for server, fidList := range serverToFileids { wg.Add(1) go func(server string, fidList []string) { defer wg.Done() diff --git a/weed/operation/lookup.go b/weed/operation/lookup.go index 562a11580..bff059042 100644 --- a/weed/operation/lookup.go +++ b/weed/operation/lookup.go @@ -33,9 +33,9 @@ var ( ) func Lookup(server string, vid string) (ret *LookupResult, err error) { - locations, cache_err := vc.Get(vid) - if cache_err != nil { - if ret, err = do_lookup(server, vid); err == nil { + locations, cacheErr := vc.Get(vid) + if cacheErr != nil { + if ret, err = doLookup(server, vid); err == nil { vc.Set(vid, ret.Locations, 10*time.Minute) } } else { @@ -44,7 +44,7 @@ func Lookup(server string, vid string) (ret *LookupResult, err error) { return } -func do_lookup(server string, vid string) (*LookupResult, error) { +func doLookup(server string, vid string) (*LookupResult, error) { values := make(url.Values) values.Add("volumeId", vid) jsonBlob, err := util.Post("http://"+server+"/dir/lookup", values) @@ -80,7 +80,7 @@ func LookupFileId(server string, fileId string) (fullUrl string, err error) { // LookupVolumeIds find volume locations by cache and actual lookup func LookupVolumeIds(server string, vids []string) (map[string]LookupResult, error) { ret := make(map[string]LookupResult) - var unknown_vids []string + var unknownVids []string //check vid cache first for _, vid := range vids { @@ -88,11 +88,11 @@ func LookupVolumeIds(server string, vids []string) (map[string]LookupResult, err if cache_err == nil { ret[vid] = LookupResult{VolumeId: vid, Locations: locations} } else { - unknown_vids = append(unknown_vids, vid) + unknownVids = append(unknownVids, vid) } } //return success if all volume ids are known - if len(unknown_vids) == 0 { + if len(unknownVids) == 0 { return ret, nil } @@ -103,7 +103,7 @@ func LookupVolumeIds(server string, vids []string) (map[string]LookupResult, err defer cancel() req := &master_pb.LookupVolumeRequest{ - VolumeIds: unknown_vids, + VolumeIds: unknownVids, } resp, grpcErr := masterClient.LookupVolume(ctx, req) if grpcErr != nil { diff --git a/weed/operation/submit.go b/weed/operation/submit.go index 7a1a3085e..5924dba1b 100644 --- a/weed/operation/submit.go +++ b/weed/operation/submit.go @@ -53,7 +53,7 @@ func SubmitFiles(master string, files []FilePart, } ret, err := Assign(master, ar) if err != nil { - for index, _ := range files { + for index := range files { results[index].Error = err.Error() } return results, err @@ -166,7 +166,7 @@ func (fi FilePart) Upload(maxMB int, master string, secret security.Secret) (ret } } fileUrl := "http://" + ret.Url + "/" + id - count, e := upload_one_chunk( + count, e := uploadOneChunk( baseName+"-"+strconv.FormatInt(i+1, 10), io.LimitReader(fi.Reader, chunkSize), master, fileUrl, @@ -185,7 +185,7 @@ func (fi FilePart) Upload(maxMB int, master string, secret security.Secret) (ret ) retSize += count } - err = upload_chunked_file_manifest(fileUrl, &cm, jwt) + err = uploadChunkedFileManifest(fileUrl, &cm, jwt) if err != nil { // delete all uploaded chunks cm.DeleteChunks(master) @@ -200,7 +200,7 @@ func (fi FilePart) Upload(maxMB int, master string, secret security.Secret) (ret return } -func upload_one_chunk(filename string, reader io.Reader, master, +func uploadOneChunk(filename string, reader io.Reader, master, fileUrl string, jwt security.EncodedJwt, ) (size uint32, e error) { glog.V(4).Info("Uploading part ", filename, " to ", fileUrl, "...") @@ -212,7 +212,7 @@ func upload_one_chunk(filename string, reader io.Reader, master, return uploadResult.Size, nil } -func upload_chunked_file_manifest(fileUrl string, manifest *ChunkManifest, jwt security.EncodedJwt) error { +func uploadChunkedFileManifest(fileUrl string, manifest *ChunkManifest, jwt security.EncodedJwt) error { buf, e := manifest.Marshal() if e != nil { return e diff --git a/weed/replication/sink/azuresink/azure_sink.go b/weed/replication/sink/azuresink/azure_sink.go index 7acf37fa5..1db86f894 100644 --- a/weed/replication/sink/azuresink/azure_sink.go +++ b/weed/replication/sink/azuresink/azure_sink.go @@ -9,7 +9,6 @@ import ( "github.com/Azure/azure-storage-blob-go/azblob" "github.com/chrislusf/seaweedfs/weed/filer2" - "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/replication/sink" "github.com/chrislusf/seaweedfs/weed/replication/source" @@ -54,9 +53,7 @@ func (g *AzureSink) initialize(accountName, accountKey, container, dir string) e // Use your Storage account's name and key to create a credential object. credential, err := azblob.NewSharedKeyCredential(accountName, accountKey) - if err != nil { - glog.Fatalf("failed to create Azure credential with account name:%s key:%s", accountName, accountKey) - } + util.LogFatalIfError(err, "failed to create Azure credential with account name:%s key:%s", accountName, accountKey) // Create a request pipeline that is used to process HTTP(S) requests and responses. p := azblob.NewPipeline(credential, azblob.PipelineOptions{}) diff --git a/weed/replication/sink/gcssink/gcs_sink.go b/weed/replication/sink/gcssink/gcs_sink.go index c1beefc33..9c1ae07d8 100644 --- a/weed/replication/sink/gcssink/gcs_sink.go +++ b/weed/replication/sink/gcssink/gcs_sink.go @@ -7,7 +7,6 @@ import ( "cloud.google.com/go/storage" "github.com/chrislusf/seaweedfs/weed/filer2" - "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/replication/sink" "github.com/chrislusf/seaweedfs/weed/replication/source" @@ -55,14 +54,10 @@ func (g *GcsSink) initialize(google_application_credentials, bucketName, dir str if google_application_credentials == "" { var found bool google_application_credentials, found = os.LookupEnv("GOOGLE_APPLICATION_CREDENTIALS") - if !found { - glog.Fatalf("need to specific GOOGLE_APPLICATION_CREDENTIALS env variable or google_application_credentials in replication.toml") - } + util.LogFatalIf(!found, "need to specific GOOGLE_APPLICATION_CREDENTIALS env variable or google_application_credentials in replication.toml") } client, err := storage.NewClient(ctx, option.WithCredentialsFile(google_application_credentials)) - if err != nil { - glog.Fatalf("Failed to create client: %v", err) - } + util.LogFatalIfError(err, "Failed to create client: %v", err) g.client = client diff --git a/weed/replication/sub/notification_google_pub_sub.go b/weed/replication/sub/notification_google_pub_sub.go index ad6b42a2e..ffb13d781 100644 --- a/weed/replication/sub/notification_google_pub_sub.go +++ b/weed/replication/sub/notification_google_pub_sub.go @@ -44,41 +44,30 @@ func (k *GooglePubSubInput) initialize(google_application_credentials, projectId if google_application_credentials == "" { var found bool google_application_credentials, found = os.LookupEnv("GOOGLE_APPLICATION_CREDENTIALS") - if !found { - glog.Fatalf("need to specific GOOGLE_APPLICATION_CREDENTIALS env variable or google_application_credentials in filer.toml") - } + util.LogFatalIf(!found, "need to specific GOOGLE_APPLICATION_CREDENTIALS env variable or google_application_credentials in filer.toml") } client, err := pubsub.NewClient(ctx, projectId, option.WithCredentialsFile(google_application_credentials)) - if err != nil { - glog.Fatalf("Failed to create client: %v", err) - } + util.LogFatalIfError(err, "Failed to create client: %v", err) k.topicName = topicName topic := client.Topic(topicName) - if exists, err := topic.Exists(ctx); err == nil { - if !exists { - topic, err = client.CreateTopic(ctx, topicName) - if err != nil { - glog.Fatalf("Failed to create topic %s: %v", topicName, err) - } - } - } else { - glog.Fatalf("Failed to check topic %s: %v", topicName, err) + exists, err := topic.Exists(ctx) + util.LogFatalIfError(err, "Failed to check topic %s: %v", topicName, err) + if !exists { + topic, err = client.CreateTopic(ctx, topicName) + util.LogFatalIfError(err, "Failed to create topic %s: %v", topicName, err) } subscriptionName := "seaweedfs_sub" k.sub = client.Subscription(subscriptionName) - if exists, err := k.sub.Exists(ctx); err == nil { - if !exists { - k.sub, err = client.CreateSubscription(ctx, subscriptionName, pubsub.SubscriptionConfig{Topic: topic}) - if err != nil { - glog.Fatalf("Failed to create subscription %s: %v", subscriptionName, err) - } - } - } else { - glog.Fatalf("Failed to check subscription %s: %v", topicName, err) + exists, err = k.sub.Exists(ctx) + util.LogFatalIfError(err, "Failed to check subscription %s: %v", topicName, err) + + if !exists { + k.sub, err = client.CreateSubscription(ctx, subscriptionName, pubsub.SubscriptionConfig{Topic: topic}) + util.LogFatalIfError(err, "Failed to create subscription %s: %v", subscriptionName, err) } k.messageChan = make(chan *pubsub.Message, 1) diff --git a/weed/server/filer_server.go b/weed/server/filer_server.go index 9d70e4dac..fe257c8bc 100644 --- a/weed/server/filer_server.go +++ b/weed/server/filer_server.go @@ -1,6 +1,7 @@ package weed_server import ( + "github.com/chrislusf/seaweedfs/weed/util" "net/http" "os" @@ -94,17 +95,15 @@ func LoadConfiguration(configFileName string, required bool) (loaded bool) { if err := viper.MergeInConfig(); err != nil { // Handle errors reading the config file glog.V(0).Infof("Reading %s: %v", viper.ConfigFileUsed(), err) - if required { - glog.Fatalf("Failed to load %s.toml file from current directory, or $HOME/.seaweedfs/, or /etc/seaweedfs/"+ - "\n\nPlease follow this example and add a filer.toml file to "+ - "current directory, or $HOME/.seaweedfs/, or /etc/seaweedfs/:\n"+ - " https://github.com/chrislusf/seaweedfs/blob/master/weed/%s.toml\n"+ - "\nOr use this command to generate the default toml file\n"+ - " weed scaffold -config=%s -output=.\n\n\n", - configFileName, configFileName, configFileName) - } else { - return false - } + util.LogFatalIf(required, "Failed to load %s.toml file from current directory, or $HOME/.seaweedfs/, or /etc/seaweedfs/"+ + "\n\nPlease follow this example and add a filer.toml file to "+ + "current directory, or $HOME/.seaweedfs/, or /etc/seaweedfs/:\n"+ + " https://github.com/chrislusf/seaweedfs/blob/master/weed/%s.toml\n"+ + "\nOr use this command to generate the default toml file\n"+ + " weed scaffold -config=%s -output=.\n\n\n", + configFileName, configFileName, configFileName) + + return false } return true diff --git a/weed/server/master_grpc_server.go b/weed/server/master_grpc_server.go index f29dc22b1..6871f5dad 100644 --- a/weed/server/master_grpc_server.go +++ b/weed/server/master_grpc_server.go @@ -102,9 +102,7 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ // tell the volume servers about the leader newLeader, err := t.Leader() if err == nil { - if err := stream.Send(&master_pb.HeartbeatResponse{ - Leader: newLeader, - }); err != nil { + if err := stream.Send(&master_pb.HeartbeatResponse{Leader: newLeader}); err != nil { return err } } diff --git a/weed/server/master_server_handlers_admin.go b/weed/server/master_server_handlers_admin.go index deb2d8ba1..7ecf581ae 100644 --- a/weed/server/master_server_handlers_admin.go +++ b/weed/server/master_server_handlers_admin.go @@ -138,7 +138,7 @@ func (ms *MasterServer) submitFromMasterServerHandler(w http.ResponseWriter, r * func (ms *MasterServer) HasWritableVolume(option *topology.VolumeGrowOption) bool { vl := ms.Topo.GetVolumeLayout(option.Collection, option.ReplicaPlacement, option.Ttl) - return vl.GetActiveVolumeCount(option) > 0 + return vl.GetWritableVolumeCount(option) > 0 } func (ms *MasterServer) getVolumeGrowOption(r *http.Request) (*topology.VolumeGrowOption, error) { diff --git a/weed/storage/volume_super_block.go b/weed/storage/volume_super_block.go index 2a3be122e..616db5bbb 100644 --- a/weed/storage/volume_super_block.go +++ b/weed/storage/volume_super_block.go @@ -50,14 +50,12 @@ func (s *SuperBlock) Bytes() []byte { if s.Extra != nil { extraData, err := proto.Marshal(s.Extra) - if err != nil { - glog.Fatalf("cannot marshal super block extra %+v: %v", s.Extra, err) - } + util.LogFatalIfError(err, "cannot marshal super block extra %+v: %v", s.Extra, err) + extraSize := len(extraData) - if extraSize > 256*256-2 { - // reserve a couple of bits for future extension - glog.Fatalf("super block extra size is %d bigger than %d", extraSize, 256*256-2) - } + // reserve a couple of bits for future extension + util.LogFatalIf(extraSize > 256*256-2, "super block extra size is %d bigger than %d", extraSize, 256*256-2) + s.extraSize = uint16(extraSize) util.Uint16toBytes(header[6:8], s.extraSize) diff --git a/weed/topology/topology.go b/weed/topology/topology.go index 95e932f23..fd69ebb54 100644 --- a/weed/topology/topology.go +++ b/weed/topology/topology.go @@ -97,7 +97,7 @@ func (t *Topology) NextVolumeId() storage.VolumeId { func (t *Topology) HasWritableVolume(option *VolumeGrowOption) bool { vl := t.GetVolumeLayout(option.Collection, option.ReplicaPlacement, option.Ttl) - return vl.GetActiveVolumeCount(option) > 0 + return vl.GetWritableVolumeCount(option) > 0 } func (t *Topology) PickForWrite(count uint64, option *VolumeGrowOption) (string, uint64, *DataNode, error) { diff --git a/weed/topology/topology_event_handling.go b/weed/topology/topology_event_handling.go index c5459204f..0ad17df2d 100644 --- a/weed/topology/topology_event_handling.go +++ b/weed/topology/topology_event_handling.go @@ -37,10 +37,6 @@ func (t *Topology) StartRefreshWritableVolumes(garbageThreshold float64, preallo } func (t *Topology) SetVolumeCapacityFull(volumeInfo storage.VolumeInfo) bool { vl := t.GetVolumeLayout(volumeInfo.Collection, volumeInfo.ReplicaPlacement, volumeInfo.Ttl) - if !vl.SetVolumeCapacityFull(volumeInfo.Id) { - return false - } - vl.accessLock.RLock() defer vl.accessLock.RUnlock() @@ -54,8 +50,6 @@ func (t *Topology) SetVolumeCapacityFull(volumeInfo storage.VolumeInfo) bool { func (t *Topology) UnRegisterDataNode(dn *DataNode) { for _, v := range dn.GetVolumes() { glog.V(0).Infoln("Removing Volume", v.Id, "from the dead volume server", dn.Id()) - vl := t.GetVolumeLayout(v.Collection, v.ReplicaPlacement, v.Ttl) - vl.SetVolumeUnavailable(dn, v.Id) } dn.UpAdjustVolumeCountDelta(-dn.GetVolumeCount()) dn.UpAdjustActiveVolumeCountDelta(-dn.GetActiveVolumeCount()) diff --git a/weed/topology/topology_vacuum.go b/weed/topology/topology_vacuum.go index d6b09314b..7343798ff 100644 --- a/weed/topology/topology_vacuum.go +++ b/weed/topology/topology_vacuum.go @@ -35,7 +35,7 @@ func batchVacuumVolumeCheck(vl *VolumeLayout, vid storage.VolumeId, locationlist }(index, dn.Url(), vid) } isCheckSuccess := true - for _ = range locationlist.list { + for range locationlist.list { select { case canVacuum := <-ch: isCheckSuccess = isCheckSuccess && canVacuum @@ -47,7 +47,6 @@ func batchVacuumVolumeCheck(vl *VolumeLayout, vid storage.VolumeId, locationlist return isCheckSuccess } func batchVacuumVolumeCompact(vl *VolumeLayout, vid storage.VolumeId, locationlist *VolumeLocationList, preallocate int64) bool { - vl.removeFromWritable(vid) ch := make(chan bool, locationlist.Length()) for index, dn := range locationlist.list { go func(index int, url string, vid storage.VolumeId) { @@ -68,7 +67,7 @@ func batchVacuumVolumeCompact(vl *VolumeLayout, vid storage.VolumeId, locationli }(index, dn.Url(), vid) } isVacuumSuccess := true - for _ = range locationlist.list { + for range locationlist.list { select { case canCommit := <-ch: isVacuumSuccess = isVacuumSuccess && canCommit @@ -82,7 +81,7 @@ func batchVacuumVolumeCompact(vl *VolumeLayout, vid storage.VolumeId, locationli func batchVacuumVolumeCommit(vl *VolumeLayout, vid storage.VolumeId, locationlist *VolumeLocationList) bool { isCommitSuccess := true for _, dn := range locationlist.list { - glog.V(0).Infoln("Start Commiting vacuum", vid, "on", dn.Url()) + glog.V(0).Infoln("Start Committing vacuum", vid, "on", dn.Url()) err := operation.WithVolumeServerClient(dn.Url(), func(volumeServerClient volume_server_pb.VolumeServerClient) error { _, err := volumeServerClient.VacuumVolumeCommit(context.Background(), &volume_server_pb.VacuumVolumeCommitRequest{ VolumdId: uint32(vid), @@ -93,7 +92,7 @@ func batchVacuumVolumeCommit(vl *VolumeLayout, vid storage.VolumeId, locationlis glog.Errorf("Error when committing vacuum %d on %s: %v", vid, dn.Url(), err) isCommitSuccess = false } else { - glog.V(0).Infof("Complete Commiting vacuum %d on %s", vid, dn.Url()) + glog.V(0).Infof("Complete Committing vacuum %d on %s", vid, dn.Url()) } if isCommitSuccess { vl.SetVolumeAvailable(dn, vid) @@ -133,7 +132,6 @@ func (t *Topology) Vacuum(garbageThreshold float64, preallocate int64) int { } func vacuumOneVolumeLayout(volumeLayout *VolumeLayout, c *Collection, garbageThreshold float64, preallocate int64) { - volumeLayout.accessLock.RLock() tmpMap := make(map[storage.VolumeId]*VolumeLocationList) for vid, locationList := range volumeLayout.vid2location { @@ -144,7 +142,7 @@ func vacuumOneVolumeLayout(volumeLayout *VolumeLayout, c *Collection, garbageThr for vid, locationList := range tmpMap { volumeLayout.accessLock.RLock() - isReadOnly, hasValue := volumeLayout.readonlyVolumes[vid] + isReadOnly, hasValue := locationList.HasReadOnly(vid) volumeLayout.accessLock.RUnlock() if hasValue && isReadOnly { diff --git a/weed/topology/volume_growth.go b/weed/topology/volume_growth.go index dfee75dc5..e3b8ab956 100644 --- a/weed/topology/volume_growth.go +++ b/weed/topology/volume_growth.go @@ -104,7 +104,7 @@ func (vg *VolumeGrowth) findAndGrow(topo *Topology, option *VolumeGrowOption) (i func (vg *VolumeGrowth) findEmptySlotsForOneVolume(topo *Topology, option *VolumeGrowOption) (servers []*DataNode, err error) { //find main datacenter and other data centers rp := option.ReplicaPlacement - mainDataCenter, otherDataCenters, dc_err := topo.RandomlyPickNodes(rp.DiffDataCenterCount+1, func(node Node) error { + mainDataCenter, otherDataCenters, dcErr := topo.RandomlyPickNodes(rp.DiffDataCenterCount+1, func(node Node) error { if option.DataCenter != "" && node.IsDataCenter() && node.Id() != NodeId(option.DataCenter) { return fmt.Errorf("Not matching preferred data center:%s", option.DataCenter) } @@ -131,8 +131,8 @@ func (vg *VolumeGrowth) findEmptySlotsForOneVolume(topo *Topology, option *Volum } return nil }) - if dc_err != nil { - return nil, dc_err + if dcErr != nil { + return nil, dcErr } //find main rack and other racks @@ -188,9 +188,9 @@ func (vg *VolumeGrowth) findEmptySlotsForOneVolume(topo *Topology, option *Volum return servers, e } } - for _, datacenter := range otherDataCenters { - r := rand.Intn(datacenter.FreeSpace()) - if server, e := datacenter.ReserveOneVolume(r); e == nil { + for _, dataCenter := range otherDataCenters { + r := rand.Intn(dataCenter.FreeSpace()) + if server, e := dataCenter.ReserveOneVolume(r); e == nil { servers = append(servers, server) } else { return servers, e diff --git a/weed/topology/volume_layout.go b/weed/topology/volume_layout.go index 3d043ae62..e423b6c92 100644 --- a/weed/topology/volume_layout.go +++ b/weed/topology/volume_layout.go @@ -1,7 +1,6 @@ package topology import ( - "errors" "fmt" "math/rand" "sync" @@ -13,14 +12,12 @@ import ( // mapping from volume to its locations, inverted from server to volume type VolumeLayout struct { - rp *storage.ReplicaPlacement - ttl *storage.TTL - vid2location map[storage.VolumeId]*VolumeLocationList - writables []storage.VolumeId // transient array of writable volume id - readonlyVolumes map[storage.VolumeId]bool // transient set of readonly volumes - oversizedVolumes map[storage.VolumeId]bool // set of oversized volumes - volumeSizeLimit uint64 - accessLock sync.RWMutex + rp *storage.ReplicaPlacement + ttl *storage.TTL + vid2location map[storage.VolumeId]*VolumeLocationList + + volumeSizeLimit uint64 + accessLock sync.RWMutex } type VolumeLayoutStats struct { @@ -31,18 +28,15 @@ type VolumeLayoutStats struct { func NewVolumeLayout(rp *storage.ReplicaPlacement, ttl *storage.TTL, volumeSizeLimit uint64) *VolumeLayout { return &VolumeLayout{ - rp: rp, - ttl: ttl, - vid2location: make(map[storage.VolumeId]*VolumeLocationList), - writables: *new([]storage.VolumeId), - readonlyVolumes: make(map[storage.VolumeId]bool), - oversizedVolumes: make(map[storage.VolumeId]bool), - volumeSizeLimit: volumeSizeLimit, + rp: rp, + ttl: ttl, + vid2location: make(map[storage.VolumeId]*VolumeLocationList), + volumeSizeLimit: volumeSizeLimit, } } func (vl *VolumeLayout) String() string { - return fmt.Sprintf("rp:%v, ttl:%v, vid2location:%v, writables:%v, volumeSizeLimit:%v", vl.rp, vl.ttl, vl.vid2location, vl.writables, vl.volumeSizeLimit) + return fmt.Sprintf("rp:%v, ttl:%v, vid2location:%v, volumeSizeLimit:%v", vl.rp, vl.ttl, vl.vid2location, vl.volumeSizeLimit) } func (vl *VolumeLayout) RegisterVolume(v *storage.VolumeInfo, dn *DataNode) { @@ -58,60 +52,28 @@ func (vl *VolumeLayout) RegisterVolume(v *storage.VolumeInfo, dn *DataNode) { if vInfo, err := dn.GetVolumesById(v.Id); err == nil { if vInfo.IsReadOnly() { glog.V(3).Infof("vid %d removed from writable", v.Id) - vl.removeFromWritable(v.Id) - vl.readonlyVolumes[v.Id] = true return - } else { - delete(vl.readonlyVolumes, v.Id) } } else { glog.V(3).Infof("vid %d removed from writable", v.Id) - vl.removeFromWritable(v.Id) - delete(vl.readonlyVolumes, v.Id) return } } - if vl.vid2location[v.Id].Length() == vl.rp.GetCopyCount() && vl.isWritable(v) { - if _, ok := vl.oversizedVolumes[v.Id]; !ok { - vl.addToWritable(v.Id) - } - } else { - vl.rememberOversizedVolumne(v) - vl.removeFromWritable(v.Id) - } -} - -func (vl *VolumeLayout) rememberOversizedVolumne(v *storage.VolumeInfo) { - if vl.isOversized(v) { - vl.oversizedVolumes[v.Id] = true - } } func (vl *VolumeLayout) UnRegisterVolume(v *storage.VolumeInfo, dn *DataNode) { vl.accessLock.Lock() defer vl.accessLock.Unlock() - vl.removeFromWritable(v.Id) delete(vl.vid2location, v.Id) } -func (vl *VolumeLayout) addToWritable(vid storage.VolumeId) { - for _, id := range vl.writables { - if vid == id { - return - } - } - vl.writables = append(vl.writables, vid) -} - -func (vl *VolumeLayout) isOversized(v *storage.VolumeInfo) bool { +func (vl *VolumeLayout) isOverSized(v *storage.VolumeInfo) bool { return uint64(v.Size) >= vl.volumeSizeLimit } func (vl *VolumeLayout) isWritable(v *storage.VolumeInfo) bool { - return !vl.isOversized(v) && - v.Version == storage.CurrentVersion && - !v.IsReadOnly() + return !vl.isOverSized(v) && v.Version == storage.CurrentVersion && !v.IsReadOnly() } func (vl *VolumeLayout) isEmpty() bool { @@ -145,127 +107,76 @@ func (vl *VolumeLayout) PickForWrite(count uint64, option *VolumeGrowOption) (*s vl.accessLock.RLock() defer vl.accessLock.RUnlock() - lenWriters := len(vl.writables) - if lenWriters <= 0 { - glog.V(0).Infoln("No more writable volumes!") - return nil, 0, nil, errors.New("No more writable volumes!") - } - if option.DataCenter == "" { - vid := vl.writables[rand.Intn(lenWriters)] - locationList := vl.vid2location[vid] - if locationList != nil { - return &vid, count, locationList, nil - } - return nil, 0, nil, errors.New("Strangely vid " + vid.String() + " is on no machine!") - } var vid storage.VolumeId var locationList *VolumeLocationList counter := 0 - for _, v := range vl.writables { - volumeLocationList := vl.vid2location[v] - for _, dn := range volumeLocationList.list { - if option.MatchesDataCenter(dn) { - if !option.MatchesRackDataNode(dn) { - continue - } - - counter++ - if rand.Intn(counter) < 1 { - vid, locationList = v, volumeLocationList - } + +NextVolume: + for v, vll := range vl.vid2location { + if option.ReplicaPlacement.GetCopyCount() != vll.Length() { + continue + } + + for _, dn := range vll.list { + if !option.MatchesDataCenter(dn) || !option.MatchesRackDataNode(dn) { + continue NextVolume } + + vi, _ := dn.GetVolumesById(v) + if !vl.isWritable(&vi) { + continue NextVolume + } + } + + counter++ + if rand.Intn(counter) < 1 { + vid, locationList = v, vll } } return &vid, count, locationList, nil } -func (vl *VolumeLayout) GetActiveVolumeCount(option *VolumeGrowOption) int { +func (vl *VolumeLayout) GetWritableVolumeCount(option *VolumeGrowOption) int { vl.accessLock.RLock() defer vl.accessLock.RUnlock() - if option.DataCenter == "" { - return len(vl.writables) - } counter := 0 - for _, v := range vl.writables { - for _, dn := range vl.vid2location[v].list { - if option.MatchesDataCenter(dn) { - if !option.MatchesRackDataNode(dn) { - continue - } - - counter++ - } - } - } - return counter -} -func (vl *VolumeLayout) removeFromWritable(vid storage.VolumeId) bool { - toDeleteIndex := -1 - for k, id := range vl.writables { - if id == vid { - toDeleteIndex = k - break - } - } - if toDeleteIndex >= 0 { - glog.V(0).Infoln("Volume", vid, "becomes unwritable") - vl.writables = append(vl.writables[0:toDeleteIndex], vl.writables[toDeleteIndex+1:]...) - return true - } - return false -} -func (vl *VolumeLayout) setVolumeWritable(vid storage.VolumeId) bool { - for _, v := range vl.writables { - if v == vid { - return false +NextVolume: + for vid, vll := range vl.vid2location { + if option.ReplicaPlacement.GetCopyCount() != vll.Length() { + continue } - } - glog.V(0).Infoln("Volume", vid, "becomes writable") - vl.writables = append(vl.writables, vid) - return true -} -func (vl *VolumeLayout) SetVolumeUnavailable(dn *DataNode, vid storage.VolumeId) bool { - vl.accessLock.Lock() - defer vl.accessLock.Unlock() + for _, dn := range vll.list { + if !option.MatchesDataCenter(dn) || !option.MatchesRackDataNode(dn) { + continue NextVolume + } - if location, ok := vl.vid2location[vid]; ok { - if location.Remove(dn) { - if location.Length() < vl.rp.GetCopyCount() { - glog.V(0).Infoln("Volume", vid, "has", location.Length(), "replica, less than required", vl.rp.GetCopyCount()) - return vl.removeFromWritable(vid) + vi, _ := dn.GetVolumesById(vid) + if !vl.isWritable(&vi) { + continue NextVolume } } + + counter++ } - return false + + return counter } + func (vl *VolumeLayout) SetVolumeAvailable(dn *DataNode, vid storage.VolumeId) bool { vl.accessLock.Lock() defer vl.accessLock.Unlock() vl.vid2location[vid].Set(dn) - if vl.vid2location[vid].Length() >= vl.rp.GetCopyCount() { - return vl.setVolumeWritable(vid) - } return false } -func (vl *VolumeLayout) SetVolumeCapacityFull(vid storage.VolumeId) bool { - vl.accessLock.Lock() - defer vl.accessLock.Unlock() - - // glog.V(0).Infoln("Volume", vid, "reaches full capacity.") - return vl.removeFromWritable(vid) -} - func (vl *VolumeLayout) ToMap() map[string]interface{} { m := make(map[string]interface{}) m["replication"] = vl.rp.String() m["ttl"] = vl.ttl.String() - m["writables"] = vl.writables - //m["locations"] = vl.vid2location return m } @@ -281,7 +192,17 @@ func (vl *VolumeLayout) Stats() *VolumeLayoutStats { size, fileCount := vll.Stats(vid, freshThreshold) ret.FileCount += uint64(fileCount) ret.UsedSize += size - if vl.readonlyVolumes[vid] { + + writable := true + for _, dn := range vll.list { + vi, _ := dn.GetVolumesById(vid) + if !vl.isWritable(&vi) { + writable = false + break + } + } + + if writable { ret.TotalSize += size } else { ret.TotalSize += vl.volumeSizeLimit diff --git a/weed/topology/volume_location_list.go b/weed/topology/volume_location_list.go index 8d5881333..dd8cb4a94 100644 --- a/weed/topology/volume_location_list.go +++ b/weed/topology/volume_location_list.go @@ -14,42 +14,49 @@ func NewVolumeLocationList() *VolumeLocationList { return &VolumeLocationList{} } -func (dnll *VolumeLocationList) String() string { - return fmt.Sprintf("%v", dnll.list) +func (vll *VolumeLocationList) String() string { + return fmt.Sprintf("%v", vll.list) } -func (dnll *VolumeLocationList) Head() *DataNode { - //mark first node as master volume - return dnll.list[0] +func (vll *VolumeLocationList) Head() *DataNode { + // mark first node as master volume + return vll.list[0] } -func (dnll *VolumeLocationList) Length() int { - return len(dnll.list) +func (vll *VolumeLocationList) Length() int { + return len(vll.list) } -func (dnll *VolumeLocationList) Set(loc *DataNode) { - for i := 0; i < len(dnll.list); i++ { - if loc.Ip == dnll.list[i].Ip && loc.Port == dnll.list[i].Port { - dnll.list[i] = loc - return +func (vll *VolumeLocationList) FindDataNodeByIpPort(ip string, port int) (*DataNode, int) { + for i := 0; i < len(vll.list); i++ { + if ip == vll.list[i].Ip && port == vll.list[i].Port { + return vll.list[i], i } } - dnll.list = append(dnll.list, loc) + + return nil, -1 +} +func (vll *VolumeLocationList) Set(loc *DataNode) { + dataNode, i := vll.FindDataNodeByIpPort(loc.Ip, loc.Port) + if dataNode != nil { + vll.list[i] = loc + } else { + vll.list = append(vll.list, loc) + } } -func (dnll *VolumeLocationList) Remove(loc *DataNode) bool { - for i, dnl := range dnll.list { - if loc.Ip == dnl.Ip && loc.Port == dnl.Port { - dnll.list = append(dnll.list[:i], dnll.list[i+1:]...) - return true - } +func (vll *VolumeLocationList) Remove(loc *DataNode) bool { + dataNode, i := vll.FindDataNodeByIpPort(loc.Ip, loc.Port) + if dataNode != nil { + vll.list = append(vll.list[:i], vll.list[i+1:]...) + return true } return false } -func (dnll *VolumeLocationList) Refresh(freshThreshHold int64) { +func (vll *VolumeLocationList) Refresh(freshThreshHold int64) { var changed bool - for _, dnl := range dnll.list { + for _, dnl := range vll.list { if dnl.LastSeen < freshThreshHold { changed = true break @@ -57,17 +64,17 @@ func (dnll *VolumeLocationList) Refresh(freshThreshHold int64) { } if changed { var l []*DataNode - for _, dnl := range dnll.list { + for _, dnl := range vll.list { if dnl.LastSeen >= freshThreshHold { l = append(l, dnl) } } - dnll.list = l + vll.list = l } } -func (dnll *VolumeLocationList) Stats(vid storage.VolumeId, freshThreshHold int64) (size uint64, fileCount int) { - for _, dnl := range dnll.list { +func (vll *VolumeLocationList) Stats(vid storage.VolumeId, freshThreshHold int64) (size uint64, fileCount int) { + for _, dnl := range vll.list { if dnl.LastSeen < freshThreshHold { vinfo, err := dnl.GetVolumesById(vid) if err == nil { @@ -77,3 +84,19 @@ func (dnll *VolumeLocationList) Stats(vid storage.VolumeId, freshThreshHold int6 } return 0, 0 } + +func (vll *VolumeLocationList) HasReadOnly(id storage.VolumeId) (hasReadOnly, hasValue bool) { + for _, dn := range vll.list { + vi, err := dn.GetVolumesById(id) + if err == nil { + hasValue = true + } + + if vi.IsReadOnly() { + hasReadOnly = true + break + } + } + + return +} diff --git a/weed/util/arg.go b/weed/util/arg.go index 2c4e46daf..137f44d4c 100644 --- a/weed/util/arg.go +++ b/weed/util/arg.go @@ -1,7 +1,6 @@ package util import ( - "github.com/chrislusf/seaweedfs/weed/glog" "github.com/dustin/go-humanize" ) @@ -10,13 +9,10 @@ func ParseVolumeSizeLimit(volumeSizeLimitMiBValue uint, volumeSizeLimitArgValue if volumeSizeLimitArgValue != "" { var err error volumeSizeLimit, err = humanize.ParseBytes(volumeSizeLimitArgValue) - if err != nil { - glog.Fatalf("Parse volumeSizeLimit %s : %s", volumeSizeLimitMiBValue, err) - } - } - if volumeSizeLimit > uint64(30*1000)*1024*1024 { - glog.Fatalf("volumeSizeLimitMB should be smaller than 30000") + LogFatalIfError(err, "Parse volumeSizeLimit %d : %s", volumeSizeLimitMiBValue, err) } + LogFatalIf(volumeSizeLimit > uint64(30*1000)*1024*1024, "volumeSizeLimitMB should be smaller than 30000") + return volumeSizeLimit } diff --git a/weed/util/glog.go b/weed/util/glog.go new file mode 100644 index 000000000..2431da275 --- /dev/null +++ b/weed/util/glog.go @@ -0,0 +1,15 @@ +package util + +import "github.com/chrislusf/seaweedfs/weed/glog" + +func LogFatalIfError(err error, format string, args ...interface{}) { + if err != nil { + glog.Fatalf(format, args...) + } +} + +func LogFatalIf(condition bool, format string, args ...interface{}) { + if condition { + glog.Fatalf(format, args...) + } +} diff --git a/weed/util/sys.go b/weed/util/sys.go new file mode 100644 index 000000000..9c0499ef6 --- /dev/null +++ b/weed/util/sys.go @@ -0,0 +1,10 @@ +package util + +import "runtime" + +func GoMaxProcs(maxCPU *int) { + if *maxCPU < 1 { + *maxCPU = runtime.NumCPU() + } + runtime.GOMAXPROCS(*maxCPU) +}