|
|
@ -67,39 +67,44 @@ func (mc *MasterClient) GetLookupFileIdFunction() LookupFileIdFunctionType { |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
func (mc *MasterClient) LookupFileIdWithFallback(ctx context.Context, fileId string) (fullUrls []string, err error) { |
|
|
func (mc *MasterClient) LookupFileIdWithFallback(ctx context.Context, fileId string) (fullUrls []string, err error) { |
|
|
|
|
|
// Try cache first using the fast path
|
|
|
fullUrls, err = mc.vidMap.LookupFileId(ctx, fileId) |
|
|
fullUrls, err = mc.vidMap.LookupFileId(ctx, fileId) |
|
|
if err == nil && len(fullUrls) > 0 { |
|
|
if err == nil && len(fullUrls) > 0 { |
|
|
return |
|
|
return |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
err = pb.WithMasterClient(false, mc.GetMaster(ctx), mc.grpcDialOption, false, func(client master_pb.SeaweedClient) error { |
|
|
|
|
|
resp, err := client.LookupVolume(ctx, &master_pb.LookupVolumeRequest{ |
|
|
|
|
|
VolumeOrFileIds: []string{fileId}, |
|
|
|
|
|
}) |
|
|
|
|
|
if err != nil { |
|
|
|
|
|
return fmt.Errorf("LookupVolume %s failed: %v", fileId, err) |
|
|
|
|
|
} |
|
|
|
|
|
for vid, vidLocation := range resp.VolumeIdLocations { |
|
|
|
|
|
for _, vidLoc := range vidLocation.Locations { |
|
|
|
|
|
loc := Location{ |
|
|
|
|
|
Url: vidLoc.Url, |
|
|
|
|
|
PublicUrl: vidLoc.PublicUrl, |
|
|
|
|
|
GrpcPort: int(vidLoc.GrpcPort), |
|
|
|
|
|
DataCenter: vidLoc.DataCenter, |
|
|
|
|
|
} |
|
|
|
|
|
mc.vidMap.addLocation(uint32(vid), loc) |
|
|
|
|
|
httpUrl := "http://" + loc.Url + "/" + fileId |
|
|
|
|
|
// Prefer same data center
|
|
|
|
|
|
if mc.DataCenter != "" && mc.DataCenter == loc.DataCenter { |
|
|
|
|
|
fullUrls = append([]string{httpUrl}, fullUrls...) |
|
|
|
|
|
} else { |
|
|
|
|
|
fullUrls = append(fullUrls, httpUrl) |
|
|
|
|
|
} |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
// Extract volume ID from file ID (format: "volumeId,needle_id_cookie")
|
|
|
|
|
|
parts := strings.Split(fileId, ",") |
|
|
|
|
|
if len(parts) != 2 { |
|
|
|
|
|
return nil, fmt.Errorf("invalid fileId %s", fileId) |
|
|
|
|
|
} |
|
|
|
|
|
volumeId := parts[0] |
|
|
|
|
|
|
|
|
|
|
|
// Use shared lookup logic with batching and singleflight
|
|
|
|
|
|
vidLocations, err := mc.LookupVolumeIdsWithFallback(ctx, []string{volumeId}) |
|
|
|
|
|
if err != nil { |
|
|
|
|
|
return nil, fmt.Errorf("LookupVolume %s failed: %v", fileId, err) |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
locations, found := vidLocations[volumeId] |
|
|
|
|
|
if !found || len(locations) == 0 { |
|
|
|
|
|
return nil, fmt.Errorf("volume %s not found for fileId %s", volumeId, fileId) |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
// Build HTTP URLs from locations, preferring same data center
|
|
|
|
|
|
var sameDcUrls, otherDcUrls []string |
|
|
|
|
|
for _, loc := range locations { |
|
|
|
|
|
httpUrl := "http://" + loc.Url + "/" + fileId |
|
|
|
|
|
if mc.DataCenter != "" && mc.DataCenter == loc.DataCenter { |
|
|
|
|
|
sameDcUrls = append(sameDcUrls, httpUrl) |
|
|
|
|
|
} else { |
|
|
|
|
|
otherDcUrls = append(otherDcUrls, httpUrl) |
|
|
} |
|
|
} |
|
|
return nil |
|
|
|
|
|
}) |
|
|
|
|
|
return |
|
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
// Prefer same data center
|
|
|
|
|
|
fullUrls = append(sameDcUrls, otherDcUrls...) |
|
|
|
|
|
return fullUrls, nil |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
// LookupVolumeIdsWithFallback looks up volume locations, querying master if not in cache
|
|
|
// LookupVolumeIdsWithFallback looks up volume locations, querying master if not in cache
|
|
|
|