diff --git a/weed/server/master_grpc_server_assign.go b/weed/server/master_grpc_server_assign.go index 5839a6a73..7455c9ea4 100644 --- a/weed/server/master_grpc_server_assign.go +++ b/weed/server/master_grpc_server_assign.go @@ -85,7 +85,7 @@ func (ms *MasterServer) Assign(ctx context.Context, req *master_pb.AssignRequest return nil, fmt.Errorf("no free volumes left for " + option.String()) } vl.AddGrowRequest() - ms.vgCh <- &topology.VolumeGrowRequest{ + ms.volumeGrowthRequestChan <- &topology.VolumeGrowRequest{ Option: option, Count: int(req.WritableVolumeCount), } diff --git a/weed/server/master_grpc_server_volume.go b/weed/server/master_grpc_server_volume.go index a344e5221..503da7fd4 100644 --- a/weed/server/master_grpc_server_volume.go +++ b/weed/server/master_grpc_server_volume.go @@ -22,7 +22,7 @@ func (ms *MasterServer) ProcessGrowRequest() { go func() { filter := sync.Map{} for { - req, ok := <-ms.vgCh + req, ok := <-ms.volumeGrowthRequestChan if !ok { break } diff --git a/weed/server/master_server.go b/weed/server/master_server.go index 9a5313a10..f89c9598f 100644 --- a/weed/server/master_server.go +++ b/weed/server/master_server.go @@ -60,8 +60,8 @@ type MasterServer struct { preallocateSize int64 Topo *topology.Topology - vg *topology.VolumeGrowth - vgCh chan *topology.VolumeGrowRequest + vg *topology.VolumeGrowth + volumeGrowthRequestChan chan *topology.VolumeGrowRequest boundedLeaderChan chan int @@ -97,6 +97,11 @@ func NewMasterServer(r *mux.Router, option *MasterOption, peers map[string]pb.Se v.SetDefault("master.volume_growth.copy_3", 3) v.SetDefault("master.volume_growth.copy_other", 1) v.SetDefault("master.volume_growth.threshold", 0.9) + topology.VolumeGrowStrategy.Copy1Count = v.GetInt("master.volume_growth.copy_1") + topology.VolumeGrowStrategy.Copy2Count = v.GetInt("master.volume_growth.copy_2") + topology.VolumeGrowStrategy.Copy3Count = v.GetInt("master.volume_growth.copy_3") + topology.VolumeGrowStrategy.CopyOtherCount = v.GetInt("master.volume_growth.copy_other") + topology.VolumeGrowStrategy.Threshold = v.GetFloat64("master.volume_growth.threshold") var preallocateSize int64 if option.VolumePreallocate { @@ -105,14 +110,14 @@ func NewMasterServer(r *mux.Router, option *MasterOption, peers map[string]pb.Se grpcDialOption := security.LoadClientTLS(v, "grpc.master") ms := &MasterServer{ - option: option, - preallocateSize: preallocateSize, - vgCh: make(chan *topology.VolumeGrowRequest, 1<<6), - clientChans: make(map[string]chan *master_pb.KeepConnectedResponse), - grpcDialOption: grpcDialOption, - MasterClient: wdclient.NewMasterClient(grpcDialOption, "", cluster.MasterType, option.Master, "", "", *pb.NewServiceDiscoveryFromMap(peers)), - adminLocks: NewAdminLocks(), - Cluster: cluster.NewCluster(), + option: option, + preallocateSize: preallocateSize, + volumeGrowthRequestChan: make(chan *topology.VolumeGrowRequest, 1<<6), + clientChans: make(map[string]chan *master_pb.KeepConnectedResponse), + grpcDialOption: grpcDialOption, + MasterClient: wdclient.NewMasterClient(grpcDialOption, "", cluster.MasterType, option.Master, "", "", *pb.NewServiceDiscoveryFromMap(peers)), + adminLocks: NewAdminLocks(), + Cluster: cluster.NewCluster(), } ms.boundedLeaderChan = make(chan int, 16) @@ -151,7 +156,7 @@ func NewMasterServer(r *mux.Router, option *MasterOption, peers map[string]pb.Se ms.Topo.StartRefreshWritableVolumes( ms.grpcDialOption, ms.option.GarbageThreshold, - v.GetFloat64("master.volume_growth.threshold"), + topology.VolumeGrowStrategy.Threshold, ms.preallocateSize, ) diff --git a/weed/server/master_server_handlers.go b/weed/server/master_server_handlers.go index 9dc6351a4..e4188420d 100644 --- a/weed/server/master_server_handlers.go +++ b/weed/server/master_server_handlers.go @@ -136,7 +136,7 @@ func (ms *MasterServer) dirAssignHandler(w http.ResponseWriter, r *http.Request) return } vl.AddGrowRequest() - ms.vgCh <- &topology.VolumeGrowRequest{ + ms.volumeGrowthRequestChan <- &topology.VolumeGrowRequest{ Option: option, Count: writableVolumeCount, } diff --git a/weed/stats/metrics.go b/weed/stats/metrics.go index 83391f047..4393c280c 100644 --- a/weed/stats/metrics.go +++ b/weed/stats/metrics.go @@ -194,7 +194,7 @@ var ( Buckets: prometheus.ExponentialBuckets(0.0001, 2, 24), }, []string{"type"}) - VolumeServerVolumeCounter = prometheus.NewGaugeVec( + VolumeServerVolumeGauge = prometheus.NewGaugeVec( prometheus.GaugeOpts{ Namespace: Namespace, Subsystem: "volumeServer", @@ -291,7 +291,7 @@ func init() { Gather.MustRegister(VolumeServerVacuumingCompactCounter) Gather.MustRegister(VolumeServerVacuumingCommitCounter) Gather.MustRegister(VolumeServerVacuumingHistogram) - Gather.MustRegister(VolumeServerVolumeCounter) + Gather.MustRegister(VolumeServerVolumeGauge) Gather.MustRegister(VolumeServerMaxVolumeCounter) Gather.MustRegister(VolumeServerReadOnlyVolumeGauge) Gather.MustRegister(VolumeServerDiskSizeGauge) @@ -354,5 +354,5 @@ func DeleteCollectionMetrics(collection string) { for _, volume_type := range readOnlyVolumeTypes { VolumeServerReadOnlyVolumeGauge.DeleteLabelValues(collection, volume_type) } - VolumeServerVolumeCounter.DeleteLabelValues(collection, "volume") + VolumeServerVolumeGauge.DeleteLabelValues(collection, "volume") } diff --git a/weed/storage/erasure_coding/ec_shard.go b/weed/storage/erasure_coding/ec_shard.go index 19ee17636..9fcb11525 100644 --- a/weed/storage/erasure_coding/ec_shard.go +++ b/weed/storage/erasure_coding/ec_shard.go @@ -44,7 +44,7 @@ func NewEcVolumeShard(diskType types.DiskType, dirname string, collection string } v.ecdFileSize = ecdFi.Size() - stats.VolumeServerVolumeCounter.WithLabelValues(v.Collection, "ec_shards").Inc() + stats.VolumeServerVolumeGauge.WithLabelValues(v.Collection, "ec_shards").Inc() return } @@ -88,7 +88,7 @@ func (shard *EcVolumeShard) Close() { func (shard *EcVolumeShard) Destroy() { os.Remove(shard.FileName() + ToExt(int(shard.ShardId))) - stats.VolumeServerVolumeCounter.WithLabelValues(shard.Collection, "ec_shards").Dec() + stats.VolumeServerVolumeGauge.WithLabelValues(shard.Collection, "ec_shards").Dec() } func (shard *EcVolumeShard) ReadAt(buf []byte, offset int64) (int, error) { diff --git a/weed/storage/volume.go b/weed/storage/volume.go index 232930b80..94d1f9a4e 100644 --- a/weed/storage/volume.go +++ b/weed/storage/volume.go @@ -246,7 +246,7 @@ func (v *Volume) doClose() { glog.Warningf("Volume Close fail to sync volume %d", v.Id) } v.DataBackend = nil - stats.VolumeServerVolumeCounter.WithLabelValues(v.Collection, "volume").Dec() + stats.VolumeServerVolumeGauge.WithLabelValues(v.Collection, "volume").Dec() } } diff --git a/weed/storage/volume_loading.go b/weed/storage/volume_loading.go index 78dfa6901..6e9b2f623 100644 --- a/weed/storage/volume_loading.go +++ b/weed/storage/volume_loading.go @@ -201,7 +201,7 @@ func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, needleMapKind } } - stats.VolumeServerVolumeCounter.WithLabelValues(v.Collection, "volume").Inc() + stats.VolumeServerVolumeGauge.WithLabelValues(v.Collection, "volume").Inc() if err == nil { hasLoadedVolume = true diff --git a/weed/storage/volume_vacuum.go b/weed/storage/volume_vacuum.go index b070fa901..c8098493d 100644 --- a/weed/storage/volume_vacuum.go +++ b/weed/storage/volume_vacuum.go @@ -124,7 +124,7 @@ func (v *Volume) CommitCompact() error { } } v.DataBackend = nil - stats.VolumeServerVolumeCounter.WithLabelValues(v.Collection, "volume").Dec() + stats.VolumeServerVolumeGauge.WithLabelValues(v.Collection, "volume").Dec() var e error if e = v.makeupDiff(v.FileName(".cpd"), v.FileName(".cpx"), v.FileName(".dat"), v.FileName(".idx")); e != nil { diff --git a/weed/topology/volume_growth.go b/weed/topology/volume_growth.go index cf5690cec..9885fc2d1 100644 --- a/weed/topology/volume_growth.go +++ b/weed/topology/volume_growth.go @@ -15,7 +15,6 @@ import ( "github.com/seaweedfs/seaweedfs/weed/storage/needle" "github.com/seaweedfs/seaweedfs/weed/storage/super_block" "github.com/seaweedfs/seaweedfs/weed/storage/types" - "github.com/seaweedfs/seaweedfs/weed/util" ) /* @@ -31,6 +30,24 @@ type VolumeGrowRequest struct { Count int } +type volumeGrowthStrategy struct { + Copy1Count int + Copy2Count int + Copy3Count int + CopyOtherCount int + Threshold float64 +} + +var ( + VolumeGrowStrategy = volumeGrowthStrategy{ + Copy1Count: 7, + Copy2Count: 6, + Copy3Count: 3, + CopyOtherCount: 1, + Threshold: 0.9, + } +) + type VolumeGrowOption struct { Collection string `json:"collection,omitempty"` ReplicaPlacement *super_block.ReplicaPlacement `json:"replication,omitempty"` @@ -52,11 +69,6 @@ func (o *VolumeGrowOption) String() string { return string(blob) } -func (o *VolumeGrowOption) Threshold() float64 { - v := util.GetViper() - return v.GetFloat64("master.volume_growth.threshold") -} - func NewDefaultVolumeGrowth() *VolumeGrowth { return &VolumeGrowth{} } @@ -64,16 +76,14 @@ func NewDefaultVolumeGrowth() *VolumeGrowth { // one replication type may need rp.GetCopyCount() actual volumes // given copyCount, how many logical volumes to create func (vg *VolumeGrowth) findVolumeCount(copyCount int) (count int) { - v := util.GetViper() switch copyCount { - case 1: - count = v.GetInt("master.volume_growth.copy_1") + case 1: count = VolumeGrowStrategy.Copy1Count case 2: - count = v.GetInt("master.volume_growth.copy_2") + count = VolumeGrowStrategy.Copy2Count case 3: - count = v.GetInt("master.volume_growth.copy_3") + count = VolumeGrowStrategy.Copy3Count default: - count = v.GetInt("master.volume_growth.copy_other") + count = VolumeGrowStrategy.CopyOtherCount } return } diff --git a/weed/topology/volume_growth_test.go b/weed/topology/volume_growth_test.go index 04c5e8aeb..1624ec32a 100644 --- a/weed/topology/volume_growth_test.go +++ b/weed/topology/volume_growth_test.go @@ -4,7 +4,6 @@ import ( "encoding/json" "fmt" "github.com/seaweedfs/seaweedfs/weed/storage/types" - "github.com/seaweedfs/seaweedfs/weed/util" "testing" "github.com/seaweedfs/seaweedfs/weed/sequence" @@ -420,8 +419,7 @@ func TestPickForWrite(t *testing.T) { Rack: "", DataNode: "", } - v := util.GetViper() - v.Set("master.volume_growth.threshold", 0.9) + VolumeGrowStrategy.Threshold = 0.9 for _, rpStr := range []string{"001", "010", "100"} { rp, _ := super_block.NewReplicaPlacementFromString(rpStr) vl := topo.GetVolumeLayout("test", rp, needle.EMPTY_TTL, types.HardDriveType) diff --git a/weed/topology/volume_layout.go b/weed/topology/volume_layout.go index d04552d35..4467b2dc8 100644 --- a/weed/topology/volume_layout.go +++ b/weed/topology/volume_layout.go @@ -303,7 +303,7 @@ func (vl *VolumeLayout) PickForWrite(count uint64, option *VolumeGrowOption) (vi // check whether picked file is close to full dn := locationList.Head() info, _ := dn.GetVolumesById(vid) - if float64(info.Size) > float64(vl.volumeSizeLimit)*option.Threshold() { + if float64(info.Size) > float64(vl.volumeSizeLimit)*VolumeGrowStrategy.Threshold { shouldGrow = true } return vid, count, locationList.Copy(), shouldGrow, nil @@ -334,7 +334,7 @@ func (vl *VolumeLayout) PickForWrite(count uint64, option *VolumeGrowOption) (vi vid, locationList = writableVolumeId, volumeLocationList.Copy() // check whether picked file is close to full info, _ := dn.GetVolumesById(writableVolumeId) - if float64(info.Size) > float64(vl.volumeSizeLimit)*option.Threshold() { + if float64(info.Size) > float64(vl.volumeSizeLimit)*VolumeGrowStrategy.Threshold { shouldGrow = true } counter = count @@ -381,7 +381,7 @@ func (vl *VolumeLayout) GetActiveVolumeCount(option *VolumeGrowOption) (total, a } active++ info, _ := dn.GetVolumesById(v) - if float64(info.Size) > float64(vl.volumeSizeLimit)*option.Threshold() { + if float64(info.Size) > float64(vl.volumeSizeLimit)* VolumeGrowStrategy.Threshold{ crowded++ } }