diff --git a/weed/wdclient/masterclient.go b/weed/wdclient/masterclient.go index db5ebee10..81dc2ecc5 100644 --- a/weed/wdclient/masterclient.go +++ b/weed/wdclient/masterclient.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "math/rand" + "sort" "strconv" "strings" "sync" @@ -39,21 +40,19 @@ type MasterClient struct { OnPeerUpdateLock sync.RWMutex // Per-volume-ID in-flight tracking to prevent duplicate lookups - vidLookupLock sync.Mutex - vidLookupInFlight map[string]*singleflight.Group // volumeId -> singleflight group + vidLookupGroup singleflight.Group } func NewMasterClient(grpcDialOption grpc.DialOption, filerGroup string, clientType string, clientHost pb.ServerAddress, clientDataCenter string, rack string, masters pb.ServerDiscovery) *MasterClient { return &MasterClient{ - FilerGroup: filerGroup, - clientType: clientType, - clientHost: clientHost, - rack: rack, - masters: masters, - grpcDialOption: grpcDialOption, - vidMap: newVidMap(clientDataCenter), - vidMapCacheSize: 5, - vidLookupInFlight: make(map[string]*singleflight.Group), + FilerGroup: filerGroup, + clientType: clientType, + clientHost: clientHost, + rack: rack, + masters: masters, + grpcDialOption: grpcDialOption, + vidMap: newVidMap(clientDataCenter), + vidMapCacheSize: 5, } } @@ -104,10 +103,11 @@ func (mc *MasterClient) LookupFileIdWithFallback(ctx context.Context, fileId str } // LookupVolumeIdsWithFallback looks up volume locations, querying master if not in cache -// Uses per-volume-ID singleflight to prevent duplicate lookups of the same volume +// Uses per-volume-ID singleflight to prevent duplicate lookups, with batched master queries func (mc *MasterClient) LookupVolumeIdsWithFallback(ctx context.Context, volumeIds []string) (map[string][]Location, error) { result := make(map[string][]Location) var needsLookup []string + var lookupErrors []error // Check cache first and separate volumes that need lookup for _, vidString := range volumeIds { @@ -127,86 +127,103 @@ func (mc *MasterClient) LookupVolumeIdsWithFallback(ctx context.Context, volumeI return result, nil } - // For each volume that needs lookup, use per-volume singleflight - // to prevent duplicate master queries for the same volume ID - for _, vidString := range needsLookup { - // Get or create singleflight group for this volume ID - mc.vidLookupLock.Lock() - group, exists := mc.vidLookupInFlight[vidString] - if !exists { - group = &singleflight.Group{} - mc.vidLookupInFlight[vidString] = group - } - mc.vidLookupLock.Unlock() + // Batch query all missing volumes using singleflight on the batch key + // Sort for stable key to coalesce identical batches + sort.Strings(needsLookup) + batchKey := strings.Join(needsLookup, ",") + + sfResult, err, _ := mc.vidLookupGroup.Do(batchKey, func() (interface{}, error) { + // Double-check cache for volumes that might have been populated while waiting + stillNeedLookup := make([]string, 0, len(needsLookup)) + batchResult := make(map[string][]Location) - // Use singleflight to ensure only one lookup per volume ID - sfResult, err, _ := group.Do(vidString, func() (interface{}, error) { - // Double-check cache in case it was populated while we were waiting + for _, vidString := range needsLookup { vid, _ := strconv.ParseUint(vidString, 10, 32) if locations, found := mc.GetLocations(uint32(vid)); found && len(locations) > 0 { - return locations, nil + batchResult[vidString] = locations + } else { + stillNeedLookup = append(stillNeedLookup, vidString) } + } - // Query master for this volume - glog.V(2).Infof("Looking up volume %s from master", vidString) - var locations []Location + if len(stillNeedLookup) == 0 { + return batchResult, nil + } - err := pb.WithMasterClient(false, mc.GetMaster(ctx), mc.grpcDialOption, false, func(client master_pb.SeaweedClient) error { - resp, err := client.LookupVolume(ctx, &master_pb.LookupVolumeRequest{ - VolumeOrFileIds: []string{vidString}, - }) - if err != nil { - return fmt.Errorf("master lookup failed: %v", err) - } + // Query master with batched volume IDs + glog.V(2).Infof("Looking up %d volumes from master: %v", len(stillNeedLookup), stillNeedLookup) - for _, vidLoc := range resp.VolumeIdLocations { - if vidLoc.Error != "" { - return fmt.Errorf("volume %s lookup error: %s", vidString, vidLoc.Error) - } + err := pb.WithMasterClient(false, mc.GetMaster(ctx), mc.grpcDialOption, false, func(client master_pb.SeaweedClient) error { + resp, err := client.LookupVolume(ctx, &master_pb.LookupVolumeRequest{ + VolumeOrFileIds: stillNeedLookup, + }) + if err != nil { + return fmt.Errorf("master lookup failed: %v", err) + } - // Parse volume ID from response - parts := strings.Split(vidLoc.VolumeOrFileId, ",") - vidOnly := parts[0] - vid, err := strconv.ParseUint(vidOnly, 10, 32) - if err != nil { - return fmt.Errorf("failed to parse volume id '%s': %v", vidOnly, err) - } + for _, vidLoc := range resp.VolumeIdLocations { + if vidLoc.Error != "" { + glog.V(0).Infof("volume %s lookup error: %s", vidLoc.VolumeOrFileId, vidLoc.Error) + continue + } - for _, masterLoc := range vidLoc.Locations { - loc := Location{ - Url: masterLoc.Url, - PublicUrl: masterLoc.PublicUrl, - GrpcPort: int(masterLoc.GrpcPort), - DataCenter: masterLoc.DataCenter, - } - mc.vidMap.addLocation(uint32(vid), loc) - locations = append(locations, loc) + // Parse volume ID from response + parts := strings.Split(vidLoc.VolumeOrFileId, ",") + vidOnly := parts[0] + vid, err := strconv.ParseUint(vidOnly, 10, 32) + if err != nil { + glog.Warningf("Failed to parse volume id '%s' from master response '%s': %v", vidOnly, vidLoc.VolumeOrFileId, err) + continue + } + + var locations []Location + for _, masterLoc := range vidLoc.Locations { + loc := Location{ + Url: masterLoc.Url, + PublicUrl: masterLoc.PublicUrl, + GrpcPort: int(masterLoc.GrpcPort), + DataCenter: masterLoc.DataCenter, } + mc.vidMap.addLocation(uint32(vid), loc) + locations = append(locations, loc) } - return nil - }) - if err != nil { - return nil, err + if len(locations) > 0 { + batchResult[vidOnly] = locations + } } - return locations, nil + return nil }) - // Clean up the singleflight group for this volume - mc.vidLookupLock.Lock() - delete(mc.vidLookupInFlight, vidString) - mc.vidLookupLock.Unlock() - if err != nil { - glog.Warningf("Failed to lookup volume %s: %v", vidString, err) - continue // Continue with other volumes + return batchResult, err } + return batchResult, nil + }) - if locations, ok := sfResult.([]Location); ok && len(locations) > 0 { - result[vidString] = locations + if err != nil { + lookupErrors = append(lookupErrors, err) + } + + // Merge singleflight batch results + if batchLocations, ok := sfResult.(map[string][]Location); ok { + for vid, locs := range batchLocations { + result[vid] = locs + } + } + + // Check for volumes that still weren't found + for _, vidString := range needsLookup { + if _, found := result[vidString]; !found { + lookupErrors = append(lookupErrors, fmt.Errorf("volume %s not found", vidString)) } } + // Return aggregated errors + if len(lookupErrors) > 0 { + return result, fmt.Errorf("lookup errors: %v", lookupErrors) + } + return result, nil }