diff --git a/other/java/hdfs/pom.xml b/other/java/hdfs/pom.xml
index 1fb554a19..c91fa2bfe 100644
--- a/other/java/hdfs/pom.xml
+++ b/other/java/hdfs/pom.xml
@@ -7,9 +7,21 @@
seaweedfs
hadoop-client
1.0-SNAPSHOT
+
+
+
+ org.apache.maven.plugins
+ maven-compiler-plugin
+
+
+ 7
+
+
+
+
- 2.2.0
+ 3.1.1
@@ -23,6 +35,11 @@
client
1.0-SNAPSHOT
+
+ org.apache.hadoop
+ hadoop-common
+ ${hadoop.version}
+
diff --git a/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedOutputStream.java b/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedOutputStream.java
new file mode 100644
index 000000000..d0d488b1d
--- /dev/null
+++ b/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedOutputStream.java
@@ -0,0 +1,323 @@
+package seaweed.hdfs;
+
+// adapted from org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.fs.FSExceptionMessages;
+import org.apache.hadoop.fs.StreamCapabilities;
+import org.apache.hadoop.fs.Syncable;
+import seaweedfs.client.FilerGrpcClient;
+import seaweedfs.client.FilerProto;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Locale;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+public class SeaweedOutputStream extends OutputStream implements Syncable, StreamCapabilities {
+
+ private final FilerGrpcClient filerGrpcClient;
+ private final List entries = new ArrayList<>();
+ private final String path;
+ private final int bufferSize;
+ private final int maxConcurrentRequestCount;
+ private final ThreadPoolExecutor threadExecutor;
+ private final ExecutorCompletionService completionService;
+ private long position;
+ private boolean closed;
+ private boolean supportFlush = true;
+ private volatile IOException lastError;
+ private long lastFlushOffset;
+ private long lastTotalAppendOffset = 0;
+ private byte[] buffer;
+ private int bufferIndex;
+ private ConcurrentLinkedDeque writeOperations;
+
+ public SeaweedOutputStream(FilerGrpcClient filerGrpcClient,
+ final String path,
+ final long position,
+ final int bufferSize) {
+ this.filerGrpcClient = filerGrpcClient;
+ this.path = path;
+ this.position = position;
+ this.closed = false;
+ this.lastError = null;
+ this.lastFlushOffset = 0;
+ this.bufferSize = bufferSize;
+ this.buffer = new byte[bufferSize];
+ this.bufferIndex = 0;
+ this.writeOperations = new ConcurrentLinkedDeque<>();
+
+ this.maxConcurrentRequestCount = 4 * Runtime.getRuntime().availableProcessors();
+
+ this.threadExecutor
+ = new ThreadPoolExecutor(maxConcurrentRequestCount,
+ maxConcurrentRequestCount,
+ 10L,
+ TimeUnit.SECONDS,
+ new LinkedBlockingQueue());
+ this.completionService = new ExecutorCompletionService<>(this.threadExecutor);
+
+ }
+
+ private synchronized void flushWrittenBytesToServiceInternal(final long offset) throws IOException {
+ try {
+ SeaweedWrite.writeMeta(filerGrpcClient, path, entries);
+ } catch (Exception ex) {
+ throw new IOException(ex);
+ }
+ this.lastFlushOffset = offset;
+ }
+
+ @Override
+ public void write(final int byteVal) throws IOException {
+ write(new byte[]{(byte) (byteVal & 0xFF)});
+ }
+
+ @Override
+ public synchronized void write(final byte[] data, final int off, final int length)
+ throws IOException {
+ maybeThrowLastError();
+
+ Preconditions.checkArgument(data != null, "null data");
+
+ if (off < 0 || length < 0 || length > data.length - off) {
+ throw new IndexOutOfBoundsException();
+ }
+
+ int currentOffset = off;
+ int writableBytes = bufferSize - bufferIndex;
+ int numberOfBytesToWrite = length;
+
+ while (numberOfBytesToWrite > 0) {
+ if (writableBytes <= numberOfBytesToWrite) {
+ System.arraycopy(data, currentOffset, buffer, bufferIndex, writableBytes);
+ bufferIndex += writableBytes;
+ writeCurrentBufferToService();
+ currentOffset += writableBytes;
+ numberOfBytesToWrite = numberOfBytesToWrite - writableBytes;
+ } else {
+ System.arraycopy(data, currentOffset, buffer, bufferIndex, numberOfBytesToWrite);
+ bufferIndex += numberOfBytesToWrite;
+ numberOfBytesToWrite = 0;
+ }
+
+ writableBytes = bufferSize - bufferIndex;
+ }
+ }
+
+ /**
+ * Flushes this output stream and forces any buffered output bytes to be
+ * written out. If any data remains in the payload it is committed to the
+ * service. Data is queued for writing and forced out to the service
+ * before the call returns.
+ */
+ @Override
+ public void flush() throws IOException {
+ if (supportFlush) {
+ flushInternalAsync();
+ }
+ }
+
+ /**
+ * Similar to posix fsync, flush out the data in client's user buffer
+ * all the way to the disk device (but the disk may have it in its cache).
+ *
+ * @throws IOException if error occurs
+ */
+ @Override
+ public void hsync() throws IOException {
+ if (supportFlush) {
+ flushInternal();
+ }
+ }
+
+ /**
+ * Flush out the data in client's user buffer. After the return of
+ * this call, new readers will see the data.
+ *
+ * @throws IOException if any error occurs
+ */
+ @Override
+ public void hflush() throws IOException {
+ if (supportFlush) {
+ flushInternal();
+ }
+ }
+
+ /**
+ * Query the stream for a specific capability.
+ *
+ * @param capability string to query the stream support for.
+ * @return true for hsync and hflush.
+ */
+ @Override
+ public boolean hasCapability(String capability) {
+ switch (capability.toLowerCase(Locale.ENGLISH)) {
+ case StreamCapabilities.HSYNC:
+ case StreamCapabilities.HFLUSH:
+ return supportFlush;
+ default:
+ return false;
+ }
+ }
+
+ /**
+ * Force all data in the output stream to be written to Azure storage.
+ * Wait to return until this is complete. Close the access to the stream and
+ * shutdown the upload thread pool.
+ * If the blob was created, its lease will be released.
+ * Any error encountered caught in threads and stored will be rethrown here
+ * after cleanup.
+ */
+ @Override
+ public synchronized void close() throws IOException {
+ if (closed) {
+ return;
+ }
+
+ try {
+ flushInternal();
+ threadExecutor.shutdown();
+ } finally {
+ lastError = new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
+ buffer = null;
+ bufferIndex = 0;
+ closed = true;
+ writeOperations.clear();
+ if (!threadExecutor.isShutdown()) {
+ threadExecutor.shutdownNow();
+ }
+ }
+ }
+
+ private synchronized void writeCurrentBufferToService() throws IOException {
+ if (bufferIndex == 0) {
+ return;
+ }
+
+ final byte[] bytes = buffer;
+ final int bytesLength = bufferIndex;
+
+ buffer = new byte[bufferSize];
+ bufferIndex = 0;
+ final long offset = position;
+ position += bytesLength;
+
+ if (threadExecutor.getQueue().size() >= maxConcurrentRequestCount * 2) {
+ waitForTaskToComplete();
+ }
+
+ final Future job = completionService.submit(new Callable() {
+ @Override
+ public Void call() throws Exception {
+ // originally: client.append(path, offset, bytes, 0, bytesLength);
+ FilerProto.Entry entry = SeaweedWrite.writeData(filerGrpcClient, offset, bytes, 0, bytesLength);
+ entries.add(entry);
+ return null;
+ }
+ });
+
+ writeOperations.add(new WriteOperation(job, offset, bytesLength));
+
+ // Try to shrink the queue
+ shrinkWriteOperationQueue();
+ }
+
+ private void waitForTaskToComplete() throws IOException {
+ boolean completed;
+ for (completed = false; completionService.poll() != null; completed = true) {
+ // keep polling until there is no data
+ }
+
+ if (!completed) {
+ try {
+ completionService.take();
+ } catch (InterruptedException e) {
+ lastError = (IOException) new InterruptedIOException(e.toString()).initCause(e);
+ throw lastError;
+ }
+ }
+ }
+
+ private void maybeThrowLastError() throws IOException {
+ if (lastError != null) {
+ throw lastError;
+ }
+ }
+
+ /**
+ * Try to remove the completed write operations from the beginning of write
+ * operation FIFO queue.
+ */
+ private synchronized void shrinkWriteOperationQueue() throws IOException {
+ try {
+ while (writeOperations.peek() != null && writeOperations.peek().task.isDone()) {
+ writeOperations.peek().task.get();
+ lastTotalAppendOffset += writeOperations.peek().length;
+ writeOperations.remove();
+ }
+ } catch (Exception e) {
+ lastError = new IOException(e);
+ throw lastError;
+ }
+ }
+
+ private synchronized void flushInternal() throws IOException {
+ maybeThrowLastError();
+ writeCurrentBufferToService();
+ flushWrittenBytesToService();
+ }
+
+ private synchronized void flushInternalAsync() throws IOException {
+ maybeThrowLastError();
+ writeCurrentBufferToService();
+ flushWrittenBytesToServiceAsync();
+ }
+
+ private synchronized void flushWrittenBytesToService() throws IOException {
+ for (WriteOperation writeOperation : writeOperations) {
+ try {
+ writeOperation.task.get();
+ } catch (Exception ex) {
+ lastError = new IOException(ex);
+ throw lastError;
+ }
+ }
+ flushWrittenBytesToServiceInternal(position);
+ }
+
+ private synchronized void flushWrittenBytesToServiceAsync() throws IOException {
+ shrinkWriteOperationQueue();
+
+ if (this.lastTotalAppendOffset > this.lastFlushOffset) {
+ this.flushWrittenBytesToServiceInternal(this.lastTotalAppendOffset);
+ }
+ }
+
+ private static class WriteOperation {
+ private final Future task;
+ private final long startOffset;
+ private final long length;
+
+ WriteOperation(final Future task, final long startOffset, final long length) {
+ Preconditions.checkNotNull(task, "task");
+ Preconditions.checkArgument(startOffset >= 0, "startOffset");
+ Preconditions.checkArgument(length >= 0, "length");
+
+ this.task = task;
+ this.startOffset = startOffset;
+ this.length = length;
+ }
+ }
+
+}
diff --git a/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedWrite.java b/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedWrite.java
new file mode 100644
index 000000000..1624eb739
--- /dev/null
+++ b/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedWrite.java
@@ -0,0 +1,18 @@
+package seaweed.hdfs;
+
+import seaweedfs.client.FilerGrpcClient;
+import seaweedfs.client.FilerProto;
+
+import java.util.List;
+
+public class SeaweedWrite {
+ public static FilerProto.Entry writeData(final FilerGrpcClient filerGrpcClient, final long offset,
+ final byte[] bytes, final long bytesOffset, final long bytesLength) {
+ return null;
+ }
+
+ public static void writeMeta(final FilerGrpcClient filerGrpcClient,
+ final String path, final List entries) {
+ return;
+ }
+}