diff --git a/weed/server/filer_grpc_server.go b/weed/server/filer_grpc_server.go index a18c55bb1..02eceebde 100644 --- a/weed/server/filer_grpc_server.go +++ b/weed/server/filer_grpc_server.go @@ -5,7 +5,6 @@ import ( "fmt" "os" "path/filepath" - "strconv" "time" "github.com/seaweedfs/seaweedfs/weed/cluster" @@ -17,6 +16,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" "github.com/seaweedfs/seaweedfs/weed/storage/needle" "github.com/seaweedfs/seaweedfs/weed/util" + "github.com/seaweedfs/seaweedfs/weed/wdclient" ) func (fs *FilerServer) LookupDirectoryEntry(ctx context.Context, req *filer_pb.LookupDirectoryEntryRequest) (*filer_pb.LookupDirectoryEntryResponse, error) { @@ -94,31 +94,31 @@ func (fs *FilerServer) LookupVolume(ctx context.Context, req *filer_pb.LookupVol LocationsMap: make(map[string]*filer_pb.Locations), } - for _, vidString := range req.VolumeIds { - vid, err := strconv.Atoi(vidString) - if err != nil { - glog.V(1).InfofCtx(ctx, "Unknown volume id %d", vid) - return nil, err - } - var locs []*filer_pb.Location - locations, found := fs.filer.MasterClient.GetLocations(uint32(vid)) - if !found { - continue - } - for _, loc := range locations { - locs = append(locs, &filer_pb.Location{ - Url: loc.Url, - PublicUrl: loc.PublicUrl, - GrpcPort: uint32(loc.GrpcPort), - DataCenter: loc.DataCenter, - }) - } + // Use master client's lookup with fallback - it handles cache and master query + vidLocations, err := fs.filer.MasterClient.LookupVolumeIdsWithFallback(ctx, req.VolumeIds) + + // Convert wdclient.Location to filer_pb.Location + // Return partial results even if there was an error + for vidString, locations := range vidLocations { resp.LocationsMap[vidString] = &filer_pb.Locations{ - Locations: locs, + Locations: wdclientLocationsToPb(locations), } } - return resp, nil + return resp, err +} + +func wdclientLocationsToPb(locations []wdclient.Location) []*filer_pb.Location { + locs := make([]*filer_pb.Location, 0, len(locations)) + for _, loc := range locations { + locs = append(locs, &filer_pb.Location{ + Url: loc.Url, + PublicUrl: loc.PublicUrl, + GrpcPort: uint32(loc.GrpcPort), + DataCenter: loc.DataCenter, + }) + } + return locs } func (fs *FilerServer) lookupFileId(ctx context.Context, fileId string) (targetUrls []string, err error) { diff --git a/weed/wdclient/masterclient.go b/weed/wdclient/masterclient.go index 11b58d861..f3950bc37 100644 --- a/weed/wdclient/masterclient.go +++ b/weed/wdclient/masterclient.go @@ -2,11 +2,17 @@ package wdclient import ( "context" + "errors" "fmt" "math/rand" + "sort" + "strconv" + "strings" "sync" "time" + "golang.org/x/sync/singleflight" + "github.com/seaweedfs/seaweedfs/weed/util/version" "github.com/seaweedfs/seaweedfs/weed/stats" @@ -29,10 +35,16 @@ type MasterClient struct { masters pb.ServerDiscovery grpcDialOption grpc.DialOption + // TODO: CRITICAL - Data race: resetVidMap() writes to vidMap while other methods read concurrently + // This embedded *vidMap should be changed to a private field protected by sync.RWMutex + // See: https://github.com/seaweedfs/seaweedfs/issues/[ISSUE_NUMBER] *vidMap vidMapCacheSize int OnPeerUpdate func(update *master_pb.ClusterNodeUpdate, startFrom time.Time) OnPeerUpdateLock sync.RWMutex + + // Per-batch in-flight tracking to prevent duplicate lookups for the same set of volumes + vidLookupGroup singleflight.Group } func NewMasterClient(grpcDialOption grpc.DialOption, filerGroup string, clientType string, clientHost pb.ServerAddress, clientDataCenter string, rack string, masters pb.ServerDiscovery) *MasterClient { @@ -59,39 +71,168 @@ func (mc *MasterClient) GetLookupFileIdFunction() LookupFileIdFunctionType { } func (mc *MasterClient) LookupFileIdWithFallback(ctx context.Context, fileId string) (fullUrls []string, err error) { + // Try cache first using the fast path fullUrls, err = mc.vidMap.LookupFileId(ctx, fileId) if err == nil && len(fullUrls) > 0 { return } - err = pb.WithMasterClient(false, mc.GetMaster(ctx), mc.grpcDialOption, false, func(client master_pb.SeaweedClient) error { - resp, err := client.LookupVolume(ctx, &master_pb.LookupVolumeRequest{ - VolumeOrFileIds: []string{fileId}, - }) + // Extract volume ID from file ID (format: "volumeId,needle_id_cookie") + parts := strings.Split(fileId, ",") + if len(parts) != 2 { + return nil, fmt.Errorf("invalid fileId %s", fileId) + } + volumeId := parts[0] + + // Use shared lookup logic with batching and singleflight + vidLocations, err := mc.LookupVolumeIdsWithFallback(ctx, []string{volumeId}) + if err != nil { + return nil, fmt.Errorf("LookupVolume %s failed: %v", fileId, err) + } + + locations, found := vidLocations[volumeId] + if !found || len(locations) == 0 { + return nil, fmt.Errorf("volume %s not found for fileId %s", volumeId, fileId) + } + + // Build HTTP URLs from locations, preferring same data center + var sameDcUrls, otherDcUrls []string + for _, loc := range locations { + httpUrl := "http://" + loc.Url + "/" + fileId + if mc.DataCenter != "" && mc.DataCenter == loc.DataCenter { + sameDcUrls = append(sameDcUrls, httpUrl) + } else { + otherDcUrls = append(otherDcUrls, httpUrl) + } + } + + // Prefer same data center + fullUrls = append(sameDcUrls, otherDcUrls...) + return fullUrls, nil +} + +// LookupVolumeIdsWithFallback looks up volume locations, querying master if not in cache +// Uses singleflight to coalesce concurrent requests for the same batch of volumes +func (mc *MasterClient) LookupVolumeIdsWithFallback(ctx context.Context, volumeIds []string) (map[string][]Location, error) { + result := make(map[string][]Location) + var needsLookup []string + var lookupErrors []error + + // Check cache first and parse volume IDs once + vidStringToUint := make(map[string]uint32, len(volumeIds)) + for _, vidString := range volumeIds { + vid, err := strconv.ParseUint(vidString, 10, 32) if err != nil { - return fmt.Errorf("LookupVolume %s failed: %v", fileId, err) + return nil, fmt.Errorf("invalid volume id %s: %v", vidString, err) + } + vidStringToUint[vidString] = uint32(vid) + + locations, found := mc.GetLocations(uint32(vid)) + if found && len(locations) > 0 { + result[vidString] = locations + } else { + needsLookup = append(needsLookup, vidString) + } + } + + if len(needsLookup) == 0 { + return result, nil + } + + // Batch query all missing volumes using singleflight on the batch key + // Sort for stable key to coalesce identical batches + sort.Strings(needsLookup) + batchKey := strings.Join(needsLookup, ",") + + sfResult, err, _ := mc.vidLookupGroup.Do(batchKey, func() (interface{}, error) { + // Double-check cache for volumes that might have been populated while waiting + stillNeedLookup := make([]string, 0, len(needsLookup)) + batchResult := make(map[string][]Location) + + for _, vidString := range needsLookup { + vid := vidStringToUint[vidString] // Use pre-parsed value + if locations, found := mc.GetLocations(vid); found && len(locations) > 0 { + batchResult[vidString] = locations + } else { + stillNeedLookup = append(stillNeedLookup, vidString) + } + } + + if len(stillNeedLookup) == 0 { + return batchResult, nil } - for vid, vidLocation := range resp.VolumeIdLocations { - for _, vidLoc := range vidLocation.Locations { - loc := Location{ - Url: vidLoc.Url, - PublicUrl: vidLoc.PublicUrl, - GrpcPort: int(vidLoc.GrpcPort), - DataCenter: vidLoc.DataCenter, + + // Query master with batched volume IDs + glog.V(2).Infof("Looking up %d volumes from master: %v", len(stillNeedLookup), stillNeedLookup) + + err := pb.WithMasterClient(false, mc.GetMaster(ctx), mc.grpcDialOption, false, func(client master_pb.SeaweedClient) error { + resp, err := client.LookupVolume(ctx, &master_pb.LookupVolumeRequest{ + VolumeOrFileIds: stillNeedLookup, + }) + if err != nil { + return fmt.Errorf("master lookup failed: %v", err) + } + + for _, vidLoc := range resp.VolumeIdLocations { + if vidLoc.Error != "" { + glog.V(0).Infof("volume %s lookup error: %s", vidLoc.VolumeOrFileId, vidLoc.Error) + continue } - mc.vidMap.addLocation(uint32(vid), loc) - httpUrl := "http://" + loc.Url + "/" + fileId - // Prefer same data center - if mc.DataCenter != "" && mc.DataCenter == loc.DataCenter { - fullUrls = append([]string{httpUrl}, fullUrls...) - } else { - fullUrls = append(fullUrls, httpUrl) + + // Parse volume ID from response + parts := strings.Split(vidLoc.VolumeOrFileId, ",") + vidOnly := parts[0] + vid, err := strconv.ParseUint(vidOnly, 10, 32) + if err != nil { + glog.Warningf("Failed to parse volume id '%s' from master response '%s': %v", vidOnly, vidLoc.VolumeOrFileId, err) + continue + } + + var locations []Location + for _, masterLoc := range vidLoc.Locations { + loc := Location{ + Url: masterLoc.Url, + PublicUrl: masterLoc.PublicUrl, + GrpcPort: int(masterLoc.GrpcPort), + DataCenter: masterLoc.DataCenter, + } + mc.vidMap.addLocation(uint32(vid), loc) + locations = append(locations, loc) + } + + if len(locations) > 0 { + batchResult[vidOnly] = locations } } + return nil + }) + + if err != nil { + return batchResult, err } - return nil + return batchResult, nil }) - return + + if err != nil { + lookupErrors = append(lookupErrors, err) + } + + // Merge singleflight batch results + if batchLocations, ok := sfResult.(map[string][]Location); ok { + for vid, locs := range batchLocations { + result[vid] = locs + } + } + + // Check for volumes that still weren't found + for _, vidString := range needsLookup { + if _, found := result[vidString]; !found { + lookupErrors = append(lookupErrors, fmt.Errorf("volume %s not found", vidString)) + } + } + + // Return aggregated errors using errors.Join to preserve error types + return result, errors.Join(lookupErrors...) } func (mc *MasterClient) getCurrentMaster() pb.ServerAddress {