From 5a1eed08355bb141b6cd3fc11d1597561eee5a06 Mon Sep 17 00:00:00 2001 From: chrislu Date: Thu, 20 Nov 2025 12:30:22 -0800 Subject: [PATCH] refactor: mount uses FilerClient for efficient volume location caching - Add configurable vidMap cache size (default: 5 historical snapshots) - Add FilerClientOption struct for clean configuration * GrpcTimeout: default 5 seconds (prevents hanging requests) * UrlPreference: PreferUrl or PreferPublicUrl * CacheSize: number of historical vidMap snapshots (for volume moves) - NewFilerClient uses option struct for better API extensibility - Improved error handling in filerVolumeProvider.LookupVolumeIds: * Distinguish genuine 'not found' from communication failures * Log volumes missing from filer response * Return proper error context with volume count * Document that filer Locations lacks Error field (unlike master) - FilerClient.GetLookupFileIdFunction() handles URL preference automatically - Mount (WFS) creates FilerClient with appropriate options - Benefits for weed mount: * Singleflight: Deduplicates concurrent volume lookups * Cache history: Old volume locations available briefly when volumes move * Configurable cache depth: Tune for different deployment environments * Battle-tested vidMap cache with cache chain * Better concurrency handling with timeout protection * Improved error visibility and debugging - Old filer.LookupFn() kept for backward compatibility - Performance improvement for mount operations with high concurrency --- weed/filer/reader_at.go | 5 ++ weed/mount/weedfs.go | 21 +++++- weed/wdclient/filer_client.go | 122 +++++++++++++++++++++++++++++++-- weed/wdclient/masterclient.go | 4 +- weed/wdclient/vidmap_client.go | 13 +++- 5 files changed, 154 insertions(+), 11 deletions(-) diff --git a/weed/filer/reader_at.go b/weed/filer/reader_at.go index 27d773f49..8cffe38c1 100644 --- a/weed/filer/reader_at.go +++ b/weed/filer/reader_at.go @@ -26,6 +26,11 @@ type ChunkReadAt struct { var _ = io.ReaderAt(&ChunkReadAt{}) var _ = io.Closer(&ChunkReadAt{}) +// LookupFn creates a basic volume location lookup function with simple caching +// DEPRECATED: For mount operations, use wdclient.FilerClient directly for better performance: +// - Singleflight deduplication of concurrent lookups +// - Cache history when volumes move +// - Battle-tested vidMap with cache chain func LookupFn(filerClient filer_pb.FilerClient) wdclient.LookupFileIdFunctionType { vidCache := make(map[string]*filer_pb.Locations) diff --git a/weed/mount/weedfs.go b/weed/mount/weedfs.go index 95864ef00..0c0050d85 100644 --- a/weed/mount/weedfs.go +++ b/weed/mount/weedfs.go @@ -97,9 +97,26 @@ type WFS struct { fhLockTable *util.LockTable[FileHandleId] rdmaClient *RDMAMountClient FilerConf *filer.FilerConf + filerClient *wdclient.FilerClient // Cached volume location client } func NewSeaweedFileSystem(option *Option) *WFS { + // Create FilerClient for efficient volume location caching + // Configure URL preference based on VolumeServerAccess option + var opts *wdclient.FilerClientOption + if option.VolumeServerAccess == "publicUrl" { + opts = &wdclient.FilerClientOption{ + UrlPreference: wdclient.PreferPublicUrl, + } + } + + filerClient := wdclient.NewFilerClient( + option.FilerAddresses[0], + option.GrpcDialOption, + option.DataCenter, + opts, + ) + wfs := &WFS{ RawFileSystem: fuse.NewDefaultRawFileSystem(), option: option, @@ -107,6 +124,7 @@ func NewSeaweedFileSystem(option *Option) *WFS { inodeToPath: NewInodeToPath(util.FullPath(option.FilerMountRootPath), option.CacheMetaTTlSec), fhMap: NewFileHandleToInode(), dhMap: NewDirectoryHandleToInode(), + filerClient: filerClient, fhLockTable: util.NewLockTable[FileHandleId](), } @@ -253,7 +271,8 @@ func (wfs *WFS) LookupFn() wdclient.LookupFileIdFunctionType { return []string{"http://" + wfs.getCurrentFiler().ToHttpAddress() + "/?proxyChunkId=" + fileId}, nil } } - return filer.LookupFn(wfs) + // Use the cached FilerClient for efficient lookups with singleflight and cache history + return wfs.filerClient.GetLookupFileIdFunction() } func (wfs *WFS) getCurrentFiler() pb.ServerAddress { diff --git a/weed/wdclient/filer_client.go b/weed/wdclient/filer_client.go index 3aa153565..01f46e40f 100644 --- a/weed/wdclient/filer_client.go +++ b/weed/wdclient/filer_client.go @@ -3,6 +3,8 @@ package wdclient import ( "context" "fmt" + "strings" + "time" "google.golang.org/grpc" @@ -11,47 +13,141 @@ import ( "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" ) +// UrlPreference controls which URL to use for volume access +type UrlPreference string + +const ( + PreferUrl UrlPreference = "url" // Use private URL (default) + PreferPublicUrl UrlPreference = "publicUrl" // Use public URL +) + // FilerClient provides volume location services by querying a filer // It uses the shared vidMap cache for efficient lookups type FilerClient struct { *vidMapClient filerAddress pb.ServerAddress grpcDialOption grpc.DialOption + urlPreference UrlPreference + grpcTimeout time.Duration + cacheSize int // Number of historical vidMap snapshots to keep } // filerVolumeProvider implements VolumeLocationProvider by querying filer type filerVolumeProvider struct { filerAddress pb.ServerAddress grpcDialOption grpc.DialOption + grpcTimeout time.Duration +} + +// FilerClientOption holds optional configuration for FilerClient +type FilerClientOption struct { + GrpcTimeout time.Duration + UrlPreference UrlPreference + CacheSize int // Number of historical vidMap snapshots (0 = use default) } // NewFilerClient creates a new client that queries filer for volume locations -func NewFilerClient(filerAddress pb.ServerAddress, grpcDialOption grpc.DialOption, dataCenter string) *FilerClient { +// Uses sensible defaults: 5-second gRPC timeout, PreferUrl, DefaultVidMapCacheSize +func NewFilerClient(filerAddress pb.ServerAddress, grpcDialOption grpc.DialOption, dataCenter string, opts ...*FilerClientOption) *FilerClient { + // Apply defaults + grpcTimeout := 5 * time.Second + urlPref := PreferUrl + cacheSize := DefaultVidMapCacheSize + + // Override with provided options + if len(opts) > 0 && opts[0] != nil { + opt := opts[0] + if opt.GrpcTimeout > 0 { + grpcTimeout = opt.GrpcTimeout + } + if opt.UrlPreference != "" { + urlPref = opt.UrlPreference + } + if opt.CacheSize > 0 { + cacheSize = opt.CacheSize + } + } + provider := &filerVolumeProvider{ filerAddress: filerAddress, grpcDialOption: grpcDialOption, + grpcTimeout: grpcTimeout, } return &FilerClient{ - vidMapClient: newVidMapClient(provider, dataCenter), + vidMapClient: newVidMapClient(provider, dataCenter, cacheSize), filerAddress: filerAddress, grpcDialOption: grpcDialOption, + urlPreference: urlPref, + grpcTimeout: grpcTimeout, + cacheSize: cacheSize, + } +} + +// GetLookupFileIdFunction returns a lookup function with URL preference handling +func (fc *FilerClient) GetLookupFileIdFunction() LookupFileIdFunctionType { + if fc.urlPreference == PreferUrl { + // Use the default implementation from vidMapClient + return fc.vidMapClient.GetLookupFileIdFunction() + } + + // Custom implementation that prefers PublicUrl + return func(ctx context.Context, fileId string) (fullUrls []string, err error) { + // Parse file ID to extract volume ID + parts := strings.Split(fileId, ",") + if len(parts) != 2 { + return nil, fmt.Errorf("invalid fileId format: %s", fileId) + } + volumeIdStr := parts[0] + + // First try the cache using LookupVolumeIdsWithFallback + vidLocations, err := fc.LookupVolumeIdsWithFallback(ctx, []string{volumeIdStr}) + if err != nil { + return nil, fmt.Errorf("LookupVolume %s failed: %v", fileId, err) + } + + locations, found := vidLocations[volumeIdStr] + if !found || len(locations) == 0 { + return nil, fmt.Errorf("volume %s not found for fileId %s", volumeIdStr, fileId) + } + + // Build URLs with publicUrl preference + for _, loc := range locations { + url := loc.PublicUrl + if url == "" { + url = loc.Url + } + fullUrls = append(fullUrls, "http://"+url+"/"+fileId) + } + return fullUrls, nil } } // LookupVolumeIds queries the filer for volume locations +// Note: Unlike master's VolumeIdLocation, filer's Locations message doesn't currently have +// an Error field. This implementation handles the current structure while being prepared +// for future error reporting enhancements. func (p *filerVolumeProvider) LookupVolumeIds(ctx context.Context, volumeIds []string) (map[string][]Location, error) { result := make(map[string][]Location) - err := pb.WithGrpcFilerClient(false, 0, p.filerAddress, p.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { - resp, err := client.LookupVolume(ctx, &filer_pb.LookupVolumeRequest{ + // Create a timeout context for the gRPC call + timeoutCtx, cancel := context.WithTimeout(ctx, p.grpcTimeout) + defer cancel() + + // Convert grpcTimeout to milliseconds for the signature parameter + timeoutMs := int32(p.grpcTimeout.Milliseconds()) + + err := pb.WithGrpcFilerClient(false, timeoutMs, p.filerAddress, p.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + resp, err := client.LookupVolume(timeoutCtx, &filer_pb.LookupVolumeRequest{ VolumeIds: volumeIds, }) if err != nil { return fmt.Errorf("filer.LookupVolume failed: %w", err) } + // Process each volume in the response for vid, locs := range resp.LocationsMap { + // Convert locations from protobuf to internal format var locations []Location for _, loc := range locs.Locations { locations = append(locations, Location{ @@ -61,8 +157,22 @@ func (p *filerVolumeProvider) LookupVolumeIds(ctx context.Context, volumeIds []s GrpcPort: int(loc.GrpcPort), }) } + + // Only add to result if we have locations + // Empty locations with no gRPC error means "not found" (volume doesn't exist) if len(locations) > 0 { result[vid] = locations + glog.V(4).Infof("FilerClient: volume %s found with %d location(s)", vid, len(locations)) + } else { + glog.V(2).Infof("FilerClient: volume %s not found (no locations in response)", vid) + } + } + + // Check for volumes that weren't in the response at all + // This could indicate a problem with the filer + for _, vid := range volumeIds { + if _, found := resp.LocationsMap[vid]; !found { + glog.V(1).Infof("FilerClient: volume %s missing from filer response", vid) } } @@ -70,10 +180,10 @@ func (p *filerVolumeProvider) LookupVolumeIds(ctx context.Context, volumeIds []s }) if err != nil { - return nil, err + // gRPC error - this is a communication or server failure + return nil, fmt.Errorf("filer volume lookup failed for %d volume(s): %w", len(volumeIds), err) } glog.V(3).Infof("FilerClient: looked up %d volumes, found %d", len(volumeIds), len(result)) return result, nil } - diff --git a/weed/wdclient/masterclient.go b/weed/wdclient/masterclient.go index 344523c0e..42016f416 100644 --- a/weed/wdclient/masterclient.go +++ b/weed/wdclient/masterclient.go @@ -112,8 +112,8 @@ func NewMasterClient(grpcDialOption grpc.DialOption, filerGroup string, clientTy // Create provider that references this MasterClient provider := &masterVolumeProvider{masterClient: mc} - // Initialize embedded vidMapClient with the provider - mc.vidMapClient = newVidMapClient(provider, clientDataCenter) + // Initialize embedded vidMapClient with the provider and default cache size + mc.vidMapClient = newVidMapClient(provider, clientDataCenter, DefaultVidMapCacheSize) return mc } diff --git a/weed/wdclient/vidmap_client.go b/weed/wdclient/vidmap_client.go index 8b754915c..51d9855f9 100644 --- a/weed/wdclient/vidmap_client.go +++ b/weed/wdclient/vidmap_client.go @@ -32,11 +32,20 @@ type vidMapClient struct { vidLookupGroup singleflight.Group } +const ( + // DefaultVidMapCacheSize is the default number of historical vidMap snapshots to keep + // This provides cache history when volumes move between servers + DefaultVidMapCacheSize = 5 +) + // newVidMapClient creates a new client with the given provider and data center -func newVidMapClient(provider VolumeLocationProvider, dataCenter string) *vidMapClient { +func newVidMapClient(provider VolumeLocationProvider, dataCenter string, cacheSize int) *vidMapClient { + if cacheSize <= 0 { + cacheSize = DefaultVidMapCacheSize + } return &vidMapClient{ vidMap: newVidMap(dataCenter), - vidMapCacheSize: 5, + vidMapCacheSize: cacheSize, provider: provider, } }