diff --git a/k8s/charts/seaweedfs/values.yaml b/k8s/charts/seaweedfs/values.yaml index 520323dce..bddfd622d 100644 --- a/k8s/charts/seaweedfs/values.yaml +++ b/k8s/charts/seaweedfs/values.yaml @@ -979,9 +979,9 @@ s3: extraEnvironmentVars: # Custom command line arguments to add to the s3 command - # Example to fix connection idle seconds: - extraArgs: ["-idleTimeout=30"] - # extraArgs: [] + # Default idleTimeout is 120 seconds. Example to customize: + # extraArgs: ["-idleTimeout=300"] + extraArgs: [] # used to configure livenessProbe on s3 containers # diff --git a/weed/command/filer.go b/weed/command/filer.go index 86991a181..bb7092543 100644 --- a/weed/command/filer.go +++ b/weed/command/filer.go @@ -128,7 +128,7 @@ func init() { filerS3Options.tlsCACertificate = cmdFiler.Flag.String("s3.cacert.file", "", "path to the TLS CA certificate file") filerS3Options.tlsVerifyClientCert = cmdFiler.Flag.Bool("s3.tlsVerifyClientCert", false, "whether to verify the client's certificate") filerS3Options.bindIp = cmdFiler.Flag.String("s3.ip.bind", "", "ip address to bind to. If empty, default to same as -ip.bind option.") - filerS3Options.idleTimeout = cmdFiler.Flag.Int("s3.idleTimeout", 10, "connection idle seconds") + filerS3Options.idleTimeout = cmdFiler.Flag.Int("s3.idleTimeout", 120, "connection idle seconds") filerS3Options.concurrentUploadLimitMB = cmdFiler.Flag.Int("s3.concurrentUploadLimitMB", 128, "limit total concurrent upload size for S3") filerS3Options.concurrentFileUploadLimit = cmdFiler.Flag.Int("s3.concurrentFileUploadLimit", 0, "limit number of concurrent file uploads for S3, 0 means unlimited") diff --git a/weed/command/s3.go b/weed/command/s3.go index 61222336b..ace6dd427 100644 --- a/weed/command/s3.go +++ b/weed/command/s3.go @@ -84,7 +84,7 @@ func init() { s3StandaloneOptions.allowDeleteBucketNotEmpty = cmdS3.Flag.Bool("allowDeleteBucketNotEmpty", true, "allow recursive deleting all entries along with bucket") s3StandaloneOptions.localFilerSocket = cmdS3.Flag.String("localFilerSocket", "", "local filer socket path") s3StandaloneOptions.localSocket = cmdS3.Flag.String("localSocket", "", "default to /tmp/seaweedfs-s3-.sock") - s3StandaloneOptions.idleTimeout = cmdS3.Flag.Int("idleTimeout", 10, "connection idle seconds") + s3StandaloneOptions.idleTimeout = cmdS3.Flag.Int("idleTimeout", 120, "connection idle seconds") s3StandaloneOptions.concurrentUploadLimitMB = cmdS3.Flag.Int("concurrentUploadLimitMB", 128, "limit total concurrent upload size") s3StandaloneOptions.concurrentFileUploadLimit = cmdS3.Flag.Int("concurrentFileUploadLimit", 0, "limit number of concurrent file uploads, 0 means unlimited") } diff --git a/weed/command/server.go b/weed/command/server.go index d729502f0..5683f1fc5 100644 --- a/weed/command/server.go +++ b/weed/command/server.go @@ -169,7 +169,7 @@ func init() { s3Options.allowDeleteBucketNotEmpty = cmdServer.Flag.Bool("s3.allowDeleteBucketNotEmpty", true, "allow recursive deleting all entries along with bucket") s3Options.localSocket = cmdServer.Flag.String("s3.localSocket", "", "default to /tmp/seaweedfs-s3-.sock") s3Options.bindIp = cmdServer.Flag.String("s3.ip.bind", "", "ip address to bind to. If empty, default to same as -ip.bind option.") - s3Options.idleTimeout = cmdServer.Flag.Int("s3.idleTimeout", 10, "connection idle seconds") + s3Options.idleTimeout = cmdServer.Flag.Int("s3.idleTimeout", 120, "connection idle seconds") s3Options.concurrentUploadLimitMB = cmdServer.Flag.Int("s3.concurrentUploadLimitMB", 128, "limit total concurrent upload size for S3") s3Options.concurrentFileUploadLimit = cmdServer.Flag.Int("s3.concurrentFileUploadLimit", 0, "limit number of concurrent file uploads for S3, 0 means unlimited") diff --git a/weed/util/net_timeout.go b/weed/util/net_timeout.go index 75e475f6b..9aeb5cd48 100644 --- a/weed/util/net_timeout.go +++ b/weed/util/net_timeout.go @@ -9,22 +9,11 @@ import ( "github.com/seaweedfs/seaweedfs/weed/stats" ) -const ( - // minThroughputBytesPerSecond defines the minimum expected throughput (4KB/s) - // Used to calculate timeout scaling based on data transferred - minThroughputBytesPerSecond = 4000 - - // graceTimeCapMultiplier caps the grace period for slow clients at 3x base timeout - // This prevents indefinite connections while allowing time for server-side chunk fetches - graceTimeCapMultiplier = 3 -) - // Listener wraps a net.Listener, and gives a place to store the timeout // parameters. On Accept, it will wrap the net.Conn with our own Conn for us. type Listener struct { net.Listener - ReadTimeout time.Duration - WriteTimeout time.Duration + Timeout time.Duration } func (l *Listener) Accept() (net.Conn, error) { @@ -34,103 +23,50 @@ func (l *Listener) Accept() (net.Conn, error) { } stats.ConnectionOpen() tc := &Conn{ - Conn: c, - ReadTimeout: l.ReadTimeout, - WriteTimeout: l.WriteTimeout, + Conn: c, + Timeout: l.Timeout, } return tc, nil } -// Conn wraps a net.Conn, and sets a deadline for every read -// and write operation. +// Conn wraps a net.Conn and implements a "no activity timeout". +// Any activity (read or write) resets the deadline, so the connection +// only times out when there's no activity in either direction. type Conn struct { net.Conn - ReadTimeout time.Duration - WriteTimeout time.Duration - isClosed bool - bytesRead int64 - bytesWritten int64 - lastWrite time.Time + Timeout time.Duration + isClosed bool } -// calculateBytesPerTimeout calculates the expected number of bytes that should -// be transferred during one timeout period, based on the minimum throughput. -// Returns at least 1 to prevent division by zero. -func calculateBytesPerTimeout(timeout time.Duration) int64 { - bytesPerTimeout := int64(float64(minThroughputBytesPerSecond) * timeout.Seconds()) - if bytesPerTimeout <= 0 { - return 1 // Prevent division by zero +// extendDeadline extends the connection deadline from now. +// This implements "no activity timeout" - any activity keeps the connection alive. +func (c *Conn) extendDeadline() error { + if c.Timeout > 0 { + return c.Conn.SetDeadline(time.Now().Add(c.Timeout)) } - return bytesPerTimeout + return nil } func (c *Conn) Read(b []byte) (count int, e error) { - if c.ReadTimeout != 0 { - // Calculate expected bytes per timeout period based on minimum throughput (4KB/s) - // Example: with ReadTimeout=30s, bytesPerTimeout = 4000 * 30 = 120KB - // After reading 1MB: multiplier = 1,000,000/120,000 + 1 ≈ 9, deadline = 30s * 9 = 270s - bytesPerTimeout := calculateBytesPerTimeout(c.ReadTimeout) - timeoutMultiplier := time.Duration(c.bytesRead/bytesPerTimeout + 1) - err := c.Conn.SetReadDeadline(time.Now().Add(c.ReadTimeout * timeoutMultiplier)) - if err != nil { - return 0, err - } + // Extend deadline before reading - any activity keeps connection alive + if err := c.extendDeadline(); err != nil { + return 0, err } count, e = c.Conn.Read(b) if e == nil { stats.BytesIn(int64(count)) - c.bytesRead += int64(count) } return } func (c *Conn) Write(b []byte) (count int, e error) { - if c.WriteTimeout != 0 { - now := time.Now() - // Calculate timeout with two components: - // 1. Base timeout scaled by cumulative data (minimum throughput of 4KB/s) - // 2. Additional grace period if there was a gap since last write (for chunk fetch delays) - - // Calculate expected bytes per timeout period based on minimum throughput (4KB/s) - // Example: with WriteTimeout=30s, bytesPerTimeout = 4000 * 30 = 120KB - // After writing 1MB: multiplier = 1,000,000/120,000 + 1 ≈ 9, baseTimeout = 30s * 9 = 270s - bytesPerTimeout := calculateBytesPerTimeout(c.WriteTimeout) - timeoutMultiplier := time.Duration(c.bytesWritten/bytesPerTimeout + 1) - baseTimeout := c.WriteTimeout * timeoutMultiplier - - // If it's been a while since last write, add grace time for server-side chunk fetches - // But cap it to avoid keeping slow clients connected indefinitely - // - // The comparison uses unscaled WriteTimeout intentionally: triggers grace when idle time - // exceeds base timeout, independent of throughput scaling. - if !c.lastWrite.IsZero() { - timeSinceLastWrite := now.Sub(c.lastWrite) - if timeSinceLastWrite > c.WriteTimeout { - // Add grace time capped at graceTimeCapMultiplier * scaled timeout. - // This allows total deadline up to 4x scaled timeout for server-side delays. - // - // Example: WriteTimeout=30s, 1MB written (multiplier≈9), baseTimeout=270s - // If 400s gap occurs fetching chunks: graceTime capped at 270s*3=810s - // Final deadline: 270s + 810s = 1080s (~18min) to accommodate slow storage - // But if only 50s gap: graceTime = 50s, final deadline = 270s + 50s = 320s - graceTime := timeSinceLastWrite - if graceTime > baseTimeout*graceTimeCapMultiplier { - graceTime = baseTimeout * graceTimeCapMultiplier - } - baseTimeout += graceTime - } - } - - err := c.Conn.SetWriteDeadline(now.Add(baseTimeout)) - if err != nil { - return 0, err - } + // Extend deadline before writing - any activity keeps connection alive + if err := c.extendDeadline(); err != nil { + return 0, err } count, e = c.Conn.Write(b) if e == nil { stats.BytesOut(int64(count)) - c.bytesWritten += int64(count) - c.lastWrite = time.Now() } return } @@ -153,9 +89,8 @@ func NewListener(addr string, timeout time.Duration) (ipListener net.Listener, e } ipListener = &Listener{ - Listener: listener, - ReadTimeout: timeout, - WriteTimeout: timeout, + Listener: listener, + Timeout: timeout, } return @@ -168,9 +103,8 @@ func NewIpAndLocalListeners(host string, port int, timeout time.Duration) (ipLis } ipListener = &Listener{ - Listener: listener, - ReadTimeout: timeout, - WriteTimeout: timeout, + Listener: listener, + Timeout: timeout, } if host != "localhost" && host != "" && host != "0.0.0.0" && host != "127.0.0.1" && host != "[::]" && host != "[::1]" { @@ -181,9 +115,8 @@ func NewIpAndLocalListeners(host string, port int, timeout time.Duration) (ipLis } localListener = &Listener{ - Listener: listener, - ReadTimeout: timeout, - WriteTimeout: timeout, + Listener: listener, + Timeout: timeout, } }