Browse Source

Integrate health tracking with S3 failover

Address code review suggestion to leverage existing health tracking
instead of simple iteration through all filers.

**Changes:**

1. Added address-based health tracking API to FilerClient:
   - ShouldSkipUnhealthyFiler(addr) - check circuit breaker
   - RecordFilerSuccess(addr) - reset failure count
   - RecordFilerFailure(addr) - increment failure count

   These methods find the filer by address and delegate to
   existing *WithHealth methods for actual health management.

2. Updated withFilerClientFailover to use health tracking:
   - Record success/failure for every filer attempt
   - Skip unhealthy filers during failover (circuit breaker)
   - Only try filers that haven't exceeded failure threshold
   - Automatic re-check after reset timeout

**Benefits:**

 Circuit breaker prevents wasting time on known-bad filers
 Health tracking shared across all operations
 Automatic recovery when unhealthy filers come back
 Reduced latency - skip filers in failure state
 Better visibility with health metrics

**Behavior:**

- Try current filer first (fast path)
- If fails, record failure and try other HEALTHY filers
- Skip filers with failureCount >= threshold (default 3)
- Re-check unhealthy filers after resetTimeout (default 30s)
- Record all successes/failures for health tracking
pull/7550/head
Chris Lu 1 week ago
parent
commit
a1cb1dc90f
  1. 25
      weed/s3api/s3api_handlers.go
  2. 51
      weed/wdclient/filer_client.go

25
weed/s3api/s3api_handlers.go

@ -35,43 +35,56 @@ func (s3a *S3ApiServer) withFilerClientFailover(streamingMode bool, fn func(file
// Get current filer as starting point
currentFiler := s3a.filerClient.GetCurrentFiler()
// Try current filer first
// Try current filer first (fast path)
err := pb.WithGrpcClient(streamingMode, s3a.randomClientId, func(grpcConnection *grpc.ClientConn) error {
client := filer_pb.NewSeaweedFilerClient(grpcConnection)
return fn(client)
}, currentFiler.ToGrpcAddress(), false, s3a.option.GrpcDialOption)
if err == nil {
s3a.filerClient.RecordFilerSuccess(currentFiler)
return nil
}
// Current filer failed - try all other filers
// Note: This is a simple failover implementation
// For production, consider implementing exponential backoff and circuit breakers
// Record failure for current filer
s3a.filerClient.RecordFilerFailure(currentFiler)
// Current filer failed - try all other filers with health-aware selection
filers := s3a.filerClient.GetAllFilers()
var lastErr error = err
for _, filer := range filers {
if filer == currentFiler {
continue // Already tried this one
}
// Skip filers known to be unhealthy (circuit breaker pattern)
if s3a.filerClient.ShouldSkipUnhealthyFiler(filer) {
glog.V(2).Infof("WithFilerClient: skipping unhealthy filer %s", filer)
continue
}
err = pb.WithGrpcClient(streamingMode, s3a.randomClientId, func(grpcConnection *grpc.ClientConn) error {
client := filer_pb.NewSeaweedFilerClient(grpcConnection)
return fn(client)
}, filer.ToGrpcAddress(), false, s3a.option.GrpcDialOption)
if err == nil {
// Success! Update current filer for future requests
// Success! Record success and update current filer for future requests
s3a.filerClient.RecordFilerSuccess(filer)
s3a.filerClient.SetCurrentFiler(filer)
glog.V(1).Infof("WithFilerClient: failover from %s to %s succeeded", currentFiler, filer)
return nil
}
// Record failure for health tracking
s3a.filerClient.RecordFilerFailure(filer)
glog.V(2).Infof("WithFilerClient: failover to %s failed: %v", filer, err)
lastErr = err
}
// All filers failed
return fmt.Errorf("all filers failed, last error: %w", err)
return fmt.Errorf("all filers failed, last error: %w", lastErr)
}
func (s3a *S3ApiServer) AdjustedUrl(location *filer_pb.Location) string {

51
weed/wdclient/filer_client.go

@ -233,6 +233,57 @@ func (fc *FilerClient) SetCurrentFiler(addr pb.ServerAddress) {
// If address not found, leave index unchanged
}
// ShouldSkipUnhealthyFiler checks if a filer address should be skipped based on health tracking
// Returns true if the filer has exceeded failure threshold and reset timeout hasn't elapsed
func (fc *FilerClient) ShouldSkipUnhealthyFiler(addr pb.ServerAddress) bool {
fc.filerAddressesMu.RLock()
defer fc.filerAddressesMu.RUnlock()
// Find the health for this filer address
for i, filer := range fc.filerAddresses {
if filer == addr {
if i < len(fc.filerHealth) {
return fc.shouldSkipUnhealthyFilerWithHealth(fc.filerHealth[i])
}
return false
}
}
// If address not found, don't skip it
return false
}
// RecordFilerSuccess resets failure tracking for a successful filer
func (fc *FilerClient) RecordFilerSuccess(addr pb.ServerAddress) {
fc.filerAddressesMu.RLock()
defer fc.filerAddressesMu.RUnlock()
// Find the health for this filer address
for i, filer := range fc.filerAddresses {
if filer == addr {
if i < len(fc.filerHealth) {
fc.recordFilerSuccessWithHealth(fc.filerHealth[i])
}
return
}
}
}
// RecordFilerFailure increments failure count for an unhealthy filer
func (fc *FilerClient) RecordFilerFailure(addr pb.ServerAddress) {
fc.filerAddressesMu.RLock()
defer fc.filerAddressesMu.RUnlock()
// Find the health for this filer address
for i, filer := range fc.filerAddresses {
if filer == addr {
if i < len(fc.filerHealth) {
fc.recordFilerFailureWithHealth(fc.filerHealth[i])
}
return
}
}
}
// Close stops the filer discovery goroutine if running
// Safe to call multiple times (idempotent)
func (fc *FilerClient) Close() {

Loading…
Cancel
Save