diff --git a/weed/wdclient/filer_client.go b/weed/wdclient/filer_client.go index c6c2f8326..8065c4dfd 100644 --- a/weed/wdclient/filer_client.go +++ b/weed/wdclient/filer_client.go @@ -24,13 +24,21 @@ const ( PreferPublicUrl UrlPreference = "publicUrl" // Use public URL ) +// filerHealth tracks the health status of a filer +type filerHealth struct { + failureCount int32 // atomic: consecutive failures + lastFailureTime time.Time // last time this filer failed +} + // FilerClient provides volume location services by querying a filer // It uses the shared vidMap cache for efficient lookups // Supports multiple filer addresses with automatic failover for high availability +// Tracks filer health to avoid repeatedly trying known-unhealthy filers type FilerClient struct { *vidMapClient filerAddresses []pb.ServerAddress filerIndex int32 // atomic: current filer index for round-robin + filerHealth []*filerHealth // health status per filer (same order as filerAddresses) grpcDialOption grpc.DialOption urlPreference UrlPreference grpcTimeout time.Duration @@ -77,9 +85,16 @@ func NewFilerClient(filerAddresses []pb.ServerAddress, grpcDialOption grpc.DialO } } + // Initialize health tracking for each filer + health := make([]*filerHealth, len(filerAddresses)) + for i := range health { + health[i] = &filerHealth{} + } + fc := &FilerClient{ filerAddresses: filerAddresses, filerIndex: 0, + filerHealth: health, grpcDialOption: grpcDialOption, urlPreference: urlPref, grpcTimeout: grpcTimeout, @@ -166,6 +181,38 @@ func isRetryableGrpcError(err error) bool { strings.Contains(errStr, "unavailable") } +// shouldSkipUnhealthyFiler checks if we should skip a filer based on recent failures +// Circuit breaker pattern: skip filers with multiple recent consecutive failures +func (fc *FilerClient) shouldSkipUnhealthyFiler(index int32) bool { + health := fc.filerHealth[index] + failureCount := atomic.LoadInt32(&health.failureCount) + + // Allow up to 2 failures before skipping + if failureCount < 3 { + return false + } + + // Re-check unhealthy filers every 30 seconds + if time.Since(health.lastFailureTime) > 30*time.Second { + return false // Time to re-check + } + + return true // Skip this unhealthy filer +} + +// recordFilerSuccess resets failure tracking for a successful filer +func (fc *FilerClient) recordFilerSuccess(index int32) { + health := fc.filerHealth[index] + atomic.StoreInt32(&health.failureCount, 0) +} + +// recordFilerFailure increments failure count for an unhealthy filer +func (fc *FilerClient) recordFilerFailure(index int32) { + health := fc.filerHealth[index] + atomic.AddInt32(&health.failureCount, 1) + health.lastFailureTime = time.Now() +} + // LookupVolumeIds queries the filer for volume locations with automatic failover // Tries all configured filer addresses until one succeeds (high availability) // Retries transient gRPC errors (Unavailable, DeadlineExceeded, etc.) with exponential backoff @@ -190,10 +237,22 @@ func (p *filerVolumeProvider) LookupVolumeIds(ctx context.Context, volumeIds []s for retry := 0; retry < maxRetries; retry++ { // Try all filer addresses with round-robin starting from current index + // Skip known-unhealthy filers (circuit breaker pattern) i := atomic.LoadInt32(&fc.filerIndex) n := int32(len(fc.filerAddresses)) for x := int32(0); x < n; x++ { + // Circuit breaker: skip unhealthy filers + if fc.shouldSkipUnhealthyFiler(i) { + glog.V(2).Infof("FilerClient: skipping unhealthy filer %s (consecutive failures: %d)", + fc.filerAddresses[i], atomic.LoadInt32(&fc.filerHealth[i].failureCount)) + i++ + if i >= n { + i = 0 + } + continue + } + filerAddress := fc.filerAddresses[i] err := pb.WithGrpcFilerClient(false, timeoutMs, filerAddress, fc.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { @@ -240,6 +299,7 @@ func (p *filerVolumeProvider) LookupVolumeIds(ctx context.Context, volumeIds []s if err != nil { glog.V(1).Infof("FilerClient: filer %s lookup failed (attempt %d/%d, retry %d/%d): %v", filerAddress, x+1, n, retry+1, maxRetries, err) + fc.recordFilerFailure(i) lastErr = err i++ if i >= n { @@ -248,8 +308,9 @@ func (p *filerVolumeProvider) LookupVolumeIds(ctx context.Context, volumeIds []s continue } - // Success - update the preferred filer index for next time + // Success - update the preferred filer index and reset health tracking atomic.StoreInt32(&fc.filerIndex, i) + fc.recordFilerSuccess(i) glog.V(3).Infof("FilerClient: looked up %d volumes on %s, found %d", len(volumeIds), filerAddress, len(result)) return result, nil }