diff --git a/weed/s3api/s3api_handlers.go b/weed/s3api/s3api_handlers.go index ffeedc968..6c47e8256 100644 --- a/weed/s3api/s3api_handlers.go +++ b/weed/s3api/s3api_handlers.go @@ -5,6 +5,7 @@ import ( "fmt" "net/http" + "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/s3api/s3err" "google.golang.org/grpc" @@ -15,7 +16,13 @@ import ( var _ = filer_pb.FilerClient(&S3ApiServer{}) func (s3a *S3ApiServer) WithFilerClient(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) error { - + // Use filerClient for proper connection management and failover + if s3a.filerClient != nil { + return s3a.withFilerClientFailover(streamingMode, fn) + } + + // Fallback to direct connection if filerClient not initialized + // This should only happen during initialization or testing return pb.WithGrpcClient(streamingMode, s3a.randomClientId, func(grpcConnection *grpc.ClientConn) error { client := filer_pb.NewSeaweedFilerClient(grpcConnection) return fn(client) @@ -23,6 +30,63 @@ func (s3a *S3ApiServer) WithFilerClient(streamingMode bool, fn func(filer_pb.Sea } +// withFilerClientFailover attempts to execute fn with automatic failover to other filers +func (s3a *S3ApiServer) withFilerClientFailover(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) error { + // Get current filer as starting point + currentFiler := s3a.filerClient.GetCurrentFiler() + + // Try current filer first (fast path) + err := pb.WithGrpcClient(streamingMode, s3a.randomClientId, func(grpcConnection *grpc.ClientConn) error { + client := filer_pb.NewSeaweedFilerClient(grpcConnection) + return fn(client) + }, currentFiler.ToGrpcAddress(), false, s3a.option.GrpcDialOption) + + if err == nil { + s3a.filerClient.RecordFilerSuccess(currentFiler) + return nil + } + + // Record failure for current filer + s3a.filerClient.RecordFilerFailure(currentFiler) + + // Current filer failed - try all other filers with health-aware selection + filers := s3a.filerClient.GetAllFilers() + var lastErr error = err + + for _, filer := range filers { + if filer == currentFiler { + continue // Already tried this one + } + + // Skip filers known to be unhealthy (circuit breaker pattern) + if s3a.filerClient.ShouldSkipUnhealthyFiler(filer) { + glog.V(2).Infof("WithFilerClient: skipping unhealthy filer %s", filer) + continue + } + + err = pb.WithGrpcClient(streamingMode, s3a.randomClientId, func(grpcConnection *grpc.ClientConn) error { + client := filer_pb.NewSeaweedFilerClient(grpcConnection) + return fn(client) + }, filer.ToGrpcAddress(), false, s3a.option.GrpcDialOption) + + if err == nil { + // Success! Record success and update current filer for future requests + s3a.filerClient.RecordFilerSuccess(filer) + s3a.filerClient.SetCurrentFiler(filer) + glog.V(1).Infof("WithFilerClient: failover from %s to %s succeeded", currentFiler, filer) + return nil + } + + // Record failure for health tracking + s3a.filerClient.RecordFilerFailure(filer) + glog.V(2).Infof("WithFilerClient: failover to %s failed: %v", filer, err) + lastErr = err + } + + // All filers failed + return fmt.Errorf("all filers failed, last error: %w", lastErr) +} + func (s3a *S3ApiServer) AdjustedUrl(location *filer_pb.Location) string { return location.Url } diff --git a/weed/server/master_server_handlers_ui.go b/weed/server/master_server_handlers_ui.go index 65195ee1e..157376f5b 100644 --- a/weed/server/master_server_handlers_ui.go +++ b/weed/server/master_server_handlers_ui.go @@ -1,10 +1,11 @@ package weed_server import ( - "github.com/seaweedfs/seaweedfs/weed/util/version" "net/http" "time" + "github.com/seaweedfs/seaweedfs/weed/util/version" + hashicorpRaft "github.com/hashicorp/raft" "github.com/seaweedfs/raft" @@ -14,7 +15,7 @@ import ( func (ms *MasterServer) uiStatusHandler(w http.ResponseWriter, r *http.Request) { infos := make(map[string]interface{}) - infos["Up Time"] = time.Now().Sub(startTime).String() + infos["Up Time"] = time.Since(startTime).Truncate(time.Second).String() infos["Max Volume Id"] = ms.Topo.GetMaxVolumeId() ms.Topo.RaftServerAccessLock.RLock() diff --git a/weed/server/volume_server_handlers_ui.go b/weed/server/volume_server_handlers_ui.go index 5679eb483..a8dabf507 100644 --- a/weed/server/volume_server_handlers_ui.go +++ b/weed/server/volume_server_handlers_ui.go @@ -18,7 +18,7 @@ import ( func (vs *VolumeServer) uiStatusHandler(w http.ResponseWriter, r *http.Request) { w.Header().Set("Server", "SeaweedFS Volume "+version.VERSION) infos := make(map[string]interface{}) - infos["Up Time"] = time.Now().Sub(startTime).String() + infos["Up Time"] = time.Since(startTime).Truncate(time.Second).String() var ds []*volume_server_pb.DiskStatus for _, loc := range vs.store.Locations { if dir, e := filepath.Abs(loc.Directory); e == nil { diff --git a/weed/wdclient/filer_client.go b/weed/wdclient/filer_client.go index 0c6719c6f..2222575d6 100644 --- a/weed/wdclient/filer_client.go +++ b/weed/wdclient/filer_client.go @@ -169,7 +169,8 @@ func NewFilerClient(filerAddresses []pb.ServerAddress, grpcDialOption grpc.DialO } // Start filer discovery if master client is configured - if masterClient != nil && filerGroup != "" { + // Empty filerGroup is valid (represents default group) + if masterClient != nil { fc.stopDiscovery = make(chan struct{}) go fc.discoverFilers() glog.V(0).Infof("FilerClient: started filer discovery for group '%s' (refresh interval: %v)", filerGroup, discoveryInterval) @@ -205,6 +206,85 @@ func (fc *FilerClient) GetCurrentFiler() pb.ServerAddress { return fc.filerAddresses[index] } +// GetAllFilers returns a snapshot of all filer addresses +// Returns a copy to avoid concurrent modification issues +func (fc *FilerClient) GetAllFilers() []pb.ServerAddress { + fc.filerAddressesMu.RLock() + defer fc.filerAddressesMu.RUnlock() + + // Return a copy to avoid concurrent modification + filers := make([]pb.ServerAddress, len(fc.filerAddresses)) + copy(filers, fc.filerAddresses) + return filers +} + +// SetCurrentFiler updates the current filer index to the specified address +// This is useful after successful failover to prefer the healthy filer for future requests +func (fc *FilerClient) SetCurrentFiler(addr pb.ServerAddress) { + fc.filerAddressesMu.RLock() + defer fc.filerAddressesMu.RUnlock() + + // Find the index of the specified filer address + for i, filer := range fc.filerAddresses { + if filer == addr { + atomic.StoreInt32(&fc.filerIndex, int32(i)) + return + } + } + // If address not found, leave index unchanged +} + +// ShouldSkipUnhealthyFiler checks if a filer address should be skipped based on health tracking +// Returns true if the filer has exceeded failure threshold and reset timeout hasn't elapsed +func (fc *FilerClient) ShouldSkipUnhealthyFiler(addr pb.ServerAddress) bool { + fc.filerAddressesMu.RLock() + defer fc.filerAddressesMu.RUnlock() + + // Find the health for this filer address + for i, filer := range fc.filerAddresses { + if filer == addr { + if i < len(fc.filerHealth) { + return fc.shouldSkipUnhealthyFilerWithHealth(fc.filerHealth[i]) + } + return false + } + } + // If address not found, don't skip it + return false +} + +// RecordFilerSuccess resets failure tracking for a successful filer +func (fc *FilerClient) RecordFilerSuccess(addr pb.ServerAddress) { + fc.filerAddressesMu.RLock() + defer fc.filerAddressesMu.RUnlock() + + // Find the health for this filer address + for i, filer := range fc.filerAddresses { + if filer == addr { + if i < len(fc.filerHealth) { + fc.recordFilerSuccessWithHealth(fc.filerHealth[i]) + } + return + } + } +} + +// RecordFilerFailure increments failure count for an unhealthy filer +func (fc *FilerClient) RecordFilerFailure(addr pb.ServerAddress) { + fc.filerAddressesMu.RLock() + defer fc.filerAddressesMu.RUnlock() + + // Find the health for this filer address + for i, filer := range fc.filerAddresses { + if filer == addr { + if i < len(fc.filerHealth) { + fc.recordFilerFailureWithHealth(fc.filerHealth[i]) + } + return + } + } +} + // Close stops the filer discovery goroutine if running // Safe to call multiple times (idempotent) func (fc *FilerClient) Close() { @@ -425,7 +505,7 @@ func (fc *FilerClient) shouldSkipUnhealthyFilerWithHealth(health *filerHealth) b // Deprecated: Use shouldSkipUnhealthyFilerWithHealth instead // This function is kept for backward compatibility but requires array access -// Note: Accesses filerHealth without lock; safe only when discovery is disabled +// Note: This function is now thread-safe. func (fc *FilerClient) shouldSkipUnhealthyFiler(index int32) bool { fc.filerAddressesMu.RLock() if index >= int32(len(fc.filerHealth)) {