From 7bca72deedb872402b4e597976eb198f3dad2d74 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Wed, 15 Jul 2020 23:33:31 -0700 Subject: [PATCH] reuse bytebuffer --- other/java/client/pom.xml | 2 +- other/java/client/pom.xml.deploy | 2 +- other/java/client/pom_debug.xml | 2 +- .../java/seaweedfs/client/ByteBufferPool.java | 22 +++++ other/java/hdfs2/dependency-reduced-pom.xml | 2 +- other/java/hdfs2/pom.xml | 2 +- .../seaweed/hdfs/SeaweedOutputStream.java | 88 +++++++------------ other/java/hdfs3/dependency-reduced-pom.xml | 2 +- other/java/hdfs3/pom.xml | 2 +- .../seaweed/hdfs/SeaweedOutputStream.java | 87 +++++++----------- 10 files changed, 89 insertions(+), 122 deletions(-) create mode 100644 other/java/client/src/main/java/seaweedfs/client/ByteBufferPool.java diff --git a/other/java/client/pom.xml b/other/java/client/pom.xml index 8cf72c02c..65493d5cf 100644 --- a/other/java/client/pom.xml +++ b/other/java/client/pom.xml @@ -5,7 +5,7 @@ com.github.chrislusf seaweedfs-client - 1.3.4 + 1.3.5 org.sonatype.oss diff --git a/other/java/client/pom.xml.deploy b/other/java/client/pom.xml.deploy index 8cf72c02c..65493d5cf 100644 --- a/other/java/client/pom.xml.deploy +++ b/other/java/client/pom.xml.deploy @@ -5,7 +5,7 @@ com.github.chrislusf seaweedfs-client - 1.3.4 + 1.3.5 org.sonatype.oss diff --git a/other/java/client/pom_debug.xml b/other/java/client/pom_debug.xml index c9a8247c4..d6fcfb11f 100644 --- a/other/java/client/pom_debug.xml +++ b/other/java/client/pom_debug.xml @@ -5,7 +5,7 @@ com.github.chrislusf seaweedfs-client - 1.3.4 + 1.3.5 org.sonatype.oss diff --git a/other/java/client/src/main/java/seaweedfs/client/ByteBufferPool.java b/other/java/client/src/main/java/seaweedfs/client/ByteBufferPool.java new file mode 100644 index 000000000..897fe9694 --- /dev/null +++ b/other/java/client/src/main/java/seaweedfs/client/ByteBufferPool.java @@ -0,0 +1,22 @@ +package seaweedfs.client; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; + +public class ByteBufferPool { + + static List bufferList = new ArrayList<>(); + + public static synchronized ByteBuffer request(int bufferSize) { + if (bufferList.isEmpty()) { + return ByteBuffer.allocate(bufferSize); + } + return bufferList.remove(bufferList.size()-1); + } + + public static synchronized void release(ByteBuffer obj) { + bufferList.add(obj); + } + +} diff --git a/other/java/hdfs2/dependency-reduced-pom.xml b/other/java/hdfs2/dependency-reduced-pom.xml index 64de808c4..06bfda480 100644 --- a/other/java/hdfs2/dependency-reduced-pom.xml +++ b/other/java/hdfs2/dependency-reduced-pom.xml @@ -127,7 +127,7 @@ - 1.3.4 + 1.3.5 2.9.2 diff --git a/other/java/hdfs2/pom.xml b/other/java/hdfs2/pom.xml index 8bbd31f29..24699b99a 100644 --- a/other/java/hdfs2/pom.xml +++ b/other/java/hdfs2/pom.xml @@ -5,7 +5,7 @@ 4.0.0 - 1.3.4 + 1.3.5 2.9.2 diff --git a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedOutputStream.java b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedOutputStream.java index 1100e9a53..1138ecca2 100644 --- a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedOutputStream.java +++ b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedOutputStream.java @@ -7,6 +7,7 @@ import org.apache.hadoop.fs.FSExceptionMessages; import org.apache.hadoop.fs.Path; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import seaweedfs.client.ByteBufferPool; import seaweedfs.client.FilerGrpcClient; import seaweedfs.client.FilerProto; import seaweedfs.client.SeaweedWrite; @@ -14,7 +15,7 @@ import seaweedfs.client.SeaweedWrite; import java.io.IOException; import java.io.InterruptedIOException; import java.io.OutputStream; -import java.util.Arrays; +import java.nio.ByteBuffer; import java.util.concurrent.*; import static seaweed.hdfs.SeaweedFileSystemStore.getParentDirectory; @@ -30,15 +31,15 @@ public class SeaweedOutputStream extends OutputStream { private final ThreadPoolExecutor threadExecutor; private final ExecutorCompletionService completionService; private final FilerProto.Entry.Builder entry; - private final boolean supportFlush = true; + private final boolean supportFlush = false; // true; private final ConcurrentLinkedDeque writeOperations; private long position; private boolean closed; private volatile IOException lastError; private long lastFlushOffset; private long lastTotalAppendOffset = 0; - private byte[] buffer; - private int bufferIndex; + private ByteBuffer buffer; + private long outputIndex; private String replication = "000"; public SeaweedOutputStream(FilerGrpcClient filerGrpcClient, final Path path, FilerProto.Entry.Builder entry, @@ -51,8 +52,8 @@ public class SeaweedOutputStream extends OutputStream { this.lastError = null; this.lastFlushOffset = 0; this.bufferSize = bufferSize; - // this.buffer = new byte[bufferSize]; - this.bufferIndex = 0; + this.buffer = ByteBufferPool.request(bufferSize); + this.outputIndex = 0; this.writeOperations = new ConcurrentLinkedDeque<>(); this.maxConcurrentRequestCount = 4 * Runtime.getRuntime().availableProcessors(); @@ -94,41 +95,29 @@ public class SeaweedOutputStream extends OutputStream { throw new IndexOutOfBoundsException(); } + // System.out.println(path + " write [" + (outputIndex + off) + "," + ((outputIndex + off) + length) + ")"); + int currentOffset = off; - int writableBytes = bufferSize - bufferIndex; + int writableBytes = bufferSize - buffer.position(); int numberOfBytesToWrite = length; while (numberOfBytesToWrite > 0) { - if (buffer == null) { - buffer = new byte[32]; - } - // ensureCapacity - if (numberOfBytesToWrite > buffer.length - bufferIndex) { - int capacity = buffer.length; - while(capacity-bufferIndex= maxConcurrentRequestCount * 2) { - waitForTaskToComplete(); - } - - final Future job = completionService.submit(new Callable() { - @Override - public Void call() throws Exception { - // originally: client.append(path, offset, bytes, 0, bytesLength); - SeaweedWrite.writeData(entry, replication, filerGrpcClient, offset, bytes, 0, bytesLength); - return null; - } - }); - - writeOperations.add(new WriteOperation(job, offset, bytesLength)); - - // Try to shrink the queue - shrinkWriteOperationQueue(); } private void waitForTaskToComplete() throws IOException { diff --git a/other/java/hdfs3/dependency-reduced-pom.xml b/other/java/hdfs3/dependency-reduced-pom.xml index a60b0c575..b965219e0 100644 --- a/other/java/hdfs3/dependency-reduced-pom.xml +++ b/other/java/hdfs3/dependency-reduced-pom.xml @@ -127,7 +127,7 @@ - 1.3.4 + 1.3.5 3.1.1 diff --git a/other/java/hdfs3/pom.xml b/other/java/hdfs3/pom.xml index b55d2ac84..8a35423a6 100644 --- a/other/java/hdfs3/pom.xml +++ b/other/java/hdfs3/pom.xml @@ -5,7 +5,7 @@ 4.0.0 - 1.3.4 + 1.3.5 3.1.1 diff --git a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedOutputStream.java b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedOutputStream.java index 3e0e89417..9ea26776b 100644 --- a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedOutputStream.java +++ b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedOutputStream.java @@ -9,6 +9,7 @@ import org.apache.hadoop.fs.StreamCapabilities; import org.apache.hadoop.fs.Syncable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import seaweedfs.client.ByteBufferPool; import seaweedfs.client.FilerGrpcClient; import seaweedfs.client.FilerProto; import seaweedfs.client.SeaweedWrite; @@ -16,6 +17,7 @@ import seaweedfs.client.SeaweedWrite; import java.io.IOException; import java.io.InterruptedIOException; import java.io.OutputStream; +import java.nio.ByteBuffer; import java.util.Arrays; import java.util.Locale; import java.util.concurrent.*; @@ -33,15 +35,15 @@ public class SeaweedOutputStream extends OutputStream implements Syncable, Strea private final ThreadPoolExecutor threadExecutor; private final ExecutorCompletionService completionService; private final FilerProto.Entry.Builder entry; - private final boolean supportFlush = true; + private final boolean supportFlush = false; // true; private final ConcurrentLinkedDeque writeOperations; private long position; private boolean closed; private volatile IOException lastError; private long lastFlushOffset; private long lastTotalAppendOffset = 0; - private byte[] buffer; - private int bufferIndex; + private ByteBuffer buffer; + private long outputIndex; private String replication = "000"; public SeaweedOutputStream(FilerGrpcClient filerGrpcClient, final Path path, FilerProto.Entry.Builder entry, @@ -54,8 +56,8 @@ public class SeaweedOutputStream extends OutputStream implements Syncable, Strea this.lastError = null; this.lastFlushOffset = 0; this.bufferSize = bufferSize; - // this.buffer = new byte[bufferSize]; - this.bufferIndex = 0; + this.buffer = ByteBufferPool.request(bufferSize); + this.outputIndex = 0; this.writeOperations = new ConcurrentLinkedDeque<>(); this.maxConcurrentRequestCount = 4 * Runtime.getRuntime().availableProcessors(); @@ -97,41 +99,29 @@ public class SeaweedOutputStream extends OutputStream implements Syncable, Strea throw new IndexOutOfBoundsException(); } + // System.out.println(path + " write [" + (outputIndex + off) + "," + ((outputIndex + off) + length) + ")"); + int currentOffset = off; - int writableBytes = bufferSize - bufferIndex; + int writableBytes = bufferSize - buffer.position(); int numberOfBytesToWrite = length; while (numberOfBytesToWrite > 0) { - if (buffer == null) { - buffer = new byte[32]; - } - // ensureCapacity - if (numberOfBytesToWrite > buffer.length - bufferIndex) { - int capacity = buffer.length; - while (capacity - bufferIndex < numberOfBytesToWrite) { - capacity = capacity << 1; - } - if (capacity < 0) { - throw new OutOfMemoryError(); - } - buffer = Arrays.copyOf(buffer, capacity); - } - - if (writableBytes <= numberOfBytesToWrite) { - System.arraycopy(data, currentOffset, buffer, bufferIndex, writableBytes); - bufferIndex += writableBytes; - writeCurrentBufferToService(); - currentOffset += writableBytes; - numberOfBytesToWrite = numberOfBytesToWrite - writableBytes; - } else { - System.arraycopy(data, currentOffset, buffer, bufferIndex, numberOfBytesToWrite); - bufferIndex += numberOfBytesToWrite; - numberOfBytesToWrite = 0; + if (numberOfBytesToWrite < writableBytes) { + buffer.put(data, currentOffset, numberOfBytesToWrite); + outputIndex += numberOfBytesToWrite; + break; } - writableBytes = bufferSize - bufferIndex; + // System.out.println(path + " [" + (outputIndex + currentOffset) + "," + ((outputIndex + currentOffset) + writableBytes) + ")"); + buffer.put(data, currentOffset, writableBytes); + outputIndex += writableBytes; + currentOffset += writableBytes; + writeCurrentBufferToService(); + numberOfBytesToWrite = numberOfBytesToWrite - writableBytes; + writableBytes = bufferSize - buffer.position(); } + } /** @@ -210,8 +200,9 @@ public class SeaweedOutputStream extends OutputStream implements Syncable, Strea threadExecutor.shutdown(); } finally { lastError = new IOException(FSExceptionMessages.STREAM_IS_CLOSED); + ByteBufferPool.release(buffer); buffer = null; - bufferIndex = 0; + outputIndex = 0; closed = true; writeOperations.clear(); if (!threadExecutor.isShutdown()) { @@ -221,35 +212,17 @@ public class SeaweedOutputStream extends OutputStream implements Syncable, Strea } private synchronized void writeCurrentBufferToService() throws IOException { - if (bufferIndex == 0) { + if (buffer.position() == 0) { return; } - final byte[] bytes = buffer; - final int bytesLength = bufferIndex; - - buffer = null; // new byte[bufferSize]; - bufferIndex = 0; - final long offset = position; + buffer.flip(); + int bytesLength = buffer.limit() - buffer.position(); + SeaweedWrite.writeData(entry, replication, filerGrpcClient, position, buffer.array(), buffer.position(), buffer.limit()); + // System.out.println(path + " saved [" + (position) + "," + ((position) + bytesLength) + ")"); position += bytesLength; + buffer.clear(); - if (threadExecutor.getQueue().size() >= maxConcurrentRequestCount * 2) { - waitForTaskToComplete(); - } - - final Future job = completionService.submit(new Callable() { - @Override - public Void call() throws Exception { - // originally: client.append(path, offset, bytes, 0, bytesLength); - SeaweedWrite.writeData(entry, replication, filerGrpcClient, offset, bytes, 0, bytesLength); - return null; - } - }); - - writeOperations.add(new WriteOperation(job, offset, bytesLength)); - - // Try to shrink the queue - shrinkWriteOperationQueue(); } private void waitForTaskToComplete() throws IOException {