Browse Source

improve: implement gRPC-aware retry for FilerClient volume lookups

The previous implementation used util.Retry which only retries errors
containing the string "transport". This is insufficient for handling
the full range of transient gRPC errors.

Changes:
1. Added isRetryableGrpcError() to properly inspect gRPC status codes
   - Retries: Unavailable, DeadlineExceeded, ResourceExhausted, Aborted
   - Falls back to string matching for non-gRPC network errors

2. Replaced util.Retry with custom retry loop
   - 3 attempts with exponential backoff (1s, 1.5s, 2.25s)
   - Tries all N filers on each attempt (N*3 total attempts max)
   - Fast-fails on non-retryable errors (NotFound, PermissionDenied, etc.)

3. Improved logging
   - Shows both filer attempt (x/N) and retry attempt (y/3)
   - Logs retry reason and wait time for debugging

Benefits:
- Better handling of transient gRPC failures (server restarts, load spikes)
- Faster failure for permanent errors (no wasted retries)
- More informative logs for troubleshooting
- Maintains existing HA failover across multiple filers

Example: If all 3 filers return Unavailable (server overload):
- Attempt 1: try all 3 filers, wait 1s
- Attempt 2: try all 3 filers, wait 1.5s
- Attempt 3: try all 3 filers, fail

Example: If filer returns NotFound (volume doesn't exist):
- Attempt 1: try all 3 filers, fast-fail (no retry)
pull/7518/head
chrislu 2 weeks ago
parent
commit
85ad2e9a13
  1. 65
      weed/wdclient/filer_client.go

65
weed/wdclient/filer_client.go

@ -8,11 +8,12 @@ import (
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/util"
)
// UrlPreference controls which URL to use for volume access
@ -139,8 +140,35 @@ func (fc *FilerClient) GetLookupFileIdFunction() LookupFileIdFunctionType {
}
}
// isRetryableGrpcError checks if a gRPC error is transient and should be retried
func isRetryableGrpcError(err error) bool {
if err == nil {
return false
}
// Check gRPC status code
st, ok := status.FromError(err)
if ok {
switch st.Code() {
case codes.Unavailable, // Server unavailable (temporary)
codes.DeadlineExceeded, // Request timeout
codes.ResourceExhausted, // Rate limited or overloaded
codes.Aborted: // Operation aborted (might succeed on retry)
return true
}
}
// Fallback to string matching for non-gRPC errors (e.g., network errors)
errStr := err.Error()
return strings.Contains(errStr, "transport") ||
strings.Contains(errStr, "connection") ||
strings.Contains(errStr, "timeout") ||
strings.Contains(errStr, "unavailable")
}
// LookupVolumeIds queries the filer for volume locations with automatic failover
// Tries all configured filer addresses until one succeeds (high availability)
// Retries transient gRPC errors (Unavailable, DeadlineExceeded, etc.) with exponential backoff
// Note: Unlike master's VolumeIdLocation, filer's Locations message doesn't currently have
// an Error field. This implementation handles the current structure while being prepared
// for future error reporting enhancements.
@ -155,9 +183,13 @@ func (p *filerVolumeProvider) LookupVolumeIds(ctx context.Context, volumeIds []s
// Convert grpcTimeout to milliseconds for the signature parameter
timeoutMs := int32(fc.grpcTimeout.Milliseconds())
// Try all filer addresses with round-robin starting from current index
// Retry transient failures with exponential backoff
var lastErr error
err := util.Retry("filer volume lookup", func() error {
waitTime := time.Second
maxRetries := 3
for retry := 0; retry < maxRetries; retry++ {
// Try all filer addresses with round-robin starting from current index
i := atomic.LoadInt32(&fc.filerIndex)
n := int32(len(fc.filerAddresses))
@ -207,7 +239,7 @@ func (p *filerVolumeProvider) LookupVolumeIds(ctx context.Context, volumeIds []s
})
if err != nil {
glog.V(1).Infof("FilerClient: filer %s lookup failed (attempt %d/%d): %v", filerAddress, x+1, n, err)
glog.V(1).Infof("FilerClient: filer %s lookup failed (attempt %d/%d, retry %d/%d): %v", filerAddress, x+1, n, retry+1, maxRetries, err)
lastErr = err
i++
if i >= n {
@ -219,16 +251,25 @@ func (p *filerVolumeProvider) LookupVolumeIds(ctx context.Context, volumeIds []s
// Success - update the preferred filer index for next time
atomic.StoreInt32(&fc.filerIndex, i)
glog.V(3).Infof("FilerClient: looked up %d volumes on %s, found %d", len(volumeIds), filerAddress, len(result))
return nil
return result, nil
}
// All filers failed
return fmt.Errorf("all %d filer(s) failed, last error: %w", n, lastErr)
})
if err != nil {
return nil, fmt.Errorf("filer volume lookup failed for %d volume(s): %w", len(volumeIds), err)
// All filers failed on this attempt
// Check if the error is retryable (transient gRPC error)
if !isRetryableGrpcError(lastErr) {
// Non-retryable error (e.g., NotFound, PermissionDenied) - fail immediately
return nil, fmt.Errorf("all %d filer(s) failed with non-retryable error: %w", n, lastErr)
}
// Transient error - retry if we have attempts left
if retry < maxRetries-1 {
glog.V(1).Infof("FilerClient: all %d filer(s) failed with retryable error (attempt %d/%d), retrying in %v: %v",
n, retry+1, maxRetries, waitTime, lastErr)
time.Sleep(waitTime)
waitTime = waitTime * 3 / 2 // Exponential backoff: 1s, 1.5s, 2.25s
}
}
return result, nil
// All retries exhausted
return nil, fmt.Errorf("all %d filer(s) failed after %d attempts, last error: %w", len(fc.filerAddresses), maxRetries, lastErr)
}
Loading…
Cancel
Save