@ -1,13 +1,24 @@
package util
import (
"github.com/seaweedfs/seaweedfs/weed/glog"
"net"
"time"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/stats"
)
const (
// minThroughputBytesPerSecond defines the minimum expected throughput (4KB/s)
// Used to calculate timeout scaling based on data transferred
minThroughputBytesPerSecond = 4000
// graceTimeCapMultiplier caps the grace period for slow clients at 3x base timeout
// This prevents indefinite connections while allowing time for server-side chunk fetches
graceTimeCapMultiplier = 3
)
// Listener wraps a net.Listener, and gives a place to store the timeout
// parameters. On Accept, it will wrap the net.Conn with our own Conn for us.
type Listener struct {
@ -39,11 +50,28 @@ type Conn struct {
isClosed bool
bytesRead int64
bytesWritten int64
lastWrite time . Time
}
// calculateBytesPerTimeout calculates the expected number of bytes that should
// be transferred during one timeout period, based on the minimum throughput.
// Returns at least 1 to prevent division by zero.
func calculateBytesPerTimeout ( timeout time . Duration ) int64 {
bytesPerTimeout := int64 ( float64 ( minThroughputBytesPerSecond ) * timeout . Seconds ( ) )
if bytesPerTimeout <= 0 {
return 1 // Prevent division by zero
}
return bytesPerTimeout
}
func ( c * Conn ) Read ( b [ ] byte ) ( count int , e error ) {
if c . ReadTimeout != 0 {
err := c . Conn . SetReadDeadline ( time . Now ( ) . Add ( c . ReadTimeout * time . Duration ( c . bytesRead / 40000 + 1 ) ) )
// Calculate expected bytes per timeout period based on minimum throughput (4KB/s)
// Example: with ReadTimeout=30s, bytesPerTimeout = 4000 * 30 = 120KB
// After reading 1MB: multiplier = 1,000,000/120,000 + 1 ≈ 9, deadline = 30s * 9 = 270s
bytesPerTimeout := calculateBytesPerTimeout ( c . ReadTimeout )
timeoutMultiplier := time . Duration ( c . bytesRead / bytesPerTimeout + 1 )
err := c . Conn . SetReadDeadline ( time . Now ( ) . Add ( c . ReadTimeout * timeoutMultiplier ) )
if err != nil {
return 0 , err
}
@ -58,8 +86,42 @@ func (c *Conn) Read(b []byte) (count int, e error) {
func ( c * Conn ) Write ( b [ ] byte ) ( count int , e error ) {
if c . WriteTimeout != 0 {
// minimum 4KB/s
err := c . Conn . SetWriteDeadline ( time . Now ( ) . Add ( c . WriteTimeout * time . Duration ( c . bytesWritten / 40000 + 1 ) ) )
now := time . Now ( )
// Calculate timeout with two components:
// 1. Base timeout scaled by cumulative data (minimum throughput of 4KB/s)
// 2. Additional grace period if there was a gap since last write (for chunk fetch delays)
// Calculate expected bytes per timeout period based on minimum throughput (4KB/s)
// Example: with WriteTimeout=30s, bytesPerTimeout = 4000 * 30 = 120KB
// After writing 1MB: multiplier = 1,000,000/120,000 + 1 ≈ 9, baseTimeout = 30s * 9 = 270s
bytesPerTimeout := calculateBytesPerTimeout ( c . WriteTimeout )
timeoutMultiplier := time . Duration ( c . bytesWritten / bytesPerTimeout + 1 )
baseTimeout := c . WriteTimeout * timeoutMultiplier
// If it's been a while since last write, add grace time for server-side chunk fetches
// But cap it to avoid keeping slow clients connected indefinitely
//
// The comparison uses unscaled WriteTimeout intentionally: triggers grace when idle time
// exceeds base timeout, independent of throughput scaling.
if ! c . lastWrite . IsZero ( ) {
timeSinceLastWrite := now . Sub ( c . lastWrite )
if timeSinceLastWrite > c . WriteTimeout {
// Add grace time capped at graceTimeCapMultiplier * scaled timeout.
// This allows total deadline up to 4x scaled timeout for server-side delays.
//
// Example: WriteTimeout=30s, 1MB written (multiplier≈9), baseTimeout=270s
// If 400s gap occurs fetching chunks: graceTime capped at 270s*3=810s
// Final deadline: 270s + 810s = 1080s (~18min) to accommodate slow storage
// But if only 50s gap: graceTime = 50s, final deadline = 270s + 50s = 320s
graceTime := timeSinceLastWrite
if graceTime > baseTimeout * graceTimeCapMultiplier {
graceTime = baseTimeout * graceTimeCapMultiplier
}
baseTimeout += graceTime
}
}
err := c . Conn . SetWriteDeadline ( now . Add ( baseTimeout ) )
if err != nil {
return 0 , err
}
@ -68,6 +130,7 @@ func (c *Conn) Write(b []byte) (count int, e error) {
if e == nil {
stats . BytesOut ( int64 ( count ) )
c . bytesWritten += int64 ( count )
c . lastWrite = time . Now ( )
}
return
}