From 01b9b68ac5cf5e681d0af1a12d545169afe81d9b Mon Sep 17 00:00:00 2001 From: chrislu Date: Thu, 20 Nov 2025 13:14:50 -0800 Subject: [PATCH] fix: FilerClient supports multiple filer addresses for high availability Critical fix: FilerClient now accepts []ServerAddress instead of single address - Prevents mount failure when first filer is down (regression fix) - Implements automatic failover to remaining filers - Uses round-robin with atomic index tracking (same pattern as WFS.WithFilerClient) - Retries all configured filers before giving up - Updates successful filer index for future requests Changes: - NewFilerClient([]pb.ServerAddress, ...) instead of (pb.ServerAddress, ...) - filerVolumeProvider references FilerClient for failover access - LookupVolumeIds tries all filers with util.Retry pattern - Mount passes all option.FilerAddresses for HA - S3 wraps single filer in slice for API consistency This restores the high availability that existed in the old implementation where mount would automatically failover between configured filers. --- weed/mount/weedfs.go | 3 +- weed/s3api/s3api_server.go | 3 +- weed/wdclient/filer_client.go | 148 +++++++++++++++++++++------------- 3 files changed, 98 insertions(+), 56 deletions(-) diff --git a/weed/mount/weedfs.go b/weed/mount/weedfs.go index 0c0050d85..73091f0f1 100644 --- a/weed/mount/weedfs.go +++ b/weed/mount/weedfs.go @@ -102,6 +102,7 @@ type WFS struct { func NewSeaweedFileSystem(option *Option) *WFS { // Create FilerClient for efficient volume location caching + // Pass all filer addresses for high availability with automatic failover // Configure URL preference based on VolumeServerAccess option var opts *wdclient.FilerClientOption if option.VolumeServerAccess == "publicUrl" { @@ -111,7 +112,7 @@ func NewSeaweedFileSystem(option *Option) *WFS { } filerClient := wdclient.NewFilerClient( - option.FilerAddresses[0], + option.FilerAddresses, // Pass all filer addresses for HA option.GrpcDialOption, option.DataCenter, opts, diff --git a/weed/s3api/s3api_server.go b/weed/s3api/s3api_server.go index 21e4190d0..992027fda 100644 --- a/weed/s3api/s3api_server.go +++ b/weed/s3api/s3api_server.go @@ -95,7 +95,8 @@ func NewS3ApiServerWithStore(router *mux.Router, option *S3ApiServerOption, expl // Initialize FilerClient for volume location caching // Uses the battle-tested vidMap with filer-based lookups - filerClient := wdclient.NewFilerClient(option.Filer, option.GrpcDialOption, option.DataCenter) + // S3 API typically connects to a single filer, but wrap in slice for consistency + filerClient := wdclient.NewFilerClient([]pb.ServerAddress{option.Filer}, option.GrpcDialOption, option.DataCenter) glog.V(0).Infof("S3 API initialized FilerClient for volume location caching") s3ApiServer = &S3ApiServer{ diff --git a/weed/wdclient/filer_client.go b/weed/wdclient/filer_client.go index 01f46e40f..3f665d775 100644 --- a/weed/wdclient/filer_client.go +++ b/weed/wdclient/filer_client.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "strings" + "sync/atomic" "time" "google.golang.org/grpc" @@ -11,6 +12,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/seaweedfs/weed/util" ) // UrlPreference controls which URL to use for volume access @@ -23,9 +25,11 @@ const ( // FilerClient provides volume location services by querying a filer // It uses the shared vidMap cache for efficient lookups +// Supports multiple filer addresses with automatic failover for high availability type FilerClient struct { *vidMapClient - filerAddress pb.ServerAddress + filerAddresses []pb.ServerAddress + filerIndex int32 // atomic: current filer index for round-robin grpcDialOption grpc.DialOption urlPreference UrlPreference grpcTimeout time.Duration @@ -33,10 +37,9 @@ type FilerClient struct { } // filerVolumeProvider implements VolumeLocationProvider by querying filer +// Supports multiple filer addresses with automatic failover type filerVolumeProvider struct { - filerAddress pb.ServerAddress - grpcDialOption grpc.DialOption - grpcTimeout time.Duration + filerClient *FilerClient } // FilerClientOption holds optional configuration for FilerClient @@ -46,9 +49,14 @@ type FilerClientOption struct { CacheSize int // Number of historical vidMap snapshots (0 = use default) } -// NewFilerClient creates a new client that queries filer for volume locations +// NewFilerClient creates a new client that queries filer(s) for volume locations +// Supports multiple filer addresses for high availability with automatic failover // Uses sensible defaults: 5-second gRPC timeout, PreferUrl, DefaultVidMapCacheSize -func NewFilerClient(filerAddress pb.ServerAddress, grpcDialOption grpc.DialOption, dataCenter string, opts ...*FilerClientOption) *FilerClient { +func NewFilerClient(filerAddresses []pb.ServerAddress, grpcDialOption grpc.DialOption, dataCenter string, opts ...*FilerClientOption) *FilerClient { + if len(filerAddresses) == 0 { + glog.Fatal("NewFilerClient requires at least one filer address") + } + // Apply defaults grpcTimeout := 5 * time.Second urlPref := PreferUrl @@ -68,20 +76,23 @@ func NewFilerClient(filerAddress pb.ServerAddress, grpcDialOption grpc.DialOptio } } - provider := &filerVolumeProvider{ - filerAddress: filerAddress, - grpcDialOption: grpcDialOption, - grpcTimeout: grpcTimeout, - } - - return &FilerClient{ - vidMapClient: newVidMapClient(provider, dataCenter, cacheSize), - filerAddress: filerAddress, + fc := &FilerClient{ + filerAddresses: filerAddresses, + filerIndex: 0, grpcDialOption: grpcDialOption, urlPreference: urlPref, grpcTimeout: grpcTimeout, cacheSize: cacheSize, } + + // Create provider that references this FilerClient for failover support + provider := &filerVolumeProvider{ + filerClient: fc, + } + + fc.vidMapClient = newVidMapClient(provider, dataCenter, cacheSize) + + return fc } // GetLookupFileIdFunction returns a lookup function with URL preference handling @@ -123,67 +134,96 @@ func (fc *FilerClient) GetLookupFileIdFunction() LookupFileIdFunctionType { } } -// LookupVolumeIds queries the filer for volume locations +// LookupVolumeIds queries the filer for volume locations with automatic failover +// Tries all configured filer addresses until one succeeds (high availability) // Note: Unlike master's VolumeIdLocation, filer's Locations message doesn't currently have // an Error field. This implementation handles the current structure while being prepared // for future error reporting enhancements. func (p *filerVolumeProvider) LookupVolumeIds(ctx context.Context, volumeIds []string) (map[string][]Location, error) { + fc := p.filerClient result := make(map[string][]Location) // Create a timeout context for the gRPC call - timeoutCtx, cancel := context.WithTimeout(ctx, p.grpcTimeout) + timeoutCtx, cancel := context.WithTimeout(ctx, fc.grpcTimeout) defer cancel() // Convert grpcTimeout to milliseconds for the signature parameter - timeoutMs := int32(p.grpcTimeout.Milliseconds()) + timeoutMs := int32(fc.grpcTimeout.Milliseconds()) - err := pb.WithGrpcFilerClient(false, timeoutMs, p.filerAddress, p.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { - resp, err := client.LookupVolume(timeoutCtx, &filer_pb.LookupVolumeRequest{ - VolumeIds: volumeIds, - }) - if err != nil { - return fmt.Errorf("filer.LookupVolume failed: %w", err) - } + // Try all filer addresses with round-robin starting from current index + var lastErr error + err := util.Retry("filer volume lookup", func() error { + i := atomic.LoadInt32(&fc.filerIndex) + n := int32(len(fc.filerAddresses)) - // Process each volume in the response - for vid, locs := range resp.LocationsMap { - // Convert locations from protobuf to internal format - var locations []Location - for _, loc := range locs.Locations { - locations = append(locations, Location{ - Url: loc.Url, - PublicUrl: loc.PublicUrl, - DataCenter: loc.DataCenter, - GrpcPort: int(loc.GrpcPort), + for x := int32(0); x < n; x++ { + filerAddress := fc.filerAddresses[i] + + err := pb.WithGrpcFilerClient(false, timeoutMs, filerAddress, fc.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + resp, err := client.LookupVolume(timeoutCtx, &filer_pb.LookupVolumeRequest{ + VolumeIds: volumeIds, }) + if err != nil { + return fmt.Errorf("filer.LookupVolume failed: %w", err) + } + + // Process each volume in the response + for vid, locs := range resp.LocationsMap { + // Convert locations from protobuf to internal format + var locations []Location + for _, loc := range locs.Locations { + locations = append(locations, Location{ + Url: loc.Url, + PublicUrl: loc.PublicUrl, + DataCenter: loc.DataCenter, + GrpcPort: int(loc.GrpcPort), + }) + } + + // Only add to result if we have locations + // Empty locations with no gRPC error means "not found" (volume doesn't exist) + if len(locations) > 0 { + result[vid] = locations + glog.V(4).Infof("FilerClient: volume %s found with %d location(s)", vid, len(locations)) + } else { + glog.V(2).Infof("FilerClient: volume %s not found (no locations in response)", vid) + } + } + + // Check for volumes that weren't in the response at all + // This could indicate a problem with the filer + for _, vid := range volumeIds { + if _, found := resp.LocationsMap[vid]; !found { + glog.V(1).Infof("FilerClient: volume %s missing from filer response", vid) + } + } + + return nil + }) + + if err != nil { + glog.V(1).Infof("FilerClient: filer %s lookup failed (attempt %d/%d): %v", filerAddress, x+1, n, err) + lastErr = err + i++ + if i >= n { + i = 0 + } + continue } - // Only add to result if we have locations - // Empty locations with no gRPC error means "not found" (volume doesn't exist) - if len(locations) > 0 { - result[vid] = locations - glog.V(4).Infof("FilerClient: volume %s found with %d location(s)", vid, len(locations)) - } else { - glog.V(2).Infof("FilerClient: volume %s not found (no locations in response)", vid) - } - } - - // Check for volumes that weren't in the response at all - // This could indicate a problem with the filer - for _, vid := range volumeIds { - if _, found := resp.LocationsMap[vid]; !found { - glog.V(1).Infof("FilerClient: volume %s missing from filer response", vid) - } + // Success - update the preferred filer index for next time + atomic.StoreInt32(&fc.filerIndex, i) + glog.V(3).Infof("FilerClient: looked up %d volumes on %s, found %d", len(volumeIds), filerAddress, len(result)) + return nil } - return nil + // All filers failed + return fmt.Errorf("all %d filer(s) failed, last error: %w", n, lastErr) }) if err != nil { - // gRPC error - this is a communication or server failure return nil, fmt.Errorf("filer volume lookup failed for %d volume(s): %w", len(volumeIds), err) } - glog.V(3).Infof("FilerClient: looked up %d volumes, found %d", len(volumeIds), len(result)) return result, nil }