diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java index 045751717..ab2407dec 100644 --- a/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java +++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java @@ -40,7 +40,7 @@ public class SeaweedRead { //TODO parallel this long readCount = 0; - int startOffset = bufferOffset; + long startOffset = position; for (ChunkView chunkView : chunkViews) { if (startOffset < chunkView.logicOffset) { @@ -57,7 +57,7 @@ public class SeaweedRead { return 0; } - int len = readChunkView(position, buffer, startOffset, chunkView, locations); + int len = readChunkView(startOffset, buffer, bufferOffset + readCount, chunkView, locations); LOG.debug("read [{},{}) {} size {}", startOffset, startOffset + len, chunkView.fileId, chunkView.size); @@ -66,7 +66,7 @@ public class SeaweedRead { } - long limit = Math.min(bufferLength, fileSize); + long limit = Math.min(bufferOffset + bufferLength, fileSize); if (startOffset < limit) { long gap = limit - startOffset; @@ -78,7 +78,7 @@ public class SeaweedRead { return readCount; } - private static int readChunkView(long position, byte[] buffer, int startOffset, ChunkView chunkView, FilerProto.Locations locations) throws IOException { + private static int readChunkView(long startOffset, byte[] buffer, long bufOffset, ChunkView chunkView, FilerProto.Locations locations) throws IOException { byte[] chunkData = chunkCache.getChunk(chunkView.fileId); @@ -88,17 +88,51 @@ public class SeaweedRead { } int len = (int) chunkView.size; - LOG.debug("readChunkView fid:{} chunkData.length:{} chunkView.offset:{} buffer.length:{} startOffset:{} len:{}", - chunkView.fileId, chunkData.length, chunkView.offset, buffer.length, startOffset, len); - System.arraycopy(chunkData, startOffset - (int) (chunkView.logicOffset - chunkView.offset), buffer, startOffset, len); + LOG.debug("readChunkView fid:{} chunkData.length:{} chunkView.offset:{} chunkView[{};{}) buf[{},{})/{} startOffset:{}", + chunkView.fileId, chunkData.length, chunkView.offset, chunkView.logicOffset, chunkView.logicOffset + chunkView.size, bufOffset, bufOffset + len, buffer.length, startOffset); + System.arraycopy(chunkData, (int) (startOffset - chunkView.logicOffset + chunkView.offset), buffer, (int) bufOffset, len); return len; } public static byte[] doFetchFullChunkData(ChunkView chunkView, FilerProto.Locations locations) throws IOException { - HttpGet request = new HttpGet( - String.format("http://%s/%s", locations.getLocations(0).getUrl(), chunkView.fileId)); + byte[] data = null; + IOException lastException = null; + for (long waitTime = 1000L; waitTime < 10 * 1000; waitTime += waitTime / 2) { + for (FilerProto.Location location : locations.getLocationsList()) { + String url = String.format("http://%s/%s", location.getUrl(), chunkView.fileId); + try { + data = doFetchOneFullChunkData(chunkView, url); + lastException = null; + break; + } catch (IOException ioe) { + LOG.debug("doFetchFullChunkData {} :{}", url, ioe); + lastException = ioe; + } + } + if (data != null) { + break; + } + try { + Thread.sleep(waitTime); + } catch (InterruptedException e) { + } + } + + if (lastException != null) { + throw lastException; + } + + LOG.debug("doFetchFullChunkData fid:{} chunkData.length:{}", chunkView.fileId, data.length); + + return data; + + } + + public static byte[] doFetchOneFullChunkData(ChunkView chunkView, String url) throws IOException { + + HttpGet request = new HttpGet(url); request.setHeader(HttpHeaders.ACCEPT_ENCODING, "gzip"); @@ -142,7 +176,7 @@ public class SeaweedRead { data = Gzip.decompress(data); } - LOG.debug("doFetchFullChunkData fid:{} chunkData.length:{}", chunkView.fileId, data.length); + LOG.debug("doFetchOneFullChunkData url:{} chunkData.length:{}", url, data.length); return data; diff --git a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystem.java b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystem.java index 6551548fa..44c2ef053 100644 --- a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystem.java +++ b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystem.java @@ -74,7 +74,7 @@ public class SeaweedFileSystem extends FileSystem { try { int seaweedBufferSize = this.getConf().getInt(FS_SEAWEED_BUFFER_SIZE, FS_SEAWEED_DEFAULT_BUFFER_SIZE); - FSInputStream inputStream = seaweedFileSystemStore.openFileForRead(path, statistics, seaweedBufferSize); + FSInputStream inputStream = seaweedFileSystemStore.openFileForRead(path, statistics); return new FSDataInputStream(new BufferedFSInputStream(inputStream, 4 * seaweedBufferSize)); } catch (Exception ex) { LOG.warn("open path: {} bufferSize:{}", path, bufferSize, ex); diff --git a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java index 3dc38fe1e..eec5bd2d3 100644 --- a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java +++ b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java @@ -205,10 +205,9 @@ public class SeaweedFileSystemStore { } - public FSInputStream openFileForRead(final Path path, FileSystem.Statistics statistics, - int bufferSize) throws IOException { + public FSInputStream openFileForRead(final Path path, FileSystem.Statistics statistics) throws IOException { - LOG.debug("openFileForRead path:{} bufferSize:{}", path, bufferSize); + LOG.debug("openFileForRead path:{}", path); FilerProto.Entry entry = lookupEntry(path); @@ -219,8 +218,7 @@ public class SeaweedFileSystemStore { return new SeaweedInputStream(filerGrpcClient, statistics, path.toUri().getPath(), - entry, - bufferSize); + entry); } public void setOwner(Path path, String owner, String group) { diff --git a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedInputStream.java b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedInputStream.java index 36c0766a4..8bda2e092 100644 --- a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedInputStream.java +++ b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedInputStream.java @@ -25,7 +25,6 @@ public class SeaweedInputStream extends FSInputStream { private final FilerProto.Entry entry; private final List visibleIntervalList; private final long contentLength; - private final int bufferSize; // default buffer size private long position = 0; // cursor of the file @@ -35,14 +34,12 @@ public class SeaweedInputStream extends FSInputStream { final FilerGrpcClient filerGrpcClient, final Statistics statistics, final String path, - final FilerProto.Entry entry, - final int bufferSize) throws IOException { + final FilerProto.Entry entry) throws IOException { this.filerGrpcClient = filerGrpcClient; this.statistics = statistics; this.path = path; this.entry = entry; this.contentLength = SeaweedRead.fileSize(entry); - this.bufferSize = bufferSize; this.visibleIntervalList = SeaweedRead.nonOverlappingVisibleIntervals(filerGrpcClient, entry.getChunksList()); diff --git a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java index 6551548fa..44c2ef053 100644 --- a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java +++ b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java @@ -74,7 +74,7 @@ public class SeaweedFileSystem extends FileSystem { try { int seaweedBufferSize = this.getConf().getInt(FS_SEAWEED_BUFFER_SIZE, FS_SEAWEED_DEFAULT_BUFFER_SIZE); - FSInputStream inputStream = seaweedFileSystemStore.openFileForRead(path, statistics, seaweedBufferSize); + FSInputStream inputStream = seaweedFileSystemStore.openFileForRead(path, statistics); return new FSDataInputStream(new BufferedFSInputStream(inputStream, 4 * seaweedBufferSize)); } catch (Exception ex) { LOG.warn("open path: {} bufferSize:{}", path, bufferSize, ex); diff --git a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java index c76160bd2..7ec91929c 100644 --- a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java +++ b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java @@ -205,10 +205,9 @@ public class SeaweedFileSystemStore { } - public FSInputStream openFileForRead(final Path path, FileSystem.Statistics statistics, - int bufferSize) throws IOException { + public FSInputStream openFileForRead(final Path path, FileSystem.Statistics statistics) throws IOException { - LOG.debug("openFileForRead path:{} bufferSize:{}", path, bufferSize); + LOG.debug("openFileForRead path:{}", path); FilerProto.Entry entry = lookupEntry(path); @@ -219,8 +218,7 @@ public class SeaweedFileSystemStore { return new SeaweedInputStream(filerGrpcClient, statistics, path.toUri().getPath(), - entry, - bufferSize); + entry); } public void setOwner(Path path, String owner, String group) { diff --git a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedInputStream.java b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedInputStream.java index 36c0766a4..8bda2e092 100644 --- a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedInputStream.java +++ b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedInputStream.java @@ -25,7 +25,6 @@ public class SeaweedInputStream extends FSInputStream { private final FilerProto.Entry entry; private final List visibleIntervalList; private final long contentLength; - private final int bufferSize; // default buffer size private long position = 0; // cursor of the file @@ -35,14 +34,12 @@ public class SeaweedInputStream extends FSInputStream { final FilerGrpcClient filerGrpcClient, final Statistics statistics, final String path, - final FilerProto.Entry entry, - final int bufferSize) throws IOException { + final FilerProto.Entry entry) throws IOException { this.filerGrpcClient = filerGrpcClient; this.statistics = statistics; this.path = path; this.entry = entry; this.contentLength = SeaweedRead.fileSize(entry); - this.bufferSize = bufferSize; this.visibleIntervalList = SeaweedRead.nonOverlappingVisibleIntervals(filerGrpcClient, entry.getChunksList());