From ba07b3e4c68f34219f3ca10f8f3ddcc059ffb972 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Thu, 30 Oct 2025 16:43:29 -0700 Subject: [PATCH] network: Adaptive timeout (#7410) * server can start when no network for local dev * fixed superfluous response.WriteHeader call" warning * adaptive based on last write time * more doc * refactoring --- weed/util/net_timeout.go | 71 +++++++++++++++++++++++++++++++++++++--- 1 file changed, 67 insertions(+), 4 deletions(-) diff --git a/weed/util/net_timeout.go b/weed/util/net_timeout.go index f235a77b3..313d7f849 100644 --- a/weed/util/net_timeout.go +++ b/weed/util/net_timeout.go @@ -1,13 +1,24 @@ package util import ( - "github.com/seaweedfs/seaweedfs/weed/glog" "net" "time" + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/stats" ) +const ( + // minThroughputBytesPerSecond defines the minimum expected throughput (4KB/s) + // Used to calculate timeout scaling based on data transferred + minThroughputBytesPerSecond = 4000 + + // graceTimeCapMultiplier caps the grace period for slow clients at 3x base timeout + // This prevents indefinite connections while allowing time for server-side chunk fetches + graceTimeCapMultiplier = 3 +) + // Listener wraps a net.Listener, and gives a place to store the timeout // parameters. On Accept, it will wrap the net.Conn with our own Conn for us. type Listener struct { @@ -39,11 +50,28 @@ type Conn struct { isClosed bool bytesRead int64 bytesWritten int64 + lastWrite time.Time +} + +// calculateBytesPerTimeout calculates the expected number of bytes that should +// be transferred during one timeout period, based on the minimum throughput. +// Returns at least 1 to prevent division by zero. +func calculateBytesPerTimeout(timeout time.Duration) int64 { + bytesPerTimeout := int64(float64(minThroughputBytesPerSecond) * timeout.Seconds()) + if bytesPerTimeout <= 0 { + return 1 // Prevent division by zero + } + return bytesPerTimeout } func (c *Conn) Read(b []byte) (count int, e error) { if c.ReadTimeout != 0 { - err := c.Conn.SetReadDeadline(time.Now().Add(c.ReadTimeout * time.Duration(c.bytesRead/40000+1))) + // Calculate expected bytes per timeout period based on minimum throughput (4KB/s) + // Example: with ReadTimeout=30s, bytesPerTimeout = 4000 * 30 = 120KB + // After reading 1MB: multiplier = 1,000,000/120,000 + 1 ≈ 9, deadline = 30s * 9 = 270s + bytesPerTimeout := calculateBytesPerTimeout(c.ReadTimeout) + timeoutMultiplier := time.Duration(c.bytesRead/bytesPerTimeout + 1) + err := c.Conn.SetReadDeadline(time.Now().Add(c.ReadTimeout * timeoutMultiplier)) if err != nil { return 0, err } @@ -58,8 +86,42 @@ func (c *Conn) Read(b []byte) (count int, e error) { func (c *Conn) Write(b []byte) (count int, e error) { if c.WriteTimeout != 0 { - // minimum 4KB/s - err := c.Conn.SetWriteDeadline(time.Now().Add(c.WriteTimeout * time.Duration(c.bytesWritten/40000+1))) + now := time.Now() + // Calculate timeout with two components: + // 1. Base timeout scaled by cumulative data (minimum throughput of 4KB/s) + // 2. Additional grace period if there was a gap since last write (for chunk fetch delays) + + // Calculate expected bytes per timeout period based on minimum throughput (4KB/s) + // Example: with WriteTimeout=30s, bytesPerTimeout = 4000 * 30 = 120KB + // After writing 1MB: multiplier = 1,000,000/120,000 + 1 ≈ 9, baseTimeout = 30s * 9 = 270s + bytesPerTimeout := calculateBytesPerTimeout(c.WriteTimeout) + timeoutMultiplier := time.Duration(c.bytesWritten/bytesPerTimeout + 1) + baseTimeout := c.WriteTimeout * timeoutMultiplier + + // If it's been a while since last write, add grace time for server-side chunk fetches + // But cap it to avoid keeping slow clients connected indefinitely + // + // The comparison uses unscaled WriteTimeout intentionally: triggers grace when idle time + // exceeds base timeout, independent of throughput scaling. + if !c.lastWrite.IsZero() { + timeSinceLastWrite := now.Sub(c.lastWrite) + if timeSinceLastWrite > c.WriteTimeout { + // Add grace time capped at graceTimeCapMultiplier * scaled timeout. + // This allows total deadline up to 4x scaled timeout for server-side delays. + // + // Example: WriteTimeout=30s, 1MB written (multiplier≈9), baseTimeout=270s + // If 400s gap occurs fetching chunks: graceTime capped at 270s*3=810s + // Final deadline: 270s + 810s = 1080s (~18min) to accommodate slow storage + // But if only 50s gap: graceTime = 50s, final deadline = 270s + 50s = 320s + graceTime := timeSinceLastWrite + if graceTime > baseTimeout*graceTimeCapMultiplier { + graceTime = baseTimeout * graceTimeCapMultiplier + } + baseTimeout += graceTime + } + } + + err := c.Conn.SetWriteDeadline(now.Add(baseTimeout)) if err != nil { return 0, err } @@ -68,6 +130,7 @@ func (c *Conn) Write(b []byte) (count int, e error) { if e == nil { stats.BytesOut(int64(count)) c.bytesWritten += int64(count) + c.lastWrite = time.Now() } return }