Browse Source

[master] do sync grow request only if absolutely necessary (#5821)

* do sync grow request only if absolutely necessary
https://github.com/seaweedfs/seaweedfs/pull/5819

* remove check VolumeGrowStrategy Threshold on PickForWrite

* fix fmt.Errorf
pull/5841/head
Konstantin Lebedev 6 months ago
committed by GitHub
parent
commit
b2ffcdaab2
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
  1. 8
      weed/server/master_grpc_server_assign.go
  2. 8
      weed/server/master_server_handlers.go
  3. 8
      weed/stats/metrics.go
  4. 4
      weed/topology/topology.go
  5. 36
      weed/topology/volume_layout.go

8
weed/server/master_grpc_server_assign.go

@ -4,6 +4,7 @@ import (
"context" "context"
"fmt" "fmt"
"github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/stats"
"time" "time"
"github.com/seaweedfs/raft" "github.com/seaweedfs/raft"
@ -85,9 +86,8 @@ func (ms *MasterServer) Assign(ctx context.Context, req *master_pb.AssignRequest
for time.Now().Sub(startTime) < maxTimeout { for time.Now().Sub(startTime) < maxTimeout {
fid, count, dnList, shouldGrow, err := ms.Topo.PickForWrite(req.Count, option, vl) fid, count, dnList, shouldGrow, err := ms.Topo.PickForWrite(req.Count, option, vl)
if shouldGrow && !vl.HasGrowRequest() { if shouldGrow && !vl.HasGrowRequest() {
// if picked volume is almost full, trigger a volume-grow request
if ms.Topo.AvailableSpaceFor(option) <= 0 {
return nil, fmt.Errorf("no free volumes left for " + option.String())
if err != nil && ms.Topo.AvailableSpaceFor(option) <= 0 {
err = fmt.Errorf("%s and no free volumes left for %s", err.Error(), option.String())
} }
vl.AddGrowRequest() vl.AddGrowRequest()
ms.volumeGrowthRequestChan <- &topology.VolumeGrowRequest{ ms.volumeGrowthRequestChan <- &topology.VolumeGrowRequest{
@ -96,7 +96,7 @@ func (ms *MasterServer) Assign(ctx context.Context, req *master_pb.AssignRequest
} }
} }
if err != nil { if err != nil {
// glog.Warningf("PickForWrite %+v: %v", req, err)
stats.MasterPickForWriteErrorCounter.Inc()
lastErr = err lastErr = err
time.Sleep(200 * time.Millisecond) time.Sleep(200 * time.Millisecond)
continue continue

8
weed/server/master_server_handlers.go

@ -143,11 +143,9 @@ func (ms *MasterServer) dirAssignHandler(w http.ResponseWriter, r *http.Request)
for time.Now().Sub(startTime) < maxTimeout { for time.Now().Sub(startTime) < maxTimeout {
fid, count, dnList, shouldGrow, err := ms.Topo.PickForWrite(requestedCount, option, vl) fid, count, dnList, shouldGrow, err := ms.Topo.PickForWrite(requestedCount, option, vl)
if shouldGrow && !vl.HasGrowRequest() { if shouldGrow && !vl.HasGrowRequest() {
// if picked volume is almost full, trigger a volume-grow request
glog.V(0).Infof("dirAssign volume growth %v from %v", option.String(), r.RemoteAddr) glog.V(0).Infof("dirAssign volume growth %v from %v", option.String(), r.RemoteAddr)
if ms.Topo.AvailableSpaceFor(option) <= 0 {
writeJsonQuiet(w, r, http.StatusNotFound, operation.AssignResult{Error: "No free volumes left for " + option.String()})
return
if err != nil && ms.Topo.AvailableSpaceFor(option) <= 0 {
err = fmt.Errorf("%s and no free volumes left for %s", err.Error(), option.String())
} }
vl.AddGrowRequest() vl.AddGrowRequest()
ms.volumeGrowthRequestChan <- &topology.VolumeGrowRequest{ ms.volumeGrowthRequestChan <- &topology.VolumeGrowRequest{
@ -156,7 +154,7 @@ func (ms *MasterServer) dirAssignHandler(w http.ResponseWriter, r *http.Request)
} }
} }
if err != nil { if err != nil {
// glog.Warningf("PickForWrite %+v: %v", req, err)
stats.MasterPickForWriteErrorCounter.Inc()
lastErr = err lastErr = err
time.Sleep(200 * time.Millisecond) time.Sleep(200 * time.Millisecond)
continue continue

8
weed/stats/metrics.go

@ -78,6 +78,14 @@ var (
Help: "Number of volumes in volume layouts", Help: "Number of volumes in volume layouts",
}, []string{"collection", "dataCenter", "type"}) }, []string{"collection", "dataCenter", "type"})
MasterPickForWriteErrorCounter = prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: Namespace,
Subsystem: "master",
Name: "pick_for_write_error",
Help: "Counter of master pick for write error",
})
MasterLeaderChangeCounter = prometheus.NewCounterVec( MasterLeaderChangeCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{ prometheus.CounterOpts{
Namespace: Namespace, Namespace: Namespace,

4
weed/topology/topology.go

@ -251,8 +251,8 @@ func (t *Topology) PickForWrite(requestedCount uint64, option *VolumeGrowOption,
if err != nil { if err != nil {
return "", 0, nil, shouldGrow, fmt.Errorf("failed to find writable volumes for collection:%s replication:%s ttl:%s error: %v", option.Collection, option.ReplicaPlacement.String(), option.Ttl.String(), err) return "", 0, nil, shouldGrow, fmt.Errorf("failed to find writable volumes for collection:%s replication:%s ttl:%s error: %v", option.Collection, option.ReplicaPlacement.String(), option.Ttl.String(), err)
} }
if volumeLocationList.Length() == 0 {
return "", 0, nil, shouldGrow, fmt.Errorf("no writable volumes available for collection:%s replication:%s ttl:%s", option.Collection, option.ReplicaPlacement.String(), option.Ttl.String())
if volumeLocationList == nil || volumeLocationList.Length() == 0 {
return "", 0, nil, shouldGrow, fmt.Errorf("%s available for collection:%s replication:%s ttl:%s", noWritableVolumes, option.Collection, option.ReplicaPlacement.String(), option.Ttl.String())
} }
nextFileId := t.Sequence.NextFileId(requestedCount) nextFileId := t.Sequence.NextFileId(requestedCount)
fileId = needle.NewFileId(vid, nextFileId, rand.Uint32()).String() fileId = needle.NewFileId(vid, nextFileId, rand.Uint32()).String()

36
weed/topology/volume_layout.go

@ -1,7 +1,6 @@
package topology package topology
import ( import (
"errors"
"fmt" "fmt"
"github.com/seaweedfs/seaweedfs/weed/stats" "github.com/seaweedfs/seaweedfs/weed/stats"
"math/rand" "math/rand"
@ -28,9 +27,10 @@ const (
type volumeState string type volumeState string
const ( const (
readOnlyState volumeState = "ReadOnly"
oversizedState = "Oversized"
crowdedState = "Crowded"
readOnlyState volumeState = "ReadOnly"
oversizedState = "Oversized"
crowdedState = "Crowded"
noWritableVolumes = "No writable volumes"
) )
type stateIndicator func(copyState) bool type stateIndicator func(copyState) bool
@ -108,7 +108,7 @@ func (v *volumesBinaryState) copyState(list *VolumeLocationList) copyState {
type VolumeLayout struct { type VolumeLayout struct {
growRequest atomic.Bool growRequest atomic.Bool
lastGrowCount atomic.Uint32 lastGrowCount atomic.Uint32
rp *super_block.ReplicaPlacement
rp *super_block.ReplicaPlacement
ttl *needle.TTL ttl *needle.TTL
diskType types.DiskType diskType types.DiskType
vid2location map[needle.VolumeId]*VolumeLocationList vid2location map[needle.VolumeId]*VolumeLocationList
@ -293,23 +293,15 @@ func (vl *VolumeLayout) PickForWrite(count uint64, option *VolumeGrowOption) (vi
lenWriters := len(vl.writables) lenWriters := len(vl.writables)
if lenWriters <= 0 { if lenWriters <= 0 {
//glog.V(0).Infoln("No more writable volumes!")
shouldGrow = true
return 0, 0, nil, shouldGrow, errors.New("No more writable volumes!")
return 0, 0, nil, true, fmt.Errorf("%s in volume layout", noWritableVolumes)
} }
if option.DataCenter == "" && option.Rack == "" && option.DataNode == "" { if option.DataCenter == "" && option.Rack == "" && option.DataNode == "" {
vid := vl.writables[rand.Intn(lenWriters)] vid := vl.writables[rand.Intn(lenWriters)]
locationList = vl.vid2location[vid] locationList = vl.vid2location[vid]
if locationList != nil && locationList.Length() > 0 {
// check whether picked file is close to full
dn := locationList.Head()
info, _ := dn.GetVolumesById(vid)
if float64(info.Size) > float64(vl.volumeSizeLimit)*VolumeGrowStrategy.Threshold {
shouldGrow = true
}
return vid, count, locationList.Copy(), shouldGrow, nil
if locationList == nil || len(locationList.list) == 0 {
return 0, 0, nil, false, fmt.Errorf("Strangely vid %s is on no machine!", vid.String())
} }
return 0, 0, nil, shouldGrow, errors.New("Strangely vid " + vid.String() + " is on no machine!")
return vid, count, locationList.Copy(), false, nil
} }
// clone vl.writables // clone vl.writables
@ -332,17 +324,11 @@ func (vl *VolumeLayout) PickForWrite(count uint64, option *VolumeGrowOption) (vi
if option.DataNode != "" && dn.Id() != NodeId(option.DataNode) { if option.DataNode != "" && dn.Id() != NodeId(option.DataNode) {
continue continue
} }
vid, locationList = writableVolumeId, volumeLocationList.Copy()
// check whether picked file is close to full
info, _ := dn.GetVolumesById(writableVolumeId)
if float64(info.Size) > float64(vl.volumeSizeLimit)*VolumeGrowStrategy.Threshold {
shouldGrow = true
}
counter = count
vid, locationList, counter = writableVolumeId, volumeLocationList.Copy(), count
return return
} }
} }
return vid, count, locationList, true, fmt.Errorf("No writable volumes in DataCenter:%v Rack:%v DataNode:%v", option.DataCenter, option.Rack, option.DataNode)
return vid, count, locationList, true, fmt.Errorf("%s in DataCenter:%v Rack:%v DataNode:%v", noWritableVolumes, option.DataCenter, option.Rack, option.DataNode)
} }
func (vl *VolumeLayout) HasGrowRequest() bool { func (vl *VolumeLayout) HasGrowRequest() bool {

Loading…
Cancel
Save