Browse Source

fix: S3 downloads failing after idle timeout (#7626)

* fix: S3 downloads failing after idle timeout (#7618)

The idle timeout was incorrectly terminating active downloads because
read and write deadlines were managed independently. During a download,
the server writes data but rarely reads, so the read deadline would
expire even though the connection was actively being used.

Changes:
1. Simplify to single Timeout field - since this is a 'no activity timeout'
   where any activity extends the deadline, separate read/write timeouts
   are unnecessary. Now uses SetDeadline() which sets both at once.

2. Implement proper 'no activity timeout' - any activity (read or write)
   now extends the deadline. The connection only times out when there's
   genuinely no activity in either direction.

3. Increase default S3 idleTimeout from 10s to 120s for additional safety
   margin when fetching chunks from slow storage backends.

Fixes #7618

* Update weed/util/net_timeout.go

Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>

---------

Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
pull/7632/head
Chris Lu 5 days ago
committed by GitHub
parent
commit
3183a49698
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
  1. 6
      k8s/charts/seaweedfs/values.yaml
  2. 2
      weed/command/filer.go
  3. 2
      weed/command/s3.go
  4. 2
      weed/command/server.go
  5. 119
      weed/util/net_timeout.go

6
k8s/charts/seaweedfs/values.yaml

@ -979,9 +979,9 @@ s3:
extraEnvironmentVars: extraEnvironmentVars:
# Custom command line arguments to add to the s3 command # Custom command line arguments to add to the s3 command
# Example to fix connection idle seconds:
extraArgs: ["-idleTimeout=30"]
# extraArgs: []
# Default idleTimeout is 120 seconds. Example to customize:
# extraArgs: ["-idleTimeout=300"]
extraArgs: []
# used to configure livenessProbe on s3 containers # used to configure livenessProbe on s3 containers
# #

2
weed/command/filer.go

@ -128,7 +128,7 @@ func init() {
filerS3Options.tlsCACertificate = cmdFiler.Flag.String("s3.cacert.file", "", "path to the TLS CA certificate file") filerS3Options.tlsCACertificate = cmdFiler.Flag.String("s3.cacert.file", "", "path to the TLS CA certificate file")
filerS3Options.tlsVerifyClientCert = cmdFiler.Flag.Bool("s3.tlsVerifyClientCert", false, "whether to verify the client's certificate") filerS3Options.tlsVerifyClientCert = cmdFiler.Flag.Bool("s3.tlsVerifyClientCert", false, "whether to verify the client's certificate")
filerS3Options.bindIp = cmdFiler.Flag.String("s3.ip.bind", "", "ip address to bind to. If empty, default to same as -ip.bind option.") filerS3Options.bindIp = cmdFiler.Flag.String("s3.ip.bind", "", "ip address to bind to. If empty, default to same as -ip.bind option.")
filerS3Options.idleTimeout = cmdFiler.Flag.Int("s3.idleTimeout", 10, "connection idle seconds")
filerS3Options.idleTimeout = cmdFiler.Flag.Int("s3.idleTimeout", 120, "connection idle seconds")
filerS3Options.concurrentUploadLimitMB = cmdFiler.Flag.Int("s3.concurrentUploadLimitMB", 128, "limit total concurrent upload size for S3") filerS3Options.concurrentUploadLimitMB = cmdFiler.Flag.Int("s3.concurrentUploadLimitMB", 128, "limit total concurrent upload size for S3")
filerS3Options.concurrentFileUploadLimit = cmdFiler.Flag.Int("s3.concurrentFileUploadLimit", 0, "limit number of concurrent file uploads for S3, 0 means unlimited") filerS3Options.concurrentFileUploadLimit = cmdFiler.Flag.Int("s3.concurrentFileUploadLimit", 0, "limit number of concurrent file uploads for S3, 0 means unlimited")

2
weed/command/s3.go

@ -84,7 +84,7 @@ func init() {
s3StandaloneOptions.allowDeleteBucketNotEmpty = cmdS3.Flag.Bool("allowDeleteBucketNotEmpty", true, "allow recursive deleting all entries along with bucket") s3StandaloneOptions.allowDeleteBucketNotEmpty = cmdS3.Flag.Bool("allowDeleteBucketNotEmpty", true, "allow recursive deleting all entries along with bucket")
s3StandaloneOptions.localFilerSocket = cmdS3.Flag.String("localFilerSocket", "", "local filer socket path") s3StandaloneOptions.localFilerSocket = cmdS3.Flag.String("localFilerSocket", "", "local filer socket path")
s3StandaloneOptions.localSocket = cmdS3.Flag.String("localSocket", "", "default to /tmp/seaweedfs-s3-<port>.sock") s3StandaloneOptions.localSocket = cmdS3.Flag.String("localSocket", "", "default to /tmp/seaweedfs-s3-<port>.sock")
s3StandaloneOptions.idleTimeout = cmdS3.Flag.Int("idleTimeout", 10, "connection idle seconds")
s3StandaloneOptions.idleTimeout = cmdS3.Flag.Int("idleTimeout", 120, "connection idle seconds")
s3StandaloneOptions.concurrentUploadLimitMB = cmdS3.Flag.Int("concurrentUploadLimitMB", 128, "limit total concurrent upload size") s3StandaloneOptions.concurrentUploadLimitMB = cmdS3.Flag.Int("concurrentUploadLimitMB", 128, "limit total concurrent upload size")
s3StandaloneOptions.concurrentFileUploadLimit = cmdS3.Flag.Int("concurrentFileUploadLimit", 0, "limit number of concurrent file uploads, 0 means unlimited") s3StandaloneOptions.concurrentFileUploadLimit = cmdS3.Flag.Int("concurrentFileUploadLimit", 0, "limit number of concurrent file uploads, 0 means unlimited")
} }

2
weed/command/server.go

@ -169,7 +169,7 @@ func init() {
s3Options.allowDeleteBucketNotEmpty = cmdServer.Flag.Bool("s3.allowDeleteBucketNotEmpty", true, "allow recursive deleting all entries along with bucket") s3Options.allowDeleteBucketNotEmpty = cmdServer.Flag.Bool("s3.allowDeleteBucketNotEmpty", true, "allow recursive deleting all entries along with bucket")
s3Options.localSocket = cmdServer.Flag.String("s3.localSocket", "", "default to /tmp/seaweedfs-s3-<port>.sock") s3Options.localSocket = cmdServer.Flag.String("s3.localSocket", "", "default to /tmp/seaweedfs-s3-<port>.sock")
s3Options.bindIp = cmdServer.Flag.String("s3.ip.bind", "", "ip address to bind to. If empty, default to same as -ip.bind option.") s3Options.bindIp = cmdServer.Flag.String("s3.ip.bind", "", "ip address to bind to. If empty, default to same as -ip.bind option.")
s3Options.idleTimeout = cmdServer.Flag.Int("s3.idleTimeout", 10, "connection idle seconds")
s3Options.idleTimeout = cmdServer.Flag.Int("s3.idleTimeout", 120, "connection idle seconds")
s3Options.concurrentUploadLimitMB = cmdServer.Flag.Int("s3.concurrentUploadLimitMB", 128, "limit total concurrent upload size for S3") s3Options.concurrentUploadLimitMB = cmdServer.Flag.Int("s3.concurrentUploadLimitMB", 128, "limit total concurrent upload size for S3")
s3Options.concurrentFileUploadLimit = cmdServer.Flag.Int("s3.concurrentFileUploadLimit", 0, "limit number of concurrent file uploads for S3, 0 means unlimited") s3Options.concurrentFileUploadLimit = cmdServer.Flag.Int("s3.concurrentFileUploadLimit", 0, "limit number of concurrent file uploads for S3, 0 means unlimited")

119
weed/util/net_timeout.go

@ -9,22 +9,11 @@ import (
"github.com/seaweedfs/seaweedfs/weed/stats" "github.com/seaweedfs/seaweedfs/weed/stats"
) )
const (
// minThroughputBytesPerSecond defines the minimum expected throughput (4KB/s)
// Used to calculate timeout scaling based on data transferred
minThroughputBytesPerSecond = 4000
// graceTimeCapMultiplier caps the grace period for slow clients at 3x base timeout
// This prevents indefinite connections while allowing time for server-side chunk fetches
graceTimeCapMultiplier = 3
)
// Listener wraps a net.Listener, and gives a place to store the timeout // Listener wraps a net.Listener, and gives a place to store the timeout
// parameters. On Accept, it will wrap the net.Conn with our own Conn for us. // parameters. On Accept, it will wrap the net.Conn with our own Conn for us.
type Listener struct { type Listener struct {
net.Listener net.Listener
ReadTimeout time.Duration
WriteTimeout time.Duration
Timeout time.Duration
} }
func (l *Listener) Accept() (net.Conn, error) { func (l *Listener) Accept() (net.Conn, error) {
@ -34,103 +23,50 @@ func (l *Listener) Accept() (net.Conn, error) {
} }
stats.ConnectionOpen() stats.ConnectionOpen()
tc := &Conn{ tc := &Conn{
Conn: c,
ReadTimeout: l.ReadTimeout,
WriteTimeout: l.WriteTimeout,
Conn: c,
Timeout: l.Timeout,
} }
return tc, nil return tc, nil
} }
// Conn wraps a net.Conn, and sets a deadline for every read
// and write operation.
// Conn wraps a net.Conn and implements a "no activity timeout".
// Any activity (read or write) resets the deadline, so the connection
// only times out when there's no activity in either direction.
type Conn struct { type Conn struct {
net.Conn net.Conn
ReadTimeout time.Duration
WriteTimeout time.Duration
isClosed bool
bytesRead int64
bytesWritten int64
lastWrite time.Time
Timeout time.Duration
isClosed bool
} }
// calculateBytesPerTimeout calculates the expected number of bytes that should
// be transferred during one timeout period, based on the minimum throughput.
// Returns at least 1 to prevent division by zero.
func calculateBytesPerTimeout(timeout time.Duration) int64 {
bytesPerTimeout := int64(float64(minThroughputBytesPerSecond) * timeout.Seconds())
if bytesPerTimeout <= 0 {
return 1 // Prevent division by zero
// extendDeadline extends the connection deadline from now.
// This implements "no activity timeout" - any activity keeps the connection alive.
func (c *Conn) extendDeadline() error {
if c.Timeout > 0 {
return c.Conn.SetDeadline(time.Now().Add(c.Timeout))
} }
return bytesPerTimeout
return nil
} }
func (c *Conn) Read(b []byte) (count int, e error) { func (c *Conn) Read(b []byte) (count int, e error) {
if c.ReadTimeout != 0 {
// Calculate expected bytes per timeout period based on minimum throughput (4KB/s)
// Example: with ReadTimeout=30s, bytesPerTimeout = 4000 * 30 = 120KB
// After reading 1MB: multiplier = 1,000,000/120,000 + 1 ≈ 9, deadline = 30s * 9 = 270s
bytesPerTimeout := calculateBytesPerTimeout(c.ReadTimeout)
timeoutMultiplier := time.Duration(c.bytesRead/bytesPerTimeout + 1)
err := c.Conn.SetReadDeadline(time.Now().Add(c.ReadTimeout * timeoutMultiplier))
if err != nil {
return 0, err
}
// Extend deadline before reading - any activity keeps connection alive
if err := c.extendDeadline(); err != nil {
return 0, err
} }
count, e = c.Conn.Read(b) count, e = c.Conn.Read(b)
if e == nil { if e == nil {
stats.BytesIn(int64(count)) stats.BytesIn(int64(count))
c.bytesRead += int64(count)
} }
return return
} }
func (c *Conn) Write(b []byte) (count int, e error) { func (c *Conn) Write(b []byte) (count int, e error) {
if c.WriteTimeout != 0 {
now := time.Now()
// Calculate timeout with two components:
// 1. Base timeout scaled by cumulative data (minimum throughput of 4KB/s)
// 2. Additional grace period if there was a gap since last write (for chunk fetch delays)
// Calculate expected bytes per timeout period based on minimum throughput (4KB/s)
// Example: with WriteTimeout=30s, bytesPerTimeout = 4000 * 30 = 120KB
// After writing 1MB: multiplier = 1,000,000/120,000 + 1 ≈ 9, baseTimeout = 30s * 9 = 270s
bytesPerTimeout := calculateBytesPerTimeout(c.WriteTimeout)
timeoutMultiplier := time.Duration(c.bytesWritten/bytesPerTimeout + 1)
baseTimeout := c.WriteTimeout * timeoutMultiplier
// If it's been a while since last write, add grace time for server-side chunk fetches
// But cap it to avoid keeping slow clients connected indefinitely
//
// The comparison uses unscaled WriteTimeout intentionally: triggers grace when idle time
// exceeds base timeout, independent of throughput scaling.
if !c.lastWrite.IsZero() {
timeSinceLastWrite := now.Sub(c.lastWrite)
if timeSinceLastWrite > c.WriteTimeout {
// Add grace time capped at graceTimeCapMultiplier * scaled timeout.
// This allows total deadline up to 4x scaled timeout for server-side delays.
//
// Example: WriteTimeout=30s, 1MB written (multiplier≈9), baseTimeout=270s
// If 400s gap occurs fetching chunks: graceTime capped at 270s*3=810s
// Final deadline: 270s + 810s = 1080s (~18min) to accommodate slow storage
// But if only 50s gap: graceTime = 50s, final deadline = 270s + 50s = 320s
graceTime := timeSinceLastWrite
if graceTime > baseTimeout*graceTimeCapMultiplier {
graceTime = baseTimeout * graceTimeCapMultiplier
}
baseTimeout += graceTime
}
}
err := c.Conn.SetWriteDeadline(now.Add(baseTimeout))
if err != nil {
return 0, err
}
// Extend deadline before writing - any activity keeps connection alive
if err := c.extendDeadline(); err != nil {
return 0, err
} }
count, e = c.Conn.Write(b) count, e = c.Conn.Write(b)
if e == nil { if e == nil {
stats.BytesOut(int64(count)) stats.BytesOut(int64(count))
c.bytesWritten += int64(count)
c.lastWrite = time.Now()
} }
return return
} }
@ -153,9 +89,8 @@ func NewListener(addr string, timeout time.Duration) (ipListener net.Listener, e
} }
ipListener = &Listener{ ipListener = &Listener{
Listener: listener,
ReadTimeout: timeout,
WriteTimeout: timeout,
Listener: listener,
Timeout: timeout,
} }
return return
@ -168,9 +103,8 @@ func NewIpAndLocalListeners(host string, port int, timeout time.Duration) (ipLis
} }
ipListener = &Listener{ ipListener = &Listener{
Listener: listener,
ReadTimeout: timeout,
WriteTimeout: timeout,
Listener: listener,
Timeout: timeout,
} }
if host != "localhost" && host != "" && host != "0.0.0.0" && host != "127.0.0.1" && host != "[::]" && host != "[::1]" { if host != "localhost" && host != "" && host != "0.0.0.0" && host != "127.0.0.1" && host != "[::]" && host != "[::1]" {
@ -181,9 +115,8 @@ func NewIpAndLocalListeners(host string, port int, timeout time.Duration) (ipLis
} }
localListener = &Listener{ localListener = &Listener{
Listener: listener,
ReadTimeout: timeout,
WriteTimeout: timeout,
Listener: listener,
Timeout: timeout,
} }
} }

Loading…
Cancel
Save