From c2884cace2fae44dd97d718db30a22ab70151d63 Mon Sep 17 00:00:00 2001 From: Lei Liu Date: Tue, 29 Oct 2019 21:28:28 +0800 Subject: [PATCH 1/2] misc updated Signed-off-by: Lei Liu --- weed/command/mount_darwin.go | 1 - weed/command/mount_freebsd.go | 1 - weed/command/mount_linux.go | 1 - weed/server/master_grpc_server_collection.go | 4 ++-- weed/server/master_grpc_server_volume.go | 2 +- weed/server/master_server.go | 12 ++++++------ weed/server/master_server_handlers.go | 2 +- weed/server/master_server_handlers_admin.go | 10 +++++----- weed/server/raft_server.go | 2 +- weed/topology/topology.go | 4 ++-- 10 files changed, 18 insertions(+), 21 deletions(-) diff --git a/weed/command/mount_darwin.go b/weed/command/mount_darwin.go index 5b3fff513..632691e47 100644 --- a/weed/command/mount_darwin.go +++ b/weed/command/mount_darwin.go @@ -1,7 +1,6 @@ package command import ( - "github.com/seaweedfs/fuse" ) diff --git a/weed/command/mount_freebsd.go b/weed/command/mount_freebsd.go index 5b3fff513..632691e47 100644 --- a/weed/command/mount_freebsd.go +++ b/weed/command/mount_freebsd.go @@ -1,7 +1,6 @@ package command import ( - "github.com/seaweedfs/fuse" ) diff --git a/weed/command/mount_linux.go b/weed/command/mount_linux.go index f07e5bfe8..7d94e5142 100644 --- a/weed/command/mount_linux.go +++ b/weed/command/mount_linux.go @@ -1,7 +1,6 @@ package command import ( - "github.com/seaweedfs/fuse" ) diff --git a/weed/server/master_grpc_server_collection.go b/weed/server/master_grpc_server_collection.go index a50cfa192..f8e0785f6 100644 --- a/weed/server/master_grpc_server_collection.go +++ b/weed/server/master_grpc_server_collection.go @@ -57,7 +57,7 @@ func (ms *MasterServer) doDeleteNormalCollection(collectionName string) error { } for _, server := range collection.ListVolumeServers() { - err := operation.WithVolumeServerClient(server.Url(), ms.grpcDialOpiton, func(client volume_server_pb.VolumeServerClient) error { + err := operation.WithVolumeServerClient(server.Url(), ms.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { _, deleteErr := client.DeleteCollection(context.Background(), &volume_server_pb.DeleteCollectionRequest{ Collection: collectionName, }) @@ -77,7 +77,7 @@ func (ms *MasterServer) doDeleteEcCollection(collectionName string) error { listOfEcServers := ms.Topo.ListEcServersByCollection(collectionName) for _, server := range listOfEcServers { - err := operation.WithVolumeServerClient(server, ms.grpcDialOpiton, func(client volume_server_pb.VolumeServerClient) error { + err := operation.WithVolumeServerClient(server, ms.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { _, deleteErr := client.DeleteCollection(context.Background(), &volume_server_pb.DeleteCollectionRequest{ Collection: collectionName, }) diff --git a/weed/server/master_grpc_server_volume.go b/weed/server/master_grpc_server_volume.go index eacfb5814..0580acf76 100644 --- a/weed/server/master_grpc_server_volume.go +++ b/weed/server/master_grpc_server_volume.go @@ -78,7 +78,7 @@ func (ms *MasterServer) Assign(ctx context.Context, req *master_pb.AssignRequest } ms.vgLock.Lock() if !ms.Topo.HasWritableVolume(option) { - if _, err = ms.vg.AutomaticGrowByType(option, ms.grpcDialOpiton, ms.Topo); err != nil { + if _, err = ms.vg.AutomaticGrowByType(option, ms.grpcDialOption, ms.Topo); err != nil { ms.vgLock.Unlock() return nil, fmt.Errorf("Cannot grow volume group! %v", err) } diff --git a/weed/server/master_server.go b/weed/server/master_server.go index e9eb32cca..cde583560 100644 --- a/weed/server/master_server.go +++ b/weed/server/master_server.go @@ -3,9 +3,6 @@ package weed_server import ( "context" "fmt" - "github.com/chrislusf/seaweedfs/weed/shell" - "github.com/chrislusf/seaweedfs/weed/wdclient" - "google.golang.org/grpc" "net/http" "net/http/httputil" "net/url" @@ -21,10 +18,13 @@ import ( "github.com/chrislusf/seaweedfs/weed/pb/master_pb" "github.com/chrislusf/seaweedfs/weed/security" "github.com/chrislusf/seaweedfs/weed/sequence" + "github.com/chrislusf/seaweedfs/weed/shell" "github.com/chrislusf/seaweedfs/weed/topology" "github.com/chrislusf/seaweedfs/weed/util" + "github.com/chrislusf/seaweedfs/weed/wdclient" "github.com/gorilla/mux" "github.com/spf13/viper" + "google.golang.org/grpc" ) type MasterOption struct { @@ -57,7 +57,7 @@ type MasterServer struct { clientChansLock sync.RWMutex clientChans map[string]chan *master_pb.VolumeLocation - grpcDialOpiton grpc.DialOption + grpcDialOption grpc.DialOption MasterClient *wdclient.MasterClient } @@ -83,7 +83,7 @@ func NewMasterServer(r *mux.Router, option *MasterOption, peers []string) *Maste option: option, preallocateSize: preallocateSize, clientChans: make(map[string]chan *master_pb.VolumeLocation), - grpcDialOpiton: grpcDialOption, + grpcDialOption: grpcDialOption, MasterClient: wdclient.NewMasterClient(context.Background(), grpcDialOption, "master", peers), } ms.bounedLeaderChan = make(chan int, 16) @@ -112,7 +112,7 @@ func NewMasterServer(r *mux.Router, option *MasterOption, peers []string) *Maste r.HandleFunc("/{fileId}", ms.redirectHandler) } - ms.Topo.StartRefreshWritableVolumes(ms.grpcDialOpiton, ms.option.GarbageThreshold, ms.preallocateSize) + ms.Topo.StartRefreshWritableVolumes(ms.grpcDialOption, ms.option.GarbageThreshold, ms.preallocateSize) ms.startAdminScripts() diff --git a/weed/server/master_server_handlers.go b/weed/server/master_server_handlers.go index 728c32076..c10f9a5b7 100644 --- a/weed/server/master_server_handlers.go +++ b/weed/server/master_server_handlers.go @@ -108,7 +108,7 @@ func (ms *MasterServer) dirAssignHandler(w http.ResponseWriter, r *http.Request) ms.vgLock.Lock() defer ms.vgLock.Unlock() if !ms.Topo.HasWritableVolume(option) { - if _, err = ms.vg.AutomaticGrowByType(option, ms.grpcDialOpiton, ms.Topo); err != nil { + if _, err = ms.vg.AutomaticGrowByType(option, ms.grpcDialOption, ms.Topo); err != nil { writeJsonError(w, r, http.StatusInternalServerError, fmt.Errorf("Cannot grow volume group! %v", err)) return diff --git a/weed/server/master_server_handlers_admin.go b/weed/server/master_server_handlers_admin.go index 88c7f1d34..486bf31f4 100644 --- a/weed/server/master_server_handlers_admin.go +++ b/weed/server/master_server_handlers_admin.go @@ -24,7 +24,7 @@ func (ms *MasterServer) collectionDeleteHandler(w http.ResponseWriter, r *http.R return } for _, server := range collection.ListVolumeServers() { - err := operation.WithVolumeServerClient(server.Url(), ms.grpcDialOpiton, func(client volume_server_pb.VolumeServerClient) error { + err := operation.WithVolumeServerClient(server.Url(), ms.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { _, deleteErr := client.DeleteCollection(context.Background(), &volume_server_pb.DeleteCollectionRequest{ Collection: collection.Name, }) @@ -57,7 +57,7 @@ func (ms *MasterServer) volumeVacuumHandler(w http.ResponseWriter, r *http.Reque } } glog.Infoln("garbageThreshold =", gcThreshold) - ms.Topo.Vacuum(ms.grpcDialOpiton, gcThreshold, ms.preallocateSize) + ms.Topo.Vacuum(ms.grpcDialOption, gcThreshold, ms.preallocateSize) ms.dirStatusHandler(w, r) } @@ -73,7 +73,7 @@ func (ms *MasterServer) volumeGrowHandler(w http.ResponseWriter, r *http.Request if ms.Topo.FreeSpace() < int64(count*option.ReplicaPlacement.GetCopyCount()) { err = fmt.Errorf("only %d volumes left, not enough for %d", ms.Topo.FreeSpace(), count*option.ReplicaPlacement.GetCopyCount()) } else { - count, err = ms.vg.GrowByCountAndType(ms.grpcDialOpiton, count, option, ms.Topo) + count, err = ms.vg.GrowByCountAndType(ms.grpcDialOption, count, option, ms.Topo) } } else { err = fmt.Errorf("can not parse parameter count %s", r.FormValue("count")) @@ -119,13 +119,13 @@ func (ms *MasterServer) selfUrl(r *http.Request) string { } func (ms *MasterServer) submitFromMasterServerHandler(w http.ResponseWriter, r *http.Request) { if ms.Topo.IsLeader() { - submitForClientHandler(w, r, ms.selfUrl(r), ms.grpcDialOpiton) + submitForClientHandler(w, r, ms.selfUrl(r), ms.grpcDialOption) } else { masterUrl, err := ms.Topo.Leader() if err != nil { writeJsonError(w, r, http.StatusInternalServerError, err) } else { - submitForClientHandler(w, r, masterUrl, ms.grpcDialOpiton) + submitForClientHandler(w, r, masterUrl, ms.grpcDialOption) } } } diff --git a/weed/server/raft_server.go b/weed/server/raft_server.go index 88320ed98..53289f1c1 100644 --- a/weed/server/raft_server.go +++ b/weed/server/raft_server.go @@ -25,7 +25,7 @@ type RaftServer struct { *raft.GrpcServer } -func NewRaftServer(grpcDialOption grpc.DialOption, peers []string, serverAddr string, dataDir string, topo *topology.Topology, pulseSeconds int) *RaftServer { +func NewRaftServer(grpcDialOption grpc.DialOption, peers []string, serverAddr, dataDir string, topo *topology.Topology, pulseSeconds int) *RaftServer { s := &RaftServer{ peers: peers, serverAddr: serverAddr, diff --git a/weed/topology/topology.go b/weed/topology/topology.go index eff8c99a0..ea0769248 100644 --- a/weed/topology/topology.go +++ b/weed/topology/topology.go @@ -120,10 +120,10 @@ func (t *Topology) HasWritableVolume(option *VolumeGrowOption) bool { func (t *Topology) PickForWrite(count uint64, option *VolumeGrowOption) (string, uint64, *DataNode, error) { vid, count, datanodes, err := t.GetVolumeLayout(option.Collection, option.ReplicaPlacement, option.Ttl).PickForWrite(count, option) if err != nil { - return "", 0, nil, fmt.Errorf("failed to find writable volumes for collectio:%s replication:%s ttl:%s error: %v", option.Collection, option.ReplicaPlacement.String(), option.Ttl.String(), err) + return "", 0, nil, fmt.Errorf("failed to find writable volumes for collection:%s replication:%s ttl:%s error: %v", option.Collection, option.ReplicaPlacement.String(), option.Ttl.String(), err) } if datanodes.Length() == 0 { - return "", 0, nil, fmt.Errorf("no writable volumes available for for collectio:%s replication:%s ttl:%s", option.Collection, option.ReplicaPlacement.String(), option.Ttl.String()) + return "", 0, nil, fmt.Errorf("no writable volumes available for collection:%s replication:%s ttl:%s", option.Collection, option.ReplicaPlacement.String(), option.Ttl.String()) } fileId, count := t.Sequence.NextFileId(count) return needle.NewFileId(*vid, fileId, rand.Uint32()).String(), count, datanodes.Head(), nil From 57e441d67be69311b2428d88dd41b5791ee51f99 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Tue, 29 Oct 2019 23:18:01 -0700 Subject: [PATCH 2/2] fix compaction logic --- weed/storage/volume_vacuum.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/weed/storage/volume_vacuum.go b/weed/storage/volume_vacuum.go index 73314f022..0e5130572 100644 --- a/weed/storage/volume_vacuum.go +++ b/weed/storage/volume_vacuum.go @@ -23,7 +23,7 @@ func (v *Volume) garbageLevel() float64 { func (v *Volume) Compact(preallocate int64, compactionBytePerSecond int64) error { - if v.MemoryMapMaxSizeMb > 0 { //it makes no sense to compact in memory + if v.MemoryMapMaxSizeMb == 0 { //it makes no sense to compact in memory glog.V(3).Infof("Compacting volume %d ...", v.Id) //no need to lock for copy on write //v.accessLock.Lock() @@ -46,7 +46,7 @@ func (v *Volume) Compact(preallocate int64, compactionBytePerSecond int64) error func (v *Volume) Compact2() error { - if v.MemoryMapMaxSizeMb > 0 { //it makes no sense to compact in memory + if v.MemoryMapMaxSizeMb == 0 { //it makes no sense to compact in memory glog.V(3).Infof("Compact2 volume %d ...", v.Id) v.isCompacting = true @@ -63,7 +63,7 @@ func (v *Volume) Compact2() error { } func (v *Volume) CommitCompact() error { - if v.MemoryMapMaxSizeMb > 0 { //it makes no sense to compact in memory + if v.MemoryMapMaxSizeMb == 0 { //it makes no sense to compact in memory glog.V(0).Infof("Committing volume %d vacuuming...", v.Id) v.isCompacting = true