diff --git a/weed/command/server.go b/weed/command/server.go index 9bac2be97..fe10b24f7 100644 --- a/weed/command/server.go +++ b/weed/command/server.go @@ -116,6 +116,7 @@ func init() { serverOptions.v.compactionMBPerSecond = cmdServer.Flag.Int("volume.compactionMBps", 0, "limit compaction speed in mega bytes per second") serverOptions.v.fileSizeLimitMB = cmdServer.Flag.Int("volume.fileSizeLimitMB", 256, "limit file size to avoid out of memory") serverOptions.v.concurrentUploadLimitMB = cmdServer.Flag.Int("volume.concurrentUploadLimitMB", 64, "limit total concurrent upload size") + serverOptions.v.concurrentDownloadLimitMB = cmdServer.Flag.Int("volume.concurrentDownloadLimitMB", 64, "limit total concurrent download size") serverOptions.v.publicUrl = cmdServer.Flag.String("volume.publicUrl", "", "publicly accessible address") serverOptions.v.preStopSeconds = cmdServer.Flag.Int("volume.preStopSeconds", 10, "number of seconds between stop send heartbeats and stop volume server") serverOptions.v.pprof = cmdServer.Flag.Bool("volume.pprof", false, "enable pprof http handlers. precludes --memprofile and --cpuprofile") diff --git a/weed/command/volume.go b/weed/command/volume.go index 712fa0dce..235eff11b 100644 --- a/weed/command/volume.go +++ b/weed/command/volume.go @@ -35,31 +35,32 @@ var ( ) type VolumeServerOptions struct { - port *int - publicPort *int - folders []string - folderMaxLimits []int - idxFolder *string - ip *string - publicUrl *string - bindIp *string - masters *string - idleConnectionTimeout *int - dataCenter *string - rack *string - whiteList []string - indexType *string - diskType *string - fixJpgOrientation *bool - readMode *string - cpuProfile *string - memProfile *string - compactionMBPerSecond *int - fileSizeLimitMB *int - concurrentUploadLimitMB *int - pprof *bool - preStopSeconds *int - metricsHttpPort *int + port *int + publicPort *int + folders []string + folderMaxLimits []int + idxFolder *string + ip *string + publicUrl *string + bindIp *string + masters *string + idleConnectionTimeout *int + dataCenter *string + rack *string + whiteList []string + indexType *string + diskType *string + fixJpgOrientation *bool + readMode *string + cpuProfile *string + memProfile *string + compactionMBPerSecond *int + fileSizeLimitMB *int + concurrentUploadLimitMB *int + concurrentDownloadLimitMB *int + pprof *bool + preStopSeconds *int + metricsHttpPort *int // pulseSeconds *int enableTcp *bool } @@ -85,7 +86,8 @@ func init() { v.memProfile = cmdVolume.Flag.String("memprofile", "", "memory profile output file") v.compactionMBPerSecond = cmdVolume.Flag.Int("compactionMBps", 0, "limit background compaction or copying speed in mega bytes per second") v.fileSizeLimitMB = cmdVolume.Flag.Int("fileSizeLimitMB", 256, "limit file size to avoid out of memory") - v.concurrentUploadLimitMB = cmdVolume.Flag.Int("concurrentUploadLimitMB", 128, "limit total concurrent upload size") + v.concurrentUploadLimitMB = cmdVolume.Flag.Int("concurrentUploadLimitMB", 256, "limit total concurrent upload size") + v.concurrentDownloadLimitMB = cmdVolume.Flag.Int("concurrentDownloadLimitMB", 256, "limit total concurrent download size") v.pprof = cmdVolume.Flag.Bool("pprof", false, "enable pprof http handlers. precludes --memprofile and --cpuprofile") v.metricsHttpPort = cmdVolume.Flag.Int("metricsPort", 0, "Prometheus metrics listen port") v.idxFolder = cmdVolume.Flag.String("dir.idx", "", "directory to store .idx files") @@ -232,6 +234,7 @@ func (v VolumeServerOptions) startVolumeServer(volumeFolders, maxVolumeCounts, v *v.compactionMBPerSecond, *v.fileSizeLimitMB, int64(*v.concurrentUploadLimitMB)*1024*1024, + int64(*v.concurrentDownloadLimitMB)*1024*1024, ) // starting grpc server grpcS := v.startGrpcService(volumeServer) diff --git a/weed/server/volume_grpc_admin.go b/weed/server/volume_grpc_admin.go index 2bc108a23..898c3da12 100644 --- a/weed/server/volume_grpc_admin.go +++ b/weed/server/volume_grpc_admin.go @@ -225,9 +225,9 @@ func (vs *VolumeServer) VolumeNeedleStatus(ctx context.Context, req *volume_serv if !hasEcVolume { return nil, fmt.Errorf("volume not found %d", req.VolumeId) } - count, err = vs.store.ReadEcShardNeedle(volumeId, n) + count, err = vs.store.ReadEcShardNeedle(volumeId, n, nil) } else { - count, err = vs.store.ReadVolumeNeedle(volumeId, n, nil) + count, err = vs.store.ReadVolumeNeedle(volumeId, n, nil, nil) } if err != nil { return nil, err diff --git a/weed/server/volume_grpc_batch_delete.go b/weed/server/volume_grpc_batch_delete.go index 2c445f996..3645ad9c9 100644 --- a/weed/server/volume_grpc_batch_delete.go +++ b/weed/server/volume_grpc_batch_delete.go @@ -40,7 +40,7 @@ func (vs *VolumeServer) BatchDelete(ctx context.Context, req *volume_server_pb.B } else { n.ParsePath(id_cookie) cookie := n.Cookie - if _, err := vs.store.ReadVolumeNeedle(volumeId, n, nil); err != nil { + if _, err := vs.store.ReadVolumeNeedle(volumeId, n, nil, nil); err != nil { resp.Results = append(resp.Results, &volume_server_pb.DeleteResult{ FileId: fid, Status: http.StatusNotFound, diff --git a/weed/server/volume_grpc_query.go b/weed/server/volume_grpc_query.go index 2f4fab96a..349d10097 100644 --- a/weed/server/volume_grpc_query.go +++ b/weed/server/volume_grpc_query.go @@ -24,7 +24,7 @@ func (vs *VolumeServer) Query(req *volume_server_pb.QueryRequest, stream volume_ n.ParsePath(id_cookie) cookie := n.Cookie - if _, err := vs.store.ReadVolumeNeedle(volumeId, n, nil); err != nil { + if _, err := vs.store.ReadVolumeNeedle(volumeId, n, nil, nil); err != nil { glog.V(0).Infof("volume query failed to read fid %s: %v", fid, err) return err } diff --git a/weed/server/volume_server.go b/weed/server/volume_server.go index a0f32700b..034521b4b 100644 --- a/weed/server/volume_server.go +++ b/weed/server/volume_server.go @@ -17,9 +17,12 @@ import ( ) type VolumeServer struct { - inFlightUploadDataSize int64 - concurrentUploadLimit int64 - inFlightUploadDataLimitCond *sync.Cond + inFlightUploadDataSize int64 + inFlightDownloadDataSize int64 + concurrentUploadLimit int64 + concurrentDownloadLimit int64 + inFlightUploadDataLimitCond *sync.Cond + inFlightDownloadDataLimitCond *sync.Cond SeedMasterNodes []string currentMaster string @@ -54,6 +57,7 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string, compactionMBPerSecond int, fileSizeLimitMB int, concurrentUploadLimit int64, + concurrentDownloadLimit int64, ) *VolumeServer { v := util.GetViper() @@ -67,19 +71,21 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string, readExpiresAfterSec := v.GetInt("jwt.signing.read.expires_after_seconds") vs := &VolumeServer{ - pulseSeconds: pulseSeconds, - dataCenter: dataCenter, - rack: rack, - needleMapKind: needleMapKind, - FixJpgOrientation: fixJpgOrientation, - ReadMode: readMode, - grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.volume"), - compactionBytePerSecond: int64(compactionMBPerSecond) * 1024 * 1024, - fileSizeLimitBytes: int64(fileSizeLimitMB) * 1024 * 1024, - isHeartbeating: true, - stopChan: make(chan bool), - inFlightUploadDataLimitCond: sync.NewCond(new(sync.Mutex)), - concurrentUploadLimit: concurrentUploadLimit, + pulseSeconds: pulseSeconds, + dataCenter: dataCenter, + rack: rack, + needleMapKind: needleMapKind, + FixJpgOrientation: fixJpgOrientation, + ReadMode: readMode, + grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.volume"), + compactionBytePerSecond: int64(compactionMBPerSecond) * 1024 * 1024, + fileSizeLimitBytes: int64(fileSizeLimitMB) * 1024 * 1024, + isHeartbeating: true, + stopChan: make(chan bool), + inFlightUploadDataLimitCond: sync.NewCond(new(sync.Mutex)), + inFlightDownloadDataLimitCond: sync.NewCond(new(sync.Mutex)), + concurrentUploadLimit: concurrentUploadLimit, + concurrentDownloadLimit: concurrentDownloadLimit, } vs.SeedMasterNodes = masterNodes diff --git a/weed/server/volume_server_handlers.go b/weed/server/volume_server_handlers.go index 917c7cd25..ed7807bb8 100644 --- a/weed/server/volume_server_handlers.go +++ b/weed/server/volume_server_handlers.go @@ -37,6 +37,11 @@ func (vs *VolumeServer) privateStoreHandler(w http.ResponseWriter, r *http.Reque switch r.Method { case "GET", "HEAD": stats.ReadRequest() + vs.inFlightDownloadDataLimitCond.L.Lock() + for vs.concurrentDownloadLimit != 0 && atomic.LoadInt64(&vs.inFlightDownloadDataSize) > vs.concurrentDownloadLimit { + glog.V(4).Infof("wait because inflight download data %d > %d", vs.inFlightDownloadDataSize, vs.concurrentDownloadLimit) + vs.inFlightDownloadDataLimitCond.Wait() + } vs.GetOrHeadHandler(w, r) case "DELETE": stats.DeleteRequest() @@ -47,7 +52,7 @@ func (vs *VolumeServer) privateStoreHandler(w http.ResponseWriter, r *http.Reque contentLength := getContentLength(r) vs.inFlightUploadDataLimitCond.L.Lock() for vs.concurrentUploadLimit != 0 && atomic.LoadInt64(&vs.inFlightUploadDataSize) > vs.concurrentUploadLimit { - glog.V(4).Infof("wait because inflight data %d > %d", vs.inFlightUploadDataSize, vs.concurrentUploadLimit) + glog.V(4).Infof("wait because inflight upload data %d > %d", vs.inFlightUploadDataSize, vs.concurrentUploadLimit) vs.inFlightUploadDataLimitCond.Wait() } atomic.AddInt64(&vs.inFlightUploadDataSize, contentLength) diff --git a/weed/server/volume_server_handlers_read.go b/weed/server/volume_server_handlers_read.go index cbad9c770..ae3c0b53f 100644 --- a/weed/server/volume_server_handlers_read.go +++ b/weed/server/volume_server_handlers_read.go @@ -5,6 +5,7 @@ import ( "encoding/json" "errors" "fmt" + "github.com/chrislusf/seaweedfs/weed/storage/types" "io" "mime" "net/http" @@ -12,6 +13,7 @@ import ( "path/filepath" "strconv" "strings" + "sync/atomic" "time" "github.com/chrislusf/seaweedfs/weed/glog" @@ -123,11 +125,22 @@ func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request) } var count int + var needleSize types.Size + onReadSizeFn := func(size types.Size) { + needleSize = size + atomic.AddInt64(&vs.inFlightDownloadDataSize, int64(needleSize)) + vs.inFlightDownloadDataLimitCond.L.Unlock() + } if hasVolume { - count, err = vs.store.ReadVolumeNeedle(volumeId, n, readOption) + count, err = vs.store.ReadVolumeNeedle(volumeId, n, readOption, onReadSizeFn) } else if hasEcVolume { - count, err = vs.store.ReadEcShardNeedle(volumeId, n) + count, err = vs.store.ReadEcShardNeedle(volumeId, n, onReadSizeFn) } + defer func() { + atomic.AddInt64(&vs.inFlightDownloadDataSize, -int64(needleSize)) + vs.inFlightDownloadDataLimitCond.Signal() + }() + if err != nil && err != storage.ErrorDeleted && r.FormValue("type") != "replicate" && hasVolume { glog.V(4).Infof("read needle: %v", err) // start to fix it from other replicas, if not deleted and hasVolume and is not a replicated request diff --git a/weed/server/volume_server_handlers_write.go b/weed/server/volume_server_handlers_write.go index 58212e8ff..aeb7d6e65 100644 --- a/weed/server/volume_server_handlers_write.go +++ b/weed/server/volume_server_handlers_write.go @@ -108,7 +108,7 @@ func (vs *VolumeServer) DeleteHandler(w http.ResponseWriter, r *http.Request) { return } - _, ok := vs.store.ReadVolumeNeedle(volumeId, n, nil) + _, ok := vs.store.ReadVolumeNeedle(volumeId, n, nil, nil) if ok != nil { m := make(map[string]uint32) m["size"] = 0 diff --git a/weed/storage/store.go b/weed/storage/store.go index cda1e196b..c407a6081 100644 --- a/weed/storage/store.go +++ b/weed/storage/store.go @@ -356,9 +356,9 @@ func (s *Store) DeleteVolumeNeedle(i needle.VolumeId, n *needle.Needle) (Size, e return 0, fmt.Errorf("volume %d not found on %s:%d", i, s.Ip, s.Port) } -func (s *Store) ReadVolumeNeedle(i needle.VolumeId, n *needle.Needle, readOption *ReadOption) (int, error) { +func (s *Store) ReadVolumeNeedle(i needle.VolumeId, n *needle.Needle, readOption *ReadOption, onReadSizeFn func(size Size)) (int, error) { if v := s.findVolume(i); v != nil { - return v.readNeedle(n, readOption) + return v.readNeedle(n, readOption, onReadSizeFn) } return 0, fmt.Errorf("volume %d not found", i) } diff --git a/weed/storage/store_ec.go b/weed/storage/store_ec.go index 9702fdd50..6ba7237e2 100644 --- a/weed/storage/store_ec.go +++ b/weed/storage/store_ec.go @@ -121,7 +121,7 @@ func (s *Store) DestroyEcVolume(vid needle.VolumeId) { } } -func (s *Store) ReadEcShardNeedle(vid needle.VolumeId, n *needle.Needle) (int, error) { +func (s *Store) ReadEcShardNeedle(vid needle.VolumeId, n *needle.Needle, onReadSizeFn func(size types.Size)) (int, error) { for _, location := range s.Locations { if localEcVolume, found := location.FindEcVolume(vid); found { @@ -133,6 +133,10 @@ func (s *Store) ReadEcShardNeedle(vid needle.VolumeId, n *needle.Needle) (int, e return 0, ErrorDeleted } + if onReadSizeFn != nil { + onReadSizeFn(size) + } + glog.V(3).Infof("read ec volume %d offset %d size %d intervals:%+v", vid, offset.ToActualOffset(), size, intervals) if len(intervals) > 1 { diff --git a/weed/storage/store_ec_delete.go b/weed/storage/store_ec_delete.go index 4a75fb20b..6c10af3c5 100644 --- a/weed/storage/store_ec_delete.go +++ b/weed/storage/store_ec_delete.go @@ -14,7 +14,7 @@ import ( func (s *Store) DeleteEcShardNeedle(ecVolume *erasure_coding.EcVolume, n *needle.Needle, cookie types.Cookie) (int64, error) { - count, err := s.ReadEcShardNeedle(ecVolume.VolumeId, n) + count, err := s.ReadEcShardNeedle(ecVolume.VolumeId, n, nil) if err != nil { return 0, err diff --git a/weed/storage/volume_read.go b/weed/storage/volume_read.go index f689eeec0..9751b56ae 100644 --- a/weed/storage/volume_read.go +++ b/weed/storage/volume_read.go @@ -13,7 +13,7 @@ import ( ) // read fills in Needle content by looking up n.Id from NeedleMapper -func (v *Volume) readNeedle(n *needle.Needle, readOption *ReadOption) (int, error) { +func (v *Volume) readNeedle(n *needle.Needle, readOption *ReadOption, onReadSizeFn func(size Size)) (int, error) { v.dataFileAccessLock.RLock() defer v.dataFileAccessLock.RUnlock() @@ -33,6 +33,9 @@ func (v *Volume) readNeedle(n *needle.Needle, readOption *ReadOption) (int, erro if readSize == 0 { return 0, nil } + if onReadSizeFn != nil { + onReadSizeFn(readSize) + } err := n.ReadData(v.DataBackend, nv.Offset.ToActualOffset(), readSize, v.Version()) if err == needle.ErrorSizeMismatch && OffsetSize == 4 { err = n.ReadData(v.DataBackend, nv.Offset.ToActualOffset()+int64(MaxPossibleVolumeSize), readSize, v.Version()) diff --git a/weed/storage/volume_vacuum_test.go b/weed/storage/volume_vacuum_test.go index cd5a4f430..89fff4b2b 100644 --- a/weed/storage/volume_vacuum_test.go +++ b/weed/storage/volume_vacuum_test.go @@ -113,7 +113,7 @@ func TestCompaction(t *testing.T) { } n := newEmptyNeedle(uint64(i)) - size, err := v.readNeedle(n, nil) + size, err := v.readNeedle(n, nil, nil) if err != nil { t.Fatalf("read file %d: %v", i, err) }