From 656372e6342336e9053aaf7205669036515eaf58 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Wed, 26 Nov 2025 10:04:28 -0800 Subject: [PATCH] s3: add ConcurrentFileUploadLimit option to limit number of concurrent uploads This adds a new configuration option ConcurrentFileUploadLimit that limits the number of concurrent file uploads based on file count, complementing the existing ConcurrentUploadLimit which limits based on total data size. This addresses an OOM vulnerability where requests with missing/zero Content-Length headers could bypass the size-based rate limiter. Changes: - Add ConcurrentUploadLimit and ConcurrentFileUploadLimit fields to S3ApiServerOption - Add inFlightDataSize, inFlightUploads, and inFlightDataLimitCond to S3ApiServer - Add s3a reference to CircuitBreaker for upload limiting - Enhance CircuitBreaker.Limit() to apply upload limiting for write actions - Add -concurrentUploadLimitMB and -concurrentFileUploadLimit command line flags - Add s3.concurrentUploadLimitMB and s3.concurrentFileUploadLimit flags to filer command The upload limiting is integrated into the existing CircuitBreaker.Limit() function, avoiding creation of new wrapper functions and reusing the existing handler registration pattern. Fixes #7529 --- weed/command/filer.go | 2 ++ weed/command/s3.go | 6 ++++ weed/s3api/s3api_circuit_breaker.go | 43 ++++++++++++++++++++++++ weed/s3api/s3api_server.go | 52 +++++++++++++++++------------ 4 files changed, 82 insertions(+), 21 deletions(-) diff --git a/weed/command/filer.go b/weed/command/filer.go index f26ff9134..86991a181 100644 --- a/weed/command/filer.go +++ b/weed/command/filer.go @@ -129,6 +129,8 @@ func init() { filerS3Options.tlsVerifyClientCert = cmdFiler.Flag.Bool("s3.tlsVerifyClientCert", false, "whether to verify the client's certificate") filerS3Options.bindIp = cmdFiler.Flag.String("s3.ip.bind", "", "ip address to bind to. If empty, default to same as -ip.bind option.") filerS3Options.idleTimeout = cmdFiler.Flag.Int("s3.idleTimeout", 10, "connection idle seconds") + filerS3Options.concurrentUploadLimitMB = cmdFiler.Flag.Int("s3.concurrentUploadLimitMB", 128, "limit total concurrent upload size for S3") + filerS3Options.concurrentFileUploadLimit = cmdFiler.Flag.Int("s3.concurrentFileUploadLimit", 0, "limit number of concurrent file uploads for S3, 0 means unlimited") // start webdav on filer filerStartWebDav = cmdFiler.Flag.Bool("webdav", false, "whether to start webdav gateway") diff --git a/weed/command/s3.go b/weed/command/s3.go index 995d15f8a..61222336b 100644 --- a/weed/command/s3.go +++ b/weed/command/s3.go @@ -57,6 +57,8 @@ type S3Options struct { localSocket *string certProvider certprovider.Provider idleTimeout *int + concurrentUploadLimitMB *int + concurrentFileUploadLimit *int } func init() { @@ -83,6 +85,8 @@ func init() { s3StandaloneOptions.localFilerSocket = cmdS3.Flag.String("localFilerSocket", "", "local filer socket path") s3StandaloneOptions.localSocket = cmdS3.Flag.String("localSocket", "", "default to /tmp/seaweedfs-s3-.sock") s3StandaloneOptions.idleTimeout = cmdS3.Flag.Int("idleTimeout", 10, "connection idle seconds") + s3StandaloneOptions.concurrentUploadLimitMB = cmdS3.Flag.Int("concurrentUploadLimitMB", 128, "limit total concurrent upload size") + s3StandaloneOptions.concurrentFileUploadLimit = cmdS3.Flag.Int("concurrentFileUploadLimit", 0, "limit number of concurrent file uploads, 0 means unlimited") } var cmdS3 = &Command{ @@ -275,6 +279,8 @@ func (s3opt *S3Options) startS3Server() bool { DataCenter: *s3opt.dataCenter, FilerGroup: filerGroup, IamConfig: iamConfigPath, // Advanced IAM config (optional) + ConcurrentUploadLimit: int64(*s3opt.concurrentUploadLimitMB) * 1024 * 1024, + ConcurrentFileUploadLimit: int64(*s3opt.concurrentFileUploadLimit), }) if s3ApiServer_err != nil { glog.Fatalf("S3 API Server startup error: %v", s3ApiServer_err) diff --git a/weed/s3api/s3api_circuit_breaker.go b/weed/s3api/s3api_circuit_breaker.go index 2f5e1f580..3c4f55a23 100644 --- a/weed/s3api/s3api_circuit_breaker.go +++ b/weed/s3api/s3api_circuit_breaker.go @@ -21,6 +21,7 @@ type CircuitBreaker struct { Enabled bool counters map[string]*int64 limitations map[string]int64 + s3a *S3ApiServer } func NewCircuitBreaker(option *S3ApiServerOption) *CircuitBreaker { @@ -89,6 +90,48 @@ func (cb *CircuitBreaker) loadCircuitBreakerConfig(cfg *s3_pb.S3CircuitBreakerCo func (cb *CircuitBreaker) Limit(f func(w http.ResponseWriter, r *http.Request), action string) (http.HandlerFunc, Action) { return func(w http.ResponseWriter, r *http.Request) { + // Apply upload limiting for write actions if configured + if cb.s3a != nil && (action == s3_constants.ACTION_WRITE) && + (cb.s3a.option.ConcurrentUploadLimit != 0 || cb.s3a.option.ConcurrentFileUploadLimit != 0) { + + // Get content length, default to 0 if not provided + contentLength := r.ContentLength + if contentLength < 0 { + contentLength = 0 + } + + // Wait until in flight data is less than the limit + cb.s3a.inFlightDataLimitCond.L.Lock() + inFlightDataSize := atomic.LoadInt64(&cb.s3a.inFlightDataSize) + inFlightUploads := atomic.LoadInt64(&cb.s3a.inFlightUploads) + + // Wait if either data size limit or file count limit is exceeded + for (cb.s3a.option.ConcurrentUploadLimit != 0 && inFlightDataSize > cb.s3a.option.ConcurrentUploadLimit) || + (cb.s3a.option.ConcurrentFileUploadLimit != 0 && inFlightUploads >= cb.s3a.option.ConcurrentFileUploadLimit) { + if (cb.s3a.option.ConcurrentUploadLimit != 0 && inFlightDataSize > cb.s3a.option.ConcurrentUploadLimit) { + glog.V(4).Infof("wait because inflight data %d > %d", inFlightDataSize, cb.s3a.option.ConcurrentUploadLimit) + } + if (cb.s3a.option.ConcurrentFileUploadLimit != 0 && inFlightUploads >= cb.s3a.option.ConcurrentFileUploadLimit) { + glog.V(4).Infof("wait because inflight uploads %d >= %d", inFlightUploads, cb.s3a.option.ConcurrentFileUploadLimit) + } + cb.s3a.inFlightDataLimitCond.Wait() + inFlightDataSize = atomic.LoadInt64(&cb.s3a.inFlightDataSize) + inFlightUploads = atomic.LoadInt64(&cb.s3a.inFlightUploads) + } + cb.s3a.inFlightDataLimitCond.L.Unlock() + + // Increment counters + atomic.AddInt64(&cb.s3a.inFlightUploads, 1) + atomic.AddInt64(&cb.s3a.inFlightDataSize, contentLength) + defer func() { + // Decrement counters + atomic.AddInt64(&cb.s3a.inFlightUploads, -1) + atomic.AddInt64(&cb.s3a.inFlightDataSize, -contentLength) + cb.s3a.inFlightDataLimitCond.Signal() + }() + } + + // Apply circuit breaker logic if !cb.Enabled { f(w, r) return diff --git a/weed/s3api/s3api_server.go b/weed/s3api/s3api_server.go index dcf3a55f2..a1a3f100b 100644 --- a/weed/s3api/s3api_server.go +++ b/weed/s3api/s3api_server.go @@ -9,6 +9,7 @@ import ( "os" "slices" "strings" + "sync" "time" "github.com/gorilla/mux" @@ -48,22 +49,27 @@ type S3ApiServerOption struct { DataCenter string FilerGroup string IamConfig string // Advanced IAM configuration file path + ConcurrentUploadLimit int64 + ConcurrentFileUploadLimit int64 } type S3ApiServer struct { s3_pb.UnimplementedSeaweedS3Server - option *S3ApiServerOption - iam *IdentityAccessManagement - iamIntegration *S3IAMIntegration // Advanced IAM integration for JWT authentication - cb *CircuitBreaker - randomClientId int32 - filerGuard *security.Guard - filerClient *wdclient.FilerClient - client util_http_client.HTTPClientInterface - bucketRegistry *BucketRegistry - credentialManager *credential.CredentialManager - bucketConfigCache *BucketConfigCache - policyEngine *BucketPolicyEngine // Engine for evaluating bucket policies + option *S3ApiServerOption + iam *IdentityAccessManagement + iamIntegration *S3IAMIntegration // Advanced IAM integration for JWT authentication + cb *CircuitBreaker + randomClientId int32 + filerGuard *security.Guard + filerClient *wdclient.FilerClient + client util_http_client.HTTPClientInterface + bucketRegistry *BucketRegistry + credentialManager *credential.CredentialManager + bucketConfigCache *BucketConfigCache + policyEngine *BucketPolicyEngine // Engine for evaluating bucket policies + inFlightDataSize int64 + inFlightUploads int64 + inFlightDataLimitCond *sync.Cond } func NewS3ApiServer(router *mux.Router, option *S3ApiServerOption) (s3ApiServer *S3ApiServer, err error) { @@ -135,17 +141,21 @@ func NewS3ApiServerWithStore(router *mux.Router, option *S3ApiServerOption, expl } s3ApiServer = &S3ApiServer{ - option: option, - iam: iam, - randomClientId: util.RandomInt32(), - filerGuard: security.NewGuard([]string{}, signingKey, expiresAfterSec, readSigningKey, readExpiresAfterSec), - filerClient: filerClient, - cb: NewCircuitBreaker(option), - credentialManager: iam.credentialManager, - bucketConfigCache: NewBucketConfigCache(60 * time.Minute), // Increased TTL since cache is now event-driven - policyEngine: policyEngine, // Initialize bucket policy engine + option: option, + iam: iam, + randomClientId: util.RandomInt32(), + filerGuard: security.NewGuard([]string{}, signingKey, expiresAfterSec, readSigningKey, readExpiresAfterSec), + filerClient: filerClient, + cb: NewCircuitBreaker(option), + credentialManager: iam.credentialManager, + bucketConfigCache: NewBucketConfigCache(60 * time.Minute), // Increased TTL since cache is now event-driven + policyEngine: policyEngine, // Initialize bucket policy engine + inFlightDataLimitCond: sync.NewCond(new(sync.Mutex)), } + // Set s3a reference in circuit breaker for upload limiting + s3ApiServer.cb.s3a = s3ApiServer + // Pass policy engine to IAM for bucket policy evaluation // This avoids circular dependency by not passing the entire S3ApiServer iam.policyEngine = policyEngine