Browse Source

fix: create fresh timeout context for each filer retry attempt

The timeout context was created once at function start and reused across
all retry attempts, causing subsequent retries to run with progressively
shorter (or expired) deadlines.

Problem flow:
  Line 244: timeoutCtx, cancel := context.WithTimeout(ctx, 5s)
  defer cancel()

  Retry 1, filer 0: client.LookupVolume(timeoutCtx, ...) ← 5s available 
  Retry 1, filer 1: client.LookupVolume(timeoutCtx, ...) ← 3s left
  Retry 1, filer 2: client.LookupVolume(timeoutCtx, ...) ← 0.5s left
  Retry 2, filer 0: client.LookupVolume(timeoutCtx, ...) ← EXPIRED! 

Result: Retries always fail with DeadlineExceeded, defeating the purpose
of retries.

Fix:
  Moved context.WithTimeout inside the per-filer loop, creating a fresh
  timeout context for each attempt:

    for x := 0; x < n; x++ {
      timeoutCtx, cancel := context.WithTimeout(ctx, fc.grpcTimeout)
      err := pb.WithGrpcFilerClient(..., func(client) {
        resp, err := client.LookupVolume(timeoutCtx, ...)
        ...
      })
      cancel()  // Clean up immediately after call
    }

Benefits:
  - Each filer attempt gets full fc.grpcTimeout (default 5s)
  - Retries actually have time to complete
  - No context leaks (cancel called after each attempt)
  - More predictable timeout behavior

Example with fix:
  Retry 1, filer 0: fresh 5s timeout 
  Retry 1, filer 1: fresh 5s timeout 
  Retry 2, filer 0: fresh 5s timeout 

Total max time: 3 retries × 3 filers × 5s = 45s (plus backoff)

Note: The outer ctx (from caller) still provides overall cancellation if
the caller cancels or times out the entire operation.
pull/7518/head
chrislu 2 weeks ago
parent
commit
f0c27ffbb2
  1. 18
      weed/wdclient/filer_client.go

18
weed/wdclient/filer_client.go

@ -26,8 +26,8 @@ const (
// filerHealth tracks the health status of a filer // filerHealth tracks the health status of a filer
type filerHealth struct { type filerHealth struct {
failureCount int32 // atomic: consecutive failures
lastFailureTimeNs int64 // atomic: last failure time in Unix nanoseconds
failureCount int32 // atomic: consecutive failures
lastFailureTimeNs int64 // atomic: last failure time in Unix nanoseconds
} }
// FilerClient provides volume location services by querying a filer // FilerClient provides volume location services by querying a filer
@ -198,12 +198,12 @@ func isRetryableGrpcError(err error) bool {
func (fc *FilerClient) shouldSkipUnhealthyFiler(index int32) bool { func (fc *FilerClient) shouldSkipUnhealthyFiler(index int32) bool {
health := fc.filerHealth[index] health := fc.filerHealth[index]
failureCount := atomic.LoadInt32(&health.failureCount) failureCount := atomic.LoadInt32(&health.failureCount)
// Allow up to 2 failures before skipping // Allow up to 2 failures before skipping
if failureCount < 3 { if failureCount < 3 {
return false return false
} }
// Re-check unhealthy filers every 30 seconds // Re-check unhealthy filers every 30 seconds
lastFailureNs := atomic.LoadInt64(&health.lastFailureTimeNs) lastFailureNs := atomic.LoadInt64(&health.lastFailureTimeNs)
if lastFailureNs == 0 { if lastFailureNs == 0 {
@ -213,7 +213,7 @@ func (fc *FilerClient) shouldSkipUnhealthyFiler(index int32) bool {
if time.Since(lastFailureTime) > 30*time.Second { if time.Since(lastFailureTime) > 30*time.Second {
return false // Time to re-check return false // Time to re-check
} }
return true // Skip this unhealthy filer return true // Skip this unhealthy filer
} }
@ -240,10 +240,6 @@ func (p *filerVolumeProvider) LookupVolumeIds(ctx context.Context, volumeIds []s
fc := p.filerClient fc := p.filerClient
result := make(map[string][]Location) result := make(map[string][]Location)
// Create a timeout context for the gRPC call
timeoutCtx, cancel := context.WithTimeout(ctx, fc.grpcTimeout)
defer cancel()
// Convert grpcTimeout to milliseconds for the signature parameter // Convert grpcTimeout to milliseconds for the signature parameter
timeoutMs := int32(fc.grpcTimeout.Milliseconds()) timeoutMs := int32(fc.grpcTimeout.Milliseconds())
@ -272,6 +268,9 @@ func (p *filerVolumeProvider) LookupVolumeIds(ctx context.Context, volumeIds []s
filerAddress := fc.filerAddresses[i] filerAddress := fc.filerAddresses[i]
// Create a fresh timeout context for each filer attempt
// This ensures each retry gets the full grpcTimeout, not a diminishing deadline
timeoutCtx, cancel := context.WithTimeout(ctx, fc.grpcTimeout)
err := pb.WithGrpcFilerClient(false, timeoutMs, filerAddress, fc.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { err := pb.WithGrpcFilerClient(false, timeoutMs, filerAddress, fc.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
resp, err := client.LookupVolume(timeoutCtx, &filer_pb.LookupVolumeRequest{ resp, err := client.LookupVolume(timeoutCtx, &filer_pb.LookupVolumeRequest{
VolumeIds: volumeIds, VolumeIds: volumeIds,
@ -313,6 +312,7 @@ func (p *filerVolumeProvider) LookupVolumeIds(ctx context.Context, volumeIds []s
return nil return nil
}) })
cancel() // Clean up timeout context immediately after call returns
if err != nil { if err != nil {
glog.V(1).Infof("FilerClient: filer %s lookup failed (attempt %d/%d, retry %d/%d): %v", filerAddress, x+1, n, retry+1, maxRetries, err) glog.V(1).Infof("FilerClient: filer %s lookup failed (attempt %d/%d, retry %d/%d): %v", filerAddress, x+1, n, retry+1, maxRetries, err)

Loading…
Cancel
Save