Browse Source

Merge branch 'master' into mq-subscribe

mq-subscribe
chrislu 8 months ago
parent
commit
20666bdf81
  1. 2
      weed/server/master_grpc_server_assign.go
  2. 2
      weed/server/master_grpc_server_volume.go
  3. 27
      weed/server/master_server.go
  4. 2
      weed/server/master_server_handlers.go
  5. 6
      weed/stats/metrics.go
  6. 4
      weed/storage/erasure_coding/ec_shard.go
  7. 2
      weed/storage/volume.go
  8. 2
      weed/storage/volume_loading.go
  9. 2
      weed/storage/volume_vacuum.go
  10. 34
      weed/topology/volume_growth.go
  11. 4
      weed/topology/volume_growth_test.go
  12. 6
      weed/topology/volume_layout.go

2
weed/server/master_grpc_server_assign.go

@ -85,7 +85,7 @@ func (ms *MasterServer) Assign(ctx context.Context, req *master_pb.AssignRequest
return nil, fmt.Errorf("no free volumes left for " + option.String()) return nil, fmt.Errorf("no free volumes left for " + option.String())
} }
vl.AddGrowRequest() vl.AddGrowRequest()
ms.vgCh <- &topology.VolumeGrowRequest{
ms.volumeGrowthRequestChan <- &topology.VolumeGrowRequest{
Option: option, Option: option,
Count: int(req.WritableVolumeCount), Count: int(req.WritableVolumeCount),
} }

2
weed/server/master_grpc_server_volume.go

@ -22,7 +22,7 @@ func (ms *MasterServer) ProcessGrowRequest() {
go func() { go func() {
filter := sync.Map{} filter := sync.Map{}
for { for {
req, ok := <-ms.vgCh
req, ok := <-ms.volumeGrowthRequestChan
if !ok { if !ok {
break break
} }

27
weed/server/master_server.go

@ -60,8 +60,8 @@ type MasterServer struct {
preallocateSize int64 preallocateSize int64
Topo *topology.Topology Topo *topology.Topology
vg *topology.VolumeGrowth
vgCh chan *topology.VolumeGrowRequest
vg *topology.VolumeGrowth
volumeGrowthRequestChan chan *topology.VolumeGrowRequest
boundedLeaderChan chan int boundedLeaderChan chan int
@ -97,6 +97,11 @@ func NewMasterServer(r *mux.Router, option *MasterOption, peers map[string]pb.Se
v.SetDefault("master.volume_growth.copy_3", 3) v.SetDefault("master.volume_growth.copy_3", 3)
v.SetDefault("master.volume_growth.copy_other", 1) v.SetDefault("master.volume_growth.copy_other", 1)
v.SetDefault("master.volume_growth.threshold", 0.9) v.SetDefault("master.volume_growth.threshold", 0.9)
topology.VolumeGrowStrategy.Copy1Count = v.GetInt("master.volume_growth.copy_1")
topology.VolumeGrowStrategy.Copy2Count = v.GetInt("master.volume_growth.copy_2")
topology.VolumeGrowStrategy.Copy3Count = v.GetInt("master.volume_growth.copy_3")
topology.VolumeGrowStrategy.CopyOtherCount = v.GetInt("master.volume_growth.copy_other")
topology.VolumeGrowStrategy.Threshold = v.GetFloat64("master.volume_growth.threshold")
var preallocateSize int64 var preallocateSize int64
if option.VolumePreallocate { if option.VolumePreallocate {
@ -105,14 +110,14 @@ func NewMasterServer(r *mux.Router, option *MasterOption, peers map[string]pb.Se
grpcDialOption := security.LoadClientTLS(v, "grpc.master") grpcDialOption := security.LoadClientTLS(v, "grpc.master")
ms := &MasterServer{ ms := &MasterServer{
option: option,
preallocateSize: preallocateSize,
vgCh: make(chan *topology.VolumeGrowRequest, 1<<6),
clientChans: make(map[string]chan *master_pb.KeepConnectedResponse),
grpcDialOption: grpcDialOption,
MasterClient: wdclient.NewMasterClient(grpcDialOption, "", cluster.MasterType, option.Master, "", "", *pb.NewServiceDiscoveryFromMap(peers)),
adminLocks: NewAdminLocks(),
Cluster: cluster.NewCluster(),
option: option,
preallocateSize: preallocateSize,
volumeGrowthRequestChan: make(chan *topology.VolumeGrowRequest, 1<<6),
clientChans: make(map[string]chan *master_pb.KeepConnectedResponse),
grpcDialOption: grpcDialOption,
MasterClient: wdclient.NewMasterClient(grpcDialOption, "", cluster.MasterType, option.Master, "", "", *pb.NewServiceDiscoveryFromMap(peers)),
adminLocks: NewAdminLocks(),
Cluster: cluster.NewCluster(),
} }
ms.boundedLeaderChan = make(chan int, 16) ms.boundedLeaderChan = make(chan int, 16)
@ -151,7 +156,7 @@ func NewMasterServer(r *mux.Router, option *MasterOption, peers map[string]pb.Se
ms.Topo.StartRefreshWritableVolumes( ms.Topo.StartRefreshWritableVolumes(
ms.grpcDialOption, ms.grpcDialOption,
ms.option.GarbageThreshold, ms.option.GarbageThreshold,
v.GetFloat64("master.volume_growth.threshold"),
topology.VolumeGrowStrategy.Threshold,
ms.preallocateSize, ms.preallocateSize,
) )

2
weed/server/master_server_handlers.go

@ -136,7 +136,7 @@ func (ms *MasterServer) dirAssignHandler(w http.ResponseWriter, r *http.Request)
return return
} }
vl.AddGrowRequest() vl.AddGrowRequest()
ms.vgCh <- &topology.VolumeGrowRequest{
ms.volumeGrowthRequestChan <- &topology.VolumeGrowRequest{
Option: option, Option: option,
Count: writableVolumeCount, Count: writableVolumeCount,
} }

6
weed/stats/metrics.go

@ -194,7 +194,7 @@ var (
Buckets: prometheus.ExponentialBuckets(0.0001, 2, 24), Buckets: prometheus.ExponentialBuckets(0.0001, 2, 24),
}, []string{"type"}) }, []string{"type"})
VolumeServerVolumeCounter = prometheus.NewGaugeVec(
VolumeServerVolumeGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{ prometheus.GaugeOpts{
Namespace: Namespace, Namespace: Namespace,
Subsystem: "volumeServer", Subsystem: "volumeServer",
@ -291,7 +291,7 @@ func init() {
Gather.MustRegister(VolumeServerVacuumingCompactCounter) Gather.MustRegister(VolumeServerVacuumingCompactCounter)
Gather.MustRegister(VolumeServerVacuumingCommitCounter) Gather.MustRegister(VolumeServerVacuumingCommitCounter)
Gather.MustRegister(VolumeServerVacuumingHistogram) Gather.MustRegister(VolumeServerVacuumingHistogram)
Gather.MustRegister(VolumeServerVolumeCounter)
Gather.MustRegister(VolumeServerVolumeGauge)
Gather.MustRegister(VolumeServerMaxVolumeCounter) Gather.MustRegister(VolumeServerMaxVolumeCounter)
Gather.MustRegister(VolumeServerReadOnlyVolumeGauge) Gather.MustRegister(VolumeServerReadOnlyVolumeGauge)
Gather.MustRegister(VolumeServerDiskSizeGauge) Gather.MustRegister(VolumeServerDiskSizeGauge)
@ -354,5 +354,5 @@ func DeleteCollectionMetrics(collection string) {
for _, volume_type := range readOnlyVolumeTypes { for _, volume_type := range readOnlyVolumeTypes {
VolumeServerReadOnlyVolumeGauge.DeleteLabelValues(collection, volume_type) VolumeServerReadOnlyVolumeGauge.DeleteLabelValues(collection, volume_type)
} }
VolumeServerVolumeCounter.DeleteLabelValues(collection, "volume")
VolumeServerVolumeGauge.DeleteLabelValues(collection, "volume")
} }

4
weed/storage/erasure_coding/ec_shard.go

@ -44,7 +44,7 @@ func NewEcVolumeShard(diskType types.DiskType, dirname string, collection string
} }
v.ecdFileSize = ecdFi.Size() v.ecdFileSize = ecdFi.Size()
stats.VolumeServerVolumeCounter.WithLabelValues(v.Collection, "ec_shards").Inc()
stats.VolumeServerVolumeGauge.WithLabelValues(v.Collection, "ec_shards").Inc()
return return
} }
@ -88,7 +88,7 @@ func (shard *EcVolumeShard) Close() {
func (shard *EcVolumeShard) Destroy() { func (shard *EcVolumeShard) Destroy() {
os.Remove(shard.FileName() + ToExt(int(shard.ShardId))) os.Remove(shard.FileName() + ToExt(int(shard.ShardId)))
stats.VolumeServerVolumeCounter.WithLabelValues(shard.Collection, "ec_shards").Dec()
stats.VolumeServerVolumeGauge.WithLabelValues(shard.Collection, "ec_shards").Dec()
} }
func (shard *EcVolumeShard) ReadAt(buf []byte, offset int64) (int, error) { func (shard *EcVolumeShard) ReadAt(buf []byte, offset int64) (int, error) {

2
weed/storage/volume.go

@ -246,7 +246,7 @@ func (v *Volume) doClose() {
glog.Warningf("Volume Close fail to sync volume %d", v.Id) glog.Warningf("Volume Close fail to sync volume %d", v.Id)
} }
v.DataBackend = nil v.DataBackend = nil
stats.VolumeServerVolumeCounter.WithLabelValues(v.Collection, "volume").Dec()
stats.VolumeServerVolumeGauge.WithLabelValues(v.Collection, "volume").Dec()
} }
} }

2
weed/storage/volume_loading.go

@ -201,7 +201,7 @@ func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, needleMapKind
} }
} }
stats.VolumeServerVolumeCounter.WithLabelValues(v.Collection, "volume").Inc()
stats.VolumeServerVolumeGauge.WithLabelValues(v.Collection, "volume").Inc()
if err == nil { if err == nil {
hasLoadedVolume = true hasLoadedVolume = true

2
weed/storage/volume_vacuum.go

@ -124,7 +124,7 @@ func (v *Volume) CommitCompact() error {
} }
} }
v.DataBackend = nil v.DataBackend = nil
stats.VolumeServerVolumeCounter.WithLabelValues(v.Collection, "volume").Dec()
stats.VolumeServerVolumeGauge.WithLabelValues(v.Collection, "volume").Dec()
var e error var e error
if e = v.makeupDiff(v.FileName(".cpd"), v.FileName(".cpx"), v.FileName(".dat"), v.FileName(".idx")); e != nil { if e = v.makeupDiff(v.FileName(".cpd"), v.FileName(".cpx"), v.FileName(".dat"), v.FileName(".idx")); e != nil {

34
weed/topology/volume_growth.go

@ -15,7 +15,6 @@ import (
"github.com/seaweedfs/seaweedfs/weed/storage/needle" "github.com/seaweedfs/seaweedfs/weed/storage/needle"
"github.com/seaweedfs/seaweedfs/weed/storage/super_block" "github.com/seaweedfs/seaweedfs/weed/storage/super_block"
"github.com/seaweedfs/seaweedfs/weed/storage/types" "github.com/seaweedfs/seaweedfs/weed/storage/types"
"github.com/seaweedfs/seaweedfs/weed/util"
) )
/* /*
@ -31,6 +30,24 @@ type VolumeGrowRequest struct {
Count int Count int
} }
type volumeGrowthStrategy struct {
Copy1Count int
Copy2Count int
Copy3Count int
CopyOtherCount int
Threshold float64
}
var (
VolumeGrowStrategy = volumeGrowthStrategy{
Copy1Count: 7,
Copy2Count: 6,
Copy3Count: 3,
CopyOtherCount: 1,
Threshold: 0.9,
}
)
type VolumeGrowOption struct { type VolumeGrowOption struct {
Collection string `json:"collection,omitempty"` Collection string `json:"collection,omitempty"`
ReplicaPlacement *super_block.ReplicaPlacement `json:"replication,omitempty"` ReplicaPlacement *super_block.ReplicaPlacement `json:"replication,omitempty"`
@ -52,11 +69,6 @@ func (o *VolumeGrowOption) String() string {
return string(blob) return string(blob)
} }
func (o *VolumeGrowOption) Threshold() float64 {
v := util.GetViper()
return v.GetFloat64("master.volume_growth.threshold")
}
func NewDefaultVolumeGrowth() *VolumeGrowth { func NewDefaultVolumeGrowth() *VolumeGrowth {
return &VolumeGrowth{} return &VolumeGrowth{}
} }
@ -64,16 +76,14 @@ func NewDefaultVolumeGrowth() *VolumeGrowth {
// one replication type may need rp.GetCopyCount() actual volumes // one replication type may need rp.GetCopyCount() actual volumes
// given copyCount, how many logical volumes to create // given copyCount, how many logical volumes to create
func (vg *VolumeGrowth) findVolumeCount(copyCount int) (count int) { func (vg *VolumeGrowth) findVolumeCount(copyCount int) (count int) {
v := util.GetViper()
switch copyCount { switch copyCount {
case 1:
count = v.GetInt("master.volume_growth.copy_1")
case 1: count = VolumeGrowStrategy.Copy1Count
case 2: case 2:
count = v.GetInt("master.volume_growth.copy_2")
count = VolumeGrowStrategy.Copy2Count
case 3: case 3:
count = v.GetInt("master.volume_growth.copy_3")
count = VolumeGrowStrategy.Copy3Count
default: default:
count = v.GetInt("master.volume_growth.copy_other")
count = VolumeGrowStrategy.CopyOtherCount
} }
return return
} }

4
weed/topology/volume_growth_test.go

@ -4,7 +4,6 @@ import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"github.com/seaweedfs/seaweedfs/weed/storage/types" "github.com/seaweedfs/seaweedfs/weed/storage/types"
"github.com/seaweedfs/seaweedfs/weed/util"
"testing" "testing"
"github.com/seaweedfs/seaweedfs/weed/sequence" "github.com/seaweedfs/seaweedfs/weed/sequence"
@ -420,8 +419,7 @@ func TestPickForWrite(t *testing.T) {
Rack: "", Rack: "",
DataNode: "", DataNode: "",
} }
v := util.GetViper()
v.Set("master.volume_growth.threshold", 0.9)
VolumeGrowStrategy.Threshold = 0.9
for _, rpStr := range []string{"001", "010", "100"} { for _, rpStr := range []string{"001", "010", "100"} {
rp, _ := super_block.NewReplicaPlacementFromString(rpStr) rp, _ := super_block.NewReplicaPlacementFromString(rpStr)
vl := topo.GetVolumeLayout("test", rp, needle.EMPTY_TTL, types.HardDriveType) vl := topo.GetVolumeLayout("test", rp, needle.EMPTY_TTL, types.HardDriveType)

6
weed/topology/volume_layout.go

@ -303,7 +303,7 @@ func (vl *VolumeLayout) PickForWrite(count uint64, option *VolumeGrowOption) (vi
// check whether picked file is close to full // check whether picked file is close to full
dn := locationList.Head() dn := locationList.Head()
info, _ := dn.GetVolumesById(vid) info, _ := dn.GetVolumesById(vid)
if float64(info.Size) > float64(vl.volumeSizeLimit)*option.Threshold() {
if float64(info.Size) > float64(vl.volumeSizeLimit)*VolumeGrowStrategy.Threshold {
shouldGrow = true shouldGrow = true
} }
return vid, count, locationList.Copy(), shouldGrow, nil return vid, count, locationList.Copy(), shouldGrow, nil
@ -334,7 +334,7 @@ func (vl *VolumeLayout) PickForWrite(count uint64, option *VolumeGrowOption) (vi
vid, locationList = writableVolumeId, volumeLocationList.Copy() vid, locationList = writableVolumeId, volumeLocationList.Copy()
// check whether picked file is close to full // check whether picked file is close to full
info, _ := dn.GetVolumesById(writableVolumeId) info, _ := dn.GetVolumesById(writableVolumeId)
if float64(info.Size) > float64(vl.volumeSizeLimit)*option.Threshold() {
if float64(info.Size) > float64(vl.volumeSizeLimit)*VolumeGrowStrategy.Threshold {
shouldGrow = true shouldGrow = true
} }
counter = count counter = count
@ -381,7 +381,7 @@ func (vl *VolumeLayout) GetActiveVolumeCount(option *VolumeGrowOption) (total, a
} }
active++ active++
info, _ := dn.GetVolumesById(v) info, _ := dn.GetVolumesById(v)
if float64(info.Size) > float64(vl.volumeSizeLimit)*option.Threshold() {
if float64(info.Size) > float64(vl.volumeSizeLimit)* VolumeGrowStrategy.Threshold{
crowded++ crowded++
} }
} }

Loading…
Cancel
Save