diff --git a/weed/wdclient/filer_client.go b/weed/wdclient/filer_client.go index e27d4e730..2e0d20095 100644 --- a/weed/wdclient/filer_client.go +++ b/weed/wdclient/filer_client.go @@ -8,11 +8,12 @@ import ( "time" "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" - "github.com/seaweedfs/seaweedfs/weed/util" ) // UrlPreference controls which URL to use for volume access @@ -139,8 +140,35 @@ func (fc *FilerClient) GetLookupFileIdFunction() LookupFileIdFunctionType { } } +// isRetryableGrpcError checks if a gRPC error is transient and should be retried +func isRetryableGrpcError(err error) bool { + if err == nil { + return false + } + + // Check gRPC status code + st, ok := status.FromError(err) + if ok { + switch st.Code() { + case codes.Unavailable, // Server unavailable (temporary) + codes.DeadlineExceeded, // Request timeout + codes.ResourceExhausted, // Rate limited or overloaded + codes.Aborted: // Operation aborted (might succeed on retry) + return true + } + } + + // Fallback to string matching for non-gRPC errors (e.g., network errors) + errStr := err.Error() + return strings.Contains(errStr, "transport") || + strings.Contains(errStr, "connection") || + strings.Contains(errStr, "timeout") || + strings.Contains(errStr, "unavailable") +} + // LookupVolumeIds queries the filer for volume locations with automatic failover // Tries all configured filer addresses until one succeeds (high availability) +// Retries transient gRPC errors (Unavailable, DeadlineExceeded, etc.) with exponential backoff // Note: Unlike master's VolumeIdLocation, filer's Locations message doesn't currently have // an Error field. This implementation handles the current structure while being prepared // for future error reporting enhancements. @@ -155,9 +183,13 @@ func (p *filerVolumeProvider) LookupVolumeIds(ctx context.Context, volumeIds []s // Convert grpcTimeout to milliseconds for the signature parameter timeoutMs := int32(fc.grpcTimeout.Milliseconds()) - // Try all filer addresses with round-robin starting from current index + // Retry transient failures with exponential backoff var lastErr error - err := util.Retry("filer volume lookup", func() error { + waitTime := time.Second + maxRetries := 3 + + for retry := 0; retry < maxRetries; retry++ { + // Try all filer addresses with round-robin starting from current index i := atomic.LoadInt32(&fc.filerIndex) n := int32(len(fc.filerAddresses)) @@ -207,7 +239,7 @@ func (p *filerVolumeProvider) LookupVolumeIds(ctx context.Context, volumeIds []s }) if err != nil { - glog.V(1).Infof("FilerClient: filer %s lookup failed (attempt %d/%d): %v", filerAddress, x+1, n, err) + glog.V(1).Infof("FilerClient: filer %s lookup failed (attempt %d/%d, retry %d/%d): %v", filerAddress, x+1, n, retry+1, maxRetries, err) lastErr = err i++ if i >= n { @@ -219,16 +251,25 @@ func (p *filerVolumeProvider) LookupVolumeIds(ctx context.Context, volumeIds []s // Success - update the preferred filer index for next time atomic.StoreInt32(&fc.filerIndex, i) glog.V(3).Infof("FilerClient: looked up %d volumes on %s, found %d", len(volumeIds), filerAddress, len(result)) - return nil + return result, nil } - // All filers failed - return fmt.Errorf("all %d filer(s) failed, last error: %w", n, lastErr) - }) - - if err != nil { - return nil, fmt.Errorf("filer volume lookup failed for %d volume(s): %w", len(volumeIds), err) + // All filers failed on this attempt + // Check if the error is retryable (transient gRPC error) + if !isRetryableGrpcError(lastErr) { + // Non-retryable error (e.g., NotFound, PermissionDenied) - fail immediately + return nil, fmt.Errorf("all %d filer(s) failed with non-retryable error: %w", n, lastErr) + } + + // Transient error - retry if we have attempts left + if retry < maxRetries-1 { + glog.V(1).Infof("FilerClient: all %d filer(s) failed with retryable error (attempt %d/%d), retrying in %v: %v", + n, retry+1, maxRetries, waitTime, lastErr) + time.Sleep(waitTime) + waitTime = waitTime * 3 / 2 // Exponential backoff: 1s, 1.5s, 2.25s + } } - return result, nil + // All retries exhausted + return nil, fmt.Errorf("all %d filer(s) failed after %d attempts, last error: %w", len(fc.filerAddresses), maxRetries, lastErr) }