|
|
@ -45,8 +45,11 @@ type FilerClient struct { |
|
|
grpcTimeout time.Duration |
|
|
grpcTimeout time.Duration |
|
|
cacheSize int // Number of historical vidMap snapshots to keep
|
|
|
cacheSize int // Number of historical vidMap snapshots to keep
|
|
|
clientId int32 // Unique client identifier for gRPC metadata
|
|
|
clientId int32 // Unique client identifier for gRPC metadata
|
|
|
failureThreshold int32 // Number of consecutive failures before circuit opens
|
|
|
|
|
|
resetTimeout time.Duration // Time to wait before re-checking unhealthy filer
|
|
|
|
|
|
|
|
|
failureThreshold int32 // Circuit breaker: consecutive failures before circuit opens
|
|
|
|
|
|
resetTimeout time.Duration // Circuit breaker: time before re-checking unhealthy filer
|
|
|
|
|
|
maxRetries int // Retry: maximum retry attempts for transient failures
|
|
|
|
|
|
initialRetryWait time.Duration // Retry: initial wait time before first retry
|
|
|
|
|
|
retryBackoffFactor float64 // Retry: backoff multiplier for wait time
|
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
// filerVolumeProvider implements VolumeLocationProvider by querying filer
|
|
|
// filerVolumeProvider implements VolumeLocationProvider by querying filer
|
|
|
@ -62,6 +65,9 @@ type FilerClientOption struct { |
|
|
CacheSize int // Number of historical vidMap snapshots (0 = use default)
|
|
|
CacheSize int // Number of historical vidMap snapshots (0 = use default)
|
|
|
FailureThreshold int32 // Circuit breaker: consecutive failures before skipping filer (0 = use default of 3)
|
|
|
FailureThreshold int32 // Circuit breaker: consecutive failures before skipping filer (0 = use default of 3)
|
|
|
ResetTimeout time.Duration // Circuit breaker: time before re-checking unhealthy filer (0 = use default of 30s)
|
|
|
ResetTimeout time.Duration // Circuit breaker: time before re-checking unhealthy filer (0 = use default of 30s)
|
|
|
|
|
|
MaxRetries int // Retry: maximum retry attempts for transient failures (0 = use default of 3)
|
|
|
|
|
|
InitialRetryWait time.Duration // Retry: initial wait time before first retry (0 = use default of 1s)
|
|
|
|
|
|
RetryBackoffFactor float64 // Retry: backoff multiplier for wait time (0 = use default of 1.5)
|
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
// NewFilerClient creates a new client that queries filer(s) for volume locations
|
|
|
// NewFilerClient creates a new client that queries filer(s) for volume locations
|
|
|
@ -78,6 +84,9 @@ func NewFilerClient(filerAddresses []pb.ServerAddress, grpcDialOption grpc.DialO |
|
|
cacheSize := DefaultVidMapCacheSize |
|
|
cacheSize := DefaultVidMapCacheSize |
|
|
failureThreshold := int32(3) // Default: 3 consecutive failures before circuit opens
|
|
|
failureThreshold := int32(3) // Default: 3 consecutive failures before circuit opens
|
|
|
resetTimeout := 30 * time.Second // Default: 30 seconds before re-checking unhealthy filer
|
|
|
resetTimeout := 30 * time.Second // Default: 30 seconds before re-checking unhealthy filer
|
|
|
|
|
|
maxRetries := 3 // Default: 3 retry attempts for transient failures
|
|
|
|
|
|
initialRetryWait := time.Second // Default: 1 second initial retry wait
|
|
|
|
|
|
retryBackoffFactor := 1.5 // Default: 1.5x backoff multiplier
|
|
|
|
|
|
|
|
|
// Override with provided options
|
|
|
// Override with provided options
|
|
|
if len(opts) > 0 && opts[0] != nil { |
|
|
if len(opts) > 0 && opts[0] != nil { |
|
|
@ -97,6 +106,15 @@ func NewFilerClient(filerAddresses []pb.ServerAddress, grpcDialOption grpc.DialO |
|
|
if opt.ResetTimeout > 0 { |
|
|
if opt.ResetTimeout > 0 { |
|
|
resetTimeout = opt.ResetTimeout |
|
|
resetTimeout = opt.ResetTimeout |
|
|
} |
|
|
} |
|
|
|
|
|
if opt.MaxRetries > 0 { |
|
|
|
|
|
maxRetries = opt.MaxRetries |
|
|
|
|
|
} |
|
|
|
|
|
if opt.InitialRetryWait > 0 { |
|
|
|
|
|
initialRetryWait = opt.InitialRetryWait |
|
|
|
|
|
} |
|
|
|
|
|
if opt.RetryBackoffFactor > 0 { |
|
|
|
|
|
retryBackoffFactor = opt.RetryBackoffFactor |
|
|
|
|
|
} |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
// Initialize health tracking for each filer
|
|
|
// Initialize health tracking for each filer
|
|
|
@ -116,6 +134,9 @@ func NewFilerClient(filerAddresses []pb.ServerAddress, grpcDialOption grpc.DialO |
|
|
clientId: rand.Int31(), // Random client ID for gRPC metadata tracking
|
|
|
clientId: rand.Int31(), // Random client ID for gRPC metadata tracking
|
|
|
failureThreshold: failureThreshold, |
|
|
failureThreshold: failureThreshold, |
|
|
resetTimeout: resetTimeout, |
|
|
resetTimeout: resetTimeout, |
|
|
|
|
|
maxRetries: maxRetries, |
|
|
|
|
|
initialRetryWait: initialRetryWait, |
|
|
|
|
|
retryBackoffFactor: retryBackoffFactor, |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
// Create provider that references this FilerClient for failover support
|
|
|
// Create provider that references this FilerClient for failover support
|
|
|
@ -257,10 +278,10 @@ func (p *filerVolumeProvider) LookupVolumeIds(ctx context.Context, volumeIds []s |
|
|
fc := p.filerClient |
|
|
fc := p.filerClient |
|
|
result := make(map[string][]Location) |
|
|
result := make(map[string][]Location) |
|
|
|
|
|
|
|
|
// Retry transient failures with exponential backoff
|
|
|
|
|
|
|
|
|
// Retry transient failures with configurable backoff
|
|
|
var lastErr error |
|
|
var lastErr error |
|
|
waitTime := time.Second |
|
|
|
|
|
maxRetries := 3 |
|
|
|
|
|
|
|
|
waitTime := fc.initialRetryWait |
|
|
|
|
|
maxRetries := fc.maxRetries |
|
|
|
|
|
|
|
|
for retry := 0; retry < maxRetries; retry++ { |
|
|
for retry := 0; retry < maxRetries; retry++ { |
|
|
// Try all filer addresses with round-robin starting from current index
|
|
|
// Try all filer addresses with round-robin starting from current index
|
|
|
@ -358,7 +379,7 @@ func (p *filerVolumeProvider) LookupVolumeIds(ctx context.Context, volumeIds []s |
|
|
glog.V(1).Infof("FilerClient: all %d filer(s) failed with retryable error (attempt %d/%d), retrying in %v: %v", |
|
|
glog.V(1).Infof("FilerClient: all %d filer(s) failed with retryable error (attempt %d/%d), retrying in %v: %v", |
|
|
n, retry+1, maxRetries, waitTime, lastErr) |
|
|
n, retry+1, maxRetries, waitTime, lastErr) |
|
|
time.Sleep(waitTime) |
|
|
time.Sleep(waitTime) |
|
|
waitTime = waitTime * 3 / 2 // Multiplicative backoff with 1.5x factor: 1s, 1.5s, 2.25s
|
|
|
|
|
|
|
|
|
waitTime = time.Duration(float64(waitTime) * fc.retryBackoffFactor) |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
|