Browse Source

volume: support concurrent download data size limit

pull/2252/head
Chris Lu 4 years ago
parent
commit
734c980040
  1. 1
      weed/command/server.go
  2. 5
      weed/command/volume.go
  3. 4
      weed/server/volume_grpc_admin.go
  4. 2
      weed/server/volume_grpc_batch_delete.go
  5. 2
      weed/server/volume_grpc_query.go
  6. 6
      weed/server/volume_server.go
  7. 7
      weed/server/volume_server_handlers.go
  8. 17
      weed/server/volume_server_handlers_read.go
  9. 2
      weed/server/volume_server_handlers_write.go
  10. 4
      weed/storage/store.go
  11. 6
      weed/storage/store_ec.go
  12. 2
      weed/storage/store_ec_delete.go
  13. 5
      weed/storage/volume_read.go
  14. 2
      weed/storage/volume_vacuum_test.go

1
weed/command/server.go

@ -116,6 +116,7 @@ func init() {
serverOptions.v.compactionMBPerSecond = cmdServer.Flag.Int("volume.compactionMBps", 0, "limit compaction speed in mega bytes per second") serverOptions.v.compactionMBPerSecond = cmdServer.Flag.Int("volume.compactionMBps", 0, "limit compaction speed in mega bytes per second")
serverOptions.v.fileSizeLimitMB = cmdServer.Flag.Int("volume.fileSizeLimitMB", 256, "limit file size to avoid out of memory") serverOptions.v.fileSizeLimitMB = cmdServer.Flag.Int("volume.fileSizeLimitMB", 256, "limit file size to avoid out of memory")
serverOptions.v.concurrentUploadLimitMB = cmdServer.Flag.Int("volume.concurrentUploadLimitMB", 64, "limit total concurrent upload size") serverOptions.v.concurrentUploadLimitMB = cmdServer.Flag.Int("volume.concurrentUploadLimitMB", 64, "limit total concurrent upload size")
serverOptions.v.concurrentDownloadLimitMB = cmdServer.Flag.Int("volume.concurrentDownloadLimitMB", 64, "limit total concurrent download size")
serverOptions.v.publicUrl = cmdServer.Flag.String("volume.publicUrl", "", "publicly accessible address") serverOptions.v.publicUrl = cmdServer.Flag.String("volume.publicUrl", "", "publicly accessible address")
serverOptions.v.preStopSeconds = cmdServer.Flag.Int("volume.preStopSeconds", 10, "number of seconds between stop send heartbeats and stop volume server") serverOptions.v.preStopSeconds = cmdServer.Flag.Int("volume.preStopSeconds", 10, "number of seconds between stop send heartbeats and stop volume server")
serverOptions.v.pprof = cmdServer.Flag.Bool("volume.pprof", false, "enable pprof http handlers. precludes --memprofile and --cpuprofile") serverOptions.v.pprof = cmdServer.Flag.Bool("volume.pprof", false, "enable pprof http handlers. precludes --memprofile and --cpuprofile")

5
weed/command/volume.go

@ -57,6 +57,7 @@ type VolumeServerOptions struct {
compactionMBPerSecond *int compactionMBPerSecond *int
fileSizeLimitMB *int fileSizeLimitMB *int
concurrentUploadLimitMB *int concurrentUploadLimitMB *int
concurrentDownloadLimitMB *int
pprof *bool pprof *bool
preStopSeconds *int preStopSeconds *int
metricsHttpPort *int metricsHttpPort *int
@ -85,7 +86,8 @@ func init() {
v.memProfile = cmdVolume.Flag.String("memprofile", "", "memory profile output file") v.memProfile = cmdVolume.Flag.String("memprofile", "", "memory profile output file")
v.compactionMBPerSecond = cmdVolume.Flag.Int("compactionMBps", 0, "limit background compaction or copying speed in mega bytes per second") v.compactionMBPerSecond = cmdVolume.Flag.Int("compactionMBps", 0, "limit background compaction or copying speed in mega bytes per second")
v.fileSizeLimitMB = cmdVolume.Flag.Int("fileSizeLimitMB", 256, "limit file size to avoid out of memory") v.fileSizeLimitMB = cmdVolume.Flag.Int("fileSizeLimitMB", 256, "limit file size to avoid out of memory")
v.concurrentUploadLimitMB = cmdVolume.Flag.Int("concurrentUploadLimitMB", 128, "limit total concurrent upload size")
v.concurrentUploadLimitMB = cmdVolume.Flag.Int("concurrentUploadLimitMB", 256, "limit total concurrent upload size")
v.concurrentDownloadLimitMB = cmdVolume.Flag.Int("concurrentDownloadLimitMB", 256, "limit total concurrent download size")
v.pprof = cmdVolume.Flag.Bool("pprof", false, "enable pprof http handlers. precludes --memprofile and --cpuprofile") v.pprof = cmdVolume.Flag.Bool("pprof", false, "enable pprof http handlers. precludes --memprofile and --cpuprofile")
v.metricsHttpPort = cmdVolume.Flag.Int("metricsPort", 0, "Prometheus metrics listen port") v.metricsHttpPort = cmdVolume.Flag.Int("metricsPort", 0, "Prometheus metrics listen port")
v.idxFolder = cmdVolume.Flag.String("dir.idx", "", "directory to store .idx files") v.idxFolder = cmdVolume.Flag.String("dir.idx", "", "directory to store .idx files")
@ -232,6 +234,7 @@ func (v VolumeServerOptions) startVolumeServer(volumeFolders, maxVolumeCounts, v
*v.compactionMBPerSecond, *v.compactionMBPerSecond,
*v.fileSizeLimitMB, *v.fileSizeLimitMB,
int64(*v.concurrentUploadLimitMB)*1024*1024, int64(*v.concurrentUploadLimitMB)*1024*1024,
int64(*v.concurrentDownloadLimitMB)*1024*1024,
) )
// starting grpc server // starting grpc server
grpcS := v.startGrpcService(volumeServer) grpcS := v.startGrpcService(volumeServer)

4
weed/server/volume_grpc_admin.go

@ -225,9 +225,9 @@ func (vs *VolumeServer) VolumeNeedleStatus(ctx context.Context, req *volume_serv
if !hasEcVolume { if !hasEcVolume {
return nil, fmt.Errorf("volume not found %d", req.VolumeId) return nil, fmt.Errorf("volume not found %d", req.VolumeId)
} }
count, err = vs.store.ReadEcShardNeedle(volumeId, n)
count, err = vs.store.ReadEcShardNeedle(volumeId, n, nil)
} else { } else {
count, err = vs.store.ReadVolumeNeedle(volumeId, n, nil)
count, err = vs.store.ReadVolumeNeedle(volumeId, n, nil, nil)
} }
if err != nil { if err != nil {
return nil, err return nil, err

2
weed/server/volume_grpc_batch_delete.go

@ -40,7 +40,7 @@ func (vs *VolumeServer) BatchDelete(ctx context.Context, req *volume_server_pb.B
} else { } else {
n.ParsePath(id_cookie) n.ParsePath(id_cookie)
cookie := n.Cookie cookie := n.Cookie
if _, err := vs.store.ReadVolumeNeedle(volumeId, n, nil); err != nil {
if _, err := vs.store.ReadVolumeNeedle(volumeId, n, nil, nil); err != nil {
resp.Results = append(resp.Results, &volume_server_pb.DeleteResult{ resp.Results = append(resp.Results, &volume_server_pb.DeleteResult{
FileId: fid, FileId: fid,
Status: http.StatusNotFound, Status: http.StatusNotFound,

2
weed/server/volume_grpc_query.go

@ -24,7 +24,7 @@ func (vs *VolumeServer) Query(req *volume_server_pb.QueryRequest, stream volume_
n.ParsePath(id_cookie) n.ParsePath(id_cookie)
cookie := n.Cookie cookie := n.Cookie
if _, err := vs.store.ReadVolumeNeedle(volumeId, n, nil); err != nil {
if _, err := vs.store.ReadVolumeNeedle(volumeId, n, nil, nil); err != nil {
glog.V(0).Infof("volume query failed to read fid %s: %v", fid, err) glog.V(0).Infof("volume query failed to read fid %s: %v", fid, err)
return err return err
} }

6
weed/server/volume_server.go

@ -18,8 +18,11 @@ import (
type VolumeServer struct { type VolumeServer struct {
inFlightUploadDataSize int64 inFlightUploadDataSize int64
inFlightDownloadDataSize int64
concurrentUploadLimit int64 concurrentUploadLimit int64
concurrentDownloadLimit int64
inFlightUploadDataLimitCond *sync.Cond inFlightUploadDataLimitCond *sync.Cond
inFlightDownloadDataLimitCond *sync.Cond
SeedMasterNodes []string SeedMasterNodes []string
currentMaster string currentMaster string
@ -54,6 +57,7 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string,
compactionMBPerSecond int, compactionMBPerSecond int,
fileSizeLimitMB int, fileSizeLimitMB int,
concurrentUploadLimit int64, concurrentUploadLimit int64,
concurrentDownloadLimit int64,
) *VolumeServer { ) *VolumeServer {
v := util.GetViper() v := util.GetViper()
@ -79,7 +83,9 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string,
isHeartbeating: true, isHeartbeating: true,
stopChan: make(chan bool), stopChan: make(chan bool),
inFlightUploadDataLimitCond: sync.NewCond(new(sync.Mutex)), inFlightUploadDataLimitCond: sync.NewCond(new(sync.Mutex)),
inFlightDownloadDataLimitCond: sync.NewCond(new(sync.Mutex)),
concurrentUploadLimit: concurrentUploadLimit, concurrentUploadLimit: concurrentUploadLimit,
concurrentDownloadLimit: concurrentDownloadLimit,
} }
vs.SeedMasterNodes = masterNodes vs.SeedMasterNodes = masterNodes

7
weed/server/volume_server_handlers.go

@ -37,6 +37,11 @@ func (vs *VolumeServer) privateStoreHandler(w http.ResponseWriter, r *http.Reque
switch r.Method { switch r.Method {
case "GET", "HEAD": case "GET", "HEAD":
stats.ReadRequest() stats.ReadRequest()
vs.inFlightDownloadDataLimitCond.L.Lock()
for vs.concurrentDownloadLimit != 0 && atomic.LoadInt64(&vs.inFlightDownloadDataSize) > vs.concurrentDownloadLimit {
glog.V(4).Infof("wait because inflight download data %d > %d", vs.inFlightDownloadDataSize, vs.concurrentDownloadLimit)
vs.inFlightDownloadDataLimitCond.Wait()
}
vs.GetOrHeadHandler(w, r) vs.GetOrHeadHandler(w, r)
case "DELETE": case "DELETE":
stats.DeleteRequest() stats.DeleteRequest()
@ -47,7 +52,7 @@ func (vs *VolumeServer) privateStoreHandler(w http.ResponseWriter, r *http.Reque
contentLength := getContentLength(r) contentLength := getContentLength(r)
vs.inFlightUploadDataLimitCond.L.Lock() vs.inFlightUploadDataLimitCond.L.Lock()
for vs.concurrentUploadLimit != 0 && atomic.LoadInt64(&vs.inFlightUploadDataSize) > vs.concurrentUploadLimit { for vs.concurrentUploadLimit != 0 && atomic.LoadInt64(&vs.inFlightUploadDataSize) > vs.concurrentUploadLimit {
glog.V(4).Infof("wait because inflight data %d > %d", vs.inFlightUploadDataSize, vs.concurrentUploadLimit)
glog.V(4).Infof("wait because inflight upload data %d > %d", vs.inFlightUploadDataSize, vs.concurrentUploadLimit)
vs.inFlightUploadDataLimitCond.Wait() vs.inFlightUploadDataLimitCond.Wait()
} }
atomic.AddInt64(&vs.inFlightUploadDataSize, contentLength) atomic.AddInt64(&vs.inFlightUploadDataSize, contentLength)

17
weed/server/volume_server_handlers_read.go

@ -5,6 +5,7 @@ import (
"encoding/json" "encoding/json"
"errors" "errors"
"fmt" "fmt"
"github.com/chrislusf/seaweedfs/weed/storage/types"
"io" "io"
"mime" "mime"
"net/http" "net/http"
@ -12,6 +13,7 @@ import (
"path/filepath" "path/filepath"
"strconv" "strconv"
"strings" "strings"
"sync/atomic"
"time" "time"
"github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/glog"
@ -123,11 +125,22 @@ func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request)
} }
var count int var count int
var needleSize types.Size
onReadSizeFn := func(size types.Size) {
needleSize = size
atomic.AddInt64(&vs.inFlightDownloadDataSize, int64(needleSize))
vs.inFlightDownloadDataLimitCond.L.Unlock()
}
if hasVolume { if hasVolume {
count, err = vs.store.ReadVolumeNeedle(volumeId, n, readOption)
count, err = vs.store.ReadVolumeNeedle(volumeId, n, readOption, onReadSizeFn)
} else if hasEcVolume { } else if hasEcVolume {
count, err = vs.store.ReadEcShardNeedle(volumeId, n)
count, err = vs.store.ReadEcShardNeedle(volumeId, n, onReadSizeFn)
} }
defer func() {
atomic.AddInt64(&vs.inFlightDownloadDataSize, -int64(needleSize))
vs.inFlightDownloadDataLimitCond.Signal()
}()
if err != nil && err != storage.ErrorDeleted && r.FormValue("type") != "replicate" && hasVolume { if err != nil && err != storage.ErrorDeleted && r.FormValue("type") != "replicate" && hasVolume {
glog.V(4).Infof("read needle: %v", err) glog.V(4).Infof("read needle: %v", err)
// start to fix it from other replicas, if not deleted and hasVolume and is not a replicated request // start to fix it from other replicas, if not deleted and hasVolume and is not a replicated request

2
weed/server/volume_server_handlers_write.go

@ -108,7 +108,7 @@ func (vs *VolumeServer) DeleteHandler(w http.ResponseWriter, r *http.Request) {
return return
} }
_, ok := vs.store.ReadVolumeNeedle(volumeId, n, nil)
_, ok := vs.store.ReadVolumeNeedle(volumeId, n, nil, nil)
if ok != nil { if ok != nil {
m := make(map[string]uint32) m := make(map[string]uint32)
m["size"] = 0 m["size"] = 0

4
weed/storage/store.go

@ -356,9 +356,9 @@ func (s *Store) DeleteVolumeNeedle(i needle.VolumeId, n *needle.Needle) (Size, e
return 0, fmt.Errorf("volume %d not found on %s:%d", i, s.Ip, s.Port) return 0, fmt.Errorf("volume %d not found on %s:%d", i, s.Ip, s.Port)
} }
func (s *Store) ReadVolumeNeedle(i needle.VolumeId, n *needle.Needle, readOption *ReadOption) (int, error) {
func (s *Store) ReadVolumeNeedle(i needle.VolumeId, n *needle.Needle, readOption *ReadOption, onReadSizeFn func(size Size)) (int, error) {
if v := s.findVolume(i); v != nil { if v := s.findVolume(i); v != nil {
return v.readNeedle(n, readOption)
return v.readNeedle(n, readOption, onReadSizeFn)
} }
return 0, fmt.Errorf("volume %d not found", i) return 0, fmt.Errorf("volume %d not found", i)
} }

6
weed/storage/store_ec.go

@ -121,7 +121,7 @@ func (s *Store) DestroyEcVolume(vid needle.VolumeId) {
} }
} }
func (s *Store) ReadEcShardNeedle(vid needle.VolumeId, n *needle.Needle) (int, error) {
func (s *Store) ReadEcShardNeedle(vid needle.VolumeId, n *needle.Needle, onReadSizeFn func(size types.Size)) (int, error) {
for _, location := range s.Locations { for _, location := range s.Locations {
if localEcVolume, found := location.FindEcVolume(vid); found { if localEcVolume, found := location.FindEcVolume(vid); found {
@ -133,6 +133,10 @@ func (s *Store) ReadEcShardNeedle(vid needle.VolumeId, n *needle.Needle) (int, e
return 0, ErrorDeleted return 0, ErrorDeleted
} }
if onReadSizeFn != nil {
onReadSizeFn(size)
}
glog.V(3).Infof("read ec volume %d offset %d size %d intervals:%+v", vid, offset.ToActualOffset(), size, intervals) glog.V(3).Infof("read ec volume %d offset %d size %d intervals:%+v", vid, offset.ToActualOffset(), size, intervals)
if len(intervals) > 1 { if len(intervals) > 1 {

2
weed/storage/store_ec_delete.go

@ -14,7 +14,7 @@ import (
func (s *Store) DeleteEcShardNeedle(ecVolume *erasure_coding.EcVolume, n *needle.Needle, cookie types.Cookie) (int64, error) { func (s *Store) DeleteEcShardNeedle(ecVolume *erasure_coding.EcVolume, n *needle.Needle, cookie types.Cookie) (int64, error) {
count, err := s.ReadEcShardNeedle(ecVolume.VolumeId, n)
count, err := s.ReadEcShardNeedle(ecVolume.VolumeId, n, nil)
if err != nil { if err != nil {
return 0, err return 0, err

5
weed/storage/volume_read.go

@ -13,7 +13,7 @@ import (
) )
// read fills in Needle content by looking up n.Id from NeedleMapper // read fills in Needle content by looking up n.Id from NeedleMapper
func (v *Volume) readNeedle(n *needle.Needle, readOption *ReadOption) (int, error) {
func (v *Volume) readNeedle(n *needle.Needle, readOption *ReadOption, onReadSizeFn func(size Size)) (int, error) {
v.dataFileAccessLock.RLock() v.dataFileAccessLock.RLock()
defer v.dataFileAccessLock.RUnlock() defer v.dataFileAccessLock.RUnlock()
@ -33,6 +33,9 @@ func (v *Volume) readNeedle(n *needle.Needle, readOption *ReadOption) (int, erro
if readSize == 0 { if readSize == 0 {
return 0, nil return 0, nil
} }
if onReadSizeFn != nil {
onReadSizeFn(readSize)
}
err := n.ReadData(v.DataBackend, nv.Offset.ToActualOffset(), readSize, v.Version()) err := n.ReadData(v.DataBackend, nv.Offset.ToActualOffset(), readSize, v.Version())
if err == needle.ErrorSizeMismatch && OffsetSize == 4 { if err == needle.ErrorSizeMismatch && OffsetSize == 4 {
err = n.ReadData(v.DataBackend, nv.Offset.ToActualOffset()+int64(MaxPossibleVolumeSize), readSize, v.Version()) err = n.ReadData(v.DataBackend, nv.Offset.ToActualOffset()+int64(MaxPossibleVolumeSize), readSize, v.Version())

2
weed/storage/volume_vacuum_test.go

@ -113,7 +113,7 @@ func TestCompaction(t *testing.T) {
} }
n := newEmptyNeedle(uint64(i)) n := newEmptyNeedle(uint64(i))
size, err := v.readNeedle(n, nil)
size, err := v.readNeedle(n, nil, nil)
if err != nil { if err != nil {
t.Fatalf("read file %d: %v", i, err) t.Fatalf("read file %d: %v", i, err)
} }

Loading…
Cancel
Save