Browse Source

refactor: make circuit breaker parameters configurable in FilerClient

The circuit breaker failure threshold (3) and reset timeout (30s) were
hardcoded, making it difficult to tune the client's behavior in different
deployment environments without modifying the code.

Problem:
  func shouldSkipUnhealthyFiler(index int32) bool {
    if failureCount < 3 {              // Hardcoded threshold
      return false
    }
    if time.Since(lastFailureTime) > 30*time.Second {  // Hardcoded timeout
      return false
    }
  }

Different environments have different needs:
  - High-traffic production: may want lower threshold (2) for faster failover
  - Development/testing: may want higher threshold (5) to tolerate flaky networks
  - Low-latency services: may want shorter reset timeout (10s)
  - Batch processing: may want longer reset timeout (60s)

Solution:
  1. Added fields to FilerClientOption:
     - FailureThreshold int32 (default: 3)
     - ResetTimeout time.Duration (default: 30s)

  2. Added fields to FilerClient:
     - failureThreshold int32
     - resetTimeout time.Duration

  3. Applied defaults in NewFilerClient with option override:
     failureThreshold := int32(3)
     resetTimeout := 30 * time.Second
     if opt.FailureThreshold > 0 {
       failureThreshold = opt.FailureThreshold
     }
     if opt.ResetTimeout > 0 {
       resetTimeout = opt.ResetTimeout
     }

  4. Updated shouldSkipUnhealthyFiler to use configurable values:
     if failureCount < fc.failureThreshold { ... }
     if time.Since(lastFailureTime) > fc.resetTimeout { ... }

Benefits:
  ✓ Tunable for different deployment environments
  ✓ Backward compatible (defaults match previous hardcoded values)
  ✓ No breaking changes to existing code
  ✓ Better maintainability and flexibility

Example usage:
  // Aggressive failover for low-latency production
  fc := wdclient.NewFilerClient(filers, dialOpt, dc, &wdclient.FilerClientOption{
    FailureThreshold: 2,
    ResetTimeout:     10 * time.Second,
  })

  // Tolerant of flaky networks in development
  fc := wdclient.NewFilerClient(filers, dialOpt, dc, &wdclient.FilerClientOption{
    FailureThreshold: 5,
    ResetTimeout:     60 * time.Second,
  })
pull/7518/head
chrislu 2 weeks ago
parent
commit
ba2dcfc26c
  1. 60
      weed/wdclient/filer_client.go

60
weed/wdclient/filer_client.go

@ -37,14 +37,16 @@ type filerHealth struct {
// Tracks filer health to avoid repeatedly trying known-unhealthy filers
type FilerClient struct {
*vidMapClient
filerAddresses []pb.ServerAddress
filerIndex int32 // atomic: current filer index for round-robin
filerHealth []*filerHealth // health status per filer (same order as filerAddresses)
grpcDialOption grpc.DialOption
urlPreference UrlPreference
grpcTimeout time.Duration
cacheSize int // Number of historical vidMap snapshots to keep
clientId int32 // Unique client identifier for gRPC metadata
filerAddresses []pb.ServerAddress
filerIndex int32 // atomic: current filer index for round-robin
filerHealth []*filerHealth // health status per filer (same order as filerAddresses)
grpcDialOption grpc.DialOption
urlPreference UrlPreference
grpcTimeout time.Duration
cacheSize int // Number of historical vidMap snapshots to keep
clientId int32 // Unique client identifier for gRPC metadata
failureThreshold int32 // Number of consecutive failures before circuit opens
resetTimeout time.Duration // Time to wait before re-checking unhealthy filer
}
// filerVolumeProvider implements VolumeLocationProvider by querying filer
@ -55,9 +57,11 @@ type filerVolumeProvider struct {
// FilerClientOption holds optional configuration for FilerClient
type FilerClientOption struct {
GrpcTimeout time.Duration
UrlPreference UrlPreference
CacheSize int // Number of historical vidMap snapshots (0 = use default)
GrpcTimeout time.Duration
UrlPreference UrlPreference
CacheSize int // Number of historical vidMap snapshots (0 = use default)
FailureThreshold int32 // Circuit breaker: consecutive failures before skipping filer (0 = use default of 3)
ResetTimeout time.Duration // Circuit breaker: time before re-checking unhealthy filer (0 = use default of 30s)
}
// NewFilerClient creates a new client that queries filer(s) for volume locations
@ -72,6 +76,8 @@ func NewFilerClient(filerAddresses []pb.ServerAddress, grpcDialOption grpc.DialO
grpcTimeout := 5 * time.Second
urlPref := PreferUrl
cacheSize := DefaultVidMapCacheSize
failureThreshold := int32(3) // Default: 3 consecutive failures before circuit opens
resetTimeout := 30 * time.Second // Default: 30 seconds before re-checking unhealthy filer
// Override with provided options
if len(opts) > 0 && opts[0] != nil {
@ -85,6 +91,12 @@ func NewFilerClient(filerAddresses []pb.ServerAddress, grpcDialOption grpc.DialO
if opt.CacheSize > 0 {
cacheSize = opt.CacheSize
}
if opt.FailureThreshold > 0 {
failureThreshold = opt.FailureThreshold
}
if opt.ResetTimeout > 0 {
resetTimeout = opt.ResetTimeout
}
}
// Initialize health tracking for each filer
@ -94,14 +106,16 @@ func NewFilerClient(filerAddresses []pb.ServerAddress, grpcDialOption grpc.DialO
}
fc := &FilerClient{
filerAddresses: filerAddresses,
filerIndex: 0,
filerHealth: health,
grpcDialOption: grpcDialOption,
urlPreference: urlPref,
grpcTimeout: grpcTimeout,
cacheSize: cacheSize,
clientId: rand.Int31(), // Random client ID for gRPC metadata tracking
filerAddresses: filerAddresses,
filerIndex: 0,
filerHealth: health,
grpcDialOption: grpcDialOption,
urlPreference: urlPref,
grpcTimeout: grpcTimeout,
cacheSize: cacheSize,
clientId: rand.Int31(), // Random client ID for gRPC metadata tracking
failureThreshold: failureThreshold,
resetTimeout: resetTimeout,
}
// Create provider that references this FilerClient for failover support
@ -202,18 +216,18 @@ func (fc *FilerClient) shouldSkipUnhealthyFiler(index int32) bool {
health := fc.filerHealth[index]
failureCount := atomic.LoadInt32(&health.failureCount)
// Allow up to 2 failures before skipping
if failureCount < 3 {
// Check if failure count exceeds threshold
if failureCount < fc.failureThreshold {
return false
}
// Re-check unhealthy filers every 30 seconds
// Re-check unhealthy filers after reset timeout
lastFailureNs := atomic.LoadInt64(&health.lastFailureTimeNs)
if lastFailureNs == 0 {
return false // Never failed, shouldn't skip
}
lastFailureTime := time.Unix(0, lastFailureNs)
if time.Since(lastFailureTime) > 30*time.Second {
if time.Since(lastFailureTime) > fc.resetTimeout {
return false // Time to re-check
}

Loading…
Cancel
Save