From 388a3037aca17aca06c3247ee9715a2d1d7cc8f3 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Wed, 26 Nov 2025 12:11:56 -0800 Subject: [PATCH] metrics: add Prometheus metrics for concurrent upload tracking Add Prometheus metrics to monitor concurrent upload activity for both filer and S3 servers. This provides visibility into the upload limiting feature added in the previous PR. New Metrics: - SeaweedFS_filer_in_flight_upload_bytes: Current bytes being uploaded to filer - SeaweedFS_filer_in_flight_upload_count: Current number of uploads to filer - SeaweedFS_s3_in_flight_upload_bytes: Current bytes being uploaded to S3 - SeaweedFS_s3_in_flight_upload_count: Current number of uploads to S3 The metrics are updated atomically whenever uploads start or complete, providing real-time visibility into upload concurrency levels. This helps operators: - Monitor upload concurrency in real-time - Set appropriate limits based on actual usage patterns - Detect potential bottlenecks or capacity issues - Track the effectiveness of upload limiting configuration --- weed/s3api/s3api_circuit_breaker.go | 14 ++++++++--- weed/server/filer_server_handlers.go | 6 +++++ weed/stats/metrics.go | 36 ++++++++++++++++++++++++++++ 3 files changed, 53 insertions(+), 3 deletions(-) diff --git a/weed/s3api/s3api_circuit_breaker.go b/weed/s3api/s3api_circuit_breaker.go index 3c4f55a23..f04b9541a 100644 --- a/weed/s3api/s3api_circuit_breaker.go +++ b/weed/s3api/s3api_circuit_breaker.go @@ -3,6 +3,10 @@ package s3api import ( "errors" "fmt" + "net/http" + "sync" + "sync/atomic" + "github.com/gorilla/mux" "github.com/seaweedfs/seaweedfs/weed/filer" "github.com/seaweedfs/seaweedfs/weed/glog" @@ -11,9 +15,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/pb/s3_pb" "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants" "github.com/seaweedfs/seaweedfs/weed/s3api/s3err" - "net/http" - "sync" - "sync/atomic" + "github.com/seaweedfs/seaweedfs/weed/stats" ) type CircuitBreaker struct { @@ -123,10 +125,16 @@ func (cb *CircuitBreaker) Limit(f func(w http.ResponseWriter, r *http.Request), // Increment counters atomic.AddInt64(&cb.s3a.inFlightUploads, 1) atomic.AddInt64(&cb.s3a.inFlightDataSize, contentLength) + // Update metrics + stats.S3InFlightUploadCountGauge.Set(float64(atomic.LoadInt64(&cb.s3a.inFlightUploads))) + stats.S3InFlightUploadBytesGauge.Set(float64(atomic.LoadInt64(&cb.s3a.inFlightDataSize))) defer func() { // Decrement counters atomic.AddInt64(&cb.s3a.inFlightUploads, -1) atomic.AddInt64(&cb.s3a.inFlightDataSize, -contentLength) + // Update metrics + stats.S3InFlightUploadCountGauge.Set(float64(atomic.LoadInt64(&cb.s3a.inFlightUploads))) + stats.S3InFlightUploadBytesGauge.Set(float64(atomic.LoadInt64(&cb.s3a.inFlightDataSize))) cb.s3a.inFlightDataLimitCond.Signal() }() } diff --git a/weed/server/filer_server_handlers.go b/weed/server/filer_server_handlers.go index a2eab9365..7e2ef64d2 100644 --- a/weed/server/filer_server_handlers.go +++ b/weed/server/filer_server_handlers.go @@ -114,10 +114,16 @@ func (fs *FilerServer) filerHandler(w http.ResponseWriter, r *http.Request) { // Increment counters atomic.AddInt64(&fs.inFlightUploads, 1) atomic.AddInt64(&fs.inFlightDataSize, contentLength) + // Update metrics + stats.FilerInFlightUploadCountGauge.Set(float64(atomic.LoadInt64(&fs.inFlightUploads))) + stats.FilerInFlightUploadBytesGauge.Set(float64(atomic.LoadInt64(&fs.inFlightDataSize))) defer func() { // Decrement counters atomic.AddInt64(&fs.inFlightUploads, -1) atomic.AddInt64(&fs.inFlightDataSize, -contentLength) + // Update metrics + stats.FilerInFlightUploadCountGauge.Set(float64(atomic.LoadInt64(&fs.inFlightUploads))) + stats.FilerInFlightUploadBytesGauge.Set(float64(atomic.LoadInt64(&fs.inFlightDataSize))) fs.inFlightDataLimitCond.Signal() }() diff --git a/weed/stats/metrics.go b/weed/stats/metrics.go index 95491abf1..914d59d28 100644 --- a/weed/stats/metrics.go +++ b/weed/stats/metrics.go @@ -167,6 +167,22 @@ var ( Help: "Current number of in-flight requests being handled by filer.", }, []string{"type"}) + FilerInFlightUploadBytesGauge = prometheus.NewGauge( + prometheus.GaugeOpts{ + Namespace: Namespace, + Subsystem: "filer", + Name: "in_flight_upload_bytes", + Help: "Current number of bytes being uploaded to filer.", + }) + + FilerInFlightUploadCountGauge = prometheus.NewGauge( + prometheus.GaugeOpts{ + Namespace: Namespace, + Subsystem: "filer", + Name: "in_flight_upload_count", + Help: "Current number of uploads in progress to filer.", + }) + FilerServerLastSendTsOfSubscribeGauge = prometheus.NewGaugeVec( prometheus.GaugeOpts{ Namespace: Namespace, @@ -371,6 +387,22 @@ var ( Help: "Current number of in-flight requests being handled by s3.", }, []string{"type"}) + S3InFlightUploadBytesGauge = prometheus.NewGauge( + prometheus.GaugeOpts{ + Namespace: Namespace, + Subsystem: "s3", + Name: "in_flight_upload_bytes", + Help: "Current number of bytes being uploaded to S3.", + }) + + S3InFlightUploadCountGauge = prometheus.NewGauge( + prometheus.GaugeOpts{ + Namespace: Namespace, + Subsystem: "s3", + Name: "in_flight_upload_count", + Help: "Current number of uploads in progress to S3.", + }) + S3BucketTrafficReceivedBytesCounter = prometheus.NewCounterVec( prometheus.CounterOpts{ Namespace: Namespace, @@ -422,6 +454,8 @@ func init() { Gather.MustRegister(FilerHandlerCounter) Gather.MustRegister(FilerRequestHistogram) Gather.MustRegister(FilerInFlightRequestsGauge) + Gather.MustRegister(FilerInFlightUploadBytesGauge) + Gather.MustRegister(FilerInFlightUploadCountGauge) Gather.MustRegister(FilerStoreCounter) Gather.MustRegister(FilerStoreHistogram) Gather.MustRegister(FilerSyncOffsetGauge) @@ -450,6 +484,8 @@ func init() { Gather.MustRegister(S3HandlerCounter) Gather.MustRegister(S3RequestHistogram) Gather.MustRegister(S3InFlightRequestsGauge) + Gather.MustRegister(S3InFlightUploadBytesGauge) + Gather.MustRegister(S3InFlightUploadCountGauge) Gather.MustRegister(S3TimeToFirstByteHistogram) Gather.MustRegister(S3BucketTrafficReceivedBytesCounter) Gather.MustRegister(S3BucketTrafficSentBytesCounter)