From 3a5b5ea02ccb030b8b77b3f58ac26dd7f07f5f3a Mon Sep 17 00:00:00 2001 From: chrislu Date: Thu, 20 Nov 2025 16:20:06 -0800 Subject: [PATCH] improve: add circuit breaker to skip known-unhealthy filers MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The previous implementation tried all filers on every failure, including known-unhealthy ones. This wasted time retrying permanently down filers. Problem scenario (3 filers, filer0 is down): - Last successful: filer1 (saved as filerIndex=1) - Next lookup when filer1 fails: Retry 1: filer1(fail) → filer2(fail) → filer0(fail, wastes 5s timeout) Retry 2: filer1(fail) → filer2(fail) → filer0(fail, wastes 5s timeout) Retry 3: filer1(fail) → filer2(fail) → filer0(fail, wastes 5s timeout) Total wasted: 15 seconds on known-bad filer! Solution: Circuit breaker pattern - Track consecutive failures per filer (atomic int32) - Skip filers with 3+ consecutive failures - Re-check unhealthy filers every 30 seconds - Reset failure count on success New behavior: - filer0 fails 3 times → marked unhealthy - Future lookups skip filer0 for 30 seconds - After 30s, re-check filer0 (allows recovery) - If filer0 succeeds, reset failure count to 0 Benefits: 1. Avoids wasting time on known-down filers 2. Still sticks to last healthy filer (via filerIndex) 3. Allows recovery (30s re-check window) 4. No configuration needed (automatic) Implementation details: - filerHealth struct tracks failureCount (atomic) + lastFailureTime - shouldSkipUnhealthyFiler(): checks if we should skip this filer - recordFilerSuccess(): resets failure count to 0 - recordFilerFailure(): increments count, updates timestamp - Logs when skipping unhealthy filers (V(2) level) Example with circuit breaker: - filer0 down, saved filerIndex=1 (filer1 healthy) - Lookup 1: filer1(ok) → Done (0.01s) - Lookup 2: filer1(fail) → filer2(ok) → Done, save filerIndex=2 (0.01s) - Lookup 3: filer2(fail) → skip filer0 (unhealthy) → filer1(ok) → Done (0.01s) Much better than wasting 15s trying filer0 repeatedly! --- weed/wdclient/filer_client.go | 63 ++++++++++++++++++++++++++++++++++- 1 file changed, 62 insertions(+), 1 deletion(-) diff --git a/weed/wdclient/filer_client.go b/weed/wdclient/filer_client.go index c6c2f8326..8065c4dfd 100644 --- a/weed/wdclient/filer_client.go +++ b/weed/wdclient/filer_client.go @@ -24,13 +24,21 @@ const ( PreferPublicUrl UrlPreference = "publicUrl" // Use public URL ) +// filerHealth tracks the health status of a filer +type filerHealth struct { + failureCount int32 // atomic: consecutive failures + lastFailureTime time.Time // last time this filer failed +} + // FilerClient provides volume location services by querying a filer // It uses the shared vidMap cache for efficient lookups // Supports multiple filer addresses with automatic failover for high availability +// Tracks filer health to avoid repeatedly trying known-unhealthy filers type FilerClient struct { *vidMapClient filerAddresses []pb.ServerAddress filerIndex int32 // atomic: current filer index for round-robin + filerHealth []*filerHealth // health status per filer (same order as filerAddresses) grpcDialOption grpc.DialOption urlPreference UrlPreference grpcTimeout time.Duration @@ -77,9 +85,16 @@ func NewFilerClient(filerAddresses []pb.ServerAddress, grpcDialOption grpc.DialO } } + // Initialize health tracking for each filer + health := make([]*filerHealth, len(filerAddresses)) + for i := range health { + health[i] = &filerHealth{} + } + fc := &FilerClient{ filerAddresses: filerAddresses, filerIndex: 0, + filerHealth: health, grpcDialOption: grpcDialOption, urlPreference: urlPref, grpcTimeout: grpcTimeout, @@ -166,6 +181,38 @@ func isRetryableGrpcError(err error) bool { strings.Contains(errStr, "unavailable") } +// shouldSkipUnhealthyFiler checks if we should skip a filer based on recent failures +// Circuit breaker pattern: skip filers with multiple recent consecutive failures +func (fc *FilerClient) shouldSkipUnhealthyFiler(index int32) bool { + health := fc.filerHealth[index] + failureCount := atomic.LoadInt32(&health.failureCount) + + // Allow up to 2 failures before skipping + if failureCount < 3 { + return false + } + + // Re-check unhealthy filers every 30 seconds + if time.Since(health.lastFailureTime) > 30*time.Second { + return false // Time to re-check + } + + return true // Skip this unhealthy filer +} + +// recordFilerSuccess resets failure tracking for a successful filer +func (fc *FilerClient) recordFilerSuccess(index int32) { + health := fc.filerHealth[index] + atomic.StoreInt32(&health.failureCount, 0) +} + +// recordFilerFailure increments failure count for an unhealthy filer +func (fc *FilerClient) recordFilerFailure(index int32) { + health := fc.filerHealth[index] + atomic.AddInt32(&health.failureCount, 1) + health.lastFailureTime = time.Now() +} + // LookupVolumeIds queries the filer for volume locations with automatic failover // Tries all configured filer addresses until one succeeds (high availability) // Retries transient gRPC errors (Unavailable, DeadlineExceeded, etc.) with exponential backoff @@ -190,10 +237,22 @@ func (p *filerVolumeProvider) LookupVolumeIds(ctx context.Context, volumeIds []s for retry := 0; retry < maxRetries; retry++ { // Try all filer addresses with round-robin starting from current index + // Skip known-unhealthy filers (circuit breaker pattern) i := atomic.LoadInt32(&fc.filerIndex) n := int32(len(fc.filerAddresses)) for x := int32(0); x < n; x++ { + // Circuit breaker: skip unhealthy filers + if fc.shouldSkipUnhealthyFiler(i) { + glog.V(2).Infof("FilerClient: skipping unhealthy filer %s (consecutive failures: %d)", + fc.filerAddresses[i], atomic.LoadInt32(&fc.filerHealth[i].failureCount)) + i++ + if i >= n { + i = 0 + } + continue + } + filerAddress := fc.filerAddresses[i] err := pb.WithGrpcFilerClient(false, timeoutMs, filerAddress, fc.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { @@ -240,6 +299,7 @@ func (p *filerVolumeProvider) LookupVolumeIds(ctx context.Context, volumeIds []s if err != nil { glog.V(1).Infof("FilerClient: filer %s lookup failed (attempt %d/%d, retry %d/%d): %v", filerAddress, x+1, n, retry+1, maxRetries, err) + fc.recordFilerFailure(i) lastErr = err i++ if i >= n { @@ -248,8 +308,9 @@ func (p *filerVolumeProvider) LookupVolumeIds(ctx context.Context, volumeIds []s continue } - // Success - update the preferred filer index for next time + // Success - update the preferred filer index and reset health tracking atomic.StoreInt32(&fc.filerIndex, i) + fc.recordFilerSuccess(i) glog.V(3).Infof("FilerClient: looked up %d volumes on %s, found %d", len(volumeIds), filerAddress, len(result)) return result, nil }