diff --git a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedOutputStream.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java similarity index 89% rename from other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedOutputStream.java rename to other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java index 26290c46c..b09a15a5c 100644 --- a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedOutputStream.java +++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java @@ -1,16 +1,9 @@ -package seaweed.hdfs; +package seaweedfs.client; // adapted from org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream -import com.google.common.base.Preconditions; -import org.apache.hadoop.fs.FSExceptionMessages; -import org.apache.hadoop.fs.Path; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import seaweedfs.client.ByteBufferPool; -import seaweedfs.client.FilerGrpcClient; -import seaweedfs.client.FilerProto; -import seaweedfs.client.SeaweedWrite; import java.io.IOException; import java.io.InterruptedIOException; @@ -18,20 +11,18 @@ import java.io.OutputStream; import java.nio.ByteBuffer; import java.util.concurrent.*; -import static seaweed.hdfs.SeaweedFileSystemStore.getParentDirectory; - public class SeaweedOutputStream extends OutputStream { private static final Logger LOG = LoggerFactory.getLogger(SeaweedOutputStream.class); private final FilerGrpcClient filerGrpcClient; - private final Path path; + private final String path; private final int bufferSize; private final int maxConcurrentRequestCount; private final ThreadPoolExecutor threadExecutor; private final ExecutorCompletionService completionService; private final FilerProto.Entry.Builder entry; - private final boolean supportFlush = false; // true; + protected final boolean supportFlush = false; // true; private final ConcurrentLinkedDeque writeOperations; private long position; private boolean closed; @@ -42,7 +33,7 @@ public class SeaweedOutputStream extends OutputStream { private long outputIndex; private String replication = "000"; - public SeaweedOutputStream(FilerGrpcClient filerGrpcClient, final Path path, FilerProto.Entry.Builder entry, + public SeaweedOutputStream(FilerGrpcClient filerGrpcClient, final String path, FilerProto.Entry.Builder entry, final long position, final int bufferSize, final String replication) { this.filerGrpcClient = filerGrpcClient; this.replication = replication; @@ -70,6 +61,14 @@ public class SeaweedOutputStream extends OutputStream { } + public static String getParentDirectory(String path) { + if (path.equals("/")) { + return path; + } + int lastSlashIndex = path.lastIndexOf("/"); + return path.substring(0, lastSlashIndex); + } + private synchronized void flushWrittenBytesToServiceInternal(final long offset) throws IOException { try { SeaweedWrite.writeMeta(filerGrpcClient, getParentDirectory(path), entry); @@ -89,7 +88,9 @@ public class SeaweedOutputStream extends OutputStream { throws IOException { maybeThrowLastError(); - Preconditions.checkArgument(data != null, "null data"); + if (data == null) { + return; + } if (off < 0 || length < 0 || length > data.length - off) { throw new IndexOutOfBoundsException(); @@ -152,7 +153,7 @@ public class SeaweedOutputStream extends OutputStream { flushInternal(); threadExecutor.shutdown(); } finally { - lastError = new IOException(FSExceptionMessages.STREAM_IS_CLOSED); + lastError = new IOException("Stream is closed!"); ByteBufferPool.release(buffer); buffer = null; outputIndex = 0; @@ -185,7 +186,7 @@ public class SeaweedOutputStream extends OutputStream { } final Future job = completionService.submit(() -> { // System.out.println(path + " is going to save [" + (writePosition) + "," + ((writePosition) + bytesLength) + ")"); - SeaweedWrite.writeData(entry, replication, filerGrpcClient, writePosition, bufferToWrite.array(), bufferToWrite.position(), bufferToWrite.limit(), path.toUri().getPath()); + SeaweedWrite.writeData(entry, replication, filerGrpcClient, writePosition, bufferToWrite.array(), bufferToWrite.position(), bufferToWrite.limit(), path); // System.out.println(path + " saved [" + (writePosition) + "," + ((writePosition) + bytesLength) + ")"); ByteBufferPool.release(bufferToWrite); return null; @@ -239,13 +240,13 @@ public class SeaweedOutputStream extends OutputStream { } } - private synchronized void flushInternal() throws IOException { + protected synchronized void flushInternal() throws IOException { maybeThrowLastError(); writeCurrentBufferToService(); flushWrittenBytesToService(); } - private synchronized void flushInternalAsync() throws IOException { + protected synchronized void flushInternalAsync() throws IOException { maybeThrowLastError(); writeCurrentBufferToService(); flushWrittenBytesToServiceAsync(); @@ -278,10 +279,6 @@ public class SeaweedOutputStream extends OutputStream { private final long length; WriteOperation(final Future task, final long startOffset, final long length) { - Preconditions.checkNotNull(task, "task"); - Preconditions.checkArgument(startOffset >= 0, "startOffset"); - Preconditions.checkArgument(length >= 0, "length"); - this.task = task; this.startOffset = startOffset; this.length = length; diff --git a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java index 14b32528e..719e52579 100644 --- a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java +++ b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java @@ -216,7 +216,7 @@ public class SeaweedFileSystemStore { SeaweedWrite.writeMeta(filerGrpcClient, getParentDirectory(path), entry); } - return new SeaweedOutputStream(filerGrpcClient, path, entry, writePosition, bufferSize, replication); + return new SeaweedHadoopOutputStream(filerGrpcClient, path.toString(), entry, writePosition, bufferSize, replication); } diff --git a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedHadoopOutputStream.java b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedHadoopOutputStream.java new file mode 100644 index 000000000..f7a6225d8 --- /dev/null +++ b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedHadoopOutputStream.java @@ -0,0 +1,16 @@ +package seaweed.hdfs; + +// adapted from org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream + +import seaweedfs.client.FilerGrpcClient; +import seaweedfs.client.FilerProto; +import seaweedfs.client.SeaweedOutputStream; + +public class SeaweedHadoopOutputStream extends SeaweedOutputStream { + + public SeaweedHadoopOutputStream(FilerGrpcClient filerGrpcClient, final String path, FilerProto.Entry.Builder entry, + final long position, final int bufferSize, final String replication) { + super(filerGrpcClient, path.toString(), entry, position, bufferSize, replication); + } + +} diff --git a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java index 14b32528e..719e52579 100644 --- a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java +++ b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java @@ -216,7 +216,7 @@ public class SeaweedFileSystemStore { SeaweedWrite.writeMeta(filerGrpcClient, getParentDirectory(path), entry); } - return new SeaweedOutputStream(filerGrpcClient, path, entry, writePosition, bufferSize, replication); + return new SeaweedHadoopOutputStream(filerGrpcClient, path.toString(), entry, writePosition, bufferSize, replication); } diff --git a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedHadoopOutputStream.java b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedHadoopOutputStream.java new file mode 100644 index 000000000..f65aef619 --- /dev/null +++ b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedHadoopOutputStream.java @@ -0,0 +1,64 @@ +package seaweed.hdfs; + +// adapted from org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream + +import org.apache.hadoop.fs.StreamCapabilities; +import org.apache.hadoop.fs.Syncable; +import seaweedfs.client.FilerGrpcClient; +import seaweedfs.client.FilerProto; +import seaweedfs.client.SeaweedOutputStream; + +import java.io.IOException; +import java.util.Locale; + +public class SeaweedHadoopOutputStream extends SeaweedOutputStream implements Syncable, StreamCapabilities { + + public SeaweedHadoopOutputStream(FilerGrpcClient filerGrpcClient, final String path, FilerProto.Entry.Builder entry, + final long position, final int bufferSize, final String replication) { + super(filerGrpcClient, path, entry, position, bufferSize, replication); + } + + /** + * Similar to posix fsync, flush out the data in client's user buffer + * all the way to the disk device (but the disk may have it in its cache). + * + * @throws IOException if error occurs + */ + @Override + public void hsync() throws IOException { + if (supportFlush) { + flushInternal(); + } + } + + /** + * Flush out the data in client's user buffer. After the return of + * this call, new readers will see the data. + * + * @throws IOException if any error occurs + */ + @Override + public void hflush() throws IOException { + if (supportFlush) { + flushInternal(); + } + } + + /** + * Query the stream for a specific capability. + * + * @param capability string to query the stream support for. + * @return true for hsync and hflush. + */ + @Override + public boolean hasCapability(String capability) { + switch (capability.toLowerCase(Locale.ENGLISH)) { + case StreamCapabilities.HSYNC: + case StreamCapabilities.HFLUSH: + return supportFlush; + default: + return false; + } + } + +} diff --git a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedOutputStream.java b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedOutputStream.java deleted file mode 100644 index d4c967a06..000000000 --- a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedOutputStream.java +++ /dev/null @@ -1,337 +0,0 @@ -package seaweed.hdfs; - -// adapted from org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream - -import com.google.common.base.Preconditions; -import org.apache.hadoop.fs.FSExceptionMessages; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.StreamCapabilities; -import org.apache.hadoop.fs.Syncable; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import seaweedfs.client.ByteBufferPool; -import seaweedfs.client.FilerGrpcClient; -import seaweedfs.client.FilerProto; -import seaweedfs.client.SeaweedWrite; - -import java.io.IOException; -import java.io.InterruptedIOException; -import java.io.OutputStream; -import java.nio.ByteBuffer; -import java.util.Locale; -import java.util.concurrent.*; - -import static seaweed.hdfs.SeaweedFileSystemStore.getParentDirectory; - -public class SeaweedOutputStream extends OutputStream implements Syncable, StreamCapabilities { - - private static final Logger LOG = LoggerFactory.getLogger(SeaweedOutputStream.class); - - private final FilerGrpcClient filerGrpcClient; - private final Path path; - private final int bufferSize; - private final int maxConcurrentRequestCount; - private final ThreadPoolExecutor threadExecutor; - private final ExecutorCompletionService completionService; - private final FilerProto.Entry.Builder entry; - private final boolean supportFlush = false; // true; - private final ConcurrentLinkedDeque writeOperations; - private long position; - private boolean closed; - private volatile IOException lastError; - private long lastFlushOffset; - private long lastTotalAppendOffset = 0; - private ByteBuffer buffer; - private long outputIndex; - private String replication = "000"; - - public SeaweedOutputStream(FilerGrpcClient filerGrpcClient, final Path path, FilerProto.Entry.Builder entry, - final long position, final int bufferSize, final String replication) { - this.filerGrpcClient = filerGrpcClient; - this.replication = replication; - this.path = path; - this.position = position; - this.closed = false; - this.lastError = null; - this.lastFlushOffset = 0; - this.bufferSize = bufferSize; - this.buffer = ByteBufferPool.request(bufferSize); - this.outputIndex = 0; - this.writeOperations = new ConcurrentLinkedDeque<>(); - - this.maxConcurrentRequestCount = Runtime.getRuntime().availableProcessors(); - - this.threadExecutor - = new ThreadPoolExecutor(maxConcurrentRequestCount, - maxConcurrentRequestCount, - 120L, - TimeUnit.SECONDS, - new LinkedBlockingQueue()); - this.completionService = new ExecutorCompletionService<>(this.threadExecutor); - - this.entry = entry; - - } - - private synchronized void flushWrittenBytesToServiceInternal(final long offset) throws IOException { - try { - SeaweedWrite.writeMeta(filerGrpcClient, getParentDirectory(path), entry); - } catch (Exception ex) { - throw new IOException(ex); - } - this.lastFlushOffset = offset; - } - - @Override - public void write(final int byteVal) throws IOException { - write(new byte[]{(byte) (byteVal & 0xFF)}); - } - - @Override - public synchronized void write(final byte[] data, final int off, final int length) - throws IOException { - maybeThrowLastError(); - - Preconditions.checkArgument(data != null, "null data"); - - if (off < 0 || length < 0 || length > data.length - off) { - throw new IndexOutOfBoundsException(); - } - - // System.out.println(path + " write [" + (outputIndex + off) + "," + ((outputIndex + off) + length) + ")"); - - int currentOffset = off; - int writableBytes = bufferSize - buffer.position(); - int numberOfBytesToWrite = length; - - while (numberOfBytesToWrite > 0) { - - if (numberOfBytesToWrite < writableBytes) { - buffer.put(data, currentOffset, numberOfBytesToWrite); - outputIndex += numberOfBytesToWrite; - break; - } - - // System.out.println(path + " [" + (outputIndex + currentOffset) + "," + ((outputIndex + currentOffset) + writableBytes) + ") " + buffer.capacity()); - buffer.put(data, currentOffset, writableBytes); - outputIndex += writableBytes; - currentOffset += writableBytes; - writeCurrentBufferToService(); - numberOfBytesToWrite = numberOfBytesToWrite - writableBytes; - writableBytes = bufferSize - buffer.position(); - } - - } - - /** - * Flushes this output stream and forces any buffered output bytes to be - * written out. If any data remains in the payload it is committed to the - * service. Data is queued for writing and forced out to the service - * before the call returns. - */ - @Override - public void flush() throws IOException { - if (supportFlush) { - flushInternalAsync(); - } - } - - /** - * Similar to posix fsync, flush out the data in client's user buffer - * all the way to the disk device (but the disk may have it in its cache). - * - * @throws IOException if error occurs - */ - @Override - public void hsync() throws IOException { - if (supportFlush) { - flushInternal(); - } - } - - /** - * Flush out the data in client's user buffer. After the return of - * this call, new readers will see the data. - * - * @throws IOException if any error occurs - */ - @Override - public void hflush() throws IOException { - if (supportFlush) { - flushInternal(); - } - } - - /** - * Query the stream for a specific capability. - * - * @param capability string to query the stream support for. - * @return true for hsync and hflush. - */ - @Override - public boolean hasCapability(String capability) { - switch (capability.toLowerCase(Locale.ENGLISH)) { - case StreamCapabilities.HSYNC: - case StreamCapabilities.HFLUSH: - return supportFlush; - default: - return false; - } - } - - /** - * Force all data in the output stream to be written to Azure storage. - * Wait to return until this is complete. Close the access to the stream and - * shutdown the upload thread pool. - * If the blob was created, its lease will be released. - * Any error encountered caught in threads and stored will be rethrown here - * after cleanup. - */ - @Override - public synchronized void close() throws IOException { - if (closed) { - return; - } - - LOG.debug("close path: {}", path); - try { - flushInternal(); - threadExecutor.shutdown(); - } finally { - lastError = new IOException(FSExceptionMessages.STREAM_IS_CLOSED); - ByteBufferPool.release(buffer); - buffer = null; - outputIndex = 0; - closed = true; - writeOperations.clear(); - if (!threadExecutor.isShutdown()) { - threadExecutor.shutdownNow(); - } - } - } - - private synchronized void writeCurrentBufferToService() throws IOException { - if (buffer.position() == 0) { - return; - } - - position += submitWriteBufferToService(buffer, position); - - buffer = ByteBufferPool.request(bufferSize); - - } - - private synchronized int submitWriteBufferToService(final ByteBuffer bufferToWrite, final long writePosition) throws IOException { - - bufferToWrite.flip(); - int bytesLength = bufferToWrite.limit() - bufferToWrite.position(); - - if (threadExecutor.getQueue().size() >= maxConcurrentRequestCount) { - waitForTaskToComplete(); - } - final Future job = completionService.submit(() -> { - // System.out.println(path + " is going to save [" + (writePosition) + "," + ((writePosition) + bytesLength) + ")"); - SeaweedWrite.writeData(entry, replication, filerGrpcClient, writePosition, bufferToWrite.array(), bufferToWrite.position(), bufferToWrite.limit(), path.toUri().getPath()); - // System.out.println(path + " saved [" + (writePosition) + "," + ((writePosition) + bytesLength) + ")"); - ByteBufferPool.release(bufferToWrite); - return null; - }); - - writeOperations.add(new WriteOperation(job, writePosition, bytesLength)); - - // Try to shrink the queue - shrinkWriteOperationQueue(); - - return bytesLength; - - } - - private void waitForTaskToComplete() throws IOException { - boolean completed; - for (completed = false; completionService.poll() != null; completed = true) { - // keep polling until there is no data - } - - if (!completed) { - try { - completionService.take(); - } catch (InterruptedException e) { - lastError = (IOException) new InterruptedIOException(e.toString()).initCause(e); - throw lastError; - } - } - } - - private void maybeThrowLastError() throws IOException { - if (lastError != null) { - throw lastError; - } - } - - /** - * Try to remove the completed write operations from the beginning of write - * operation FIFO queue. - */ - private synchronized void shrinkWriteOperationQueue() throws IOException { - try { - while (writeOperations.peek() != null && writeOperations.peek().task.isDone()) { - writeOperations.peek().task.get(); - lastTotalAppendOffset += writeOperations.peek().length; - writeOperations.remove(); - } - } catch (Exception e) { - lastError = new IOException(e); - throw lastError; - } - } - - private synchronized void flushInternal() throws IOException { - maybeThrowLastError(); - writeCurrentBufferToService(); - flushWrittenBytesToService(); - } - - private synchronized void flushInternalAsync() throws IOException { - maybeThrowLastError(); - writeCurrentBufferToService(); - flushWrittenBytesToServiceAsync(); - } - - private synchronized void flushWrittenBytesToService() throws IOException { - for (WriteOperation writeOperation : writeOperations) { - try { - writeOperation.task.get(); - } catch (Exception ex) { - lastError = new IOException(ex); - throw lastError; - } - } - LOG.debug("flushWrittenBytesToService: {} position:{}", path, position); - flushWrittenBytesToServiceInternal(position); - } - - private synchronized void flushWrittenBytesToServiceAsync() throws IOException { - shrinkWriteOperationQueue(); - - if (this.lastTotalAppendOffset > this.lastFlushOffset) { - this.flushWrittenBytesToServiceInternal(this.lastTotalAppendOffset); - } - } - - private static class WriteOperation { - private final Future task; - private final long startOffset; - private final long length; - - WriteOperation(final Future task, final long startOffset, final long length) { - Preconditions.checkNotNull(task, "task"); - Preconditions.checkArgument(startOffset >= 0, "startOffset"); - Preconditions.checkArgument(length >= 0, "length"); - - this.task = task; - this.startOffset = startOffset; - this.length = length; - } - } - -}