From f970abf14aa38240677131c45c7b7c0d16a8a39e Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sun, 25 Nov 2018 18:01:57 -0800 Subject: [PATCH] WIP: adding SeaweedOutputStream for write --- other/java/hdfs/pom.xml | 19 +- .../seaweed/hdfs/SeaweedOutputStream.java | 323 ++++++++++++++++++ .../main/java/seaweed/hdfs/SeaweedWrite.java | 18 + 3 files changed, 359 insertions(+), 1 deletion(-) create mode 100644 other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedOutputStream.java create mode 100644 other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedWrite.java diff --git a/other/java/hdfs/pom.xml b/other/java/hdfs/pom.xml index 1fb554a19..c91fa2bfe 100644 --- a/other/java/hdfs/pom.xml +++ b/other/java/hdfs/pom.xml @@ -7,9 +7,21 @@ seaweedfs hadoop-client 1.0-SNAPSHOT + + + + org.apache.maven.plugins + maven-compiler-plugin + + 7 + 7 + + + + - 2.2.0 + 3.1.1 @@ -23,6 +35,11 @@ client 1.0-SNAPSHOT + + org.apache.hadoop + hadoop-common + ${hadoop.version} + diff --git a/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedOutputStream.java b/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedOutputStream.java new file mode 100644 index 000000000..d0d488b1d --- /dev/null +++ b/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedOutputStream.java @@ -0,0 +1,323 @@ +package seaweed.hdfs; + +// adapted from org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream + +import com.google.common.base.Preconditions; +import org.apache.hadoop.fs.FSExceptionMessages; +import org.apache.hadoop.fs.StreamCapabilities; +import org.apache.hadoop.fs.Syncable; +import seaweedfs.client.FilerGrpcClient; +import seaweedfs.client.FilerProto; + +import java.io.IOException; +import java.io.InterruptedIOException; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.List; +import java.util.Locale; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +public class SeaweedOutputStream extends OutputStream implements Syncable, StreamCapabilities { + + private final FilerGrpcClient filerGrpcClient; + private final List entries = new ArrayList<>(); + private final String path; + private final int bufferSize; + private final int maxConcurrentRequestCount; + private final ThreadPoolExecutor threadExecutor; + private final ExecutorCompletionService completionService; + private long position; + private boolean closed; + private boolean supportFlush = true; + private volatile IOException lastError; + private long lastFlushOffset; + private long lastTotalAppendOffset = 0; + private byte[] buffer; + private int bufferIndex; + private ConcurrentLinkedDeque writeOperations; + + public SeaweedOutputStream(FilerGrpcClient filerGrpcClient, + final String path, + final long position, + final int bufferSize) { + this.filerGrpcClient = filerGrpcClient; + this.path = path; + this.position = position; + this.closed = false; + this.lastError = null; + this.lastFlushOffset = 0; + this.bufferSize = bufferSize; + this.buffer = new byte[bufferSize]; + this.bufferIndex = 0; + this.writeOperations = new ConcurrentLinkedDeque<>(); + + this.maxConcurrentRequestCount = 4 * Runtime.getRuntime().availableProcessors(); + + this.threadExecutor + = new ThreadPoolExecutor(maxConcurrentRequestCount, + maxConcurrentRequestCount, + 10L, + TimeUnit.SECONDS, + new LinkedBlockingQueue()); + this.completionService = new ExecutorCompletionService<>(this.threadExecutor); + + } + + private synchronized void flushWrittenBytesToServiceInternal(final long offset) throws IOException { + try { + SeaweedWrite.writeMeta(filerGrpcClient, path, entries); + } catch (Exception ex) { + throw new IOException(ex); + } + this.lastFlushOffset = offset; + } + + @Override + public void write(final int byteVal) throws IOException { + write(new byte[]{(byte) (byteVal & 0xFF)}); + } + + @Override + public synchronized void write(final byte[] data, final int off, final int length) + throws IOException { + maybeThrowLastError(); + + Preconditions.checkArgument(data != null, "null data"); + + if (off < 0 || length < 0 || length > data.length - off) { + throw new IndexOutOfBoundsException(); + } + + int currentOffset = off; + int writableBytes = bufferSize - bufferIndex; + int numberOfBytesToWrite = length; + + while (numberOfBytesToWrite > 0) { + if (writableBytes <= numberOfBytesToWrite) { + System.arraycopy(data, currentOffset, buffer, bufferIndex, writableBytes); + bufferIndex += writableBytes; + writeCurrentBufferToService(); + currentOffset += writableBytes; + numberOfBytesToWrite = numberOfBytesToWrite - writableBytes; + } else { + System.arraycopy(data, currentOffset, buffer, bufferIndex, numberOfBytesToWrite); + bufferIndex += numberOfBytesToWrite; + numberOfBytesToWrite = 0; + } + + writableBytes = bufferSize - bufferIndex; + } + } + + /** + * Flushes this output stream and forces any buffered output bytes to be + * written out. If any data remains in the payload it is committed to the + * service. Data is queued for writing and forced out to the service + * before the call returns. + */ + @Override + public void flush() throws IOException { + if (supportFlush) { + flushInternalAsync(); + } + } + + /** + * Similar to posix fsync, flush out the data in client's user buffer + * all the way to the disk device (but the disk may have it in its cache). + * + * @throws IOException if error occurs + */ + @Override + public void hsync() throws IOException { + if (supportFlush) { + flushInternal(); + } + } + + /** + * Flush out the data in client's user buffer. After the return of + * this call, new readers will see the data. + * + * @throws IOException if any error occurs + */ + @Override + public void hflush() throws IOException { + if (supportFlush) { + flushInternal(); + } + } + + /** + * Query the stream for a specific capability. + * + * @param capability string to query the stream support for. + * @return true for hsync and hflush. + */ + @Override + public boolean hasCapability(String capability) { + switch (capability.toLowerCase(Locale.ENGLISH)) { + case StreamCapabilities.HSYNC: + case StreamCapabilities.HFLUSH: + return supportFlush; + default: + return false; + } + } + + /** + * Force all data in the output stream to be written to Azure storage. + * Wait to return until this is complete. Close the access to the stream and + * shutdown the upload thread pool. + * If the blob was created, its lease will be released. + * Any error encountered caught in threads and stored will be rethrown here + * after cleanup. + */ + @Override + public synchronized void close() throws IOException { + if (closed) { + return; + } + + try { + flushInternal(); + threadExecutor.shutdown(); + } finally { + lastError = new IOException(FSExceptionMessages.STREAM_IS_CLOSED); + buffer = null; + bufferIndex = 0; + closed = true; + writeOperations.clear(); + if (!threadExecutor.isShutdown()) { + threadExecutor.shutdownNow(); + } + } + } + + private synchronized void writeCurrentBufferToService() throws IOException { + if (bufferIndex == 0) { + return; + } + + final byte[] bytes = buffer; + final int bytesLength = bufferIndex; + + buffer = new byte[bufferSize]; + bufferIndex = 0; + final long offset = position; + position += bytesLength; + + if (threadExecutor.getQueue().size() >= maxConcurrentRequestCount * 2) { + waitForTaskToComplete(); + } + + final Future job = completionService.submit(new Callable() { + @Override + public Void call() throws Exception { + // originally: client.append(path, offset, bytes, 0, bytesLength); + FilerProto.Entry entry = SeaweedWrite.writeData(filerGrpcClient, offset, bytes, 0, bytesLength); + entries.add(entry); + return null; + } + }); + + writeOperations.add(new WriteOperation(job, offset, bytesLength)); + + // Try to shrink the queue + shrinkWriteOperationQueue(); + } + + private void waitForTaskToComplete() throws IOException { + boolean completed; + for (completed = false; completionService.poll() != null; completed = true) { + // keep polling until there is no data + } + + if (!completed) { + try { + completionService.take(); + } catch (InterruptedException e) { + lastError = (IOException) new InterruptedIOException(e.toString()).initCause(e); + throw lastError; + } + } + } + + private void maybeThrowLastError() throws IOException { + if (lastError != null) { + throw lastError; + } + } + + /** + * Try to remove the completed write operations from the beginning of write + * operation FIFO queue. + */ + private synchronized void shrinkWriteOperationQueue() throws IOException { + try { + while (writeOperations.peek() != null && writeOperations.peek().task.isDone()) { + writeOperations.peek().task.get(); + lastTotalAppendOffset += writeOperations.peek().length; + writeOperations.remove(); + } + } catch (Exception e) { + lastError = new IOException(e); + throw lastError; + } + } + + private synchronized void flushInternal() throws IOException { + maybeThrowLastError(); + writeCurrentBufferToService(); + flushWrittenBytesToService(); + } + + private synchronized void flushInternalAsync() throws IOException { + maybeThrowLastError(); + writeCurrentBufferToService(); + flushWrittenBytesToServiceAsync(); + } + + private synchronized void flushWrittenBytesToService() throws IOException { + for (WriteOperation writeOperation : writeOperations) { + try { + writeOperation.task.get(); + } catch (Exception ex) { + lastError = new IOException(ex); + throw lastError; + } + } + flushWrittenBytesToServiceInternal(position); + } + + private synchronized void flushWrittenBytesToServiceAsync() throws IOException { + shrinkWriteOperationQueue(); + + if (this.lastTotalAppendOffset > this.lastFlushOffset) { + this.flushWrittenBytesToServiceInternal(this.lastTotalAppendOffset); + } + } + + private static class WriteOperation { + private final Future task; + private final long startOffset; + private final long length; + + WriteOperation(final Future task, final long startOffset, final long length) { + Preconditions.checkNotNull(task, "task"); + Preconditions.checkArgument(startOffset >= 0, "startOffset"); + Preconditions.checkArgument(length >= 0, "length"); + + this.task = task; + this.startOffset = startOffset; + this.length = length; + } + } + +} diff --git a/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedWrite.java b/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedWrite.java new file mode 100644 index 000000000..1624eb739 --- /dev/null +++ b/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedWrite.java @@ -0,0 +1,18 @@ +package seaweed.hdfs; + +import seaweedfs.client.FilerGrpcClient; +import seaweedfs.client.FilerProto; + +import java.util.List; + +public class SeaweedWrite { + public static FilerProto.Entry writeData(final FilerGrpcClient filerGrpcClient, final long offset, + final byte[] bytes, final long bytesOffset, final long bytesLength) { + return null; + } + + public static void writeMeta(final FilerGrpcClient filerGrpcClient, + final String path, final List entries) { + return; + } +}