Browse Source

metrics: add Prometheus metrics for concurrent upload tracking

Add Prometheus metrics to monitor concurrent upload activity for both
filer and S3 servers. This provides visibility into the upload limiting
feature added in the previous PR.

New Metrics:
- SeaweedFS_filer_in_flight_upload_bytes: Current bytes being uploaded to filer
- SeaweedFS_filer_in_flight_upload_count: Current number of uploads to filer
- SeaweedFS_s3_in_flight_upload_bytes: Current bytes being uploaded to S3
- SeaweedFS_s3_in_flight_upload_count: Current number of uploads to S3

The metrics are updated atomically whenever uploads start or complete,
providing real-time visibility into upload concurrency levels.

This helps operators:
- Monitor upload concurrency in real-time
- Set appropriate limits based on actual usage patterns
- Detect potential bottlenecks or capacity issues
- Track the effectiveness of upload limiting configuration
pull/7555/head
Chris Lu 1 week ago
parent
commit
388a3037ac
  1. 14
      weed/s3api/s3api_circuit_breaker.go
  2. 6
      weed/server/filer_server_handlers.go
  3. 36
      weed/stats/metrics.go

14
weed/s3api/s3api_circuit_breaker.go

@ -3,6 +3,10 @@ package s3api
import (
"errors"
"fmt"
"net/http"
"sync"
"sync/atomic"
"github.com/gorilla/mux"
"github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/glog"
@ -11,9 +15,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/pb/s3_pb"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3err"
"net/http"
"sync"
"sync/atomic"
"github.com/seaweedfs/seaweedfs/weed/stats"
)
type CircuitBreaker struct {
@ -123,10 +125,16 @@ func (cb *CircuitBreaker) Limit(f func(w http.ResponseWriter, r *http.Request),
// Increment counters
atomic.AddInt64(&cb.s3a.inFlightUploads, 1)
atomic.AddInt64(&cb.s3a.inFlightDataSize, contentLength)
// Update metrics
stats.S3InFlightUploadCountGauge.Set(float64(atomic.LoadInt64(&cb.s3a.inFlightUploads)))
stats.S3InFlightUploadBytesGauge.Set(float64(atomic.LoadInt64(&cb.s3a.inFlightDataSize)))
defer func() {
// Decrement counters
atomic.AddInt64(&cb.s3a.inFlightUploads, -1)
atomic.AddInt64(&cb.s3a.inFlightDataSize, -contentLength)
// Update metrics
stats.S3InFlightUploadCountGauge.Set(float64(atomic.LoadInt64(&cb.s3a.inFlightUploads)))
stats.S3InFlightUploadBytesGauge.Set(float64(atomic.LoadInt64(&cb.s3a.inFlightDataSize)))
cb.s3a.inFlightDataLimitCond.Signal()
}()
}

6
weed/server/filer_server_handlers.go

@ -114,10 +114,16 @@ func (fs *FilerServer) filerHandler(w http.ResponseWriter, r *http.Request) {
// Increment counters
atomic.AddInt64(&fs.inFlightUploads, 1)
atomic.AddInt64(&fs.inFlightDataSize, contentLength)
// Update metrics
stats.FilerInFlightUploadCountGauge.Set(float64(atomic.LoadInt64(&fs.inFlightUploads)))
stats.FilerInFlightUploadBytesGauge.Set(float64(atomic.LoadInt64(&fs.inFlightDataSize)))
defer func() {
// Decrement counters
atomic.AddInt64(&fs.inFlightUploads, -1)
atomic.AddInt64(&fs.inFlightDataSize, -contentLength)
// Update metrics
stats.FilerInFlightUploadCountGauge.Set(float64(atomic.LoadInt64(&fs.inFlightUploads)))
stats.FilerInFlightUploadBytesGauge.Set(float64(atomic.LoadInt64(&fs.inFlightDataSize)))
fs.inFlightDataLimitCond.Signal()
}()

36
weed/stats/metrics.go

@ -167,6 +167,22 @@ var (
Help: "Current number of in-flight requests being handled by filer.",
}, []string{"type"})
FilerInFlightUploadBytesGauge = prometheus.NewGauge(
prometheus.GaugeOpts{
Namespace: Namespace,
Subsystem: "filer",
Name: "in_flight_upload_bytes",
Help: "Current number of bytes being uploaded to filer.",
})
FilerInFlightUploadCountGauge = prometheus.NewGauge(
prometheus.GaugeOpts{
Namespace: Namespace,
Subsystem: "filer",
Name: "in_flight_upload_count",
Help: "Current number of uploads in progress to filer.",
})
FilerServerLastSendTsOfSubscribeGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: Namespace,
@ -371,6 +387,22 @@ var (
Help: "Current number of in-flight requests being handled by s3.",
}, []string{"type"})
S3InFlightUploadBytesGauge = prometheus.NewGauge(
prometheus.GaugeOpts{
Namespace: Namespace,
Subsystem: "s3",
Name: "in_flight_upload_bytes",
Help: "Current number of bytes being uploaded to S3.",
})
S3InFlightUploadCountGauge = prometheus.NewGauge(
prometheus.GaugeOpts{
Namespace: Namespace,
Subsystem: "s3",
Name: "in_flight_upload_count",
Help: "Current number of uploads in progress to S3.",
})
S3BucketTrafficReceivedBytesCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: Namespace,
@ -422,6 +454,8 @@ func init() {
Gather.MustRegister(FilerHandlerCounter)
Gather.MustRegister(FilerRequestHistogram)
Gather.MustRegister(FilerInFlightRequestsGauge)
Gather.MustRegister(FilerInFlightUploadBytesGauge)
Gather.MustRegister(FilerInFlightUploadCountGauge)
Gather.MustRegister(FilerStoreCounter)
Gather.MustRegister(FilerStoreHistogram)
Gather.MustRegister(FilerSyncOffsetGauge)
@ -450,6 +484,8 @@ func init() {
Gather.MustRegister(S3HandlerCounter)
Gather.MustRegister(S3RequestHistogram)
Gather.MustRegister(S3InFlightRequestsGauge)
Gather.MustRegister(S3InFlightUploadBytesGauge)
Gather.MustRegister(S3InFlightUploadCountGauge)
Gather.MustRegister(S3TimeToFirstByteHistogram)
Gather.MustRegister(S3BucketTrafficReceivedBytesCounter)
Gather.MustRegister(S3BucketTrafficSentBytesCounter)

Loading…
Cancel
Save