From a1cb1dc90fb3908ad5e17deb5a85424a6de22a67 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Wed, 26 Nov 2025 10:46:39 -0800 Subject: [PATCH] Integrate health tracking with S3 failover MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Address code review suggestion to leverage existing health tracking instead of simple iteration through all filers. **Changes:** 1. Added address-based health tracking API to FilerClient: - ShouldSkipUnhealthyFiler(addr) - check circuit breaker - RecordFilerSuccess(addr) - reset failure count - RecordFilerFailure(addr) - increment failure count These methods find the filer by address and delegate to existing *WithHealth methods for actual health management. 2. Updated withFilerClientFailover to use health tracking: - Record success/failure for every filer attempt - Skip unhealthy filers during failover (circuit breaker) - Only try filers that haven't exceeded failure threshold - Automatic re-check after reset timeout **Benefits:** ✅ Circuit breaker prevents wasting time on known-bad filers ✅ Health tracking shared across all operations ✅ Automatic recovery when unhealthy filers come back ✅ Reduced latency - skip filers in failure state ✅ Better visibility with health metrics **Behavior:** - Try current filer first (fast path) - If fails, record failure and try other HEALTHY filers - Skip filers with failureCount >= threshold (default 3) - Re-check unhealthy filers after resetTimeout (default 30s) - Record all successes/failures for health tracking --- weed/s3api/s3api_handlers.go | 25 ++++++++++++----- weed/wdclient/filer_client.go | 51 +++++++++++++++++++++++++++++++++++ 2 files changed, 70 insertions(+), 6 deletions(-) diff --git a/weed/s3api/s3api_handlers.go b/weed/s3api/s3api_handlers.go index 584d51230..6c47e8256 100644 --- a/weed/s3api/s3api_handlers.go +++ b/weed/s3api/s3api_handlers.go @@ -35,43 +35,56 @@ func (s3a *S3ApiServer) withFilerClientFailover(streamingMode bool, fn func(file // Get current filer as starting point currentFiler := s3a.filerClient.GetCurrentFiler() - // Try current filer first + // Try current filer first (fast path) err := pb.WithGrpcClient(streamingMode, s3a.randomClientId, func(grpcConnection *grpc.ClientConn) error { client := filer_pb.NewSeaweedFilerClient(grpcConnection) return fn(client) }, currentFiler.ToGrpcAddress(), false, s3a.option.GrpcDialOption) if err == nil { + s3a.filerClient.RecordFilerSuccess(currentFiler) return nil } - // Current filer failed - try all other filers - // Note: This is a simple failover implementation - // For production, consider implementing exponential backoff and circuit breakers + // Record failure for current filer + s3a.filerClient.RecordFilerFailure(currentFiler) + + // Current filer failed - try all other filers with health-aware selection filers := s3a.filerClient.GetAllFilers() + var lastErr error = err for _, filer := range filers { if filer == currentFiler { continue // Already tried this one } + // Skip filers known to be unhealthy (circuit breaker pattern) + if s3a.filerClient.ShouldSkipUnhealthyFiler(filer) { + glog.V(2).Infof("WithFilerClient: skipping unhealthy filer %s", filer) + continue + } + err = pb.WithGrpcClient(streamingMode, s3a.randomClientId, func(grpcConnection *grpc.ClientConn) error { client := filer_pb.NewSeaweedFilerClient(grpcConnection) return fn(client) }, filer.ToGrpcAddress(), false, s3a.option.GrpcDialOption) if err == nil { - // Success! Update current filer for future requests + // Success! Record success and update current filer for future requests + s3a.filerClient.RecordFilerSuccess(filer) s3a.filerClient.SetCurrentFiler(filer) glog.V(1).Infof("WithFilerClient: failover from %s to %s succeeded", currentFiler, filer) return nil } + // Record failure for health tracking + s3a.filerClient.RecordFilerFailure(filer) glog.V(2).Infof("WithFilerClient: failover to %s failed: %v", filer, err) + lastErr = err } // All filers failed - return fmt.Errorf("all filers failed, last error: %w", err) + return fmt.Errorf("all filers failed, last error: %w", lastErr) } func (s3a *S3ApiServer) AdjustedUrl(location *filer_pb.Location) string { diff --git a/weed/wdclient/filer_client.go b/weed/wdclient/filer_client.go index 728f5a7b2..695bb2df3 100644 --- a/weed/wdclient/filer_client.go +++ b/weed/wdclient/filer_client.go @@ -233,6 +233,57 @@ func (fc *FilerClient) SetCurrentFiler(addr pb.ServerAddress) { // If address not found, leave index unchanged } +// ShouldSkipUnhealthyFiler checks if a filer address should be skipped based on health tracking +// Returns true if the filer has exceeded failure threshold and reset timeout hasn't elapsed +func (fc *FilerClient) ShouldSkipUnhealthyFiler(addr pb.ServerAddress) bool { + fc.filerAddressesMu.RLock() + defer fc.filerAddressesMu.RUnlock() + + // Find the health for this filer address + for i, filer := range fc.filerAddresses { + if filer == addr { + if i < len(fc.filerHealth) { + return fc.shouldSkipUnhealthyFilerWithHealth(fc.filerHealth[i]) + } + return false + } + } + // If address not found, don't skip it + return false +} + +// RecordFilerSuccess resets failure tracking for a successful filer +func (fc *FilerClient) RecordFilerSuccess(addr pb.ServerAddress) { + fc.filerAddressesMu.RLock() + defer fc.filerAddressesMu.RUnlock() + + // Find the health for this filer address + for i, filer := range fc.filerAddresses { + if filer == addr { + if i < len(fc.filerHealth) { + fc.recordFilerSuccessWithHealth(fc.filerHealth[i]) + } + return + } + } +} + +// RecordFilerFailure increments failure count for an unhealthy filer +func (fc *FilerClient) RecordFilerFailure(addr pb.ServerAddress) { + fc.filerAddressesMu.RLock() + defer fc.filerAddressesMu.RUnlock() + + // Find the health for this filer address + for i, filer := range fc.filerAddresses { + if filer == addr { + if i < len(fc.filerHealth) { + fc.recordFilerFailureWithHealth(fc.filerHealth[i]) + } + return + } + } +} + // Close stops the filer discovery goroutine if running // Safe to call multiple times (idempotent) func (fc *FilerClient) Close() {