From 93007c1842679c0ffb644fdef0d9126671ad82d2 Mon Sep 17 00:00:00 2001 From: Konstantin Lebedev <9497591+kmlebedev@users.noreply.github.com> Date: Thu, 3 Jul 2025 06:03:49 +0500 Subject: [PATCH] [volume] refactor and add metrics for flight upload and download data limit condition (#6920) * refactor concurrentDownloadLimit * fix loop * fix cmdServer * fix: resolve conversation pr 6920 * Changes logging function (#6919) * updated logging methods for stores * updated logging methods for stores * updated logging methods for filer * updated logging methods for uploader and http_util * updated logging methods for weed server --------- Co-authored-by: akosov * Improve lock ring (#6921) * fix flaky lock ring test * add more tests * fix: build * fix: rm import util/version * fix: serverOptions * refactoring --------- Co-authored-by: Aleksey Kosov Co-authored-by: akosov Co-authored-by: Chris Lu Co-authored-by: chrislu --- weed/command/server.go | 2 + weed/command/volume.go | 11 +- .../filer_server_handlers_write_autochunk.go | 4 +- weed/server/volume_server.go | 5 + weed/server/volume_server_handlers.go | 284 ++++++++++++++---- weed/server/volume_server_handlers_read.go | 157 ++++++---- weed/stats/metrics.go | 36 +++ weed/stats/metrics_names.go | 11 +- weed/util/cond_wait.go | 32 ++ weed/util/version/constants.go | 2 - 10 files changed, 410 insertions(+), 134 deletions(-) create mode 100644 weed/util/cond_wait.go diff --git a/weed/command/server.go b/weed/command/server.go index d0e04f633..9d7626e78 100644 --- a/weed/command/server.go +++ b/weed/command/server.go @@ -143,6 +143,8 @@ func init() { serverOptions.v.pprof = cmdServer.Flag.Bool("volume.pprof", false, "enable pprof http handlers. precludes --memprofile and --cpuprofile") serverOptions.v.idxFolder = cmdServer.Flag.String("volume.dir.idx", "", "directory to store .idx files") serverOptions.v.inflightUploadDataTimeout = cmdServer.Flag.Duration("volume.inflightUploadDataTimeout", 60*time.Second, "inflight upload data wait timeout of volume servers") + serverOptions.v.inflightDownloadDataTimeout = cmdServer.Flag.Duration("volume.inflightDownloadDataTimeout", 60*time.Second, "inflight download data wait timeout of volume servers") + serverOptions.v.hasSlowRead = cmdServer.Flag.Bool("volume.hasSlowRead", true, " if true, this prevents slow reads from blocking other requests, but large file read P99 latency will increase.") serverOptions.v.readBufferSizeMB = cmdServer.Flag.Int("volume.readBufferSizeMB", 4, " larger values can optimize query performance but will increase some memory usage,Use with hasSlowRead normally") diff --git a/weed/command/volume.go b/weed/command/volume.go index c8917819b..97986b500 100644 --- a/weed/command/volume.go +++ b/weed/command/volume.go @@ -68,10 +68,11 @@ type VolumeServerOptions struct { metricsHttpPort *int metricsHttpIp *string // pulseSeconds *int - inflightUploadDataTimeout *time.Duration - hasSlowRead *bool - readBufferSizeMB *int - ldbTimeout *int64 + inflightUploadDataTimeout *time.Duration + inflightDownloadDataTimeout *time.Duration + hasSlowRead *bool + readBufferSizeMB *int + ldbTimeout *int64 } func init() { @@ -104,6 +105,7 @@ func init() { v.metricsHttpIp = cmdVolume.Flag.String("metricsIp", "", "metrics listen ip. If empty, default to same as -ip.bind option.") v.idxFolder = cmdVolume.Flag.String("dir.idx", "", "directory to store .idx files") v.inflightUploadDataTimeout = cmdVolume.Flag.Duration("inflightUploadDataTimeout", 60*time.Second, "inflight upload data wait timeout of volume servers") + v.inflightDownloadDataTimeout = cmdVolume.Flag.Duration("inflightDownloadDataTimeout", 60*time.Second, "inflight download data wait timeout of volume servers") v.hasSlowRead = cmdVolume.Flag.Bool("hasSlowRead", true, " if true, this prevents slow reads from blocking other requests, but large file read P99 latency will increase.") v.readBufferSizeMB = cmdVolume.Flag.Int("readBufferSizeMB", 4, " larger values can optimize query performance but will increase some memory usage,Use with hasSlowRead normally.") } @@ -261,6 +263,7 @@ func (v VolumeServerOptions) startVolumeServer(volumeFolders, maxVolumeCounts, v int64(*v.concurrentUploadLimitMB)*1024*1024, int64(*v.concurrentDownloadLimitMB)*1024*1024, *v.inflightUploadDataTimeout, + *v.inflightDownloadDataTimeout, *v.hasSlowRead, *v.readBufferSizeMB, *v.ldbTimeout, diff --git a/weed/server/filer_server_handlers_write_autochunk.go b/weed/server/filer_server_handlers_write_autochunk.go index 7ce3f8466..92ee68796 100644 --- a/weed/server/filer_server_handlers_write_autochunk.go +++ b/weed/server/filer_server_handlers_write_autochunk.go @@ -13,8 +13,6 @@ import ( "strings" "time" - "github.com/seaweedfs/seaweedfs/weed/util/version" - "github.com/seaweedfs/seaweedfs/weed/filer" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/operation" @@ -53,7 +51,7 @@ func (fs *FilerServer) autoChunk(ctx context.Context, w http.ResponseWriter, r * if err.Error() == "operation not permitted" { writeJsonError(w, r, http.StatusForbidden, err) } else if strings.HasPrefix(err.Error(), "read input:") || err.Error() == io.ErrUnexpectedEOF.Error() { - writeJsonError(w, r, version.HttpStatusCancelled, err) + writeJsonError(w, r, util.HttpStatusCancelled, err) } else if strings.HasSuffix(err.Error(), "is a file") || strings.HasSuffix(err.Error(), "already exists") { writeJsonError(w, r, http.StatusConflict, err) } else { diff --git a/weed/server/volume_server.go b/weed/server/volume_server.go index 5c5ebc49a..89414afc9 100644 --- a/weed/server/volume_server.go +++ b/weed/server/volume_server.go @@ -28,6 +28,7 @@ type VolumeServer struct { inFlightUploadDataLimitCond *sync.Cond inFlightDownloadDataLimitCond *sync.Cond inflightUploadDataTimeout time.Duration + inflightDownloadDataTimeout time.Duration hasSlowRead bool readBufferSizeMB int @@ -68,6 +69,7 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string, concurrentUploadLimit int64, concurrentDownloadLimit int64, inflightUploadDataTimeout time.Duration, + inflightDownloadDataTimeout time.Duration, hasSlowRead bool, readBufferSizeMB int, ldbTimeout int64, @@ -133,6 +135,9 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string, publicMux.HandleFunc("/", requestIDMiddleware(vs.publicReadOnlyHandler)) } + stats.VolumeServerConcurrentDownloadLimit.Set(float64(vs.concurrentDownloadLimit)) + stats.VolumeServerConcurrentUploadLimit.Set(float64(vs.concurrentUploadLimit)) + go vs.heartbeat() go stats.LoopPushingMetric("volumeServer", util.JoinHostPort(ip, port), vs.metricsAddress, vs.metricsIntervalSec) diff --git a/weed/server/volume_server_handlers.go b/weed/server/volume_server_handlers.go index 3243f3ffd..a42732062 100644 --- a/weed/server/volume_server_handlers.go +++ b/weed/server/volume_server_handlers.go @@ -2,13 +2,16 @@ package weed_server import ( "fmt" - "github.com/seaweedfs/seaweedfs/weed/util/version" "net/http" "strconv" "strings" "sync/atomic" "time" + "github.com/seaweedfs/seaweedfs/weed/storage/needle" + "github.com/seaweedfs/seaweedfs/weed/util" + "github.com/seaweedfs/seaweedfs/weed/util/version" + "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/security" "github.com/seaweedfs/seaweedfs/weed/stats" @@ -29,6 +32,219 @@ security settings: */ +// checkDownloadLimit handles download concurrency limiting with timeout and proxy fallback. +// +// Returns: +// - true: Request should proceed with normal processing (limit not exceeded, +// or successfully waited for available capacity) +// - false: Request was already handled by this function (proxied to replica, +// timed out with 429 response, cancelled with 499 response, or +// failed with error response). Caller should NOT continue processing. +// +// Control Flow: +// - No limit configured → return true (proceed normally) +// - Within limit → return true (proceed normally) +// - Over limit + has replicas → proxy to replica, return false (already handled) +// - Over limit + no replicas → wait with timeout: +// - Timeout → send 429 response, return false (already handled) +// - Cancelled → send 499 response, return false (already handled) +// - Capacity available → return true (proceed normally) +func (vs *VolumeServer) checkDownloadLimit(w http.ResponseWriter, r *http.Request) bool { + inFlightDownloadSize := atomic.LoadInt64(&vs.inFlightDownloadDataSize) + stats.VolumeServerInFlightDownloadSize.Set(float64(inFlightDownloadSize)) + + if vs.concurrentDownloadLimit == 0 || inFlightDownloadSize <= vs.concurrentDownloadLimit { + return true // no limit configured or within limit - proceed normally + } + + stats.VolumeServerHandlerCounter.WithLabelValues(stats.DownloadLimitCond).Inc() + glog.V(4).Infof("request %s wait because inflight download data %d > %d", + r.URL.Path, inFlightDownloadSize, vs.concurrentDownloadLimit) + + // Try to proxy to replica if available + if vs.tryProxyToReplica(w, r) { + return false // handled by proxy + } + + // Wait with timeout + return vs.waitForDownloadSlot(w, r) +} + +// tryProxyToReplica attempts to proxy the request to a replica server if the volume has replication. +// Returns: +// - true: Request was handled (either proxied successfully or failed with error response) +// - false: No proxy available (volume has no replicas or request already proxied) +func (vs *VolumeServer) tryProxyToReplica(w http.ResponseWriter, r *http.Request) bool { + vid, _, _, _, _ := parseURLPath(r.URL.Path) + volumeId, err := needle.NewVolumeId(vid) + if err != nil { + glog.V(1).Infof("parsing vid %s: %v", r.URL.Path, err) + w.WriteHeader(http.StatusBadRequest) + return true // handled (with error) + } + + volume := vs.store.GetVolume(volumeId) + if volume != nil && volume.ReplicaPlacement != nil && volume.ReplicaPlacement.HasReplication() && r.URL.Query().Get(reqIsProxied) != "true" { + vs.proxyReqToTargetServer(w, r) + return true // handled by proxy + } + return false // no proxy available +} + +// waitForDownloadSlot waits for available download capacity with timeout. +// +// This function implements a blocking wait mechanism with timeout for download capacity. +// It continuously checks if download capacity becomes available and handles timeout +// and cancellation scenarios appropriately. +// +// Returns: +// - true: Download capacity became available, request should proceed +// - false: Request failed (timeout or cancellation), error response already sent +// +// HTTP Status Codes: +// - 429 Too Many Requests: Wait timeout exceeded +// - 499 Client Closed Request: Request cancelled by client +func (vs *VolumeServer) waitForDownloadSlot(w http.ResponseWriter, r *http.Request) bool { + timerDownload := time.NewTimer(vs.inflightDownloadDataTimeout) + defer timerDownload.Stop() + + inFlightDownloadSize := atomic.LoadInt64(&vs.inFlightDownloadDataSize) + for inFlightDownloadSize > vs.concurrentDownloadLimit { + switch util.WaitWithTimeout(r.Context(), vs.inFlightDownloadDataLimitCond, timerDownload) { + case http.StatusTooManyRequests: + err := fmt.Errorf("request %s because inflight download data %d > %d, and wait timeout", + r.URL.Path, inFlightDownloadSize, vs.concurrentDownloadLimit) + glog.V(1).Infof("too many requests: %v", err) + writeJsonError(w, r, http.StatusTooManyRequests, err) + return false + case util.HttpStatusCancelled: + glog.V(1).Infof("request %s cancelled from %s: %v", r.URL.Path, r.RemoteAddr, r.Context().Err()) + w.WriteHeader(util.HttpStatusCancelled) + return false + } + inFlightDownloadSize = atomic.LoadInt64(&vs.inFlightDownloadDataSize) + stats.VolumeServerInFlightDownloadSize.Set(float64(inFlightDownloadSize)) + } + return true +} + +// checkUploadLimit handles upload concurrency limiting with timeout. +// +// This function implements upload throttling to prevent overwhelming the volume server +// with too many concurrent uploads. It excludes replication traffic from limits. +// +// Returns: +// - true: Request should proceed with upload processing (no limit, within limit, +// or successfully waited for capacity) +// - false: Request failed (timeout or cancellation), error response already sent +// +// Special Handling: +// - Replication requests (type=replicate) bypass upload limits +// - No upload limit configured (concurrentUploadLimit=0) allows all uploads +func (vs *VolumeServer) checkUploadLimit(w http.ResponseWriter, r *http.Request) bool { + // exclude the replication from the concurrentUploadLimitMB + if vs.concurrentUploadLimit == 0 || r.URL.Query().Get("type") == "replicate" { + return true + } + + inFlightUploadDataSize := atomic.LoadInt64(&vs.inFlightUploadDataSize) + stats.VolumeServerInFlightUploadSize.Set(float64(inFlightUploadDataSize)) + + if inFlightUploadDataSize <= vs.concurrentUploadLimit { + return true + } + + return vs.waitForUploadSlot(w, r) +} + +// waitForUploadSlot waits for available upload capacity with timeout. +// +// Returns: +// - true: Upload capacity became available, request should proceed +// - false: Request failed (timeout or cancellation), error response already sent +// +// HTTP Status Codes: +// - 429 Too Many Requests: Wait timeout exceeded +// - 499 Client Closed Request: Request cancelled by client +func (vs *VolumeServer) waitForUploadSlot(w http.ResponseWriter, r *http.Request) bool { + var timerUpload *time.Timer + inFlightUploadDataSize := atomic.LoadInt64(&vs.inFlightUploadDataSize) + + for inFlightUploadDataSize > vs.concurrentUploadLimit { + if timerUpload == nil { + timerUpload = time.NewTimer(vs.inflightUploadDataTimeout) + defer timerUpload.Stop() + } + + glog.V(4).Infof("wait because inflight upload data %d > %d", inFlightUploadDataSize, vs.concurrentUploadLimit) + stats.VolumeServerHandlerCounter.WithLabelValues(stats.UploadLimitCond).Inc() + + switch util.WaitWithTimeout(r.Context(), vs.inFlightUploadDataLimitCond, timerUpload) { + case http.StatusTooManyRequests: + err := fmt.Errorf("reject because inflight upload data %d > %d, and wait timeout", + inFlightUploadDataSize, vs.concurrentUploadLimit) + glog.V(1).Infof("too many requests: %v", err) + writeJsonError(w, r, http.StatusTooManyRequests, err) + return false + case util.HttpStatusCancelled: + glog.V(1).Infof("request cancelled from %s: %v", r.RemoteAddr, r.Context().Err()) + writeJsonError(w, r, util.HttpStatusCancelled, r.Context().Err()) + return false + } + + inFlightUploadDataSize = atomic.LoadInt64(&vs.inFlightUploadDataSize) + stats.VolumeServerInFlightUploadSize.Set(float64(inFlightUploadDataSize)) + } + return true +} + +// handleGetRequest processes GET/HEAD requests with download limiting. +// +// This function orchestrates the complete GET/HEAD request handling workflow: +// 1. Records read request statistics +// 2. Applies download concurrency limits with proxy fallback +// 3. Delegates to GetOrHeadHandler for actual file serving (if limits allow) +// +// The download limiting logic may handle the request completely (via proxy, +// timeout, or error), in which case normal file serving is skipped. +func (vs *VolumeServer) handleGetRequest(w http.ResponseWriter, r *http.Request) { + stats.ReadRequest() + if vs.checkDownloadLimit(w, r) { + vs.GetOrHeadHandler(w, r) + } +} + +// handleUploadRequest processes PUT/POST requests with upload limiting. +// +// This function manages the complete upload request workflow: +// 1. Extracts content length from request headers +// 2. Applies upload concurrency limits with timeout handling +// 3. Tracks in-flight upload data size for monitoring +// 4. Delegates to PostHandler for actual file processing +// 5. Ensures proper cleanup of in-flight counters +// +// The upload limiting logic may reject the request with appropriate HTTP +// status codes (429 for timeout, 499 for cancellation). +func (vs *VolumeServer) handleUploadRequest(w http.ResponseWriter, r *http.Request) { + contentLength := getContentLength(r) + + if !vs.checkUploadLimit(w, r) { + return + } + + atomic.AddInt64(&vs.inFlightUploadDataSize, contentLength) + defer func() { + atomic.AddInt64(&vs.inFlightUploadDataSize, -contentLength) + if vs.concurrentUploadLimit != 0 { + vs.inFlightUploadDataLimitCond.Broadcast() + } + }() + + // processes uploads + stats.WriteRequest() + vs.guard.WhiteList(vs.PostHandler)(w, r) +} + func (vs *VolumeServer) privateStoreHandler(w http.ResponseWriter, r *http.Request) { inFlightGauge := stats.VolumeServerInFlightRequestsGauge.WithLabelValues(r.Method) inFlightGauge.Inc() @@ -41,69 +257,22 @@ func (vs *VolumeServer) privateStoreHandler(w http.ResponseWriter, r *http.Reque w.Header().Set("Access-Control-Allow-Origin", "*") w.Header().Set("Access-Control-Allow-Credentials", "true") } + start := time.Now() requestMethod := r.Method defer func(start time.Time, method *string, statusRecorder *stats.StatusRecorder) { stats.VolumeServerRequestCounter.WithLabelValues(*method, strconv.Itoa(statusRecorder.Status)).Inc() stats.VolumeServerRequestHistogram.WithLabelValues(*method).Observe(time.Since(start).Seconds()) }(start, &requestMethod, statusRecorder) + switch r.Method { case http.MethodGet, http.MethodHead: - stats.ReadRequest() - vs.inFlightDownloadDataLimitCond.L.Lock() - inFlightDownloadSize := atomic.LoadInt64(&vs.inFlightDownloadDataSize) - for vs.concurrentDownloadLimit != 0 && inFlightDownloadSize > vs.concurrentDownloadLimit { - select { - case <-r.Context().Done(): - glog.V(4).Infof("request cancelled from %s: %v", r.RemoteAddr, r.Context().Err()) - w.WriteHeader(version.HttpStatusCancelled) - vs.inFlightDownloadDataLimitCond.L.Unlock() - return - default: - glog.V(4).Infof("wait because inflight download data %d > %d", inFlightDownloadSize, vs.concurrentDownloadLimit) - vs.inFlightDownloadDataLimitCond.Wait() - } - inFlightDownloadSize = atomic.LoadInt64(&vs.inFlightDownloadDataSize) - } - vs.inFlightDownloadDataLimitCond.L.Unlock() - vs.GetOrHeadHandler(w, r) + vs.handleGetRequest(w, r) case http.MethodDelete: stats.DeleteRequest() vs.guard.WhiteList(vs.DeleteHandler)(w, r) case http.MethodPut, http.MethodPost: - contentLength := getContentLength(r) - // exclude the replication from the concurrentUploadLimitMB - if r.URL.Query().Get("type") != "replicate" && vs.concurrentUploadLimit != 0 { - startTime := time.Now() - vs.inFlightUploadDataLimitCond.L.Lock() - inFlightUploadDataSize := atomic.LoadInt64(&vs.inFlightUploadDataSize) - for inFlightUploadDataSize > vs.concurrentUploadLimit { - //wait timeout check - if startTime.Add(vs.inflightUploadDataTimeout).Before(time.Now()) { - vs.inFlightUploadDataLimitCond.L.Unlock() - err := fmt.Errorf("reject because inflight upload data %d > %d, and wait timeout", inFlightUploadDataSize, vs.concurrentUploadLimit) - glog.V(1).Infof("too many requests: %v", err) - writeJsonError(w, r, http.StatusTooManyRequests, err) - return - } - glog.V(4).Infof("wait because inflight upload data %d > %d", inFlightUploadDataSize, vs.concurrentUploadLimit) - vs.inFlightUploadDataLimitCond.Wait() - inFlightUploadDataSize = atomic.LoadInt64(&vs.inFlightUploadDataSize) - } - vs.inFlightUploadDataLimitCond.L.Unlock() - } - atomic.AddInt64(&vs.inFlightUploadDataSize, contentLength) - defer func() { - atomic.AddInt64(&vs.inFlightUploadDataSize, -contentLength) - if vs.concurrentUploadLimit != 0 { - vs.inFlightUploadDataLimitCond.Signal() - } - }() - - // processes uploads - stats.WriteRequest() - vs.guard.WhiteList(vs.PostHandler)(w, r) - + vs.handleUploadRequest(w, r) case http.MethodOptions: stats.ReadRequest() w.Header().Add("Access-Control-Allow-Methods", "PUT, POST, GET, DELETE, OPTIONS") @@ -144,16 +313,7 @@ func (vs *VolumeServer) publicReadOnlyHandler(w http.ResponseWriter, r *http.Req switch r.Method { case http.MethodGet, http.MethodHead: - stats.ReadRequest() - vs.inFlightDownloadDataLimitCond.L.Lock() - inFlightDownloadSize := atomic.LoadInt64(&vs.inFlightDownloadDataSize) - for vs.concurrentDownloadLimit != 0 && inFlightDownloadSize > vs.concurrentDownloadLimit { - glog.V(4).Infof("wait because inflight download data %d > %d", inFlightDownloadSize, vs.concurrentDownloadLimit) - vs.inFlightDownloadDataLimitCond.Wait() - inFlightDownloadSize = atomic.LoadInt64(&vs.inFlightDownloadDataSize) - } - vs.inFlightDownloadDataLimitCond.L.Unlock() - vs.GetOrHeadHandler(w, r) + vs.handleGetRequest(w, r) case http.MethodOptions: stats.ReadRequest() w.Header().Add("Access-Control-Allow-Methods", "GET, OPTIONS") diff --git a/weed/server/volume_server_handlers_read.go b/weed/server/volume_server_handlers_read.go index 15d639f49..9860d6e9e 100644 --- a/weed/server/volume_server_handlers_read.go +++ b/weed/server/volume_server_handlers_read.go @@ -6,6 +6,8 @@ import ( "encoding/json" "errors" "fmt" + util_http "github.com/seaweedfs/seaweedfs/weed/util/http" + "github.com/seaweedfs/seaweedfs/weed/util/mem" "io" "mime" "net/http" @@ -17,19 +19,18 @@ import ( "time" "github.com/seaweedfs/seaweedfs/weed/filer" - "github.com/seaweedfs/seaweedfs/weed/storage/types" - "github.com/seaweedfs/seaweedfs/weed/util/mem" - "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/images" "github.com/seaweedfs/seaweedfs/weed/operation" "github.com/seaweedfs/seaweedfs/weed/stats" "github.com/seaweedfs/seaweedfs/weed/storage" "github.com/seaweedfs/seaweedfs/weed/storage/needle" + "github.com/seaweedfs/seaweedfs/weed/storage/types" "github.com/seaweedfs/seaweedfs/weed/util" - util_http "github.com/seaweedfs/seaweedfs/weed/util/http" ) +const reqIsProxied = "proxied" + var fileNameEscaper = strings.NewReplacer(`\`, `\\`, `"`, `\"`) func NotFound(w http.ResponseWriter) { @@ -42,6 +43,90 @@ func InternalError(w http.ResponseWriter) { w.WriteHeader(http.StatusInternalServerError) } +func (vs *VolumeServer) proxyReqToTargetServer(w http.ResponseWriter, r *http.Request) { + vid, fid, _, _, _ := parseURLPath(r.URL.Path) + volumeId, err := needle.NewVolumeId(vid) + if err != nil { + glog.V(2).Infof("parsing vid %s: %v", r.URL.Path, err) + w.WriteHeader(http.StatusBadRequest) + return + } + lookupResult, err := operation.LookupVolumeId(vs.GetMaster, vs.grpcDialOption, volumeId.String()) + if err != nil || len(lookupResult.Locations) <= 0 { + glog.V(0).Infoln("lookup error:", err, r.URL.Path) + NotFound(w) + return + } + var tragetUrl *url.URL + location := fmt.Sprintf("%s:%d", vs.store.Ip, vs.store.Port) + for _, loc := range lookupResult.Locations { + if !strings.Contains(loc.Url, location) { + rawURL, _ := util_http.NormalizeUrl(loc.Url) + tragetUrl, _ = url.Parse(rawURL) + break + } + } + if tragetUrl == nil { + stats.VolumeServerHandlerCounter.WithLabelValues(stats.EmptyReadProxyLoc).Inc() + glog.Errorf("failed lookup target host is empty locations: %+v, %s", lookupResult.Locations, location) + NotFound(w) + return + } + if vs.ReadMode == "proxy" { + stats.VolumeServerHandlerCounter.WithLabelValues(stats.ReadProxyReq).Inc() + // proxy client request to target server + r.URL.Host = tragetUrl.Host + r.URL.Scheme = tragetUrl.Scheme + r.URL.Query().Add(reqIsProxied, "true") + request, err := http.NewRequest(http.MethodGet, r.URL.String(), nil) + if err != nil { + glog.V(0).Infof("failed to instance http request of url %s: %v", r.URL.String(), err) + InternalError(w) + return + } + for k, vv := range r.Header { + for _, v := range vv { + request.Header.Add(k, v) + } + } + + response, err := util_http.GetGlobalHttpClient().Do(request) + if err != nil { + stats.VolumeServerHandlerCounter.WithLabelValues(stats.FailedReadProxyReq).Inc() + glog.V(0).Infof("request remote url %s: %v", r.URL.String(), err) + InternalError(w) + return + } + defer util_http.CloseResponse(response) + // proxy target response to client + for k, vv := range response.Header { + if k == "Server" { + continue + } + for _, v := range vv { + w.Header().Add(k, v) + } + } + w.WriteHeader(response.StatusCode) + buf := mem.Allocate(128 * 1024) + defer mem.Free(buf) + io.CopyBuffer(w, response.Body, buf) + return + } else { + // redirect + stats.VolumeServerHandlerCounter.WithLabelValues(stats.ReadRedirectReq).Inc() + tragetUrl.Path = fmt.Sprintf("%s/%s,%s", tragetUrl.Path, vid, fid) + arg := url.Values{} + if c := r.FormValue("collection"); c != "" { + arg.Set("collection", c) + } + arg.Set(reqIsProxied, "true") + tragetUrl.RawQuery = arg.Encode() + http.Redirect(w, r, tragetUrl.String(), http.StatusMovedPermanently) + return + } +} + func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request) { n := new(needle.Needle) vid, fid, filename, ext, _ := parseURLPath(r.URL.Path) @@ -73,62 +158,8 @@ func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request) NotFound(w) return } - lookupResult, err := operation.LookupVolumeId(vs.GetMaster, vs.grpcDialOption, volumeId.String()) - glog.V(2).Infoln("volume", volumeId, "found on", lookupResult, "error", err) - if err != nil || len(lookupResult.Locations) <= 0 { - glog.V(0).Infoln("lookup error:", err, r.URL.Path) - NotFound(w) - return - } - if vs.ReadMode == "proxy" { - // proxy client request to target server - rawURL, _ := util_http.NormalizeUrl(lookupResult.Locations[0].Url) - u, _ := url.Parse(rawURL) - r.URL.Host = u.Host - r.URL.Scheme = u.Scheme - request, err := http.NewRequest(http.MethodGet, r.URL.String(), nil) - if err != nil { - glog.V(0).Infof("failed to instance http request of url %s: %v", r.URL.String(), err) - InternalError(w) - return - } - for k, vv := range r.Header { - for _, v := range vv { - request.Header.Add(k, v) - } - } - - response, err := util_http.GetGlobalHttpClient().Do(request) - if err != nil { - glog.V(0).Infof("request remote url %s: %v", r.URL.String(), err) - InternalError(w) - return - } - defer util_http.CloseResponse(response) - // proxy target response to client - for k, vv := range response.Header { - for _, v := range vv { - w.Header().Add(k, v) - } - } - w.WriteHeader(response.StatusCode) - buf := mem.Allocate(128 * 1024) - defer mem.Free(buf) - io.CopyBuffer(w, response.Body, buf) - return - } else { - // redirect - rawURL, _ := util_http.NormalizeUrl(lookupResult.Locations[0].PublicUrl) - u, _ := url.Parse(rawURL) - u.Path = fmt.Sprintf("%s/%s,%s", u.Path, vid, fid) - arg := url.Values{} - if c := r.FormValue("collection"); c != "" { - arg.Set("collection", c) - } - u.RawQuery = arg.Encode() - http.Redirect(w, r, u.String(), http.StatusMovedPermanently) - return - } + vs.proxyReqToTargetServer(w, r) + return } cookie := n.Cookie @@ -145,14 +176,18 @@ func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request) memoryCost = size atomic.AddInt64(&vs.inFlightDownloadDataSize, int64(memoryCost)) } + if hasVolume { count, err = vs.store.ReadVolumeNeedle(volumeId, n, readOption, onReadSizeFn) } else if hasEcVolume { count, err = vs.store.ReadEcShardNeedle(volumeId, n, onReadSizeFn) } + defer func() { atomic.AddInt64(&vs.inFlightDownloadDataSize, -int64(memoryCost)) - vs.inFlightDownloadDataLimitCond.Signal() + if vs.concurrentDownloadLimit != 0 { + vs.inFlightDownloadDataLimitCond.Broadcast() + } }() if err != nil && err != storage.ErrorDeleted && hasVolume { diff --git a/weed/stats/metrics.go b/weed/stats/metrics.go index 26164ffc3..2723e253f 100644 --- a/weed/stats/metrics.go +++ b/weed/stats/metrics.go @@ -278,6 +278,38 @@ var ( Help: "Resource usage", }, []string{"name", "type"}) + VolumeServerConcurrentDownloadLimit = prometheus.NewGauge( + prometheus.GaugeOpts{ + Namespace: Namespace, + Subsystem: "volumeServer", + Name: "concurrent_download_limit", + Help: "Limit total concurrent download size.", + }) + + VolumeServerConcurrentUploadLimit = prometheus.NewGauge( + prometheus.GaugeOpts{ + Namespace: Namespace, + Subsystem: "volumeServer", + Name: "concurrent_upload_limit", + Help: "Limit total concurrent upload size.", + }) + + VolumeServerInFlightDownloadSize = prometheus.NewGauge( + prometheus.GaugeOpts{ + Namespace: Namespace, + Subsystem: "volumeServer", + Name: "in_flight_download_size", + Help: "In flight total download size.", + }) + + VolumeServerInFlightUploadSize = prometheus.NewGauge( + prometheus.GaugeOpts{ + Namespace: Namespace, + Subsystem: "volumeServer", + Name: "in_flight_upload_size", + Help: "In flight total upload size.", + }) + S3RequestCounter = prometheus.NewCounterVec( prometheus.CounterOpts{ Namespace: Namespace, @@ -387,6 +419,10 @@ func init() { Gather.MustRegister(VolumeServerReadOnlyVolumeGauge) Gather.MustRegister(VolumeServerDiskSizeGauge) Gather.MustRegister(VolumeServerResourceGauge) + Gather.MustRegister(VolumeServerConcurrentDownloadLimit) + Gather.MustRegister(VolumeServerConcurrentUploadLimit) + Gather.MustRegister(VolumeServerInFlightDownloadSize) + Gather.MustRegister(VolumeServerInFlightUploadSize) Gather.MustRegister(S3RequestCounter) Gather.MustRegister(S3HandlerCounter) diff --git a/weed/stats/metrics_names.go b/weed/stats/metrics_names.go index 13f491513..6c2c50ff0 100644 --- a/weed/stats/metrics_names.go +++ b/weed/stats/metrics_names.go @@ -4,8 +4,15 @@ package stats // The naming convention is ErrorSomeThing = "error.some.thing" const ( // volume server - WriteToLocalDisk = "writeToLocalDisk" - WriteToReplicas = "writeToReplicas" + WriteToLocalDisk = "writeToLocalDisk" + WriteToReplicas = "writeToReplicas" + DownloadLimitCond = "downloadLimitCondition" + UploadLimitCond = "uploadLimitCondition" + ReadProxyReq = "readProxyRequest" + ReadRedirectReq = "readRedirectRequest" + EmptyReadProxyLoc = "emptyReadProxyLocaction" + FailedReadProxyReq = "failedReadProxyRequest" + ErrorSizeMismatchOffsetSize = "errorSizeMismatchOffsetSize" ErrorSizeMismatch = "errorSizeMismatch" ErrorCRC = "errorCRC" diff --git a/weed/util/cond_wait.go b/weed/util/cond_wait.go new file mode 100644 index 000000000..43b45de7c --- /dev/null +++ b/weed/util/cond_wait.go @@ -0,0 +1,32 @@ +package util + +import ( + "context" + "net/http" + "sync" + "time" +) + +const HttpStatusCancelled = 499 + +func WaitWithTimeout(ctx context.Context, cond *sync.Cond, timer *time.Timer) int { + waitDone := make(chan struct{}) + + go func() { + cond.L.Lock() + defer cond.L.Unlock() + cond.Wait() + defer close(waitDone) + }() + + select { + case <-waitDone: + return http.StatusOK + case <-timer.C: + cond.Broadcast() + return http.StatusTooManyRequests + case <-ctx.Done(): + cond.Broadcast() + return HttpStatusCancelled + } +} diff --git a/weed/util/version/constants.go b/weed/util/version/constants.go index 756c0e2cb..8bef49820 100644 --- a/weed/util/version/constants.go +++ b/weed/util/version/constants.go @@ -6,8 +6,6 @@ import ( "github.com/seaweedfs/seaweedfs/weed/util" ) -const HttpStatusCancelled = 499 - var ( MAJOR_VERSION = int32(3) MINOR_VERSION = int32(92)