Browse Source

improve: add circuit breaker to skip known-unhealthy filers

The previous implementation tried all filers on every failure, including
known-unhealthy ones. This wasted time retrying permanently down filers.

Problem scenario (3 filers, filer0 is down):
- Last successful: filer1 (saved as filerIndex=1)
- Next lookup when filer1 fails:
  Retry 1: filer1(fail) → filer2(fail) → filer0(fail, wastes 5s timeout)
  Retry 2: filer1(fail) → filer2(fail) → filer0(fail, wastes 5s timeout)
  Retry 3: filer1(fail) → filer2(fail) → filer0(fail, wastes 5s timeout)
  Total wasted: 15 seconds on known-bad filer!

Solution: Circuit breaker pattern
- Track consecutive failures per filer (atomic int32)
- Skip filers with 3+ consecutive failures
- Re-check unhealthy filers every 30 seconds
- Reset failure count on success

New behavior:
- filer0 fails 3 times → marked unhealthy
- Future lookups skip filer0 for 30 seconds
- After 30s, re-check filer0 (allows recovery)
- If filer0 succeeds, reset failure count to 0

Benefits:
1. Avoids wasting time on known-down filers
2. Still sticks to last healthy filer (via filerIndex)
3. Allows recovery (30s re-check window)
4. No configuration needed (automatic)

Implementation details:
- filerHealth struct tracks failureCount (atomic) + lastFailureTime
- shouldSkipUnhealthyFiler(): checks if we should skip this filer
- recordFilerSuccess(): resets failure count to 0
- recordFilerFailure(): increments count, updates timestamp
- Logs when skipping unhealthy filers (V(2) level)

Example with circuit breaker:
- filer0 down, saved filerIndex=1 (filer1 healthy)
- Lookup 1: filer1(ok) → Done (0.01s)
- Lookup 2: filer1(fail) → filer2(ok) → Done, save filerIndex=2 (0.01s)
- Lookup 3: filer2(fail) → skip filer0 (unhealthy) → filer1(ok) → Done (0.01s)

Much better than wasting 15s trying filer0 repeatedly!
pull/7518/head
chrislu 2 weeks ago
parent
commit
3a5b5ea02c
  1. 63
      weed/wdclient/filer_client.go

63
weed/wdclient/filer_client.go

@ -24,13 +24,21 @@ const (
PreferPublicUrl UrlPreference = "publicUrl" // Use public URL PreferPublicUrl UrlPreference = "publicUrl" // Use public URL
) )
// filerHealth tracks the health status of a filer
type filerHealth struct {
failureCount int32 // atomic: consecutive failures
lastFailureTime time.Time // last time this filer failed
}
// FilerClient provides volume location services by querying a filer // FilerClient provides volume location services by querying a filer
// It uses the shared vidMap cache for efficient lookups // It uses the shared vidMap cache for efficient lookups
// Supports multiple filer addresses with automatic failover for high availability // Supports multiple filer addresses with automatic failover for high availability
// Tracks filer health to avoid repeatedly trying known-unhealthy filers
type FilerClient struct { type FilerClient struct {
*vidMapClient *vidMapClient
filerAddresses []pb.ServerAddress filerAddresses []pb.ServerAddress
filerIndex int32 // atomic: current filer index for round-robin filerIndex int32 // atomic: current filer index for round-robin
filerHealth []*filerHealth // health status per filer (same order as filerAddresses)
grpcDialOption grpc.DialOption grpcDialOption grpc.DialOption
urlPreference UrlPreference urlPreference UrlPreference
grpcTimeout time.Duration grpcTimeout time.Duration
@ -77,9 +85,16 @@ func NewFilerClient(filerAddresses []pb.ServerAddress, grpcDialOption grpc.DialO
} }
} }
// Initialize health tracking for each filer
health := make([]*filerHealth, len(filerAddresses))
for i := range health {
health[i] = &filerHealth{}
}
fc := &FilerClient{ fc := &FilerClient{
filerAddresses: filerAddresses, filerAddresses: filerAddresses,
filerIndex: 0, filerIndex: 0,
filerHealth: health,
grpcDialOption: grpcDialOption, grpcDialOption: grpcDialOption,
urlPreference: urlPref, urlPreference: urlPref,
grpcTimeout: grpcTimeout, grpcTimeout: grpcTimeout,
@ -166,6 +181,38 @@ func isRetryableGrpcError(err error) bool {
strings.Contains(errStr, "unavailable") strings.Contains(errStr, "unavailable")
} }
// shouldSkipUnhealthyFiler checks if we should skip a filer based on recent failures
// Circuit breaker pattern: skip filers with multiple recent consecutive failures
func (fc *FilerClient) shouldSkipUnhealthyFiler(index int32) bool {
health := fc.filerHealth[index]
failureCount := atomic.LoadInt32(&health.failureCount)
// Allow up to 2 failures before skipping
if failureCount < 3 {
return false
}
// Re-check unhealthy filers every 30 seconds
if time.Since(health.lastFailureTime) > 30*time.Second {
return false // Time to re-check
}
return true // Skip this unhealthy filer
}
// recordFilerSuccess resets failure tracking for a successful filer
func (fc *FilerClient) recordFilerSuccess(index int32) {
health := fc.filerHealth[index]
atomic.StoreInt32(&health.failureCount, 0)
}
// recordFilerFailure increments failure count for an unhealthy filer
func (fc *FilerClient) recordFilerFailure(index int32) {
health := fc.filerHealth[index]
atomic.AddInt32(&health.failureCount, 1)
health.lastFailureTime = time.Now()
}
// LookupVolumeIds queries the filer for volume locations with automatic failover // LookupVolumeIds queries the filer for volume locations with automatic failover
// Tries all configured filer addresses until one succeeds (high availability) // Tries all configured filer addresses until one succeeds (high availability)
// Retries transient gRPC errors (Unavailable, DeadlineExceeded, etc.) with exponential backoff // Retries transient gRPC errors (Unavailable, DeadlineExceeded, etc.) with exponential backoff
@ -190,10 +237,22 @@ func (p *filerVolumeProvider) LookupVolumeIds(ctx context.Context, volumeIds []s
for retry := 0; retry < maxRetries; retry++ { for retry := 0; retry < maxRetries; retry++ {
// Try all filer addresses with round-robin starting from current index // Try all filer addresses with round-robin starting from current index
// Skip known-unhealthy filers (circuit breaker pattern)
i := atomic.LoadInt32(&fc.filerIndex) i := atomic.LoadInt32(&fc.filerIndex)
n := int32(len(fc.filerAddresses)) n := int32(len(fc.filerAddresses))
for x := int32(0); x < n; x++ { for x := int32(0); x < n; x++ {
// Circuit breaker: skip unhealthy filers
if fc.shouldSkipUnhealthyFiler(i) {
glog.V(2).Infof("FilerClient: skipping unhealthy filer %s (consecutive failures: %d)",
fc.filerAddresses[i], atomic.LoadInt32(&fc.filerHealth[i].failureCount))
i++
if i >= n {
i = 0
}
continue
}
filerAddress := fc.filerAddresses[i] filerAddress := fc.filerAddresses[i]
err := pb.WithGrpcFilerClient(false, timeoutMs, filerAddress, fc.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { err := pb.WithGrpcFilerClient(false, timeoutMs, filerAddress, fc.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
@ -240,6 +299,7 @@ func (p *filerVolumeProvider) LookupVolumeIds(ctx context.Context, volumeIds []s
if err != nil { if err != nil {
glog.V(1).Infof("FilerClient: filer %s lookup failed (attempt %d/%d, retry %d/%d): %v", filerAddress, x+1, n, retry+1, maxRetries, err) glog.V(1).Infof("FilerClient: filer %s lookup failed (attempt %d/%d, retry %d/%d): %v", filerAddress, x+1, n, retry+1, maxRetries, err)
fc.recordFilerFailure(i)
lastErr = err lastErr = err
i++ i++
if i >= n { if i >= n {
@ -248,8 +308,9 @@ func (p *filerVolumeProvider) LookupVolumeIds(ctx context.Context, volumeIds []s
continue continue
} }
// Success - update the preferred filer index for next time
// Success - update the preferred filer index and reset health tracking
atomic.StoreInt32(&fc.filerIndex, i) atomic.StoreInt32(&fc.filerIndex, i)
fc.recordFilerSuccess(i)
glog.V(3).Infof("FilerClient: looked up %d volumes on %s, found %d", len(volumeIds), filerAddress, len(result)) glog.V(3).Infof("FilerClient: looked up %d volumes on %s, found %d", len(volumeIds), filerAddress, len(result))
return result, nil return result, nil
} }

Loading…
Cancel
Save