From 3183a49698d77659cd15434ccd58c3002bc8c266 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Thu, 4 Dec 2025 18:31:46 -0800 Subject: [PATCH] fix: S3 downloads failing after idle timeout (#7626) * fix: S3 downloads failing after idle timeout (#7618) The idle timeout was incorrectly terminating active downloads because read and write deadlines were managed independently. During a download, the server writes data but rarely reads, so the read deadline would expire even though the connection was actively being used. Changes: 1. Simplify to single Timeout field - since this is a 'no activity timeout' where any activity extends the deadline, separate read/write timeouts are unnecessary. Now uses SetDeadline() which sets both at once. 2. Implement proper 'no activity timeout' - any activity (read or write) now extends the deadline. The connection only times out when there's genuinely no activity in either direction. 3. Increase default S3 idleTimeout from 10s to 120s for additional safety margin when fetching chunks from slow storage backends. Fixes #7618 * Update weed/util/net_timeout.go Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> --------- Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> --- k8s/charts/seaweedfs/values.yaml | 6 +- weed/command/filer.go | 2 +- weed/command/s3.go | 2 +- weed/command/server.go | 2 +- weed/util/net_timeout.go | 119 +++++++------------------------ 5 files changed, 32 insertions(+), 99 deletions(-) diff --git a/k8s/charts/seaweedfs/values.yaml b/k8s/charts/seaweedfs/values.yaml index 520323dce..bddfd622d 100644 --- a/k8s/charts/seaweedfs/values.yaml +++ b/k8s/charts/seaweedfs/values.yaml @@ -979,9 +979,9 @@ s3: extraEnvironmentVars: # Custom command line arguments to add to the s3 command - # Example to fix connection idle seconds: - extraArgs: ["-idleTimeout=30"] - # extraArgs: [] + # Default idleTimeout is 120 seconds. Example to customize: + # extraArgs: ["-idleTimeout=300"] + extraArgs: [] # used to configure livenessProbe on s3 containers # diff --git a/weed/command/filer.go b/weed/command/filer.go index 86991a181..bb7092543 100644 --- a/weed/command/filer.go +++ b/weed/command/filer.go @@ -128,7 +128,7 @@ func init() { filerS3Options.tlsCACertificate = cmdFiler.Flag.String("s3.cacert.file", "", "path to the TLS CA certificate file") filerS3Options.tlsVerifyClientCert = cmdFiler.Flag.Bool("s3.tlsVerifyClientCert", false, "whether to verify the client's certificate") filerS3Options.bindIp = cmdFiler.Flag.String("s3.ip.bind", "", "ip address to bind to. If empty, default to same as -ip.bind option.") - filerS3Options.idleTimeout = cmdFiler.Flag.Int("s3.idleTimeout", 10, "connection idle seconds") + filerS3Options.idleTimeout = cmdFiler.Flag.Int("s3.idleTimeout", 120, "connection idle seconds") filerS3Options.concurrentUploadLimitMB = cmdFiler.Flag.Int("s3.concurrentUploadLimitMB", 128, "limit total concurrent upload size for S3") filerS3Options.concurrentFileUploadLimit = cmdFiler.Flag.Int("s3.concurrentFileUploadLimit", 0, "limit number of concurrent file uploads for S3, 0 means unlimited") diff --git a/weed/command/s3.go b/weed/command/s3.go index 61222336b..ace6dd427 100644 --- a/weed/command/s3.go +++ b/weed/command/s3.go @@ -84,7 +84,7 @@ func init() { s3StandaloneOptions.allowDeleteBucketNotEmpty = cmdS3.Flag.Bool("allowDeleteBucketNotEmpty", true, "allow recursive deleting all entries along with bucket") s3StandaloneOptions.localFilerSocket = cmdS3.Flag.String("localFilerSocket", "", "local filer socket path") s3StandaloneOptions.localSocket = cmdS3.Flag.String("localSocket", "", "default to /tmp/seaweedfs-s3-.sock") - s3StandaloneOptions.idleTimeout = cmdS3.Flag.Int("idleTimeout", 10, "connection idle seconds") + s3StandaloneOptions.idleTimeout = cmdS3.Flag.Int("idleTimeout", 120, "connection idle seconds") s3StandaloneOptions.concurrentUploadLimitMB = cmdS3.Flag.Int("concurrentUploadLimitMB", 128, "limit total concurrent upload size") s3StandaloneOptions.concurrentFileUploadLimit = cmdS3.Flag.Int("concurrentFileUploadLimit", 0, "limit number of concurrent file uploads, 0 means unlimited") } diff --git a/weed/command/server.go b/weed/command/server.go index d729502f0..5683f1fc5 100644 --- a/weed/command/server.go +++ b/weed/command/server.go @@ -169,7 +169,7 @@ func init() { s3Options.allowDeleteBucketNotEmpty = cmdServer.Flag.Bool("s3.allowDeleteBucketNotEmpty", true, "allow recursive deleting all entries along with bucket") s3Options.localSocket = cmdServer.Flag.String("s3.localSocket", "", "default to /tmp/seaweedfs-s3-.sock") s3Options.bindIp = cmdServer.Flag.String("s3.ip.bind", "", "ip address to bind to. If empty, default to same as -ip.bind option.") - s3Options.idleTimeout = cmdServer.Flag.Int("s3.idleTimeout", 10, "connection idle seconds") + s3Options.idleTimeout = cmdServer.Flag.Int("s3.idleTimeout", 120, "connection idle seconds") s3Options.concurrentUploadLimitMB = cmdServer.Flag.Int("s3.concurrentUploadLimitMB", 128, "limit total concurrent upload size for S3") s3Options.concurrentFileUploadLimit = cmdServer.Flag.Int("s3.concurrentFileUploadLimit", 0, "limit number of concurrent file uploads for S3, 0 means unlimited") diff --git a/weed/util/net_timeout.go b/weed/util/net_timeout.go index 75e475f6b..9aeb5cd48 100644 --- a/weed/util/net_timeout.go +++ b/weed/util/net_timeout.go @@ -9,22 +9,11 @@ import ( "github.com/seaweedfs/seaweedfs/weed/stats" ) -const ( - // minThroughputBytesPerSecond defines the minimum expected throughput (4KB/s) - // Used to calculate timeout scaling based on data transferred - minThroughputBytesPerSecond = 4000 - - // graceTimeCapMultiplier caps the grace period for slow clients at 3x base timeout - // This prevents indefinite connections while allowing time for server-side chunk fetches - graceTimeCapMultiplier = 3 -) - // Listener wraps a net.Listener, and gives a place to store the timeout // parameters. On Accept, it will wrap the net.Conn with our own Conn for us. type Listener struct { net.Listener - ReadTimeout time.Duration - WriteTimeout time.Duration + Timeout time.Duration } func (l *Listener) Accept() (net.Conn, error) { @@ -34,103 +23,50 @@ func (l *Listener) Accept() (net.Conn, error) { } stats.ConnectionOpen() tc := &Conn{ - Conn: c, - ReadTimeout: l.ReadTimeout, - WriteTimeout: l.WriteTimeout, + Conn: c, + Timeout: l.Timeout, } return tc, nil } -// Conn wraps a net.Conn, and sets a deadline for every read -// and write operation. +// Conn wraps a net.Conn and implements a "no activity timeout". +// Any activity (read or write) resets the deadline, so the connection +// only times out when there's no activity in either direction. type Conn struct { net.Conn - ReadTimeout time.Duration - WriteTimeout time.Duration - isClosed bool - bytesRead int64 - bytesWritten int64 - lastWrite time.Time + Timeout time.Duration + isClosed bool } -// calculateBytesPerTimeout calculates the expected number of bytes that should -// be transferred during one timeout period, based on the minimum throughput. -// Returns at least 1 to prevent division by zero. -func calculateBytesPerTimeout(timeout time.Duration) int64 { - bytesPerTimeout := int64(float64(minThroughputBytesPerSecond) * timeout.Seconds()) - if bytesPerTimeout <= 0 { - return 1 // Prevent division by zero +// extendDeadline extends the connection deadline from now. +// This implements "no activity timeout" - any activity keeps the connection alive. +func (c *Conn) extendDeadline() error { + if c.Timeout > 0 { + return c.Conn.SetDeadline(time.Now().Add(c.Timeout)) } - return bytesPerTimeout + return nil } func (c *Conn) Read(b []byte) (count int, e error) { - if c.ReadTimeout != 0 { - // Calculate expected bytes per timeout period based on minimum throughput (4KB/s) - // Example: with ReadTimeout=30s, bytesPerTimeout = 4000 * 30 = 120KB - // After reading 1MB: multiplier = 1,000,000/120,000 + 1 ≈ 9, deadline = 30s * 9 = 270s - bytesPerTimeout := calculateBytesPerTimeout(c.ReadTimeout) - timeoutMultiplier := time.Duration(c.bytesRead/bytesPerTimeout + 1) - err := c.Conn.SetReadDeadline(time.Now().Add(c.ReadTimeout * timeoutMultiplier)) - if err != nil { - return 0, err - } + // Extend deadline before reading - any activity keeps connection alive + if err := c.extendDeadline(); err != nil { + return 0, err } count, e = c.Conn.Read(b) if e == nil { stats.BytesIn(int64(count)) - c.bytesRead += int64(count) } return } func (c *Conn) Write(b []byte) (count int, e error) { - if c.WriteTimeout != 0 { - now := time.Now() - // Calculate timeout with two components: - // 1. Base timeout scaled by cumulative data (minimum throughput of 4KB/s) - // 2. Additional grace period if there was a gap since last write (for chunk fetch delays) - - // Calculate expected bytes per timeout period based on minimum throughput (4KB/s) - // Example: with WriteTimeout=30s, bytesPerTimeout = 4000 * 30 = 120KB - // After writing 1MB: multiplier = 1,000,000/120,000 + 1 ≈ 9, baseTimeout = 30s * 9 = 270s - bytesPerTimeout := calculateBytesPerTimeout(c.WriteTimeout) - timeoutMultiplier := time.Duration(c.bytesWritten/bytesPerTimeout + 1) - baseTimeout := c.WriteTimeout * timeoutMultiplier - - // If it's been a while since last write, add grace time for server-side chunk fetches - // But cap it to avoid keeping slow clients connected indefinitely - // - // The comparison uses unscaled WriteTimeout intentionally: triggers grace when idle time - // exceeds base timeout, independent of throughput scaling. - if !c.lastWrite.IsZero() { - timeSinceLastWrite := now.Sub(c.lastWrite) - if timeSinceLastWrite > c.WriteTimeout { - // Add grace time capped at graceTimeCapMultiplier * scaled timeout. - // This allows total deadline up to 4x scaled timeout for server-side delays. - // - // Example: WriteTimeout=30s, 1MB written (multiplier≈9), baseTimeout=270s - // If 400s gap occurs fetching chunks: graceTime capped at 270s*3=810s - // Final deadline: 270s + 810s = 1080s (~18min) to accommodate slow storage - // But if only 50s gap: graceTime = 50s, final deadline = 270s + 50s = 320s - graceTime := timeSinceLastWrite - if graceTime > baseTimeout*graceTimeCapMultiplier { - graceTime = baseTimeout * graceTimeCapMultiplier - } - baseTimeout += graceTime - } - } - - err := c.Conn.SetWriteDeadline(now.Add(baseTimeout)) - if err != nil { - return 0, err - } + // Extend deadline before writing - any activity keeps connection alive + if err := c.extendDeadline(); err != nil { + return 0, err } count, e = c.Conn.Write(b) if e == nil { stats.BytesOut(int64(count)) - c.bytesWritten += int64(count) - c.lastWrite = time.Now() } return } @@ -153,9 +89,8 @@ func NewListener(addr string, timeout time.Duration) (ipListener net.Listener, e } ipListener = &Listener{ - Listener: listener, - ReadTimeout: timeout, - WriteTimeout: timeout, + Listener: listener, + Timeout: timeout, } return @@ -168,9 +103,8 @@ func NewIpAndLocalListeners(host string, port int, timeout time.Duration) (ipLis } ipListener = &Listener{ - Listener: listener, - ReadTimeout: timeout, - WriteTimeout: timeout, + Listener: listener, + Timeout: timeout, } if host != "localhost" && host != "" && host != "0.0.0.0" && host != "127.0.0.1" && host != "[::]" && host != "[::1]" { @@ -181,9 +115,8 @@ func NewIpAndLocalListeners(host string, port int, timeout time.Duration) (ipLis } localListener = &Listener{ - Listener: listener, - ReadTimeout: timeout, - WriteTimeout: timeout, + Listener: listener, + Timeout: timeout, } }