diff --git a/weed/s3api/s3api_circuit_breaker.go b/weed/s3api/s3api_circuit_breaker.go index 3c4f55a23..f04b9541a 100644 --- a/weed/s3api/s3api_circuit_breaker.go +++ b/weed/s3api/s3api_circuit_breaker.go @@ -3,6 +3,10 @@ package s3api import ( "errors" "fmt" + "net/http" + "sync" + "sync/atomic" + "github.com/gorilla/mux" "github.com/seaweedfs/seaweedfs/weed/filer" "github.com/seaweedfs/seaweedfs/weed/glog" @@ -11,9 +15,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/pb/s3_pb" "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants" "github.com/seaweedfs/seaweedfs/weed/s3api/s3err" - "net/http" - "sync" - "sync/atomic" + "github.com/seaweedfs/seaweedfs/weed/stats" ) type CircuitBreaker struct { @@ -123,10 +125,16 @@ func (cb *CircuitBreaker) Limit(f func(w http.ResponseWriter, r *http.Request), // Increment counters atomic.AddInt64(&cb.s3a.inFlightUploads, 1) atomic.AddInt64(&cb.s3a.inFlightDataSize, contentLength) + // Update metrics + stats.S3InFlightUploadCountGauge.Set(float64(atomic.LoadInt64(&cb.s3a.inFlightUploads))) + stats.S3InFlightUploadBytesGauge.Set(float64(atomic.LoadInt64(&cb.s3a.inFlightDataSize))) defer func() { // Decrement counters atomic.AddInt64(&cb.s3a.inFlightUploads, -1) atomic.AddInt64(&cb.s3a.inFlightDataSize, -contentLength) + // Update metrics + stats.S3InFlightUploadCountGauge.Set(float64(atomic.LoadInt64(&cb.s3a.inFlightUploads))) + stats.S3InFlightUploadBytesGauge.Set(float64(atomic.LoadInt64(&cb.s3a.inFlightDataSize))) cb.s3a.inFlightDataLimitCond.Signal() }() } diff --git a/weed/server/filer_server_handlers.go b/weed/server/filer_server_handlers.go index a2eab9365..7e2ef64d2 100644 --- a/weed/server/filer_server_handlers.go +++ b/weed/server/filer_server_handlers.go @@ -114,10 +114,16 @@ func (fs *FilerServer) filerHandler(w http.ResponseWriter, r *http.Request) { // Increment counters atomic.AddInt64(&fs.inFlightUploads, 1) atomic.AddInt64(&fs.inFlightDataSize, contentLength) + // Update metrics + stats.FilerInFlightUploadCountGauge.Set(float64(atomic.LoadInt64(&fs.inFlightUploads))) + stats.FilerInFlightUploadBytesGauge.Set(float64(atomic.LoadInt64(&fs.inFlightDataSize))) defer func() { // Decrement counters atomic.AddInt64(&fs.inFlightUploads, -1) atomic.AddInt64(&fs.inFlightDataSize, -contentLength) + // Update metrics + stats.FilerInFlightUploadCountGauge.Set(float64(atomic.LoadInt64(&fs.inFlightUploads))) + stats.FilerInFlightUploadBytesGauge.Set(float64(atomic.LoadInt64(&fs.inFlightDataSize))) fs.inFlightDataLimitCond.Signal() }() diff --git a/weed/stats/metrics.go b/weed/stats/metrics.go index 95491abf1..914d59d28 100644 --- a/weed/stats/metrics.go +++ b/weed/stats/metrics.go @@ -167,6 +167,22 @@ var ( Help: "Current number of in-flight requests being handled by filer.", }, []string{"type"}) + FilerInFlightUploadBytesGauge = prometheus.NewGauge( + prometheus.GaugeOpts{ + Namespace: Namespace, + Subsystem: "filer", + Name: "in_flight_upload_bytes", + Help: "Current number of bytes being uploaded to filer.", + }) + + FilerInFlightUploadCountGauge = prometheus.NewGauge( + prometheus.GaugeOpts{ + Namespace: Namespace, + Subsystem: "filer", + Name: "in_flight_upload_count", + Help: "Current number of uploads in progress to filer.", + }) + FilerServerLastSendTsOfSubscribeGauge = prometheus.NewGaugeVec( prometheus.GaugeOpts{ Namespace: Namespace, @@ -371,6 +387,22 @@ var ( Help: "Current number of in-flight requests being handled by s3.", }, []string{"type"}) + S3InFlightUploadBytesGauge = prometheus.NewGauge( + prometheus.GaugeOpts{ + Namespace: Namespace, + Subsystem: "s3", + Name: "in_flight_upload_bytes", + Help: "Current number of bytes being uploaded to S3.", + }) + + S3InFlightUploadCountGauge = prometheus.NewGauge( + prometheus.GaugeOpts{ + Namespace: Namespace, + Subsystem: "s3", + Name: "in_flight_upload_count", + Help: "Current number of uploads in progress to S3.", + }) + S3BucketTrafficReceivedBytesCounter = prometheus.NewCounterVec( prometheus.CounterOpts{ Namespace: Namespace, @@ -422,6 +454,8 @@ func init() { Gather.MustRegister(FilerHandlerCounter) Gather.MustRegister(FilerRequestHistogram) Gather.MustRegister(FilerInFlightRequestsGauge) + Gather.MustRegister(FilerInFlightUploadBytesGauge) + Gather.MustRegister(FilerInFlightUploadCountGauge) Gather.MustRegister(FilerStoreCounter) Gather.MustRegister(FilerStoreHistogram) Gather.MustRegister(FilerSyncOffsetGauge) @@ -450,6 +484,8 @@ func init() { Gather.MustRegister(S3HandlerCounter) Gather.MustRegister(S3RequestHistogram) Gather.MustRegister(S3InFlightRequestsGauge) + Gather.MustRegister(S3InFlightUploadBytesGauge) + Gather.MustRegister(S3InFlightUploadCountGauge) Gather.MustRegister(S3TimeToFirstByteHistogram) Gather.MustRegister(S3BucketTrafficReceivedBytesCounter) Gather.MustRegister(S3BucketTrafficSentBytesCounter)