diff --git a/other/java/client/pom.xml b/other/java/client/pom.xml
index 8cf72c02c..65493d5cf 100644
--- a/other/java/client/pom.xml
+++ b/other/java/client/pom.xml
@@ -5,7 +5,7 @@
com.github.chrislusf
seaweedfs-client
- 1.3.4
+ 1.3.5
org.sonatype.oss
diff --git a/other/java/client/pom.xml.deploy b/other/java/client/pom.xml.deploy
index 8cf72c02c..65493d5cf 100644
--- a/other/java/client/pom.xml.deploy
+++ b/other/java/client/pom.xml.deploy
@@ -5,7 +5,7 @@
com.github.chrislusf
seaweedfs-client
- 1.3.4
+ 1.3.5
org.sonatype.oss
diff --git a/other/java/client/pom_debug.xml b/other/java/client/pom_debug.xml
index c9a8247c4..d6fcfb11f 100644
--- a/other/java/client/pom_debug.xml
+++ b/other/java/client/pom_debug.xml
@@ -5,7 +5,7 @@
com.github.chrislusf
seaweedfs-client
- 1.3.4
+ 1.3.5
org.sonatype.oss
diff --git a/other/java/client/src/main/java/seaweedfs/client/ByteBufferPool.java b/other/java/client/src/main/java/seaweedfs/client/ByteBufferPool.java
new file mode 100644
index 000000000..897fe9694
--- /dev/null
+++ b/other/java/client/src/main/java/seaweedfs/client/ByteBufferPool.java
@@ -0,0 +1,22 @@
+package seaweedfs.client;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+public class ByteBufferPool {
+
+ static List bufferList = new ArrayList<>();
+
+ public static synchronized ByteBuffer request(int bufferSize) {
+ if (bufferList.isEmpty()) {
+ return ByteBuffer.allocate(bufferSize);
+ }
+ return bufferList.remove(bufferList.size()-1);
+ }
+
+ public static synchronized void release(ByteBuffer obj) {
+ bufferList.add(obj);
+ }
+
+}
diff --git a/other/java/hdfs2/dependency-reduced-pom.xml b/other/java/hdfs2/dependency-reduced-pom.xml
index 64de808c4..06bfda480 100644
--- a/other/java/hdfs2/dependency-reduced-pom.xml
+++ b/other/java/hdfs2/dependency-reduced-pom.xml
@@ -127,7 +127,7 @@
- 1.3.4
+ 1.3.5
2.9.2
diff --git a/other/java/hdfs2/pom.xml b/other/java/hdfs2/pom.xml
index 8bbd31f29..24699b99a 100644
--- a/other/java/hdfs2/pom.xml
+++ b/other/java/hdfs2/pom.xml
@@ -5,7 +5,7 @@
4.0.0
- 1.3.4
+ 1.3.5
2.9.2
diff --git a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedOutputStream.java b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedOutputStream.java
index 1100e9a53..1138ecca2 100644
--- a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedOutputStream.java
+++ b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedOutputStream.java
@@ -7,6 +7,7 @@ import org.apache.hadoop.fs.FSExceptionMessages;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import seaweedfs.client.ByteBufferPool;
import seaweedfs.client.FilerGrpcClient;
import seaweedfs.client.FilerProto;
import seaweedfs.client.SeaweedWrite;
@@ -14,7 +15,7 @@ import seaweedfs.client.SeaweedWrite;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.io.OutputStream;
-import java.util.Arrays;
+import java.nio.ByteBuffer;
import java.util.concurrent.*;
import static seaweed.hdfs.SeaweedFileSystemStore.getParentDirectory;
@@ -30,15 +31,15 @@ public class SeaweedOutputStream extends OutputStream {
private final ThreadPoolExecutor threadExecutor;
private final ExecutorCompletionService completionService;
private final FilerProto.Entry.Builder entry;
- private final boolean supportFlush = true;
+ private final boolean supportFlush = false; // true;
private final ConcurrentLinkedDeque writeOperations;
private long position;
private boolean closed;
private volatile IOException lastError;
private long lastFlushOffset;
private long lastTotalAppendOffset = 0;
- private byte[] buffer;
- private int bufferIndex;
+ private ByteBuffer buffer;
+ private long outputIndex;
private String replication = "000";
public SeaweedOutputStream(FilerGrpcClient filerGrpcClient, final Path path, FilerProto.Entry.Builder entry,
@@ -51,8 +52,8 @@ public class SeaweedOutputStream extends OutputStream {
this.lastError = null;
this.lastFlushOffset = 0;
this.bufferSize = bufferSize;
- // this.buffer = new byte[bufferSize];
- this.bufferIndex = 0;
+ this.buffer = ByteBufferPool.request(bufferSize);
+ this.outputIndex = 0;
this.writeOperations = new ConcurrentLinkedDeque<>();
this.maxConcurrentRequestCount = 4 * Runtime.getRuntime().availableProcessors();
@@ -94,41 +95,29 @@ public class SeaweedOutputStream extends OutputStream {
throw new IndexOutOfBoundsException();
}
+ // System.out.println(path + " write [" + (outputIndex + off) + "," + ((outputIndex + off) + length) + ")");
+
int currentOffset = off;
- int writableBytes = bufferSize - bufferIndex;
+ int writableBytes = bufferSize - buffer.position();
int numberOfBytesToWrite = length;
while (numberOfBytesToWrite > 0) {
- if (buffer == null) {
- buffer = new byte[32];
- }
- // ensureCapacity
- if (numberOfBytesToWrite > buffer.length - bufferIndex) {
- int capacity = buffer.length;
- while(capacity-bufferIndex= maxConcurrentRequestCount * 2) {
- waitForTaskToComplete();
- }
-
- final Future job = completionService.submit(new Callable() {
- @Override
- public Void call() throws Exception {
- // originally: client.append(path, offset, bytes, 0, bytesLength);
- SeaweedWrite.writeData(entry, replication, filerGrpcClient, offset, bytes, 0, bytesLength);
- return null;
- }
- });
-
- writeOperations.add(new WriteOperation(job, offset, bytesLength));
-
- // Try to shrink the queue
- shrinkWriteOperationQueue();
}
private void waitForTaskToComplete() throws IOException {
diff --git a/other/java/hdfs3/dependency-reduced-pom.xml b/other/java/hdfs3/dependency-reduced-pom.xml
index a60b0c575..b965219e0 100644
--- a/other/java/hdfs3/dependency-reduced-pom.xml
+++ b/other/java/hdfs3/dependency-reduced-pom.xml
@@ -127,7 +127,7 @@
- 1.3.4
+ 1.3.5
3.1.1
diff --git a/other/java/hdfs3/pom.xml b/other/java/hdfs3/pom.xml
index b55d2ac84..8a35423a6 100644
--- a/other/java/hdfs3/pom.xml
+++ b/other/java/hdfs3/pom.xml
@@ -5,7 +5,7 @@
4.0.0
- 1.3.4
+ 1.3.5
3.1.1
diff --git a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedOutputStream.java b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedOutputStream.java
index 3e0e89417..9ea26776b 100644
--- a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedOutputStream.java
+++ b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedOutputStream.java
@@ -9,6 +9,7 @@ import org.apache.hadoop.fs.StreamCapabilities;
import org.apache.hadoop.fs.Syncable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import seaweedfs.client.ByteBufferPool;
import seaweedfs.client.FilerGrpcClient;
import seaweedfs.client.FilerProto;
import seaweedfs.client.SeaweedWrite;
@@ -16,6 +17,7 @@ import seaweedfs.client.SeaweedWrite;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.io.OutputStream;
+import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Locale;
import java.util.concurrent.*;
@@ -33,15 +35,15 @@ public class SeaweedOutputStream extends OutputStream implements Syncable, Strea
private final ThreadPoolExecutor threadExecutor;
private final ExecutorCompletionService completionService;
private final FilerProto.Entry.Builder entry;
- private final boolean supportFlush = true;
+ private final boolean supportFlush = false; // true;
private final ConcurrentLinkedDeque writeOperations;
private long position;
private boolean closed;
private volatile IOException lastError;
private long lastFlushOffset;
private long lastTotalAppendOffset = 0;
- private byte[] buffer;
- private int bufferIndex;
+ private ByteBuffer buffer;
+ private long outputIndex;
private String replication = "000";
public SeaweedOutputStream(FilerGrpcClient filerGrpcClient, final Path path, FilerProto.Entry.Builder entry,
@@ -54,8 +56,8 @@ public class SeaweedOutputStream extends OutputStream implements Syncable, Strea
this.lastError = null;
this.lastFlushOffset = 0;
this.bufferSize = bufferSize;
- // this.buffer = new byte[bufferSize];
- this.bufferIndex = 0;
+ this.buffer = ByteBufferPool.request(bufferSize);
+ this.outputIndex = 0;
this.writeOperations = new ConcurrentLinkedDeque<>();
this.maxConcurrentRequestCount = 4 * Runtime.getRuntime().availableProcessors();
@@ -97,41 +99,29 @@ public class SeaweedOutputStream extends OutputStream implements Syncable, Strea
throw new IndexOutOfBoundsException();
}
+ // System.out.println(path + " write [" + (outputIndex + off) + "," + ((outputIndex + off) + length) + ")");
+
int currentOffset = off;
- int writableBytes = bufferSize - bufferIndex;
+ int writableBytes = bufferSize - buffer.position();
int numberOfBytesToWrite = length;
while (numberOfBytesToWrite > 0) {
- if (buffer == null) {
- buffer = new byte[32];
- }
- // ensureCapacity
- if (numberOfBytesToWrite > buffer.length - bufferIndex) {
- int capacity = buffer.length;
- while (capacity - bufferIndex < numberOfBytesToWrite) {
- capacity = capacity << 1;
- }
- if (capacity < 0) {
- throw new OutOfMemoryError();
- }
- buffer = Arrays.copyOf(buffer, capacity);
- }
-
- if (writableBytes <= numberOfBytesToWrite) {
- System.arraycopy(data, currentOffset, buffer, bufferIndex, writableBytes);
- bufferIndex += writableBytes;
- writeCurrentBufferToService();
- currentOffset += writableBytes;
- numberOfBytesToWrite = numberOfBytesToWrite - writableBytes;
- } else {
- System.arraycopy(data, currentOffset, buffer, bufferIndex, numberOfBytesToWrite);
- bufferIndex += numberOfBytesToWrite;
- numberOfBytesToWrite = 0;
+ if (numberOfBytesToWrite < writableBytes) {
+ buffer.put(data, currentOffset, numberOfBytesToWrite);
+ outputIndex += numberOfBytesToWrite;
+ break;
}
- writableBytes = bufferSize - bufferIndex;
+ // System.out.println(path + " [" + (outputIndex + currentOffset) + "," + ((outputIndex + currentOffset) + writableBytes) + ")");
+ buffer.put(data, currentOffset, writableBytes);
+ outputIndex += writableBytes;
+ currentOffset += writableBytes;
+ writeCurrentBufferToService();
+ numberOfBytesToWrite = numberOfBytesToWrite - writableBytes;
+ writableBytes = bufferSize - buffer.position();
}
+
}
/**
@@ -210,8 +200,9 @@ public class SeaweedOutputStream extends OutputStream implements Syncable, Strea
threadExecutor.shutdown();
} finally {
lastError = new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
+ ByteBufferPool.release(buffer);
buffer = null;
- bufferIndex = 0;
+ outputIndex = 0;
closed = true;
writeOperations.clear();
if (!threadExecutor.isShutdown()) {
@@ -221,35 +212,17 @@ public class SeaweedOutputStream extends OutputStream implements Syncable, Strea
}
private synchronized void writeCurrentBufferToService() throws IOException {
- if (bufferIndex == 0) {
+ if (buffer.position() == 0) {
return;
}
- final byte[] bytes = buffer;
- final int bytesLength = bufferIndex;
-
- buffer = null; // new byte[bufferSize];
- bufferIndex = 0;
- final long offset = position;
+ buffer.flip();
+ int bytesLength = buffer.limit() - buffer.position();
+ SeaweedWrite.writeData(entry, replication, filerGrpcClient, position, buffer.array(), buffer.position(), buffer.limit());
+ // System.out.println(path + " saved [" + (position) + "," + ((position) + bytesLength) + ")");
position += bytesLength;
+ buffer.clear();
- if (threadExecutor.getQueue().size() >= maxConcurrentRequestCount * 2) {
- waitForTaskToComplete();
- }
-
- final Future job = completionService.submit(new Callable() {
- @Override
- public Void call() throws Exception {
- // originally: client.append(path, offset, bytes, 0, bytesLength);
- SeaweedWrite.writeData(entry, replication, filerGrpcClient, offset, bytes, 0, bytesLength);
- return null;
- }
- });
-
- writeOperations.add(new WriteOperation(job, offset, bytesLength));
-
- // Try to shrink the queue
- shrinkWriteOperationQueue();
}
private void waitForTaskToComplete() throws IOException {