diff --git a/weed/filer/reader_at.go b/weed/filer/reader_at.go index 8cffe38c1..3456b3fc7 100644 --- a/weed/filer/reader_at.go +++ b/weed/filer/reader_at.go @@ -26,15 +26,37 @@ type ChunkReadAt struct { var _ = io.ReaderAt(&ChunkReadAt{}) var _ = io.Closer(&ChunkReadAt{}) -// LookupFn creates a basic volume location lookup function with simple caching -// DEPRECATED: For mount operations, use wdclient.FilerClient directly for better performance: +// LookupFn creates a basic volume location lookup function with simple caching. +// +// DEPRECATED: This function has several limitations: +// - Unbounded cache growth (no eviction or size limit) +// - No TTL for stale entries +// - No singleflight deduplication +// - No cache history for volume moves +// +// For NEW code, especially mount operations, use wdclient.FilerClient instead: +// filerClient := wdclient.NewFilerClient(filerAddresses, grpcDialOption, dataCenter, opts) +// lookupFn := filerClient.GetLookupFileIdFunction() +// +// This provides: +// - Bounded cache with configurable size // - Singleflight deduplication of concurrent lookups // - Cache history when volumes move // - Battle-tested vidMap with cache chain +// +// This function is kept for backward compatibility with existing code paths +// (shell commands, streaming, etc.) but should be avoided in long-running processes +// or multi-tenant deployments where unbounded memory growth is a concern. +// +// Maximum recommended cache entries: ~10,000 volumes per process. +// Beyond this, consider migrating to wdclient.FilerClient. func LookupFn(filerClient filer_pb.FilerClient) wdclient.LookupFileIdFunctionType { vidCache := make(map[string]*filer_pb.Locations) var vicCacheLock sync.RWMutex + cacheSize := 0 + const maxCacheSize = 10000 // Simple bound to prevent unbounded growth + return func(ctx context.Context, fileId string) (targetUrls []string, err error) { vid := VolumeId(fileId) vicCacheLock.RLock() @@ -51,14 +73,22 @@ func LookupFn(filerClient filer_pb.FilerClient) wdclient.LookupFileIdFunctionTyp return err } - locations = resp.LocationsMap[vid] - if locations == nil || len(locations.Locations) == 0 { - glog.V(0).InfofCtx(ctx, "failed to locate %s", fileId) - return fmt.Errorf("failed to locate %s", fileId) - } - vicCacheLock.Lock() + locations = resp.LocationsMap[vid] + if locations == nil || len(locations.Locations) == 0 { + glog.V(0).InfofCtx(ctx, "failed to locate %s", fileId) + return fmt.Errorf("failed to locate %s", fileId) + } + vicCacheLock.Lock() + // Simple size limit to prevent unbounded growth + // For proper cache management, use wdclient.FilerClient instead + if cacheSize < maxCacheSize { vidCache[vid] = locations - vicCacheLock.Unlock() + cacheSize++ + } else if cacheSize == maxCacheSize { + glog.Warningf("filer.LookupFn cache reached limit of %d volumes, not caching new entries. Consider migrating to wdclient.FilerClient for bounded cache management.", maxCacheSize) + cacheSize++ // Only log once + } + vicCacheLock.Unlock() return nil }) diff --git a/weed/wdclient/masterclient.go b/weed/wdclient/masterclient.go index 696a4429f..2672bb5b4 100644 --- a/weed/wdclient/masterclient.go +++ b/weed/wdclient/masterclient.go @@ -2,6 +2,7 @@ package wdclient import ( "context" + "errors" "fmt" "math/rand" "strconv" @@ -26,8 +27,10 @@ type masterVolumeProvider struct { } // LookupVolumeIds queries the master for volume locations (fallback when cache misses) +// Returns partial results with aggregated errors for volumes that failed func (p *masterVolumeProvider) LookupVolumeIds(ctx context.Context, volumeIds []string) (map[string][]Location, error) { result := make(map[string][]Location) + var lookupErrors []error glog.V(2).Infof("Looking up %d volumes from master: %v", len(volumeIds), volumeIds) @@ -40,8 +43,11 @@ func (p *masterVolumeProvider) LookupVolumeIds(ctx context.Context, volumeIds [] } for _, vidLoc := range resp.VolumeIdLocations { + // Preserve per-volume errors from master response + // These could indicate misconfiguration, volume deletion, etc. if vidLoc.Error != "" { - glog.V(0).Infof("volume %s lookup error: %s", vidLoc.VolumeOrFileId, vidLoc.Error) + lookupErrors = append(lookupErrors, fmt.Errorf("volume %s: %s", vidLoc.VolumeOrFileId, vidLoc.Error)) + glog.V(1).Infof("volume %s lookup error from master: %s", vidLoc.VolumeOrFileId, vidLoc.Error) continue } @@ -50,6 +56,7 @@ func (p *masterVolumeProvider) LookupVolumeIds(ctx context.Context, volumeIds [] vidOnly := parts[0] vid, err := strconv.ParseUint(vidOnly, 10, 32) if err != nil { + lookupErrors = append(lookupErrors, fmt.Errorf("volume %s: invalid volume ID format: %w", vidLoc.VolumeOrFileId, err)) glog.Warningf("Failed to parse volume id '%s' from master response '%s': %v", vidOnly, vidLoc.VolumeOrFileId, err) continue } @@ -78,6 +85,13 @@ func (p *masterVolumeProvider) LookupVolumeIds(ctx context.Context, volumeIds [] return nil, err } + // Return partial results with detailed errors + // Callers should check both result map and error + if len(lookupErrors) > 0 { + glog.V(2).Infof("MasterClient: looked up %d volumes, found %d, %d errors", len(volumeIds), len(result), len(lookupErrors)) + return result, fmt.Errorf("master volume lookup errors: %w", errors.Join(lookupErrors...)) + } + glog.V(3).Infof("MasterClient: looked up %d volumes, found %d", len(volumeIds), len(result)) return result, nil } @@ -290,16 +304,34 @@ func (mc *MasterClient) setCurrentMaster(master pb.ServerAddress) { mc.currentMasterLock.Unlock() } +// GetMaster returns the current master address, blocking until connected. +// +// IMPORTANT: This method blocks until KeepConnectedToMaster successfully establishes +// a connection to a master server. If KeepConnectedToMaster hasn't been started in a +// background goroutine, this will block indefinitely (or until ctx is canceled). +// +// Typical initialization pattern: +// mc := wdclient.NewMasterClient(...) +// go mc.KeepConnectedToMaster(ctx) // Start connection management +// // ... later ... +// master := mc.GetMaster(ctx) // Will block until connected +// +// If called before KeepConnectedToMaster establishes a connection, this may cause +// unexpected timeouts in LookupVolumeIds and other operations that depend on it. func (mc *MasterClient) GetMaster(ctx context.Context) pb.ServerAddress { mc.WaitUntilConnected(ctx) return mc.getCurrentMaster() } +// GetMasters returns all configured master addresses, blocking until connected. +// See GetMaster() for important initialization contract details. func (mc *MasterClient) GetMasters(ctx context.Context) []pb.ServerAddress { mc.WaitUntilConnected(ctx) return mc.masters.GetInstances() } +// WaitUntilConnected blocks until a master connection is established or ctx is canceled. +// This does NOT initiate connections - it only waits for KeepConnectedToMaster to succeed. func (mc *MasterClient) WaitUntilConnected(ctx context.Context) { attempts := 0 for { diff --git a/weed/wdclient/vidmap_client.go b/weed/wdclient/vidmap_client.go index a84100ef7..0457c246b 100644 --- a/weed/wdclient/vidmap_client.go +++ b/weed/wdclient/vidmap_client.go @@ -103,8 +103,26 @@ func (vc *vidMapClient) LookupFileIdWithFallback(ctx context.Context, fileId str return fullUrls, nil } -// LookupVolumeIdsWithFallback looks up volume locations, querying provider if not in cache -// Uses singleflight to coalesce concurrent requests for the same batch of volumes +// LookupVolumeIdsWithFallback looks up volume locations, querying provider if not in cache. +// Uses singleflight to coalesce concurrent requests for the same batch of volumes. +// +// IMPORTANT: This function may return PARTIAL results with a non-nil error. +// The result map contains successfully looked up volumes, while the error aggregates +// failures for volumes that couldn't be found or had lookup errors. +// +// Callers MUST check both the result map AND the error: +// - result != nil && err == nil: All volumes found successfully +// - result != nil && err != nil: Some volumes found, some failed (check both) +// - result == nil && err != nil: Complete failure (connection error, etc.) +// +// Example usage: +// locs, err := mc.LookupVolumeIdsWithFallback(ctx, []string{"1", "2", "999"}) +// if len(locs) > 0 { +// // Process successfully found volumes +// } +// if err != nil { +// // Log/handle failed volumes +// } func (vc *vidMapClient) LookupVolumeIdsWithFallback(ctx context.Context, volumeIds []string) (map[string][]Location, error) { result := make(map[string][]Location) var needsLookup []string