|
|
|
@ -9,6 +9,16 @@ import ( |
|
|
|
"github.com/seaweedfs/seaweedfs/weed/stats" |
|
|
|
) |
|
|
|
|
|
|
|
const ( |
|
|
|
// minThroughputBytesPerSecond defines the minimum expected throughput (4KB/s)
|
|
|
|
// Used to calculate timeout scaling based on data transferred
|
|
|
|
minThroughputBytesPerSecond = 4000 |
|
|
|
|
|
|
|
// graceTimeCapMultiplier caps the grace period for slow clients at 3x base timeout
|
|
|
|
// This prevents indefinite connections while allowing time for server-side chunk fetches
|
|
|
|
graceTimeCapMultiplier = 3 |
|
|
|
) |
|
|
|
|
|
|
|
// Listener wraps a net.Listener, and gives a place to store the timeout
|
|
|
|
// parameters. On Accept, it will wrap the net.Conn with our own Conn for us.
|
|
|
|
type Listener struct { |
|
|
|
@ -45,7 +55,15 @@ type Conn struct { |
|
|
|
|
|
|
|
func (c *Conn) Read(b []byte) (count int, e error) { |
|
|
|
if c.ReadTimeout != 0 { |
|
|
|
err := c.Conn.SetReadDeadline(time.Now().Add(c.ReadTimeout * time.Duration(c.bytesRead/40000+1))) |
|
|
|
// Calculate expected bytes per timeout period based on minimum throughput (4KB/s)
|
|
|
|
// Example: with ReadTimeout=30s, bytesPerTimeout = 4000 * 30 = 120KB
|
|
|
|
// After reading 1MB: multiplier = 1,000,000/120,000 + 1 ≈ 9, deadline = 30s * 9 = 270s
|
|
|
|
bytesPerTimeout := int64(float64(minThroughputBytesPerSecond) * c.ReadTimeout.Seconds()) |
|
|
|
if bytesPerTimeout <= 0 { |
|
|
|
bytesPerTimeout = 1 // Prevent division by zero
|
|
|
|
} |
|
|
|
timeoutMultiplier := time.Duration(c.bytesRead/bytesPerTimeout + 1) |
|
|
|
err := c.Conn.SetReadDeadline(time.Now().Add(c.ReadTimeout * timeoutMultiplier)) |
|
|
|
if err != nil { |
|
|
|
return 0, err |
|
|
|
} |
|
|
|
@ -62,25 +80,42 @@ func (c *Conn) Write(b []byte) (count int, e error) { |
|
|
|
if c.WriteTimeout != 0 { |
|
|
|
now := time.Now() |
|
|
|
// Calculate timeout with two components:
|
|
|
|
// 1. Base timeout scaled by cumulative data (minimum 4KB/s throughput)
|
|
|
|
// 1. Base timeout scaled by cumulative data (minimum throughput of 4KB/s)
|
|
|
|
// 2. Additional grace period if there was a gap since last write (for chunk fetch delays)
|
|
|
|
timeoutMultiplier := time.Duration(c.bytesWritten/40000 + 1) |
|
|
|
|
|
|
|
// Calculate expected bytes per timeout period based on minimum throughput (4KB/s)
|
|
|
|
// Example: with WriteTimeout=30s, bytesPerTimeout = 4000 * 30 = 120KB
|
|
|
|
// After writing 1MB: multiplier = 1,000,000/120,000 + 1 ≈ 9, baseTimeout = 30s * 9 = 270s
|
|
|
|
bytesPerTimeout := int64(float64(minThroughputBytesPerSecond) * c.WriteTimeout.Seconds()) |
|
|
|
if bytesPerTimeout <= 0 { |
|
|
|
bytesPerTimeout = 1 // Prevent division by zero
|
|
|
|
} |
|
|
|
timeoutMultiplier := time.Duration(c.bytesWritten/bytesPerTimeout + 1) |
|
|
|
baseTimeout := c.WriteTimeout * timeoutMultiplier |
|
|
|
|
|
|
|
|
|
|
|
// If it's been a while since last write, add grace time for server-side chunk fetches
|
|
|
|
// But cap it to avoid keeping slow clients connected indefinitely
|
|
|
|
//
|
|
|
|
// The comparison uses unscaled WriteTimeout intentionally: triggers grace when idle time
|
|
|
|
// exceeds base timeout, independent of throughput scaling.
|
|
|
|
if !c.lastWrite.IsZero() { |
|
|
|
timeSinceLastWrite := now.Sub(c.lastWrite) |
|
|
|
if timeSinceLastWrite > c.WriteTimeout { |
|
|
|
// Add grace time, but cap at 3x base timeout to handle slow clients
|
|
|
|
// Add grace time capped at graceTimeCapMultiplier * scaled timeout.
|
|
|
|
// This allows total deadline up to 4x scaled timeout for server-side delays.
|
|
|
|
//
|
|
|
|
// Example: WriteTimeout=30s, 1MB written (multiplier≈9), baseTimeout=270s
|
|
|
|
// If 400s gap occurs fetching chunks: graceTime capped at 270s*3=810s
|
|
|
|
// Final deadline: 270s + 810s = 1080s (~18min) to accommodate slow storage
|
|
|
|
// But if only 50s gap: graceTime = 50s, final deadline = 270s + 50s = 320s
|
|
|
|
graceTime := timeSinceLastWrite |
|
|
|
if graceTime > baseTimeout*3 { |
|
|
|
graceTime = baseTimeout * 3 |
|
|
|
if graceTime > baseTimeout*graceTimeCapMultiplier { |
|
|
|
graceTime = baseTimeout * graceTimeCapMultiplier |
|
|
|
} |
|
|
|
baseTimeout += graceTime |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
err := c.Conn.SetWriteDeadline(now.Add(baseTimeout)) |
|
|
|
if err != nil { |
|
|
|
return 0, err |
|
|
|
|