From ba2dcfc26c2cfef573ee92ec5483af285f2638d8 Mon Sep 17 00:00:00 2001 From: chrislu Date: Thu, 20 Nov 2025 17:51:03 -0800 Subject: [PATCH] refactor: make circuit breaker parameters configurable in FilerClient MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The circuit breaker failure threshold (3) and reset timeout (30s) were hardcoded, making it difficult to tune the client's behavior in different deployment environments without modifying the code. Problem: func shouldSkipUnhealthyFiler(index int32) bool { if failureCount < 3 { // Hardcoded threshold return false } if time.Since(lastFailureTime) > 30*time.Second { // Hardcoded timeout return false } } Different environments have different needs: - High-traffic production: may want lower threshold (2) for faster failover - Development/testing: may want higher threshold (5) to tolerate flaky networks - Low-latency services: may want shorter reset timeout (10s) - Batch processing: may want longer reset timeout (60s) Solution: 1. Added fields to FilerClientOption: - FailureThreshold int32 (default: 3) - ResetTimeout time.Duration (default: 30s) 2. Added fields to FilerClient: - failureThreshold int32 - resetTimeout time.Duration 3. Applied defaults in NewFilerClient with option override: failureThreshold := int32(3) resetTimeout := 30 * time.Second if opt.FailureThreshold > 0 { failureThreshold = opt.FailureThreshold } if opt.ResetTimeout > 0 { resetTimeout = opt.ResetTimeout } 4. Updated shouldSkipUnhealthyFiler to use configurable values: if failureCount < fc.failureThreshold { ... } if time.Since(lastFailureTime) > fc.resetTimeout { ... } Benefits: ✓ Tunable for different deployment environments ✓ Backward compatible (defaults match previous hardcoded values) ✓ No breaking changes to existing code ✓ Better maintainability and flexibility Example usage: // Aggressive failover for low-latency production fc := wdclient.NewFilerClient(filers, dialOpt, dc, &wdclient.FilerClientOption{ FailureThreshold: 2, ResetTimeout: 10 * time.Second, }) // Tolerant of flaky networks in development fc := wdclient.NewFilerClient(filers, dialOpt, dc, &wdclient.FilerClientOption{ FailureThreshold: 5, ResetTimeout: 60 * time.Second, }) --- weed/wdclient/filer_client.go | 60 +++++++++++++++++++++-------------- 1 file changed, 37 insertions(+), 23 deletions(-) diff --git a/weed/wdclient/filer_client.go b/weed/wdclient/filer_client.go index 766b1c7fa..0cb1530a9 100644 --- a/weed/wdclient/filer_client.go +++ b/weed/wdclient/filer_client.go @@ -37,14 +37,16 @@ type filerHealth struct { // Tracks filer health to avoid repeatedly trying known-unhealthy filers type FilerClient struct { *vidMapClient - filerAddresses []pb.ServerAddress - filerIndex int32 // atomic: current filer index for round-robin - filerHealth []*filerHealth // health status per filer (same order as filerAddresses) - grpcDialOption grpc.DialOption - urlPreference UrlPreference - grpcTimeout time.Duration - cacheSize int // Number of historical vidMap snapshots to keep - clientId int32 // Unique client identifier for gRPC metadata + filerAddresses []pb.ServerAddress + filerIndex int32 // atomic: current filer index for round-robin + filerHealth []*filerHealth // health status per filer (same order as filerAddresses) + grpcDialOption grpc.DialOption + urlPreference UrlPreference + grpcTimeout time.Duration + cacheSize int // Number of historical vidMap snapshots to keep + clientId int32 // Unique client identifier for gRPC metadata + failureThreshold int32 // Number of consecutive failures before circuit opens + resetTimeout time.Duration // Time to wait before re-checking unhealthy filer } // filerVolumeProvider implements VolumeLocationProvider by querying filer @@ -55,9 +57,11 @@ type filerVolumeProvider struct { // FilerClientOption holds optional configuration for FilerClient type FilerClientOption struct { - GrpcTimeout time.Duration - UrlPreference UrlPreference - CacheSize int // Number of historical vidMap snapshots (0 = use default) + GrpcTimeout time.Duration + UrlPreference UrlPreference + CacheSize int // Number of historical vidMap snapshots (0 = use default) + FailureThreshold int32 // Circuit breaker: consecutive failures before skipping filer (0 = use default of 3) + ResetTimeout time.Duration // Circuit breaker: time before re-checking unhealthy filer (0 = use default of 30s) } // NewFilerClient creates a new client that queries filer(s) for volume locations @@ -72,6 +76,8 @@ func NewFilerClient(filerAddresses []pb.ServerAddress, grpcDialOption grpc.DialO grpcTimeout := 5 * time.Second urlPref := PreferUrl cacheSize := DefaultVidMapCacheSize + failureThreshold := int32(3) // Default: 3 consecutive failures before circuit opens + resetTimeout := 30 * time.Second // Default: 30 seconds before re-checking unhealthy filer // Override with provided options if len(opts) > 0 && opts[0] != nil { @@ -85,6 +91,12 @@ func NewFilerClient(filerAddresses []pb.ServerAddress, grpcDialOption grpc.DialO if opt.CacheSize > 0 { cacheSize = opt.CacheSize } + if opt.FailureThreshold > 0 { + failureThreshold = opt.FailureThreshold + } + if opt.ResetTimeout > 0 { + resetTimeout = opt.ResetTimeout + } } // Initialize health tracking for each filer @@ -94,14 +106,16 @@ func NewFilerClient(filerAddresses []pb.ServerAddress, grpcDialOption grpc.DialO } fc := &FilerClient{ - filerAddresses: filerAddresses, - filerIndex: 0, - filerHealth: health, - grpcDialOption: grpcDialOption, - urlPreference: urlPref, - grpcTimeout: grpcTimeout, - cacheSize: cacheSize, - clientId: rand.Int31(), // Random client ID for gRPC metadata tracking + filerAddresses: filerAddresses, + filerIndex: 0, + filerHealth: health, + grpcDialOption: grpcDialOption, + urlPreference: urlPref, + grpcTimeout: grpcTimeout, + cacheSize: cacheSize, + clientId: rand.Int31(), // Random client ID for gRPC metadata tracking + failureThreshold: failureThreshold, + resetTimeout: resetTimeout, } // Create provider that references this FilerClient for failover support @@ -202,18 +216,18 @@ func (fc *FilerClient) shouldSkipUnhealthyFiler(index int32) bool { health := fc.filerHealth[index] failureCount := atomic.LoadInt32(&health.failureCount) - // Allow up to 2 failures before skipping - if failureCount < 3 { + // Check if failure count exceeds threshold + if failureCount < fc.failureThreshold { return false } - // Re-check unhealthy filers every 30 seconds + // Re-check unhealthy filers after reset timeout lastFailureNs := atomic.LoadInt64(&health.lastFailureTimeNs) if lastFailureNs == 0 { return false // Never failed, shouldn't skip } lastFailureTime := time.Unix(0, lastFailureNs) - if time.Since(lastFailureTime) > 30*time.Second { + if time.Since(lastFailureTime) > fc.resetTimeout { return false // Time to re-check }