diff --git a/weed/command/filer.go b/weed/command/filer.go index f26ff9134..86991a181 100644 --- a/weed/command/filer.go +++ b/weed/command/filer.go @@ -129,6 +129,8 @@ func init() { filerS3Options.tlsVerifyClientCert = cmdFiler.Flag.Bool("s3.tlsVerifyClientCert", false, "whether to verify the client's certificate") filerS3Options.bindIp = cmdFiler.Flag.String("s3.ip.bind", "", "ip address to bind to. If empty, default to same as -ip.bind option.") filerS3Options.idleTimeout = cmdFiler.Flag.Int("s3.idleTimeout", 10, "connection idle seconds") + filerS3Options.concurrentUploadLimitMB = cmdFiler.Flag.Int("s3.concurrentUploadLimitMB", 128, "limit total concurrent upload size for S3") + filerS3Options.concurrentFileUploadLimit = cmdFiler.Flag.Int("s3.concurrentFileUploadLimit", 0, "limit number of concurrent file uploads for S3, 0 means unlimited") // start webdav on filer filerStartWebDav = cmdFiler.Flag.Bool("webdav", false, "whether to start webdav gateway") diff --git a/weed/command/s3.go b/weed/command/s3.go index 995d15f8a..61222336b 100644 --- a/weed/command/s3.go +++ b/weed/command/s3.go @@ -57,6 +57,8 @@ type S3Options struct { localSocket *string certProvider certprovider.Provider idleTimeout *int + concurrentUploadLimitMB *int + concurrentFileUploadLimit *int } func init() { @@ -83,6 +85,8 @@ func init() { s3StandaloneOptions.localFilerSocket = cmdS3.Flag.String("localFilerSocket", "", "local filer socket path") s3StandaloneOptions.localSocket = cmdS3.Flag.String("localSocket", "", "default to /tmp/seaweedfs-s3-.sock") s3StandaloneOptions.idleTimeout = cmdS3.Flag.Int("idleTimeout", 10, "connection idle seconds") + s3StandaloneOptions.concurrentUploadLimitMB = cmdS3.Flag.Int("concurrentUploadLimitMB", 128, "limit total concurrent upload size") + s3StandaloneOptions.concurrentFileUploadLimit = cmdS3.Flag.Int("concurrentFileUploadLimit", 0, "limit number of concurrent file uploads, 0 means unlimited") } var cmdS3 = &Command{ @@ -275,6 +279,8 @@ func (s3opt *S3Options) startS3Server() bool { DataCenter: *s3opt.dataCenter, FilerGroup: filerGroup, IamConfig: iamConfigPath, // Advanced IAM config (optional) + ConcurrentUploadLimit: int64(*s3opt.concurrentUploadLimitMB) * 1024 * 1024, + ConcurrentFileUploadLimit: int64(*s3opt.concurrentFileUploadLimit), }) if s3ApiServer_err != nil { glog.Fatalf("S3 API Server startup error: %v", s3ApiServer_err) diff --git a/weed/s3api/s3api_circuit_breaker.go b/weed/s3api/s3api_circuit_breaker.go index 2f5e1f580..3c4f55a23 100644 --- a/weed/s3api/s3api_circuit_breaker.go +++ b/weed/s3api/s3api_circuit_breaker.go @@ -21,6 +21,7 @@ type CircuitBreaker struct { Enabled bool counters map[string]*int64 limitations map[string]int64 + s3a *S3ApiServer } func NewCircuitBreaker(option *S3ApiServerOption) *CircuitBreaker { @@ -89,6 +90,48 @@ func (cb *CircuitBreaker) loadCircuitBreakerConfig(cfg *s3_pb.S3CircuitBreakerCo func (cb *CircuitBreaker) Limit(f func(w http.ResponseWriter, r *http.Request), action string) (http.HandlerFunc, Action) { return func(w http.ResponseWriter, r *http.Request) { + // Apply upload limiting for write actions if configured + if cb.s3a != nil && (action == s3_constants.ACTION_WRITE) && + (cb.s3a.option.ConcurrentUploadLimit != 0 || cb.s3a.option.ConcurrentFileUploadLimit != 0) { + + // Get content length, default to 0 if not provided + contentLength := r.ContentLength + if contentLength < 0 { + contentLength = 0 + } + + // Wait until in flight data is less than the limit + cb.s3a.inFlightDataLimitCond.L.Lock() + inFlightDataSize := atomic.LoadInt64(&cb.s3a.inFlightDataSize) + inFlightUploads := atomic.LoadInt64(&cb.s3a.inFlightUploads) + + // Wait if either data size limit or file count limit is exceeded + for (cb.s3a.option.ConcurrentUploadLimit != 0 && inFlightDataSize > cb.s3a.option.ConcurrentUploadLimit) || + (cb.s3a.option.ConcurrentFileUploadLimit != 0 && inFlightUploads >= cb.s3a.option.ConcurrentFileUploadLimit) { + if (cb.s3a.option.ConcurrentUploadLimit != 0 && inFlightDataSize > cb.s3a.option.ConcurrentUploadLimit) { + glog.V(4).Infof("wait because inflight data %d > %d", inFlightDataSize, cb.s3a.option.ConcurrentUploadLimit) + } + if (cb.s3a.option.ConcurrentFileUploadLimit != 0 && inFlightUploads >= cb.s3a.option.ConcurrentFileUploadLimit) { + glog.V(4).Infof("wait because inflight uploads %d >= %d", inFlightUploads, cb.s3a.option.ConcurrentFileUploadLimit) + } + cb.s3a.inFlightDataLimitCond.Wait() + inFlightDataSize = atomic.LoadInt64(&cb.s3a.inFlightDataSize) + inFlightUploads = atomic.LoadInt64(&cb.s3a.inFlightUploads) + } + cb.s3a.inFlightDataLimitCond.L.Unlock() + + // Increment counters + atomic.AddInt64(&cb.s3a.inFlightUploads, 1) + atomic.AddInt64(&cb.s3a.inFlightDataSize, contentLength) + defer func() { + // Decrement counters + atomic.AddInt64(&cb.s3a.inFlightUploads, -1) + atomic.AddInt64(&cb.s3a.inFlightDataSize, -contentLength) + cb.s3a.inFlightDataLimitCond.Signal() + }() + } + + // Apply circuit breaker logic if !cb.Enabled { f(w, r) return diff --git a/weed/s3api/s3api_server.go b/weed/s3api/s3api_server.go index dcf3a55f2..a1a3f100b 100644 --- a/weed/s3api/s3api_server.go +++ b/weed/s3api/s3api_server.go @@ -9,6 +9,7 @@ import ( "os" "slices" "strings" + "sync" "time" "github.com/gorilla/mux" @@ -48,22 +49,27 @@ type S3ApiServerOption struct { DataCenter string FilerGroup string IamConfig string // Advanced IAM configuration file path + ConcurrentUploadLimit int64 + ConcurrentFileUploadLimit int64 } type S3ApiServer struct { s3_pb.UnimplementedSeaweedS3Server - option *S3ApiServerOption - iam *IdentityAccessManagement - iamIntegration *S3IAMIntegration // Advanced IAM integration for JWT authentication - cb *CircuitBreaker - randomClientId int32 - filerGuard *security.Guard - filerClient *wdclient.FilerClient - client util_http_client.HTTPClientInterface - bucketRegistry *BucketRegistry - credentialManager *credential.CredentialManager - bucketConfigCache *BucketConfigCache - policyEngine *BucketPolicyEngine // Engine for evaluating bucket policies + option *S3ApiServerOption + iam *IdentityAccessManagement + iamIntegration *S3IAMIntegration // Advanced IAM integration for JWT authentication + cb *CircuitBreaker + randomClientId int32 + filerGuard *security.Guard + filerClient *wdclient.FilerClient + client util_http_client.HTTPClientInterface + bucketRegistry *BucketRegistry + credentialManager *credential.CredentialManager + bucketConfigCache *BucketConfigCache + policyEngine *BucketPolicyEngine // Engine for evaluating bucket policies + inFlightDataSize int64 + inFlightUploads int64 + inFlightDataLimitCond *sync.Cond } func NewS3ApiServer(router *mux.Router, option *S3ApiServerOption) (s3ApiServer *S3ApiServer, err error) { @@ -135,17 +141,21 @@ func NewS3ApiServerWithStore(router *mux.Router, option *S3ApiServerOption, expl } s3ApiServer = &S3ApiServer{ - option: option, - iam: iam, - randomClientId: util.RandomInt32(), - filerGuard: security.NewGuard([]string{}, signingKey, expiresAfterSec, readSigningKey, readExpiresAfterSec), - filerClient: filerClient, - cb: NewCircuitBreaker(option), - credentialManager: iam.credentialManager, - bucketConfigCache: NewBucketConfigCache(60 * time.Minute), // Increased TTL since cache is now event-driven - policyEngine: policyEngine, // Initialize bucket policy engine + option: option, + iam: iam, + randomClientId: util.RandomInt32(), + filerGuard: security.NewGuard([]string{}, signingKey, expiresAfterSec, readSigningKey, readExpiresAfterSec), + filerClient: filerClient, + cb: NewCircuitBreaker(option), + credentialManager: iam.credentialManager, + bucketConfigCache: NewBucketConfigCache(60 * time.Minute), // Increased TTL since cache is now event-driven + policyEngine: policyEngine, // Initialize bucket policy engine + inFlightDataLimitCond: sync.NewCond(new(sync.Mutex)), } + // Set s3a reference in circuit breaker for upload limiting + s3ApiServer.cb.s3a = s3ApiServer + // Pass policy engine to IAM for bucket policy evaluation // This avoids circular dependency by not passing the entire S3ApiServer iam.policyEngine = policyEngine