From 3857f9c8404c6f251135268e42c79bb83c13b2b0 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Wed, 2 Dec 2020 23:45:39 -0800 Subject: [PATCH] Hadoop: switch to ByteBuffer fix https://github.com/chrislusf/seaweedfs/issues/1645 --- other/java/client/pom_debug.xml | 5 +++ .../java/seaweedfs/client/SeaweedRead.java | 20 +++++----- .../java/seaweed/hdfs/SeaweedInputStream.java | 39 +++++++++++-------- .../seaweed/hdfs/SeaweedFileSystemStore.java | 2 +- .../java/seaweed/hdfs/SeaweedInputStream.java | 31 ++++++++++----- 5 files changed, 60 insertions(+), 37 deletions(-) diff --git a/other/java/client/pom_debug.xml b/other/java/client/pom_debug.xml index acdf621a5..078778b6c 100644 --- a/other/java/client/pom_debug.xml +++ b/other/java/client/pom_debug.xml @@ -68,6 +68,11 @@ 4.13.1 test + + javax.annotation + javax.annotation-api + 1.3.2 + diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java index 2b530d2dd..c45987bed 100644 --- a/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java +++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java @@ -12,6 +12,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.*; public class SeaweedRead { @@ -23,10 +24,9 @@ public class SeaweedRead { // returns bytesRead public static long read(FilerGrpcClient filerGrpcClient, List visibleIntervals, - final long position, final byte[] buffer, final int bufferOffset, - final int bufferLength, final long fileSize) throws IOException { + final long position, final ByteBuffer buf, final long fileSize) throws IOException { - List chunkViews = viewFromVisibles(visibleIntervals, position, bufferLength); + List chunkViews = viewFromVisibles(visibleIntervals, position, buf.remaining()); Map knownLocations = new HashMap<>(); @@ -59,6 +59,7 @@ public class SeaweedRead { if (startOffset < chunkView.logicOffset) { long gap = chunkView.logicOffset - startOffset; LOG.debug("zero [{},{})", startOffset, startOffset + gap); + buf.position(buf.position()+ (int)gap); readCount += gap; startOffset += gap; } @@ -70,7 +71,7 @@ public class SeaweedRead { return 0; } - int len = readChunkView(startOffset, buffer, bufferOffset + readCount, chunkView, locations); + int len = readChunkView(startOffset, buf, chunkView, locations); LOG.debug("read [{},{}) {} size {}", startOffset, startOffset + len, chunkView.fileId, chunkView.size); @@ -79,11 +80,12 @@ public class SeaweedRead { } - long limit = Math.min(bufferOffset + bufferLength, fileSize); + long limit = Math.min(buf.limit(), fileSize); if (startOffset < limit) { long gap = limit - startOffset; LOG.debug("zero2 [{},{})", startOffset, startOffset + gap); + buf.position(buf.position()+ (int)gap); readCount += gap; startOffset += gap; } @@ -91,7 +93,7 @@ public class SeaweedRead { return readCount; } - private static int readChunkView(long startOffset, byte[] buffer, long bufOffset, ChunkView chunkView, FilerProto.Locations locations) throws IOException { + private static int readChunkView(long startOffset, ByteBuffer buf, ChunkView chunkView, FilerProto.Locations locations) throws IOException { byte[] chunkData = chunkCache.getChunk(chunkView.fileId); @@ -101,9 +103,9 @@ public class SeaweedRead { } int len = (int) chunkView.size; - LOG.debug("readChunkView fid:{} chunkData.length:{} chunkView.offset:{} chunkView[{};{}) buf[{},{})/{} startOffset:{}", - chunkView.fileId, chunkData.length, chunkView.offset, chunkView.logicOffset, chunkView.logicOffset + chunkView.size, bufOffset, bufOffset + len, buffer.length, startOffset); - System.arraycopy(chunkData, (int) (startOffset - chunkView.logicOffset + chunkView.offset), buffer, (int) bufOffset, len); + LOG.debug("readChunkView fid:{} chunkData.length:{} chunkView.offset:{} chunkView[{};{}) startOffset:{}", + chunkView.fileId, chunkData.length, chunkView.offset, chunkView.logicOffset, chunkView.logicOffset + chunkView.size, startOffset); + buf.put(chunkData, (int) (startOffset - chunkView.logicOffset + chunkView.offset), len); return len; } diff --git a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedInputStream.java b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedInputStream.java index 2cf544162..690366849 100644 --- a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedInputStream.java +++ b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedInputStream.java @@ -65,14 +65,8 @@ public class SeaweedInputStream extends FSInputStream implements ByteBufferReada } @Override - public synchronized int read(final byte[] b, final int off, final int len) throws IOException { + public int read(final byte[] b, final int off, final int len) throws IOException { - if (position < 0) { - throw new IllegalArgumentException("attempting to read from negative offset"); - } - if (position >= contentLength) { - return -1; // Hadoop prefers -1 to EOFException - } if (b == null) { throw new IllegalArgumentException("null byte array passed in to read() method"); } @@ -86,11 +80,29 @@ public class SeaweedInputStream extends FSInputStream implements ByteBufferReada throw new IllegalArgumentException("requested read length is more than will fit after requested offset in buffer"); } + ByteBuffer buf = ByteBuffer.wrap(b, off, len); + return read(buf); + + } + + // implement ByteBufferReadable + @Override + public synchronized int read(ByteBuffer buf) throws IOException { + + if (position < 0) { + throw new IllegalArgumentException("attempting to read from negative offset"); + } + if (position >= contentLength) { + return -1; // Hadoop prefers -1 to EOFException + } + long bytesRead = 0; - if (position+len <= entry.getContent().size()) { - entry.getContent().copyTo(b, (int) position, (int) off, len); + int len = buf.remaining(); + int start = (int) this.position; + if (start+len <= entry.getContent().size()) { + entry.getContent().substring(start, start+len).copyTo(buf); } else { - bytesRead = SeaweedRead.read(this.filerGrpcClient, this.visibleIntervalList, this.position, b, off, len, SeaweedRead.fileSize(entry)); + bytesRead = SeaweedRead.read(this.filerGrpcClient, this.visibleIntervalList, this.position, buf, SeaweedRead.fileSize(entry)); } if (bytesRead > Integer.MAX_VALUE) { @@ -105,13 +117,6 @@ public class SeaweedInputStream extends FSInputStream implements ByteBufferReada } return (int) bytesRead; - - } - - // implement ByteBufferReadable - @Override - public synchronized int read(ByteBuffer buf) throws IOException { - return read(buf.array(), buf.position(), buf.remaining()); } /** diff --git a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java index 2ef1a7468..14b32528e 100644 --- a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java +++ b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java @@ -288,4 +288,4 @@ public class SeaweedFileSystemStore { } -} \ No newline at end of file +} diff --git a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedInputStream.java b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedInputStream.java index 2cf544162..752f06374 100644 --- a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedInputStream.java +++ b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedInputStream.java @@ -86,11 +86,29 @@ public class SeaweedInputStream extends FSInputStream implements ByteBufferReada throw new IllegalArgumentException("requested read length is more than will fit after requested offset in buffer"); } + ByteBuffer buf = ByteBuffer.wrap(b, off, len); + return read(buf); + + } + + // implement ByteBufferReadable + @Override + public synchronized int read(ByteBuffer buf) throws IOException { + + if (position < 0) { + throw new IllegalArgumentException("attempting to read from negative offset"); + } + if (position >= contentLength) { + return -1; // Hadoop prefers -1 to EOFException + } + long bytesRead = 0; - if (position+len <= entry.getContent().size()) { - entry.getContent().copyTo(b, (int) position, (int) off, len); + int len = buf.remaining(); + int start = (int) this.position; + if (start+len <= entry.getContent().size()) { + entry.getContent().substring(start, start+len).copyTo(buf); } else { - bytesRead = SeaweedRead.read(this.filerGrpcClient, this.visibleIntervalList, this.position, b, off, len, SeaweedRead.fileSize(entry)); + bytesRead = SeaweedRead.read(this.filerGrpcClient, this.visibleIntervalList, this.position, buf, SeaweedRead.fileSize(entry)); } if (bytesRead > Integer.MAX_VALUE) { @@ -105,13 +123,6 @@ public class SeaweedInputStream extends FSInputStream implements ByteBufferReada } return (int) bytesRead; - - } - - // implement ByteBufferReadable - @Override - public synchronized int read(ByteBuffer buf) throws IOException { - return read(buf.array(), buf.position(), buf.remaining()); } /**