From e1a3ffcdbf7981473398e9526c6e0d8cb0fb24a0 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sat, 10 Oct 2020 00:51:26 -0700 Subject: [PATCH 1/6] Hadoop: add exponential back off for failed reads --- .../java/seaweedfs/client/SeaweedRead.java | 33 +++++++++++++++++-- 1 file changed, 30 insertions(+), 3 deletions(-) diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java index 045751717..7e5c5cb88 100644 --- a/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java +++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java @@ -97,8 +97,35 @@ public class SeaweedRead { public static byte[] doFetchFullChunkData(ChunkView chunkView, FilerProto.Locations locations) throws IOException { - HttpGet request = new HttpGet( - String.format("http://%s/%s", locations.getLocations(0).getUrl(), chunkView.fileId)); + byte[] data = null; + for (long waitTime = 230L; waitTime < 20 * 1000; waitTime += waitTime / 2) { + for (FilerProto.Location location : locations.getLocationsList()) { + String url = String.format("http://%s/%s", location.getUrl(), chunkView.fileId); + try { + data = doFetchOneFullChunkData(chunkView, url); + break; + } catch (IOException ioe) { + LOG.debug("doFetchFullChunkData {} :{}", url, ioe); + } + } + if (data != null) { + break; + } + try { + Thread.sleep(waitTime); + } catch (InterruptedException e) { + } + } + + LOG.debug("doFetchFullChunkData fid:{} chunkData.length:{}", chunkView.fileId, data.length); + + return data; + + } + + public static byte[] doFetchOneFullChunkData(ChunkView chunkView, String url) throws IOException { + + HttpGet request = new HttpGet(url); request.setHeader(HttpHeaders.ACCEPT_ENCODING, "gzip"); @@ -142,7 +169,7 @@ public class SeaweedRead { data = Gzip.decompress(data); } - LOG.debug("doFetchFullChunkData fid:{} chunkData.length:{}", chunkView.fileId, data.length); + LOG.debug("doFetchOneFullChunkData url:{} chunkData.length:{}", url, data.length); return data; From 00a75d7c99e87f543a4e0a4390732bb2eaa286fe Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sat, 10 Oct 2020 00:57:54 -0700 Subject: [PATCH 2/6] Hadoop: fix reading file tail --- .../java/seaweedfs/client/SeaweedRead.java | 22 ++++++++++++------- 1 file changed, 14 insertions(+), 8 deletions(-) diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java index 7e5c5cb88..cc8befb7a 100644 --- a/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java +++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java @@ -40,7 +40,7 @@ public class SeaweedRead { //TODO parallel this long readCount = 0; - int startOffset = bufferOffset; + long startOffset = position; for (ChunkView chunkView : chunkViews) { if (startOffset < chunkView.logicOffset) { @@ -57,7 +57,7 @@ public class SeaweedRead { return 0; } - int len = readChunkView(position, buffer, startOffset, chunkView, locations); + int len = readChunkView(startOffset, buffer, bufferOffset + readCount, chunkView, locations); LOG.debug("read [{},{}) {} size {}", startOffset, startOffset + len, chunkView.fileId, chunkView.size); @@ -66,7 +66,7 @@ public class SeaweedRead { } - long limit = Math.min(bufferLength, fileSize); + long limit = Math.min(bufferOffset + bufferLength, fileSize); if (startOffset < limit) { long gap = limit - startOffset; @@ -78,7 +78,7 @@ public class SeaweedRead { return readCount; } - private static int readChunkView(long position, byte[] buffer, int startOffset, ChunkView chunkView, FilerProto.Locations locations) throws IOException { + private static int readChunkView(long startOffset, byte[] buffer, long bufOffset, ChunkView chunkView, FilerProto.Locations locations) throws IOException { byte[] chunkData = chunkCache.getChunk(chunkView.fileId); @@ -88,9 +88,9 @@ public class SeaweedRead { } int len = (int) chunkView.size; - LOG.debug("readChunkView fid:{} chunkData.length:{} chunkView.offset:{} buffer.length:{} startOffset:{} len:{}", - chunkView.fileId, chunkData.length, chunkView.offset, buffer.length, startOffset, len); - System.arraycopy(chunkData, startOffset - (int) (chunkView.logicOffset - chunkView.offset), buffer, startOffset, len); + LOG.debug("readChunkView fid:{} chunkData.length:{} chunkView[{};{}) buf[{},{})/{} startOffset:{}", + chunkView.fileId, chunkData.length, chunkView.offset, chunkView.offset+chunkView.size, bufOffset, bufOffset+len, buffer.length, startOffset); + System.arraycopy(chunkData, (int) (startOffset - chunkView.logicOffset + chunkView.offset), buffer, (int)bufOffset, len); return len; } @@ -98,7 +98,8 @@ public class SeaweedRead { public static byte[] doFetchFullChunkData(ChunkView chunkView, FilerProto.Locations locations) throws IOException { byte[] data = null; - for (long waitTime = 230L; waitTime < 20 * 1000; waitTime += waitTime / 2) { + IOException lastException = null; + for (long waitTime = 1000L; waitTime < 10 * 1000; waitTime += waitTime / 2) { for (FilerProto.Location location : locations.getLocationsList()) { String url = String.format("http://%s/%s", location.getUrl(), chunkView.fileId); try { @@ -106,6 +107,7 @@ public class SeaweedRead { break; } catch (IOException ioe) { LOG.debug("doFetchFullChunkData {} :{}", url, ioe); + lastException = ioe; } } if (data != null) { @@ -117,6 +119,10 @@ public class SeaweedRead { } } + if (data == null) { + throw lastException; + } + LOG.debug("doFetchFullChunkData fid:{} chunkData.length:{}", chunkView.fileId, data.length); return data; From 6fc272f9136e9f816696f633c5536d7305026047 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sat, 10 Oct 2020 01:03:04 -0700 Subject: [PATCH 3/6] adjust throwing exception --- .../client/src/main/java/seaweedfs/client/SeaweedRead.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java index cc8befb7a..c8a9e49f1 100644 --- a/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java +++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java @@ -89,8 +89,8 @@ public class SeaweedRead { int len = (int) chunkView.size; LOG.debug("readChunkView fid:{} chunkData.length:{} chunkView[{};{}) buf[{},{})/{} startOffset:{}", - chunkView.fileId, chunkData.length, chunkView.offset, chunkView.offset+chunkView.size, bufOffset, bufOffset+len, buffer.length, startOffset); - System.arraycopy(chunkData, (int) (startOffset - chunkView.logicOffset + chunkView.offset), buffer, (int)bufOffset, len); + chunkView.fileId, chunkData.length, chunkView.offset, chunkView.offset + chunkView.size, bufOffset, bufOffset + len, buffer.length, startOffset); + System.arraycopy(chunkData, (int) (startOffset - chunkView.logicOffset + chunkView.offset), buffer, (int) bufOffset, len); return len; } @@ -119,7 +119,7 @@ public class SeaweedRead { } } - if (data == null) { + if (data == null && lastException != null) { throw lastException; } From d3d3609a03e3dae80dc72444487806c4be96580a Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sat, 10 Oct 2020 01:04:31 -0700 Subject: [PATCH 4/6] adjust logging --- .../client/src/main/java/seaweedfs/client/SeaweedRead.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java index c8a9e49f1..0e8919d06 100644 --- a/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java +++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java @@ -88,8 +88,8 @@ public class SeaweedRead { } int len = (int) chunkView.size; - LOG.debug("readChunkView fid:{} chunkData.length:{} chunkView[{};{}) buf[{},{})/{} startOffset:{}", - chunkView.fileId, chunkData.length, chunkView.offset, chunkView.offset + chunkView.size, bufOffset, bufOffset + len, buffer.length, startOffset); + LOG.debug("readChunkView fid:{} chunkData.length:{} chunkView.offset:{} chunkView[{};{}) buf[{},{})/{} startOffset:{}", + chunkView.fileId, chunkData.length, chunkView.offset, chunkView.logicOffset, chunkView.logicOffset + chunkView.size, bufOffset, bufOffset + len, buffer.length, startOffset); System.arraycopy(chunkData, (int) (startOffset - chunkView.logicOffset + chunkView.offset), buffer, (int) bufOffset, len); return len; From 9449100757e8d9cd37944bde48b89616db20bf03 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sat, 10 Oct 2020 01:06:54 -0700 Subject: [PATCH 5/6] adjust throwing exception --- .../client/src/main/java/seaweedfs/client/SeaweedRead.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java index 0e8919d06..ab2407dec 100644 --- a/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java +++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java @@ -104,6 +104,7 @@ public class SeaweedRead { String url = String.format("http://%s/%s", location.getUrl(), chunkView.fileId); try { data = doFetchOneFullChunkData(chunkView, url); + lastException = null; break; } catch (IOException ioe) { LOG.debug("doFetchFullChunkData {} :{}", url, ioe); @@ -119,7 +120,7 @@ public class SeaweedRead { } } - if (data == null && lastException != null) { + if (lastException != null) { throw lastException; } From 912ef2bc531c1bd09b21cadc87b8ea2620311cf3 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sat, 10 Oct 2020 01:14:17 -0700 Subject: [PATCH 6/6] Hadoop: remove unused variable bufferSize --- .../src/main/java/seaweed/hdfs/SeaweedFileSystem.java | 2 +- .../main/java/seaweed/hdfs/SeaweedFileSystemStore.java | 8 +++----- .../src/main/java/seaweed/hdfs/SeaweedInputStream.java | 5 +---- .../src/main/java/seaweed/hdfs/SeaweedFileSystem.java | 2 +- .../main/java/seaweed/hdfs/SeaweedFileSystemStore.java | 8 +++----- .../src/main/java/seaweed/hdfs/SeaweedInputStream.java | 5 +---- 6 files changed, 10 insertions(+), 20 deletions(-) diff --git a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystem.java b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystem.java index 6551548fa..44c2ef053 100644 --- a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystem.java +++ b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystem.java @@ -74,7 +74,7 @@ public class SeaweedFileSystem extends FileSystem { try { int seaweedBufferSize = this.getConf().getInt(FS_SEAWEED_BUFFER_SIZE, FS_SEAWEED_DEFAULT_BUFFER_SIZE); - FSInputStream inputStream = seaweedFileSystemStore.openFileForRead(path, statistics, seaweedBufferSize); + FSInputStream inputStream = seaweedFileSystemStore.openFileForRead(path, statistics); return new FSDataInputStream(new BufferedFSInputStream(inputStream, 4 * seaweedBufferSize)); } catch (Exception ex) { LOG.warn("open path: {} bufferSize:{}", path, bufferSize, ex); diff --git a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java index 3dc38fe1e..eec5bd2d3 100644 --- a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java +++ b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java @@ -205,10 +205,9 @@ public class SeaweedFileSystemStore { } - public FSInputStream openFileForRead(final Path path, FileSystem.Statistics statistics, - int bufferSize) throws IOException { + public FSInputStream openFileForRead(final Path path, FileSystem.Statistics statistics) throws IOException { - LOG.debug("openFileForRead path:{} bufferSize:{}", path, bufferSize); + LOG.debug("openFileForRead path:{}", path); FilerProto.Entry entry = lookupEntry(path); @@ -219,8 +218,7 @@ public class SeaweedFileSystemStore { return new SeaweedInputStream(filerGrpcClient, statistics, path.toUri().getPath(), - entry, - bufferSize); + entry); } public void setOwner(Path path, String owner, String group) { diff --git a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedInputStream.java b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedInputStream.java index 36c0766a4..8bda2e092 100644 --- a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedInputStream.java +++ b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedInputStream.java @@ -25,7 +25,6 @@ public class SeaweedInputStream extends FSInputStream { private final FilerProto.Entry entry; private final List visibleIntervalList; private final long contentLength; - private final int bufferSize; // default buffer size private long position = 0; // cursor of the file @@ -35,14 +34,12 @@ public class SeaweedInputStream extends FSInputStream { final FilerGrpcClient filerGrpcClient, final Statistics statistics, final String path, - final FilerProto.Entry entry, - final int bufferSize) throws IOException { + final FilerProto.Entry entry) throws IOException { this.filerGrpcClient = filerGrpcClient; this.statistics = statistics; this.path = path; this.entry = entry; this.contentLength = SeaweedRead.fileSize(entry); - this.bufferSize = bufferSize; this.visibleIntervalList = SeaweedRead.nonOverlappingVisibleIntervals(filerGrpcClient, entry.getChunksList()); diff --git a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java index 6551548fa..44c2ef053 100644 --- a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java +++ b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java @@ -74,7 +74,7 @@ public class SeaweedFileSystem extends FileSystem { try { int seaweedBufferSize = this.getConf().getInt(FS_SEAWEED_BUFFER_SIZE, FS_SEAWEED_DEFAULT_BUFFER_SIZE); - FSInputStream inputStream = seaweedFileSystemStore.openFileForRead(path, statistics, seaweedBufferSize); + FSInputStream inputStream = seaweedFileSystemStore.openFileForRead(path, statistics); return new FSDataInputStream(new BufferedFSInputStream(inputStream, 4 * seaweedBufferSize)); } catch (Exception ex) { LOG.warn("open path: {} bufferSize:{}", path, bufferSize, ex); diff --git a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java index 3dc38fe1e..eec5bd2d3 100644 --- a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java +++ b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java @@ -205,10 +205,9 @@ public class SeaweedFileSystemStore { } - public FSInputStream openFileForRead(final Path path, FileSystem.Statistics statistics, - int bufferSize) throws IOException { + public FSInputStream openFileForRead(final Path path, FileSystem.Statistics statistics) throws IOException { - LOG.debug("openFileForRead path:{} bufferSize:{}", path, bufferSize); + LOG.debug("openFileForRead path:{}", path); FilerProto.Entry entry = lookupEntry(path); @@ -219,8 +218,7 @@ public class SeaweedFileSystemStore { return new SeaweedInputStream(filerGrpcClient, statistics, path.toUri().getPath(), - entry, - bufferSize); + entry); } public void setOwner(Path path, String owner, String group) { diff --git a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedInputStream.java b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedInputStream.java index 36c0766a4..8bda2e092 100644 --- a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedInputStream.java +++ b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedInputStream.java @@ -25,7 +25,6 @@ public class SeaweedInputStream extends FSInputStream { private final FilerProto.Entry entry; private final List visibleIntervalList; private final long contentLength; - private final int bufferSize; // default buffer size private long position = 0; // cursor of the file @@ -35,14 +34,12 @@ public class SeaweedInputStream extends FSInputStream { final FilerGrpcClient filerGrpcClient, final Statistics statistics, final String path, - final FilerProto.Entry entry, - final int bufferSize) throws IOException { + final FilerProto.Entry entry) throws IOException { this.filerGrpcClient = filerGrpcClient; this.statistics = statistics; this.path = path; this.entry = entry; this.contentLength = SeaweedRead.fileSize(entry); - this.bufferSize = bufferSize; this.visibleIntervalList = SeaweedRead.nonOverlappingVisibleIntervals(filerGrpcClient, entry.getChunksList());