Browse Source

s3: add ConcurrentFileUploadLimit option to limit number of concurrent uploads

This adds a new configuration option ConcurrentFileUploadLimit that limits
the number of concurrent file uploads based on file count, complementing
the existing ConcurrentUploadLimit which limits based on total data size.

This addresses an OOM vulnerability where requests with missing/zero
Content-Length headers could bypass the size-based rate limiter.

Changes:
- Add ConcurrentUploadLimit and ConcurrentFileUploadLimit fields to S3ApiServerOption
- Add inFlightDataSize, inFlightUploads, and inFlightDataLimitCond to S3ApiServer
- Add s3a reference to CircuitBreaker for upload limiting
- Enhance CircuitBreaker.Limit() to apply upload limiting for write actions
- Add -concurrentUploadLimitMB and -concurrentFileUploadLimit command line flags
- Add s3.concurrentUploadLimitMB and s3.concurrentFileUploadLimit flags to filer command

The upload limiting is integrated into the existing CircuitBreaker.Limit()
function, avoiding creation of new wrapper functions and reusing the existing
handler registration pattern.

Fixes #7529
pull/7554/head
Chris Lu 1 week ago
parent
commit
656372e634
  1. 2
      weed/command/filer.go
  2. 6
      weed/command/s3.go
  3. 43
      weed/s3api/s3api_circuit_breaker.go
  4. 10
      weed/s3api/s3api_server.go

2
weed/command/filer.go

@ -129,6 +129,8 @@ func init() {
filerS3Options.tlsVerifyClientCert = cmdFiler.Flag.Bool("s3.tlsVerifyClientCert", false, "whether to verify the client's certificate") filerS3Options.tlsVerifyClientCert = cmdFiler.Flag.Bool("s3.tlsVerifyClientCert", false, "whether to verify the client's certificate")
filerS3Options.bindIp = cmdFiler.Flag.String("s3.ip.bind", "", "ip address to bind to. If empty, default to same as -ip.bind option.") filerS3Options.bindIp = cmdFiler.Flag.String("s3.ip.bind", "", "ip address to bind to. If empty, default to same as -ip.bind option.")
filerS3Options.idleTimeout = cmdFiler.Flag.Int("s3.idleTimeout", 10, "connection idle seconds") filerS3Options.idleTimeout = cmdFiler.Flag.Int("s3.idleTimeout", 10, "connection idle seconds")
filerS3Options.concurrentUploadLimitMB = cmdFiler.Flag.Int("s3.concurrentUploadLimitMB", 128, "limit total concurrent upload size for S3")
filerS3Options.concurrentFileUploadLimit = cmdFiler.Flag.Int("s3.concurrentFileUploadLimit", 0, "limit number of concurrent file uploads for S3, 0 means unlimited")
// start webdav on filer // start webdav on filer
filerStartWebDav = cmdFiler.Flag.Bool("webdav", false, "whether to start webdav gateway") filerStartWebDav = cmdFiler.Flag.Bool("webdav", false, "whether to start webdav gateway")

6
weed/command/s3.go

@ -57,6 +57,8 @@ type S3Options struct {
localSocket *string localSocket *string
certProvider certprovider.Provider certProvider certprovider.Provider
idleTimeout *int idleTimeout *int
concurrentUploadLimitMB *int
concurrentFileUploadLimit *int
} }
func init() { func init() {
@ -83,6 +85,8 @@ func init() {
s3StandaloneOptions.localFilerSocket = cmdS3.Flag.String("localFilerSocket", "", "local filer socket path") s3StandaloneOptions.localFilerSocket = cmdS3.Flag.String("localFilerSocket", "", "local filer socket path")
s3StandaloneOptions.localSocket = cmdS3.Flag.String("localSocket", "", "default to /tmp/seaweedfs-s3-<port>.sock") s3StandaloneOptions.localSocket = cmdS3.Flag.String("localSocket", "", "default to /tmp/seaweedfs-s3-<port>.sock")
s3StandaloneOptions.idleTimeout = cmdS3.Flag.Int("idleTimeout", 10, "connection idle seconds") s3StandaloneOptions.idleTimeout = cmdS3.Flag.Int("idleTimeout", 10, "connection idle seconds")
s3StandaloneOptions.concurrentUploadLimitMB = cmdS3.Flag.Int("concurrentUploadLimitMB", 128, "limit total concurrent upload size")
s3StandaloneOptions.concurrentFileUploadLimit = cmdS3.Flag.Int("concurrentFileUploadLimit", 0, "limit number of concurrent file uploads, 0 means unlimited")
} }
var cmdS3 = &Command{ var cmdS3 = &Command{
@ -275,6 +279,8 @@ func (s3opt *S3Options) startS3Server() bool {
DataCenter: *s3opt.dataCenter, DataCenter: *s3opt.dataCenter,
FilerGroup: filerGroup, FilerGroup: filerGroup,
IamConfig: iamConfigPath, // Advanced IAM config (optional) IamConfig: iamConfigPath, // Advanced IAM config (optional)
ConcurrentUploadLimit: int64(*s3opt.concurrentUploadLimitMB) * 1024 * 1024,
ConcurrentFileUploadLimit: int64(*s3opt.concurrentFileUploadLimit),
}) })
if s3ApiServer_err != nil { if s3ApiServer_err != nil {
glog.Fatalf("S3 API Server startup error: %v", s3ApiServer_err) glog.Fatalf("S3 API Server startup error: %v", s3ApiServer_err)

43
weed/s3api/s3api_circuit_breaker.go

@ -21,6 +21,7 @@ type CircuitBreaker struct {
Enabled bool Enabled bool
counters map[string]*int64 counters map[string]*int64
limitations map[string]int64 limitations map[string]int64
s3a *S3ApiServer
} }
func NewCircuitBreaker(option *S3ApiServerOption) *CircuitBreaker { func NewCircuitBreaker(option *S3ApiServerOption) *CircuitBreaker {
@ -89,6 +90,48 @@ func (cb *CircuitBreaker) loadCircuitBreakerConfig(cfg *s3_pb.S3CircuitBreakerCo
func (cb *CircuitBreaker) Limit(f func(w http.ResponseWriter, r *http.Request), action string) (http.HandlerFunc, Action) { func (cb *CircuitBreaker) Limit(f func(w http.ResponseWriter, r *http.Request), action string) (http.HandlerFunc, Action) {
return func(w http.ResponseWriter, r *http.Request) { return func(w http.ResponseWriter, r *http.Request) {
// Apply upload limiting for write actions if configured
if cb.s3a != nil && (action == s3_constants.ACTION_WRITE) &&
(cb.s3a.option.ConcurrentUploadLimit != 0 || cb.s3a.option.ConcurrentFileUploadLimit != 0) {
// Get content length, default to 0 if not provided
contentLength := r.ContentLength
if contentLength < 0 {
contentLength = 0
}
// Wait until in flight data is less than the limit
cb.s3a.inFlightDataLimitCond.L.Lock()
inFlightDataSize := atomic.LoadInt64(&cb.s3a.inFlightDataSize)
inFlightUploads := atomic.LoadInt64(&cb.s3a.inFlightUploads)
// Wait if either data size limit or file count limit is exceeded
for (cb.s3a.option.ConcurrentUploadLimit != 0 && inFlightDataSize > cb.s3a.option.ConcurrentUploadLimit) ||
(cb.s3a.option.ConcurrentFileUploadLimit != 0 && inFlightUploads >= cb.s3a.option.ConcurrentFileUploadLimit) {
if (cb.s3a.option.ConcurrentUploadLimit != 0 && inFlightDataSize > cb.s3a.option.ConcurrentUploadLimit) {
glog.V(4).Infof("wait because inflight data %d > %d", inFlightDataSize, cb.s3a.option.ConcurrentUploadLimit)
}
if (cb.s3a.option.ConcurrentFileUploadLimit != 0 && inFlightUploads >= cb.s3a.option.ConcurrentFileUploadLimit) {
glog.V(4).Infof("wait because inflight uploads %d >= %d", inFlightUploads, cb.s3a.option.ConcurrentFileUploadLimit)
}
cb.s3a.inFlightDataLimitCond.Wait()
inFlightDataSize = atomic.LoadInt64(&cb.s3a.inFlightDataSize)
inFlightUploads = atomic.LoadInt64(&cb.s3a.inFlightUploads)
}
cb.s3a.inFlightDataLimitCond.L.Unlock()
// Increment counters
atomic.AddInt64(&cb.s3a.inFlightUploads, 1)
atomic.AddInt64(&cb.s3a.inFlightDataSize, contentLength)
defer func() {
// Decrement counters
atomic.AddInt64(&cb.s3a.inFlightUploads, -1)
atomic.AddInt64(&cb.s3a.inFlightDataSize, -contentLength)
cb.s3a.inFlightDataLimitCond.Signal()
}()
}
// Apply circuit breaker logic
if !cb.Enabled { if !cb.Enabled {
f(w, r) f(w, r)
return return

10
weed/s3api/s3api_server.go

@ -9,6 +9,7 @@ import (
"os" "os"
"slices" "slices"
"strings" "strings"
"sync"
"time" "time"
"github.com/gorilla/mux" "github.com/gorilla/mux"
@ -48,6 +49,8 @@ type S3ApiServerOption struct {
DataCenter string DataCenter string
FilerGroup string FilerGroup string
IamConfig string // Advanced IAM configuration file path IamConfig string // Advanced IAM configuration file path
ConcurrentUploadLimit int64
ConcurrentFileUploadLimit int64
} }
type S3ApiServer struct { type S3ApiServer struct {
@ -64,6 +67,9 @@ type S3ApiServer struct {
credentialManager *credential.CredentialManager credentialManager *credential.CredentialManager
bucketConfigCache *BucketConfigCache bucketConfigCache *BucketConfigCache
policyEngine *BucketPolicyEngine // Engine for evaluating bucket policies policyEngine *BucketPolicyEngine // Engine for evaluating bucket policies
inFlightDataSize int64
inFlightUploads int64
inFlightDataLimitCond *sync.Cond
} }
func NewS3ApiServer(router *mux.Router, option *S3ApiServerOption) (s3ApiServer *S3ApiServer, err error) { func NewS3ApiServer(router *mux.Router, option *S3ApiServerOption) (s3ApiServer *S3ApiServer, err error) {
@ -144,8 +150,12 @@ func NewS3ApiServerWithStore(router *mux.Router, option *S3ApiServerOption, expl
credentialManager: iam.credentialManager, credentialManager: iam.credentialManager,
bucketConfigCache: NewBucketConfigCache(60 * time.Minute), // Increased TTL since cache is now event-driven bucketConfigCache: NewBucketConfigCache(60 * time.Minute), // Increased TTL since cache is now event-driven
policyEngine: policyEngine, // Initialize bucket policy engine policyEngine: policyEngine, // Initialize bucket policy engine
inFlightDataLimitCond: sync.NewCond(new(sync.Mutex)),
} }
// Set s3a reference in circuit breaker for upload limiting
s3ApiServer.cb.s3a = s3ApiServer
// Pass policy engine to IAM for bucket policy evaluation // Pass policy engine to IAM for bucket policy evaluation
// This avoids circular dependency by not passing the entire S3ApiServer // This avoids circular dependency by not passing the entire S3ApiServer
iam.policyEngine = policyEngine iam.policyEngine = policyEngine

Loading…
Cancel
Save