|
@ -19,6 +19,7 @@ public class SeaweedRead { |
|
|
private static final Logger LOG = LoggerFactory.getLogger(SeaweedRead.class); |
|
|
private static final Logger LOG = LoggerFactory.getLogger(SeaweedRead.class); |
|
|
|
|
|
|
|
|
static ChunkCache chunkCache = new ChunkCache(4); |
|
|
static ChunkCache chunkCache = new ChunkCache(4); |
|
|
|
|
|
static VolumeIdCache volumeIdCache = new VolumeIdCache(4 * 1024); |
|
|
|
|
|
|
|
|
// returns bytesRead |
|
|
// returns bytesRead |
|
|
public static long read(FilerGrpcClient filerGrpcClient, List<VisibleInterval> visibleIntervals, |
|
|
public static long read(FilerGrpcClient filerGrpcClient, List<VisibleInterval> visibleIntervals, |
|
@ -30,13 +31,19 @@ public class SeaweedRead { |
|
|
FilerProto.LookupVolumeRequest.Builder lookupRequest = FilerProto.LookupVolumeRequest.newBuilder(); |
|
|
FilerProto.LookupVolumeRequest.Builder lookupRequest = FilerProto.LookupVolumeRequest.newBuilder(); |
|
|
for (ChunkView chunkView : chunkViews) { |
|
|
for (ChunkView chunkView : chunkViews) { |
|
|
String vid = parseVolumeId(chunkView.fileId); |
|
|
String vid = parseVolumeId(chunkView.fileId); |
|
|
|
|
|
if (volumeIdCache.getLocations(vid)==null){ |
|
|
lookupRequest.addVolumeIds(vid); |
|
|
lookupRequest.addVolumeIds(vid); |
|
|
} |
|
|
} |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
if (lookupRequest.getVolumeIdsCount()>0){ |
|
|
FilerProto.LookupVolumeResponse lookupResponse = filerGrpcClient |
|
|
FilerProto.LookupVolumeResponse lookupResponse = filerGrpcClient |
|
|
.getBlockingStub().lookupVolume(lookupRequest.build()); |
|
|
.getBlockingStub().lookupVolume(lookupRequest.build()); |
|
|
|
|
|
|
|
|
Map<String, FilerProto.Locations> vid2Locations = lookupResponse.getLocationsMapMap(); |
|
|
Map<String, FilerProto.Locations> vid2Locations = lookupResponse.getLocationsMapMap(); |
|
|
|
|
|
for (Map.Entry<String,FilerProto.Locations> entry : vid2Locations.entrySet()) { |
|
|
|
|
|
volumeIdCache.setLocations(entry.getKey(), entry.getValue()); |
|
|
|
|
|
} |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
//TODO parallel this |
|
|
//TODO parallel this |
|
|
long readCount = 0; |
|
|
long readCount = 0; |
|
@ -50,7 +57,7 @@ public class SeaweedRead { |
|
|
startOffset += gap; |
|
|
startOffset += gap; |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
FilerProto.Locations locations = vid2Locations.get(parseVolumeId(chunkView.fileId)); |
|
|
|
|
|
|
|
|
FilerProto.Locations locations = volumeIdCache.getLocations(parseVolumeId(chunkView.fileId)); |
|
|
if (locations == null || locations.getLocationsCount() == 0) { |
|
|
if (locations == null || locations.getLocationsCount() == 0) { |
|
|
LOG.error("failed to locate {}", chunkView.fileId); |
|
|
LOG.error("failed to locate {}", chunkView.fileId); |
|
|
// log here! |
|
|
// log here! |
|
|