From 170ee6ef0f9a94504580db5fa8c82e4ef6d50a99 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Thu, 29 Aug 2019 23:29:10 -0700 Subject: [PATCH] tmp --- .../java/seaweedfs/client/SeaweedRead.java | 2 +- other/java/hdfs2/dependency-reduced-pom.xml | 129 ++++ other/java/hdfs2/pom.xml | 159 +++++ .../main/java/seaweed/hdfs/ReadBuffer.java | 0 .../java/seaweed/hdfs/ReadBufferManager.java | 0 .../java/seaweed/hdfs/ReadBufferStatus.java | 0 .../java/seaweed/hdfs/ReadBufferWorker.java | 0 .../java/seaweed/hdfs/SeaweedFileSystem.java | 0 .../seaweed/hdfs/SeaweedFileSystemStore.java | 0 .../java/seaweed/hdfs/SeaweedInputStream.java | 0 .../seaweed/hdfs/SeaweedOutputStream.java | 283 ++++++++ other/java/hdfs3/dependency-reduced-pom.xml | 129 ++++ other/java/{hdfs => hdfs3}/pom.xml | 0 .../main/java/seaweed/hdfs/ReadBuffer.java | 137 ++++ .../java/seaweed/hdfs/ReadBufferManager.java | 394 +++++++++++ .../java/seaweed/hdfs/ReadBufferStatus.java | 29 + .../java/seaweed/hdfs/ReadBufferWorker.java | 70 ++ .../java/seaweed/hdfs/SeaweedFileSystem.java | 611 ++++++++++++++++++ .../seaweed/hdfs/SeaweedFileSystemStore.java | 277 ++++++++ .../java/seaweed/hdfs/SeaweedInputStream.java | 371 +++++++++++ .../seaweed/hdfs/SeaweedOutputStream.java | 0 21 files changed, 2590 insertions(+), 1 deletion(-) create mode 100644 other/java/hdfs2/dependency-reduced-pom.xml create mode 100644 other/java/hdfs2/pom.xml rename other/java/{hdfs => hdfs2}/src/main/java/seaweed/hdfs/ReadBuffer.java (100%) rename other/java/{hdfs => hdfs2}/src/main/java/seaweed/hdfs/ReadBufferManager.java (100%) rename other/java/{hdfs => hdfs2}/src/main/java/seaweed/hdfs/ReadBufferStatus.java (100%) rename other/java/{hdfs => hdfs2}/src/main/java/seaweed/hdfs/ReadBufferWorker.java (100%) rename other/java/{hdfs => hdfs2}/src/main/java/seaweed/hdfs/SeaweedFileSystem.java (100%) rename other/java/{hdfs => hdfs2}/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java (100%) rename other/java/{hdfs => hdfs2}/src/main/java/seaweed/hdfs/SeaweedInputStream.java (100%) create mode 100644 other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedOutputStream.java create mode 100644 other/java/hdfs3/dependency-reduced-pom.xml rename other/java/{hdfs => hdfs3}/pom.xml (100%) create mode 100644 other/java/hdfs3/src/main/java/seaweed/hdfs/ReadBuffer.java create mode 100644 other/java/hdfs3/src/main/java/seaweed/hdfs/ReadBufferManager.java create mode 100644 other/java/hdfs3/src/main/java/seaweed/hdfs/ReadBufferStatus.java create mode 100644 other/java/hdfs3/src/main/java/seaweed/hdfs/ReadBufferWorker.java create mode 100644 other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java create mode 100644 other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java create mode 100644 other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedInputStream.java rename other/java/{hdfs => hdfs3}/src/main/java/seaweed/hdfs/SeaweedOutputStream.java (100%) diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java index a906a689b..a307983bb 100644 --- a/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java +++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java @@ -86,7 +86,7 @@ public class SeaweedRead { return 0; } - public static List viewFromVisibles(List visibleIntervals, long offset, long size) { + protected static List viewFromVisibles(List visibleIntervals, long offset, long size) { List views = new ArrayList<>(); long stop = offset + size; diff --git a/other/java/hdfs2/dependency-reduced-pom.xml b/other/java/hdfs2/dependency-reduced-pom.xml new file mode 100644 index 000000000..cfc869312 --- /dev/null +++ b/other/java/hdfs2/dependency-reduced-pom.xml @@ -0,0 +1,129 @@ + + + + oss-parent + org.sonatype.oss + 9 + ../pom.xml/pom.xml + + 4.0.0 + com.github.chrislusf + seaweedfs-hadoop-client + ${seaweedfs.client.version} + + + + maven-compiler-plugin + + 7 + 7 + + + + maven-shade-plugin + 3.2.1 + + + package + + shade + + + + + *:* + + META-INF/*.SF + META-INF/*.DSA + META-INF/*.RSA + org/slf4j/** + META-INF/maven/org.slf4j/** + + + + + + + + + com.google + shaded.com.google + + + io.grpc.internal + shaded.io.grpc.internal + + + org.apache.commons + shaded.org.apache.commons + + org.apache.hadoop + org.apache.log4j + + + + + + + + + maven-gpg-plugin + 1.5 + + + sign-artifacts + verify + + sign + + + + + + org.sonatype.plugins + nexus-staging-maven-plugin + 1.6.7 + true + + ossrh + https://oss.sonatype.org/ + true + + + + maven-source-plugin + 2.2.1 + + + attach-sources + + jar-no-fork + + + + + + maven-javadoc-plugin + 2.9.1 + + + attach-javadocs + + jar + + + + + + + + + ossrh + https://oss.sonatype.org/content/repositories/snapshots + + + + 1.1.0 + 2.7.4 + + diff --git a/other/java/hdfs2/pom.xml b/other/java/hdfs2/pom.xml new file mode 100644 index 000000000..5efb46757 --- /dev/null +++ b/other/java/hdfs2/pom.xml @@ -0,0 +1,159 @@ + + + 4.0.0 + + + 1.1.0 + 2.7.4 + + + com.github.chrislusf + seaweedfs-hadoop-client + ${seaweedfs.client.version} + + + org.sonatype.oss + oss-parent + 9 + + + + + ossrh + https://oss.sonatype.org/content/repositories/snapshots + + + + + + org.apache.maven.plugins + maven-compiler-plugin + + 7 + 7 + + + + org.apache.maven.plugins + maven-shade-plugin + 3.2.1 + + + package + + shade + + + + + *:* + + META-INF/*.SF + META-INF/*.DSA + META-INF/*.RSA + org/slf4j/** + META-INF/maven/org.slf4j/** + + + + + + + + + com.google + shaded.com.google + + + io.grpc.internal + shaded.io.grpc.internal + + + org.apache.commons + shaded.org.apache.commons + + org.apache.hadoop + org.apache.log4j + + + + + + + + + org.apache.maven.plugins + maven-gpg-plugin + 1.5 + + + sign-artifacts + verify + + sign + + + + + + org.sonatype.plugins + nexus-staging-maven-plugin + 1.6.7 + true + + ossrh + https://oss.sonatype.org/ + true + + + + org.apache.maven.plugins + maven-source-plugin + 2.2.1 + + + attach-sources + + jar-no-fork + + + + + + org.apache.maven.plugins + maven-javadoc-plugin + 2.9.1 + + + attach-javadocs + + jar + + + + + + + + + + org.apache.hadoop + hadoop-client + ${hadoop.version} + + + com.github.chrislusf + seaweedfs-client + ${seaweedfs.client.version} + + + org.apache.hadoop + hadoop-common + ${hadoop.version} + + + + diff --git a/other/java/hdfs/src/main/java/seaweed/hdfs/ReadBuffer.java b/other/java/hdfs2/src/main/java/seaweed/hdfs/ReadBuffer.java similarity index 100% rename from other/java/hdfs/src/main/java/seaweed/hdfs/ReadBuffer.java rename to other/java/hdfs2/src/main/java/seaweed/hdfs/ReadBuffer.java diff --git a/other/java/hdfs/src/main/java/seaweed/hdfs/ReadBufferManager.java b/other/java/hdfs2/src/main/java/seaweed/hdfs/ReadBufferManager.java similarity index 100% rename from other/java/hdfs/src/main/java/seaweed/hdfs/ReadBufferManager.java rename to other/java/hdfs2/src/main/java/seaweed/hdfs/ReadBufferManager.java diff --git a/other/java/hdfs/src/main/java/seaweed/hdfs/ReadBufferStatus.java b/other/java/hdfs2/src/main/java/seaweed/hdfs/ReadBufferStatus.java similarity index 100% rename from other/java/hdfs/src/main/java/seaweed/hdfs/ReadBufferStatus.java rename to other/java/hdfs2/src/main/java/seaweed/hdfs/ReadBufferStatus.java diff --git a/other/java/hdfs/src/main/java/seaweed/hdfs/ReadBufferWorker.java b/other/java/hdfs2/src/main/java/seaweed/hdfs/ReadBufferWorker.java similarity index 100% rename from other/java/hdfs/src/main/java/seaweed/hdfs/ReadBufferWorker.java rename to other/java/hdfs2/src/main/java/seaweed/hdfs/ReadBufferWorker.java diff --git a/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedFileSystem.java b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystem.java similarity index 100% rename from other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedFileSystem.java rename to other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystem.java diff --git a/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java similarity index 100% rename from other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java rename to other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java diff --git a/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedInputStream.java b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedInputStream.java similarity index 100% rename from other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedInputStream.java rename to other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedInputStream.java diff --git a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedOutputStream.java b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedOutputStream.java new file mode 100644 index 000000000..7b488a5da --- /dev/null +++ b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedOutputStream.java @@ -0,0 +1,283 @@ +package seaweed.hdfs; + +// adapted from org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream + +import com.google.common.base.Preconditions; +import org.apache.hadoop.fs.FSExceptionMessages; +import org.apache.hadoop.fs.Path; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import seaweedfs.client.FilerGrpcClient; +import seaweedfs.client.FilerProto; +import seaweedfs.client.SeaweedWrite; + +import java.io.IOException; +import java.io.InterruptedIOException; +import java.io.OutputStream; +import java.util.concurrent.*; + +import static seaweed.hdfs.SeaweedFileSystemStore.getParentDirectory; + +public class SeaweedOutputStream extends OutputStream { + + private static final Logger LOG = LoggerFactory.getLogger(SeaweedOutputStream.class); + + private final FilerGrpcClient filerGrpcClient; + private final Path path; + private final int bufferSize; + private final int maxConcurrentRequestCount; + private final ThreadPoolExecutor threadExecutor; + private final ExecutorCompletionService completionService; + private FilerProto.Entry.Builder entry; + private long position; + private boolean closed; + private boolean supportFlush = true; + private volatile IOException lastError; + private long lastFlushOffset; + private long lastTotalAppendOffset = 0; + private byte[] buffer; + private int bufferIndex; + private ConcurrentLinkedDeque writeOperations; + private String replication = "000"; + + public SeaweedOutputStream(FilerGrpcClient filerGrpcClient, final Path path, FilerProto.Entry.Builder entry, + final long position, final int bufferSize, final String replication) { + this.filerGrpcClient = filerGrpcClient; + this.replication = replication; + this.path = path; + this.position = position; + this.closed = false; + this.lastError = null; + this.lastFlushOffset = 0; + this.bufferSize = bufferSize; + this.buffer = new byte[bufferSize]; + this.bufferIndex = 0; + this.writeOperations = new ConcurrentLinkedDeque<>(); + + this.maxConcurrentRequestCount = 4 * Runtime.getRuntime().availableProcessors(); + + this.threadExecutor + = new ThreadPoolExecutor(maxConcurrentRequestCount, + maxConcurrentRequestCount, + 10L, + TimeUnit.SECONDS, + new LinkedBlockingQueue()); + this.completionService = new ExecutorCompletionService<>(this.threadExecutor); + + this.entry = entry; + + } + + private synchronized void flushWrittenBytesToServiceInternal(final long offset) throws IOException { + + LOG.debug("SeaweedWrite.writeMeta path: {} entry:{}", path, entry); + + try { + SeaweedWrite.writeMeta(filerGrpcClient, getParentDirectory(path), entry); + } catch (Exception ex) { + throw new IOException(ex); + } + this.lastFlushOffset = offset; + } + + @Override + public void write(final int byteVal) throws IOException { + write(new byte[]{(byte) (byteVal & 0xFF)}); + } + + @Override + public synchronized void write(final byte[] data, final int off, final int length) + throws IOException { + maybeThrowLastError(); + + Preconditions.checkArgument(data != null, "null data"); + + if (off < 0 || length < 0 || length > data.length - off) { + throw new IndexOutOfBoundsException(); + } + + int currentOffset = off; + int writableBytes = bufferSize - bufferIndex; + int numberOfBytesToWrite = length; + + while (numberOfBytesToWrite > 0) { + if (writableBytes <= numberOfBytesToWrite) { + System.arraycopy(data, currentOffset, buffer, bufferIndex, writableBytes); + bufferIndex += writableBytes; + writeCurrentBufferToService(); + currentOffset += writableBytes; + numberOfBytesToWrite = numberOfBytesToWrite - writableBytes; + } else { + System.arraycopy(data, currentOffset, buffer, bufferIndex, numberOfBytesToWrite); + bufferIndex += numberOfBytesToWrite; + numberOfBytesToWrite = 0; + } + + writableBytes = bufferSize - bufferIndex; + } + } + + /** + * Flushes this output stream and forces any buffered output bytes to be + * written out. If any data remains in the payload it is committed to the + * service. Data is queued for writing and forced out to the service + * before the call returns. + */ + @Override + public void flush() throws IOException { + if (supportFlush) { + flushInternalAsync(); + } + } + + /** + * Force all data in the output stream to be written to Azure storage. + * Wait to return until this is complete. Close the access to the stream and + * shutdown the upload thread pool. + * If the blob was created, its lease will be released. + * Any error encountered caught in threads and stored will be rethrown here + * after cleanup. + */ + @Override + public synchronized void close() throws IOException { + if (closed) { + return; + } + + LOG.debug("close path: {}", path); + try { + flushInternal(); + threadExecutor.shutdown(); + } finally { + lastError = new IOException(FSExceptionMessages.STREAM_IS_CLOSED); + buffer = null; + bufferIndex = 0; + closed = true; + writeOperations.clear(); + if (!threadExecutor.isShutdown()) { + threadExecutor.shutdownNow(); + } + } + } + + private synchronized void writeCurrentBufferToService() throws IOException { + if (bufferIndex == 0) { + return; + } + + final byte[] bytes = buffer; + final int bytesLength = bufferIndex; + + buffer = new byte[bufferSize]; + bufferIndex = 0; + final long offset = position; + position += bytesLength; + + if (threadExecutor.getQueue().size() >= maxConcurrentRequestCount * 2) { + waitForTaskToComplete(); + } + + final Future job = completionService.submit(new Callable() { + @Override + public Void call() throws Exception { + // originally: client.append(path, offset, bytes, 0, bytesLength); + SeaweedWrite.writeData(entry, replication, filerGrpcClient, offset, bytes, 0, bytesLength); + return null; + } + }); + + writeOperations.add(new WriteOperation(job, offset, bytesLength)); + + // Try to shrink the queue + shrinkWriteOperationQueue(); + } + + private void waitForTaskToComplete() throws IOException { + boolean completed; + for (completed = false; completionService.poll() != null; completed = true) { + // keep polling until there is no data + } + + if (!completed) { + try { + completionService.take(); + } catch (InterruptedException e) { + lastError = (IOException) new InterruptedIOException(e.toString()).initCause(e); + throw lastError; + } + } + } + + private void maybeThrowLastError() throws IOException { + if (lastError != null) { + throw lastError; + } + } + + /** + * Try to remove the completed write operations from the beginning of write + * operation FIFO queue. + */ + private synchronized void shrinkWriteOperationQueue() throws IOException { + try { + while (writeOperations.peek() != null && writeOperations.peek().task.isDone()) { + writeOperations.peek().task.get(); + lastTotalAppendOffset += writeOperations.peek().length; + writeOperations.remove(); + } + } catch (Exception e) { + lastError = new IOException(e); + throw lastError; + } + } + + private synchronized void flushInternal() throws IOException { + maybeThrowLastError(); + writeCurrentBufferToService(); + flushWrittenBytesToService(); + } + + private synchronized void flushInternalAsync() throws IOException { + maybeThrowLastError(); + writeCurrentBufferToService(); + flushWrittenBytesToServiceAsync(); + } + + private synchronized void flushWrittenBytesToService() throws IOException { + for (WriteOperation writeOperation : writeOperations) { + try { + writeOperation.task.get(); + } catch (Exception ex) { + lastError = new IOException(ex); + throw lastError; + } + } + LOG.debug("flushWrittenBytesToService: {} position:{}", path, position); + flushWrittenBytesToServiceInternal(position); + } + + private synchronized void flushWrittenBytesToServiceAsync() throws IOException { + shrinkWriteOperationQueue(); + + if (this.lastTotalAppendOffset > this.lastFlushOffset) { + this.flushWrittenBytesToServiceInternal(this.lastTotalAppendOffset); + } + } + + private static class WriteOperation { + private final Future task; + private final long startOffset; + private final long length; + + WriteOperation(final Future task, final long startOffset, final long length) { + Preconditions.checkNotNull(task, "task"); + Preconditions.checkArgument(startOffset >= 0, "startOffset"); + Preconditions.checkArgument(length >= 0, "length"); + + this.task = task; + this.startOffset = startOffset; + this.length = length; + } + } + +} diff --git a/other/java/hdfs3/dependency-reduced-pom.xml b/other/java/hdfs3/dependency-reduced-pom.xml new file mode 100644 index 000000000..867a81caf --- /dev/null +++ b/other/java/hdfs3/dependency-reduced-pom.xml @@ -0,0 +1,129 @@ + + + + oss-parent + org.sonatype.oss + 9 + ../pom.xml/pom.xml + + 4.0.0 + com.github.chrislusf + seaweedfs-hadoop-client + ${seaweedfs.client.version} + + + + maven-compiler-plugin + + 7 + 7 + + + + maven-shade-plugin + 3.2.1 + + + package + + shade + + + + + *:* + + META-INF/*.SF + META-INF/*.DSA + META-INF/*.RSA + org/slf4j/** + META-INF/maven/org.slf4j/** + + + + + + + + + com.google + shaded.com.google + + + io.grpc.internal + shaded.io.grpc.internal + + + org.apache.commons + shaded.org.apache.commons + + org.apache.hadoop + org.apache.log4j + + + + + + + + + maven-gpg-plugin + 1.5 + + + sign-artifacts + verify + + sign + + + + + + org.sonatype.plugins + nexus-staging-maven-plugin + 1.6.7 + true + + ossrh + https://oss.sonatype.org/ + true + + + + maven-source-plugin + 2.2.1 + + + attach-sources + + jar-no-fork + + + + + + maven-javadoc-plugin + 2.9.1 + + + attach-javadocs + + jar + + + + + + + + + ossrh + https://oss.sonatype.org/content/repositories/snapshots + + + + 1.1.0 + 3.1.1 + + diff --git a/other/java/hdfs/pom.xml b/other/java/hdfs3/pom.xml similarity index 100% rename from other/java/hdfs/pom.xml rename to other/java/hdfs3/pom.xml diff --git a/other/java/hdfs3/src/main/java/seaweed/hdfs/ReadBuffer.java b/other/java/hdfs3/src/main/java/seaweed/hdfs/ReadBuffer.java new file mode 100644 index 000000000..926d0b83b --- /dev/null +++ b/other/java/hdfs3/src/main/java/seaweed/hdfs/ReadBuffer.java @@ -0,0 +1,137 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package seaweed.hdfs; + +import java.util.concurrent.CountDownLatch; + +class ReadBuffer { + + private SeaweedInputStream stream; + private long offset; // offset within the file for the buffer + private int length; // actual length, set after the buffer is filles + private int requestedLength; // requested length of the read + private byte[] buffer; // the buffer itself + private int bufferindex = -1; // index in the buffers array in Buffer manager + private ReadBufferStatus status; // status of the buffer + private CountDownLatch latch = null; // signaled when the buffer is done reading, so any client + // waiting on this buffer gets unblocked + + // fields to help with eviction logic + private long timeStamp = 0; // tick at which buffer became available to read + private boolean isFirstByteConsumed = false; + private boolean isLastByteConsumed = false; + private boolean isAnyByteConsumed = false; + + public SeaweedInputStream getStream() { + return stream; + } + + public void setStream(SeaweedInputStream stream) { + this.stream = stream; + } + + public long getOffset() { + return offset; + } + + public void setOffset(long offset) { + this.offset = offset; + } + + public int getLength() { + return length; + } + + public void setLength(int length) { + this.length = length; + } + + public int getRequestedLength() { + return requestedLength; + } + + public void setRequestedLength(int requestedLength) { + this.requestedLength = requestedLength; + } + + public byte[] getBuffer() { + return buffer; + } + + public void setBuffer(byte[] buffer) { + this.buffer = buffer; + } + + public int getBufferindex() { + return bufferindex; + } + + public void setBufferindex(int bufferindex) { + this.bufferindex = bufferindex; + } + + public ReadBufferStatus getStatus() { + return status; + } + + public void setStatus(ReadBufferStatus status) { + this.status = status; + } + + public CountDownLatch getLatch() { + return latch; + } + + public void setLatch(CountDownLatch latch) { + this.latch = latch; + } + + public long getTimeStamp() { + return timeStamp; + } + + public void setTimeStamp(long timeStamp) { + this.timeStamp = timeStamp; + } + + public boolean isFirstByteConsumed() { + return isFirstByteConsumed; + } + + public void setFirstByteConsumed(boolean isFirstByteConsumed) { + this.isFirstByteConsumed = isFirstByteConsumed; + } + + public boolean isLastByteConsumed() { + return isLastByteConsumed; + } + + public void setLastByteConsumed(boolean isLastByteConsumed) { + this.isLastByteConsumed = isLastByteConsumed; + } + + public boolean isAnyByteConsumed() { + return isAnyByteConsumed; + } + + public void setAnyByteConsumed(boolean isAnyByteConsumed) { + this.isAnyByteConsumed = isAnyByteConsumed; + } + +} diff --git a/other/java/hdfs3/src/main/java/seaweed/hdfs/ReadBufferManager.java b/other/java/hdfs3/src/main/java/seaweed/hdfs/ReadBufferManager.java new file mode 100644 index 000000000..5b1e21529 --- /dev/null +++ b/other/java/hdfs3/src/main/java/seaweed/hdfs/ReadBufferManager.java @@ -0,0 +1,394 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package seaweed.hdfs; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collection; +import java.util.LinkedList; +import java.util.Queue; +import java.util.Stack; +import java.util.concurrent.CountDownLatch; + +/** + * The Read Buffer Manager for Rest AbfsClient. + */ +final class ReadBufferManager { + private static final Logger LOGGER = LoggerFactory.getLogger(ReadBufferManager.class); + + private static final int NUM_BUFFERS = 16; + private static final int BLOCK_SIZE = 4 * 1024 * 1024; + private static final int NUM_THREADS = 8; + private static final int THRESHOLD_AGE_MILLISECONDS = 3000; // have to see if 3 seconds is a good threshold + + private Thread[] threads = new Thread[NUM_THREADS]; + private byte[][] buffers; // array of byte[] buffers, to hold the data that is read + private Stack freeList = new Stack<>(); // indices in buffers[] array that are available + + private Queue readAheadQueue = new LinkedList<>(); // queue of requests that are not picked up by any worker thread yet + private LinkedList inProgressList = new LinkedList<>(); // requests being processed by worker threads + private LinkedList completedReadList = new LinkedList<>(); // buffers available for reading + private static final ReadBufferManager BUFFER_MANAGER; // singleton, initialized in static initialization block + + static { + BUFFER_MANAGER = new ReadBufferManager(); + BUFFER_MANAGER.init(); + } + + static ReadBufferManager getBufferManager() { + return BUFFER_MANAGER; + } + + private void init() { + buffers = new byte[NUM_BUFFERS][]; + for (int i = 0; i < NUM_BUFFERS; i++) { + buffers[i] = new byte[BLOCK_SIZE]; // same buffers are reused. The byte array never goes back to GC + freeList.add(i); + } + for (int i = 0; i < NUM_THREADS; i++) { + Thread t = new Thread(new ReadBufferWorker(i)); + t.setDaemon(true); + threads[i] = t; + t.setName("SeaweedFS-prefetch-" + i); + t.start(); + } + ReadBufferWorker.UNLEASH_WORKERS.countDown(); + } + + // hide instance constructor + private ReadBufferManager() { + } + + + /* + * + * SeaweedInputStream-facing methods + * + */ + + + /** + * {@link SeaweedInputStream} calls this method to queue read-aheads. + * + * @param stream The {@link SeaweedInputStream} for which to do the read-ahead + * @param requestedOffset The offset in the file which shoukd be read + * @param requestedLength The length to read + */ + void queueReadAhead(final SeaweedInputStream stream, final long requestedOffset, final int requestedLength) { + if (LOGGER.isTraceEnabled()) { + LOGGER.trace("Start Queueing readAhead for {} offset {} length {}", + stream.getPath(), requestedOffset, requestedLength); + } + ReadBuffer buffer; + synchronized (this) { + if (isAlreadyQueued(stream, requestedOffset)) { + return; // already queued, do not queue again + } + if (freeList.isEmpty() && !tryEvict()) { + return; // no buffers available, cannot queue anything + } + + buffer = new ReadBuffer(); + buffer.setStream(stream); + buffer.setOffset(requestedOffset); + buffer.setLength(0); + buffer.setRequestedLength(requestedLength); + buffer.setStatus(ReadBufferStatus.NOT_AVAILABLE); + buffer.setLatch(new CountDownLatch(1)); + + Integer bufferIndex = freeList.pop(); // will return a value, since we have checked size > 0 already + + buffer.setBuffer(buffers[bufferIndex]); + buffer.setBufferindex(bufferIndex); + readAheadQueue.add(buffer); + notifyAll(); + } + if (LOGGER.isTraceEnabled()) { + LOGGER.trace("Done q-ing readAhead for file {} offset {} buffer idx {}", + stream.getPath(), requestedOffset, buffer.getBufferindex()); + } + } + + + /** + * {@link SeaweedInputStream} calls this method read any bytes already available in a buffer (thereby saving a + * remote read). This returns the bytes if the data already exists in buffer. If there is a buffer that is reading + * the requested offset, then this method blocks until that read completes. If the data is queued in a read-ahead + * but not picked up by a worker thread yet, then it cancels that read-ahead and reports cache miss. This is because + * depending on worker thread availability, the read-ahead may take a while - the calling thread can do it's own + * read to get the data faster (copmared to the read waiting in queue for an indeterminate amount of time). + * + * @param stream the file to read bytes for + * @param position the offset in the file to do a read for + * @param length the length to read + * @param buffer the buffer to read data into. Note that the buffer will be written into from offset 0. + * @return the number of bytes read + */ + int getBlock(final SeaweedInputStream stream, final long position, final int length, final byte[] buffer) { + // not synchronized, so have to be careful with locking + if (LOGGER.isTraceEnabled()) { + LOGGER.trace("getBlock for file {} position {} thread {}", + stream.getPath(), position, Thread.currentThread().getName()); + } + + waitForProcess(stream, position); + + int bytesRead = 0; + synchronized (this) { + bytesRead = getBlockFromCompletedQueue(stream, position, length, buffer); + } + if (bytesRead > 0) { + if (LOGGER.isTraceEnabled()) { + LOGGER.trace("Done read from Cache for {} position {} length {}", + stream.getPath(), position, bytesRead); + } + return bytesRead; + } + + // otherwise, just say we got nothing - calling thread can do its own read + return 0; + } + + /* + * + * Internal methods + * + */ + + private void waitForProcess(final SeaweedInputStream stream, final long position) { + ReadBuffer readBuf; + synchronized (this) { + clearFromReadAheadQueue(stream, position); + readBuf = getFromList(inProgressList, stream, position); + } + if (readBuf != null) { // if in in-progress queue, then block for it + try { + if (LOGGER.isTraceEnabled()) { + LOGGER.trace("got a relevant read buffer for file {} offset {} buffer idx {}", + stream.getPath(), readBuf.getOffset(), readBuf.getBufferindex()); + } + readBuf.getLatch().await(); // blocking wait on the caller stream's thread + // Note on correctness: readBuf gets out of inProgressList only in 1 place: after worker thread + // is done processing it (in doneReading). There, the latch is set after removing the buffer from + // inProgressList. So this latch is safe to be outside the synchronized block. + // Putting it in synchronized would result in a deadlock, since this thread would be holding the lock + // while waiting, so no one will be able to change any state. If this becomes more complex in the future, + // then the latch cane be removed and replaced with wait/notify whenever inProgressList is touched. + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + } + if (LOGGER.isTraceEnabled()) { + LOGGER.trace("latch done for file {} buffer idx {} length {}", + stream.getPath(), readBuf.getBufferindex(), readBuf.getLength()); + } + } + } + + /** + * If any buffer in the completedlist can be reclaimed then reclaim it and return the buffer to free list. + * The objective is to find just one buffer - there is no advantage to evicting more than one. + * + * @return whether the eviction succeeeded - i.e., were we able to free up one buffer + */ + private synchronized boolean tryEvict() { + ReadBuffer nodeToEvict = null; + if (completedReadList.size() <= 0) { + return false; // there are no evict-able buffers + } + + // first, try buffers where all bytes have been consumed (approximated as first and last bytes consumed) + for (ReadBuffer buf : completedReadList) { + if (buf.isFirstByteConsumed() && buf.isLastByteConsumed()) { + nodeToEvict = buf; + break; + } + } + if (nodeToEvict != null) { + return evict(nodeToEvict); + } + + // next, try buffers where any bytes have been consumed (may be a bad idea? have to experiment and see) + for (ReadBuffer buf : completedReadList) { + if (buf.isAnyByteConsumed()) { + nodeToEvict = buf; + break; + } + } + + if (nodeToEvict != null) { + return evict(nodeToEvict); + } + + // next, try any old nodes that have not been consumed + long earliestBirthday = Long.MAX_VALUE; + for (ReadBuffer buf : completedReadList) { + if (buf.getTimeStamp() < earliestBirthday) { + nodeToEvict = buf; + earliestBirthday = buf.getTimeStamp(); + } + } + if ((currentTimeMillis() - earliestBirthday > THRESHOLD_AGE_MILLISECONDS) && (nodeToEvict != null)) { + return evict(nodeToEvict); + } + + // nothing can be evicted + return false; + } + + private boolean evict(final ReadBuffer buf) { + freeList.push(buf.getBufferindex()); + completedReadList.remove(buf); + if (LOGGER.isTraceEnabled()) { + LOGGER.trace("Evicting buffer idx {}; was used for file {} offset {} length {}", + buf.getBufferindex(), buf.getStream().getPath(), buf.getOffset(), buf.getLength()); + } + return true; + } + + private boolean isAlreadyQueued(final SeaweedInputStream stream, final long requestedOffset) { + // returns true if any part of the buffer is already queued + return (isInList(readAheadQueue, stream, requestedOffset) + || isInList(inProgressList, stream, requestedOffset) + || isInList(completedReadList, stream, requestedOffset)); + } + + private boolean isInList(final Collection list, final SeaweedInputStream stream, final long requestedOffset) { + return (getFromList(list, stream, requestedOffset) != null); + } + + private ReadBuffer getFromList(final Collection list, final SeaweedInputStream stream, final long requestedOffset) { + for (ReadBuffer buffer : list) { + if (buffer.getStream() == stream) { + if (buffer.getStatus() == ReadBufferStatus.AVAILABLE + && requestedOffset >= buffer.getOffset() + && requestedOffset < buffer.getOffset() + buffer.getLength()) { + return buffer; + } else if (requestedOffset >= buffer.getOffset() + && requestedOffset < buffer.getOffset() + buffer.getRequestedLength()) { + return buffer; + } + } + } + return null; + } + + private void clearFromReadAheadQueue(final SeaweedInputStream stream, final long requestedOffset) { + ReadBuffer buffer = getFromList(readAheadQueue, stream, requestedOffset); + if (buffer != null) { + readAheadQueue.remove(buffer); + notifyAll(); // lock is held in calling method + freeList.push(buffer.getBufferindex()); + } + } + + private int getBlockFromCompletedQueue(final SeaweedInputStream stream, final long position, final int length, + final byte[] buffer) { + ReadBuffer buf = getFromList(completedReadList, stream, position); + if (buf == null || position >= buf.getOffset() + buf.getLength()) { + return 0; + } + int cursor = (int) (position - buf.getOffset()); + int availableLengthInBuffer = buf.getLength() - cursor; + int lengthToCopy = Math.min(length, availableLengthInBuffer); + System.arraycopy(buf.getBuffer(), cursor, buffer, 0, lengthToCopy); + if (cursor == 0) { + buf.setFirstByteConsumed(true); + } + if (cursor + lengthToCopy == buf.getLength()) { + buf.setLastByteConsumed(true); + } + buf.setAnyByteConsumed(true); + return lengthToCopy; + } + + /* + * + * ReadBufferWorker-thread-facing methods + * + */ + + /** + * ReadBufferWorker thread calls this to get the next buffer that it should work on. + * + * @return {@link ReadBuffer} + * @throws InterruptedException if thread is interrupted + */ + ReadBuffer getNextBlockToRead() throws InterruptedException { + ReadBuffer buffer = null; + synchronized (this) { + //buffer = readAheadQueue.take(); // blocking method + while (readAheadQueue.size() == 0) { + wait(); + } + buffer = readAheadQueue.remove(); + notifyAll(); + if (buffer == null) { + return null; // should never happen + } + buffer.setStatus(ReadBufferStatus.READING_IN_PROGRESS); + inProgressList.add(buffer); + } + if (LOGGER.isTraceEnabled()) { + LOGGER.trace("ReadBufferWorker picked file {} for offset {}", + buffer.getStream().getPath(), buffer.getOffset()); + } + return buffer; + } + + /** + * ReadBufferWorker thread calls this method to post completion. + * + * @param buffer the buffer whose read was completed + * @param result the {@link ReadBufferStatus} after the read operation in the worker thread + * @param bytesActuallyRead the number of bytes that the worker thread was actually able to read + */ + void doneReading(final ReadBuffer buffer, final ReadBufferStatus result, final int bytesActuallyRead) { + if (LOGGER.isTraceEnabled()) { + LOGGER.trace("ReadBufferWorker completed file {} for offset {} bytes {}", + buffer.getStream().getPath(), buffer.getOffset(), bytesActuallyRead); + } + synchronized (this) { + inProgressList.remove(buffer); + if (result == ReadBufferStatus.AVAILABLE && bytesActuallyRead > 0) { + buffer.setStatus(ReadBufferStatus.AVAILABLE); + buffer.setTimeStamp(currentTimeMillis()); + buffer.setLength(bytesActuallyRead); + completedReadList.add(buffer); + } else { + freeList.push(buffer.getBufferindex()); + // buffer should go out of scope after the end of the calling method in ReadBufferWorker, and eligible for GC + } + } + //outside the synchronized, since anyone receiving a wake-up from the latch must see safe-published results + buffer.getLatch().countDown(); // wake up waiting threads (if any) + } + + /** + * Similar to System.currentTimeMillis, except implemented with System.nanoTime(). + * System.currentTimeMillis can go backwards when system clock is changed (e.g., with NTP time synchronization), + * making it unsuitable for measuring time intervals. nanotime is strictly monotonically increasing per CPU core. + * Note: it is not monotonic across Sockets, and even within a CPU, its only the + * more recent parts which share a clock across all cores. + * + * @return current time in milliseconds + */ + private long currentTimeMillis() { + return System.nanoTime() / 1000 / 1000; + } +} diff --git a/other/java/hdfs3/src/main/java/seaweed/hdfs/ReadBufferStatus.java b/other/java/hdfs3/src/main/java/seaweed/hdfs/ReadBufferStatus.java new file mode 100644 index 000000000..d63674977 --- /dev/null +++ b/other/java/hdfs3/src/main/java/seaweed/hdfs/ReadBufferStatus.java @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package seaweed.hdfs; + +/** + * The ReadBufferStatus for Rest AbfsClient + */ +public enum ReadBufferStatus { + NOT_AVAILABLE, // buffers sitting in readaheadqueue have this stats + READING_IN_PROGRESS, // reading is in progress on this buffer. Buffer should be in inProgressList + AVAILABLE, // data is available in buffer. It should be in completedList + READ_FAILED // read completed, but failed. +} diff --git a/other/java/hdfs3/src/main/java/seaweed/hdfs/ReadBufferWorker.java b/other/java/hdfs3/src/main/java/seaweed/hdfs/ReadBufferWorker.java new file mode 100644 index 000000000..6ffbc4644 --- /dev/null +++ b/other/java/hdfs3/src/main/java/seaweed/hdfs/ReadBufferWorker.java @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package seaweed.hdfs; + +import java.util.concurrent.CountDownLatch; + +class ReadBufferWorker implements Runnable { + + protected static final CountDownLatch UNLEASH_WORKERS = new CountDownLatch(1); + private int id; + + ReadBufferWorker(final int id) { + this.id = id; + } + + /** + * return the ID of ReadBufferWorker. + */ + public int getId() { + return this.id; + } + + /** + * Waits until a buffer becomes available in ReadAheadQueue. + * Once a buffer becomes available, reads the file specified in it and then posts results back to buffer manager. + * Rinse and repeat. Forever. + */ + public void run() { + try { + UNLEASH_WORKERS.await(); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + } + ReadBufferManager bufferManager = ReadBufferManager.getBufferManager(); + ReadBuffer buffer; + while (true) { + try { + buffer = bufferManager.getNextBlockToRead(); // blocks, until a buffer is available for this thread + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + return; + } + if (buffer != null) { + try { + // do the actual read, from the file. + int bytesRead = buffer.getStream().readRemote(buffer.getOffset(), buffer.getBuffer(), 0, buffer.getRequestedLength()); + bufferManager.doneReading(buffer, ReadBufferStatus.AVAILABLE, bytesRead); // post result back to ReadBufferManager + } catch (Exception ex) { + bufferManager.doneReading(buffer, ReadBufferStatus.READ_FAILED, 0); + } + } + } + } +} diff --git a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java new file mode 100644 index 000000000..453924cf7 --- /dev/null +++ b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java @@ -0,0 +1,611 @@ +package seaweed.hdfs; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileAlreadyExistsException; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.ParentNotDirectoryException; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.UnsupportedFileSystemException; +import org.apache.hadoop.fs.XAttrSetFlag; +import org.apache.hadoop.fs.permission.AclEntry; +import org.apache.hadoop.fs.permission.AclStatus; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.security.AccessControlException; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.util.Progressable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.URI; +import java.util.EnumSet; +import java.util.List; +import java.util.Map; + +import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY; + +public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem { + + public static final int FS_SEAWEED_DEFAULT_PORT = 8888; + public static final String FS_SEAWEED_FILER_HOST = "fs.seaweed.filer.host"; + public static final String FS_SEAWEED_FILER_PORT = "fs.seaweed.filer.port"; + public static final String FS_SEAWEED_GRPC_CA = "fs.seaweed.ca"; + public static final String FS_SEAWEED_GRPC_CLIENT_KEY = "fs.seaweed.client.key"; + public static final String FS_SEAWEED_GRPC_CLIENT_CERT = "fs.seaweed.client.cert"; + + private static final Logger LOG = LoggerFactory.getLogger(SeaweedFileSystem.class); + private static int BUFFER_SIZE = 16 * 1024 * 1024; + + private URI uri; + private Path workingDirectory = new Path("/"); + private SeaweedFileSystemStore seaweedFileSystemStore; + + public URI getUri() { + return uri; + } + + public String getScheme() { + return "seaweedfs"; + } + + @Override + public void initialize(URI uri, Configuration conf) throws IOException { // get + super.initialize(uri, conf); + + // get host information from uri (overrides info in conf) + String host = uri.getHost(); + host = (host == null) ? conf.get(FS_SEAWEED_FILER_HOST, "localhost") : host; + if (host == null) { + throw new IOException("Invalid host specified"); + } + conf.set(FS_SEAWEED_FILER_HOST, host); + + // get port information from uri, (overrides info in conf) + int port = uri.getPort(); + port = (port == -1) ? FS_SEAWEED_DEFAULT_PORT : port; + conf.setInt(FS_SEAWEED_FILER_PORT, port); + + conf.setInt(IO_FILE_BUFFER_SIZE_KEY, BUFFER_SIZE); + + setConf(conf); + this.uri = uri; + + if (conf.get(FS_SEAWEED_GRPC_CA) != null && conf.getTrimmed(FS_SEAWEED_GRPC_CA).length() != 0 + && conf.get(FS_SEAWEED_GRPC_CLIENT_CERT) != null && conf.getTrimmed(FS_SEAWEED_GRPC_CLIENT_CERT).length() != 0 + && conf.get(FS_SEAWEED_GRPC_CLIENT_KEY) != null && conf.getTrimmed(FS_SEAWEED_GRPC_CLIENT_KEY).length() != 0) { + seaweedFileSystemStore = new SeaweedFileSystemStore(host, port, + conf.get(FS_SEAWEED_GRPC_CA), + conf.get(FS_SEAWEED_GRPC_CLIENT_CERT), + conf.get(FS_SEAWEED_GRPC_CLIENT_KEY)); + } else { + seaweedFileSystemStore = new SeaweedFileSystemStore(host, port); + } + + } + + @Override + public FSDataInputStream open(Path path, int bufferSize) throws IOException { + + LOG.debug("open path: {} bufferSize:{}", path, bufferSize); + + path = qualify(path); + + try { + InputStream inputStream = seaweedFileSystemStore.openFileForRead(path, statistics, bufferSize); + return new FSDataInputStream(inputStream); + } catch (Exception ex) { + return null; + } + } + + @Override + public FSDataOutputStream create(Path path, FsPermission permission, final boolean overwrite, final int bufferSize, + final short replication, final long blockSize, final Progressable progress) throws IOException { + + LOG.debug("create path: {} bufferSize:{} blockSize:{}", path, bufferSize, blockSize); + + path = qualify(path); + + try { + String replicaPlacement = String.format("%03d", replication - 1); + OutputStream outputStream = seaweedFileSystemStore.createFile(path, overwrite, permission, bufferSize, replicaPlacement); + return new FSDataOutputStream(outputStream, statistics); + } catch (Exception ex) { + return null; + } + } + + @Override + public FSDataOutputStream append(Path path, int bufferSize, Progressable progressable) throws IOException { + + LOG.debug("append path: {} bufferSize:{}", path, bufferSize); + + path = qualify(path); + try { + OutputStream outputStream = seaweedFileSystemStore.createFile(path, false, null, bufferSize, ""); + return new FSDataOutputStream(outputStream, statistics); + } catch (Exception ex) { + return null; + } + } + + @Override + public boolean rename(Path src, Path dst) { + + LOG.debug("rename path: {} => {}", src, dst); + + if (src.isRoot()) { + return false; + } + + if (src.equals(dst)) { + return true; + } + FileStatus dstFileStatus = getFileStatus(dst); + + String sourceFileName = src.getName(); + Path adjustedDst = dst; + + if (dstFileStatus != null) { + if (!dstFileStatus.isDirectory()) { + return false; + } + adjustedDst = new Path(dst, sourceFileName); + } + + Path qualifiedSrcPath = qualify(src); + Path qualifiedDstPath = qualify(adjustedDst); + + seaweedFileSystemStore.rename(qualifiedSrcPath, qualifiedDstPath); + return true; + } + + @Override + public boolean delete(Path path, boolean recursive) { + + LOG.debug("delete path: {} recursive:{}", path, recursive); + + path = qualify(path); + + FileStatus fileStatus = getFileStatus(path); + + if (fileStatus == null) { + return true; + } + + return seaweedFileSystemStore.deleteEntries(path, fileStatus.isDirectory(), recursive); + + } + + @Override + public FileStatus[] listStatus(Path path) throws IOException { + + LOG.debug("listStatus path: {}", path); + + path = qualify(path); + + return seaweedFileSystemStore.listEntries(path); + } + + @Override + public Path getWorkingDirectory() { + return workingDirectory; + } + + @Override + public void setWorkingDirectory(Path path) { + if (path.isAbsolute()) { + workingDirectory = path; + } else { + workingDirectory = new Path(workingDirectory, path); + } + } + + @Override + public boolean mkdirs(Path path, FsPermission fsPermission) throws IOException { + + LOG.debug("mkdirs path: {}", path); + + path = qualify(path); + + FileStatus fileStatus = getFileStatus(path); + + if (fileStatus == null) { + + UserGroupInformation currentUser = UserGroupInformation.getCurrentUser(); + return seaweedFileSystemStore.createDirectory(path, currentUser, + fsPermission == null ? FsPermission.getDirDefault() : fsPermission, + FsPermission.getUMask(getConf())); + + } + + if (fileStatus.isDirectory()) { + return true; + } else { + throw new FileAlreadyExistsException("Path is a file: " + path); + } + } + + @Override + public FileStatus getFileStatus(Path path) { + + LOG.debug("getFileStatus path: {}", path); + + path = qualify(path); + + return seaweedFileSystemStore.getFileStatus(path); + } + + /** + * Set owner of a path (i.e. a file or a directory). + * The parameters owner and group cannot both be null. + * + * @param path The path + * @param owner If it is null, the original username remains unchanged. + * @param group If it is null, the original groupname remains unchanged. + */ + @Override + public void setOwner(Path path, final String owner, final String group) + throws IOException { + LOG.debug("setOwner path: {}", path); + path = qualify(path); + + seaweedFileSystemStore.setOwner(path, owner, group); + } + + + /** + * Set permission of a path. + * + * @param path The path + * @param permission Access permission + */ + @Override + public void setPermission(Path path, final FsPermission permission) throws IOException { + LOG.debug("setPermission path: {}", path); + + if (permission == null) { + throw new IllegalArgumentException("The permission can't be null"); + } + + path = qualify(path); + + seaweedFileSystemStore.setPermission(path, permission); + } + + Path qualify(Path path) { + return path.makeQualified(uri, workingDirectory); + } + + /** + * Concat existing files together. + * + * @param trg the path to the target destination. + * @param psrcs the paths to the sources to use for the concatenation. + * @throws IOException IO failure + * @throws UnsupportedOperationException if the operation is unsupported + * (default). + */ + @Override + public void concat(final Path trg, final Path[] psrcs) throws IOException { + throw new UnsupportedOperationException("Not implemented by the " + + getClass().getSimpleName() + " FileSystem implementation"); + } + + /** + * Truncate the file in the indicated path to the indicated size. + *

    + *
  • Fails if path is a directory.
  • + *
  • Fails if path does not exist.
  • + *
  • Fails if path is not closed.
  • + *
  • Fails if new size is greater than current size.
  • + *
+ * + * @param f The path to the file to be truncated + * @param newLength The size the file is to be truncated to + * @return true if the file has been truncated to the desired + * newLength and is immediately available to be reused for + * write operations such as append, or + * false if a background process of adjusting the length of + * the last block has been started, and clients should wait for it to + * complete before proceeding with further file updates. + * @throws IOException IO failure + * @throws UnsupportedOperationException if the operation is unsupported + * (default). + */ + @Override + public boolean truncate(Path f, long newLength) throws IOException { + throw new UnsupportedOperationException("Not implemented by the " + + getClass().getSimpleName() + " FileSystem implementation"); + } + + @Override + public void createSymlink(final Path target, final Path link, + final boolean createParent) throws AccessControlException, + FileAlreadyExistsException, FileNotFoundException, + ParentNotDirectoryException, UnsupportedFileSystemException, + IOException { + // Supporting filesystems should override this method + throw new UnsupportedOperationException( + "Filesystem does not support symlinks!"); + } + + public boolean supportsSymlinks() { + return false; + } + + /** + * Create a snapshot. + * + * @param path The directory where snapshots will be taken. + * @param snapshotName The name of the snapshot + * @return the snapshot path. + * @throws IOException IO failure + * @throws UnsupportedOperationException if the operation is unsupported + */ + @Override + public Path createSnapshot(Path path, String snapshotName) + throws IOException { + throw new UnsupportedOperationException(getClass().getSimpleName() + + " doesn't support createSnapshot"); + } + + /** + * Rename a snapshot. + * + * @param path The directory path where the snapshot was taken + * @param snapshotOldName Old name of the snapshot + * @param snapshotNewName New name of the snapshot + * @throws IOException IO failure + * @throws UnsupportedOperationException if the operation is unsupported + * (default outcome). + */ + @Override + public void renameSnapshot(Path path, String snapshotOldName, + String snapshotNewName) throws IOException { + throw new UnsupportedOperationException(getClass().getSimpleName() + + " doesn't support renameSnapshot"); + } + + /** + * Delete a snapshot of a directory. + * + * @param path The directory that the to-be-deleted snapshot belongs to + * @param snapshotName The name of the snapshot + * @throws IOException IO failure + * @throws UnsupportedOperationException if the operation is unsupported + * (default outcome). + */ + @Override + public void deleteSnapshot(Path path, String snapshotName) + throws IOException { + throw new UnsupportedOperationException(getClass().getSimpleName() + + " doesn't support deleteSnapshot"); + } + + /** + * Modifies ACL entries of files and directories. This method can add new ACL + * entries or modify the permissions on existing ACL entries. All existing + * ACL entries that are not specified in this call are retained without + * changes. (Modifications are merged into the current ACL.) + * + * @param path Path to modify + * @param aclSpec List<AclEntry> describing modifications + * @throws IOException if an ACL could not be modified + * @throws UnsupportedOperationException if the operation is unsupported + * (default outcome). + */ + @Override + public void modifyAclEntries(Path path, List aclSpec) + throws IOException { + throw new UnsupportedOperationException(getClass().getSimpleName() + + " doesn't support modifyAclEntries"); + } + + /** + * Removes ACL entries from files and directories. Other ACL entries are + * retained. + * + * @param path Path to modify + * @param aclSpec List describing entries to remove + * @throws IOException if an ACL could not be modified + * @throws UnsupportedOperationException if the operation is unsupported + * (default outcome). + */ + @Override + public void removeAclEntries(Path path, List aclSpec) + throws IOException { + throw new UnsupportedOperationException(getClass().getSimpleName() + + " doesn't support removeAclEntries"); + } + + /** + * Removes all default ACL entries from files and directories. + * + * @param path Path to modify + * @throws IOException if an ACL could not be modified + * @throws UnsupportedOperationException if the operation is unsupported + * (default outcome). + */ + @Override + public void removeDefaultAcl(Path path) + throws IOException { + throw new UnsupportedOperationException(getClass().getSimpleName() + + " doesn't support removeDefaultAcl"); + } + + /** + * Removes all but the base ACL entries of files and directories. The entries + * for user, group, and others are retained for compatibility with permission + * bits. + * + * @param path Path to modify + * @throws IOException if an ACL could not be removed + * @throws UnsupportedOperationException if the operation is unsupported + * (default outcome). + */ + @Override + public void removeAcl(Path path) + throws IOException { + throw new UnsupportedOperationException(getClass().getSimpleName() + + " doesn't support removeAcl"); + } + + /** + * Fully replaces ACL of files and directories, discarding all existing + * entries. + * + * @param path Path to modify + * @param aclSpec List describing modifications, which must include entries + * for user, group, and others for compatibility with permission bits. + * @throws IOException if an ACL could not be modified + * @throws UnsupportedOperationException if the operation is unsupported + * (default outcome). + */ + @Override + public void setAcl(Path path, List aclSpec) throws IOException { + throw new UnsupportedOperationException(getClass().getSimpleName() + + " doesn't support setAcl"); + } + + /** + * Gets the ACL of a file or directory. + * + * @param path Path to get + * @return AclStatus describing the ACL of the file or directory + * @throws IOException if an ACL could not be read + * @throws UnsupportedOperationException if the operation is unsupported + * (default outcome). + */ + @Override + public AclStatus getAclStatus(Path path) throws IOException { + throw new UnsupportedOperationException(getClass().getSimpleName() + + " doesn't support getAclStatus"); + } + + /** + * Set an xattr of a file or directory. + * The name must be prefixed with the namespace followed by ".". For example, + * "user.attr". + *

+ * Refer to the HDFS extended attributes user documentation for details. + * + * @param path Path to modify + * @param name xattr name. + * @param value xattr value. + * @param flag xattr set flag + * @throws IOException IO failure + * @throws UnsupportedOperationException if the operation is unsupported + * (default outcome). + */ + @Override + public void setXAttr(Path path, String name, byte[] value, + EnumSet flag) throws IOException { + throw new UnsupportedOperationException(getClass().getSimpleName() + + " doesn't support setXAttr"); + } + + /** + * Get an xattr name and value for a file or directory. + * The name must be prefixed with the namespace followed by ".". For example, + * "user.attr". + *

+ * Refer to the HDFS extended attributes user documentation for details. + * + * @param path Path to get extended attribute + * @param name xattr name. + * @return byte[] xattr value. + * @throws IOException IO failure + * @throws UnsupportedOperationException if the operation is unsupported + * (default outcome). + */ + @Override + public byte[] getXAttr(Path path, String name) throws IOException { + throw new UnsupportedOperationException(getClass().getSimpleName() + + " doesn't support getXAttr"); + } + + /** + * Get all of the xattr name/value pairs for a file or directory. + * Only those xattrs which the logged-in user has permissions to view + * are returned. + *

+ * Refer to the HDFS extended attributes user documentation for details. + * + * @param path Path to get extended attributes + * @return Map describing the XAttrs of the file or directory + * @throws IOException IO failure + * @throws UnsupportedOperationException if the operation is unsupported + * (default outcome). + */ + @Override + public Map getXAttrs(Path path) throws IOException { + throw new UnsupportedOperationException(getClass().getSimpleName() + + " doesn't support getXAttrs"); + } + + /** + * Get all of the xattrs name/value pairs for a file or directory. + * Only those xattrs which the logged-in user has permissions to view + * are returned. + *

+ * Refer to the HDFS extended attributes user documentation for details. + * + * @param path Path to get extended attributes + * @param names XAttr names. + * @return Map describing the XAttrs of the file or directory + * @throws IOException IO failure + * @throws UnsupportedOperationException if the operation is unsupported + * (default outcome). + */ + @Override + public Map getXAttrs(Path path, List names) + throws IOException { + throw new UnsupportedOperationException(getClass().getSimpleName() + + " doesn't support getXAttrs"); + } + + /** + * Get all of the xattr names for a file or directory. + * Only those xattr names which the logged-in user has permissions to view + * are returned. + *

+ * Refer to the HDFS extended attributes user documentation for details. + * + * @param path Path to get extended attributes + * @return List{@literal } of the XAttr names of the file or directory + * @throws IOException IO failure + * @throws UnsupportedOperationException if the operation is unsupported + * (default outcome). + */ + @Override + public List listXAttrs(Path path) throws IOException { + throw new UnsupportedOperationException(getClass().getSimpleName() + + " doesn't support listXAttrs"); + } + + /** + * Remove an xattr of a file or directory. + * The name must be prefixed with the namespace followed by ".". For example, + * "user.attr". + *

+ * Refer to the HDFS extended attributes user documentation for details. + * + * @param path Path to remove extended attribute + * @param name xattr name + * @throws IOException IO failure + * @throws UnsupportedOperationException if the operation is unsupported + * (default outcome). + */ + @Override + public void removeXAttr(Path path, String name) throws IOException { + throw new UnsupportedOperationException(getClass().getSimpleName() + + " doesn't support removeXAttr"); + } + +} diff --git a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java new file mode 100644 index 000000000..643467898 --- /dev/null +++ b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java @@ -0,0 +1,277 @@ +package seaweed.hdfs; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.security.UserGroupInformation; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import seaweedfs.client.FilerClient; +import seaweedfs.client.FilerGrpcClient; +import seaweedfs.client.FilerProto; +import seaweedfs.client.SeaweedRead; + +import javax.net.ssl.SSLException; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +public class SeaweedFileSystemStore { + + private static final Logger LOG = LoggerFactory.getLogger(SeaweedFileSystemStore.class); + + private FilerGrpcClient filerGrpcClient; + private FilerClient filerClient; + + public SeaweedFileSystemStore(String host, int port) { + int grpcPort = 10000 + port; + filerGrpcClient = new FilerGrpcClient(host, grpcPort); + filerClient = new FilerClient(filerGrpcClient); + } + + public SeaweedFileSystemStore(String host, int port, + String caFile, String clientCertFile, String clientKeyFile) throws SSLException { + int grpcPort = 10000 + port; + filerGrpcClient = new FilerGrpcClient(host, grpcPort, caFile, clientCertFile, clientKeyFile); + filerClient = new FilerClient(filerGrpcClient); + } + + public static String getParentDirectory(Path path) { + return path.isRoot() ? "/" : path.getParent().toUri().getPath(); + } + + static int permissionToMode(FsPermission permission, boolean isDirectory) { + int p = permission.toShort(); + if (isDirectory) { + p = p | 1 << 31; + } + return p; + } + + public boolean createDirectory(final Path path, UserGroupInformation currentUser, + final FsPermission permission, final FsPermission umask) { + + LOG.debug("createDirectory path: {} permission: {} umask: {}", + path, + permission, + umask); + + return filerClient.mkdirs( + path.toUri().getPath(), + permissionToMode(permission, true), + currentUser.getUserName(), + currentUser.getGroupNames() + ); + } + + public FileStatus[] listEntries(final Path path) { + LOG.debug("listEntries path: {}", path); + + List fileStatuses = new ArrayList(); + + List entries = filerClient.listEntries(path.toUri().getPath()); + + for (FilerProto.Entry entry : entries) { + + FileStatus fileStatus = doGetFileStatus(new Path(path, entry.getName()), entry); + + fileStatuses.add(fileStatus); + } + return fileStatuses.toArray(new FileStatus[0]); + } + + public FileStatus getFileStatus(final Path path) { + + FilerProto.Entry entry = lookupEntry(path); + if (entry == null) { + return null; + } + LOG.debug("doGetFileStatus path:{} entry:{}", path, entry); + + FileStatus fileStatus = doGetFileStatus(path, entry); + return fileStatus; + } + + public boolean deleteEntries(final Path path, boolean isDirectory, boolean recursive) { + LOG.debug("deleteEntries path: {} isDirectory {} recursive: {}", + path, + String.valueOf(isDirectory), + String.valueOf(recursive)); + + if (path.isRoot()) { + return true; + } + + if (recursive && isDirectory) { + List entries = filerClient.listEntries(path.toUri().getPath()); + for (FilerProto.Entry entry : entries) { + deleteEntries(new Path(path, entry.getName()), entry.getIsDirectory(), true); + } + } + + return filerClient.deleteEntry(getParentDirectory(path), path.getName(), true, recursive); + } + + private FileStatus doGetFileStatus(Path path, FilerProto.Entry entry) { + FilerProto.FuseAttributes attributes = entry.getAttributes(); + long length = SeaweedRead.totalSize(entry.getChunksList()); + boolean isDir = entry.getIsDirectory(); + int block_replication = 1; + int blocksize = 512; + long modification_time = attributes.getMtime() * 1000; // milliseconds + long access_time = 0; + FsPermission permission = FsPermission.createImmutable((short) attributes.getFileMode()); + String owner = attributes.getUserName(); + String group = attributes.getGroupNameCount() > 0 ? attributes.getGroupName(0) : ""; + return new FileStatus(length, isDir, block_replication, blocksize, + modification_time, access_time, permission, owner, group, null, path); + } + + private FilerProto.Entry lookupEntry(Path path) { + + return filerClient.lookupEntry(getParentDirectory(path), path.getName()); + + } + + public void rename(Path source, Path destination) { + + LOG.debug("rename source: {} destination:{}", source, destination); + + if (source.isRoot()) { + return; + } + LOG.warn("rename lookupEntry source: {}", source); + FilerProto.Entry entry = lookupEntry(source); + if (entry == null) { + LOG.warn("rename non-existing source: {}", source); + return; + } + filerClient.mv(source.toUri().getPath(), destination.toUri().getPath()); + } + + public OutputStream createFile(final Path path, + final boolean overwrite, + FsPermission permission, + int bufferSize, + String replication) throws IOException { + + permission = permission == null ? FsPermission.getFileDefault() : permission; + + LOG.debug("createFile path: {} overwrite: {} permission: {}", + path, + overwrite, + permission.toString()); + + UserGroupInformation userGroupInformation = UserGroupInformation.getCurrentUser(); + long now = System.currentTimeMillis() / 1000L; + + FilerProto.Entry.Builder entry = null; + long writePosition = 0; + if (!overwrite) { + FilerProto.Entry existingEntry = lookupEntry(path); + LOG.debug("createFile merged entry path:{} existingEntry:{}", path, existingEntry); + if (existingEntry != null) { + entry = FilerProto.Entry.newBuilder(); + entry.mergeFrom(existingEntry); + entry.getAttributesBuilder().setMtime(now); + } + LOG.debug("createFile merged entry path:{} entry:{} from:{}", path, entry, existingEntry); + writePosition = SeaweedRead.totalSize(existingEntry.getChunksList()); + replication = existingEntry.getAttributes().getReplication(); + } + if (entry == null) { + entry = FilerProto.Entry.newBuilder() + .setName(path.getName()) + .setIsDirectory(false) + .setAttributes(FilerProto.FuseAttributes.newBuilder() + .setFileMode(permissionToMode(permission, false)) + .setReplication(replication) + .setCrtime(now) + .setMtime(now) + .setUserName(userGroupInformation.getUserName()) + .clearGroupName() + .addAllGroupName(Arrays.asList(userGroupInformation.getGroupNames())) + ); + } + + return new SeaweedOutputStream(filerGrpcClient, path, entry, writePosition, bufferSize, replication); + + } + + public InputStream openFileForRead(final Path path, FileSystem.Statistics statistics, + int bufferSize) throws IOException { + + LOG.debug("openFileForRead path:{} bufferSize:{}", path, bufferSize); + + int readAheadQueueDepth = 2; + FilerProto.Entry entry = lookupEntry(path); + + if (entry == null) { + throw new FileNotFoundException("read non-exist file " + path); + } + + return new SeaweedInputStream(filerGrpcClient, + statistics, + path.toUri().getPath(), + entry, + bufferSize, + readAheadQueueDepth); + } + + public void setOwner(Path path, String owner, String group) { + + LOG.debug("setOwner path:{} owner:{} group:{}", path, owner, group); + + FilerProto.Entry entry = lookupEntry(path); + if (entry == null) { + LOG.debug("setOwner path:{} entry:{}", path, entry); + return; + } + + FilerProto.Entry.Builder entryBuilder = entry.toBuilder(); + FilerProto.FuseAttributes.Builder attributesBuilder = entry.getAttributes().toBuilder(); + + if (owner != null) { + attributesBuilder.setUserName(owner); + } + if (group != null) { + attributesBuilder.clearGroupName(); + attributesBuilder.addGroupName(group); + } + + entryBuilder.setAttributes(attributesBuilder); + + LOG.debug("setOwner path:{} entry:{}", path, entryBuilder); + + filerClient.updateEntry(getParentDirectory(path), entryBuilder.build()); + + } + + public void setPermission(Path path, FsPermission permission) { + + LOG.debug("setPermission path:{} permission:{}", path, permission); + + FilerProto.Entry entry = lookupEntry(path); + if (entry == null) { + LOG.debug("setPermission path:{} entry:{}", path, entry); + return; + } + + FilerProto.Entry.Builder entryBuilder = entry.toBuilder(); + FilerProto.FuseAttributes.Builder attributesBuilder = entry.getAttributes().toBuilder(); + + attributesBuilder.setFileMode(permissionToMode(permission, entry.getIsDirectory())); + + entryBuilder.setAttributes(attributesBuilder); + + LOG.debug("setPermission path:{} entry:{}", path, entryBuilder); + + filerClient.updateEntry(getParentDirectory(path), entryBuilder.build()); + + } +} diff --git a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedInputStream.java b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedInputStream.java new file mode 100644 index 000000000..90c14c772 --- /dev/null +++ b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedInputStream.java @@ -0,0 +1,371 @@ +package seaweed.hdfs; + +// based on org.apache.hadoop.fs.azurebfs.services.AbfsInputStream + +import com.google.common.base.Preconditions; +import org.apache.hadoop.fs.FSExceptionMessages; +import org.apache.hadoop.fs.FSInputStream; +import org.apache.hadoop.fs.FileSystem.Statistics; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import seaweedfs.client.FilerGrpcClient; +import seaweedfs.client.FilerProto; +import seaweedfs.client.SeaweedRead; + +import java.io.EOFException; +import java.io.IOException; +import java.util.List; + +public class SeaweedInputStream extends FSInputStream { + + private static final Logger LOG = LoggerFactory.getLogger(SeaweedInputStream.class); + + private final FilerGrpcClient filerGrpcClient; + private final Statistics statistics; + private final String path; + private final FilerProto.Entry entry; + private final List visibleIntervalList; + private final long contentLength; + private final int bufferSize; // default buffer size + private final int readAheadQueueDepth; // initialized in constructor + private final boolean readAheadEnabled; // whether enable readAhead; + + private byte[] buffer = null; // will be initialized on first use + + private long fCursor = 0; // cursor of buffer within file - offset of next byte to read from remote server + private long fCursorAfterLastRead = -1; + private int bCursor = 0; // cursor of read within buffer - offset of next byte to be returned from buffer + private int limit = 0; // offset of next byte to be read into buffer from service (i.e., upper marker+1 + // of valid bytes in buffer) + private boolean closed = false; + + public SeaweedInputStream( + final FilerGrpcClient filerGrpcClient, + final Statistics statistics, + final String path, + final FilerProto.Entry entry, + final int bufferSize, + final int readAheadQueueDepth) { + this.filerGrpcClient = filerGrpcClient; + this.statistics = statistics; + this.path = path; + this.entry = entry; + this.contentLength = SeaweedRead.totalSize(entry.getChunksList()); + this.bufferSize = bufferSize; + this.readAheadQueueDepth = (readAheadQueueDepth >= 0) ? readAheadQueueDepth : Runtime.getRuntime().availableProcessors(); + this.readAheadEnabled = true; + + this.visibleIntervalList = SeaweedRead.nonOverlappingVisibleIntervals(entry.getChunksList()); + + LOG.debug("new path:{} entry:{} visibleIntervalList:{}", path, entry, visibleIntervalList); + + } + + public String getPath() { + return path; + } + + @Override + public int read() throws IOException { + byte[] b = new byte[1]; + int numberOfBytesRead = read(b, 0, 1); + if (numberOfBytesRead < 0) { + return -1; + } else { + return (b[0] & 0xFF); + } + } + + @Override + public synchronized int read(final byte[] b, final int off, final int len) throws IOException { + int currentOff = off; + int currentLen = len; + int lastReadBytes; + int totalReadBytes = 0; + do { + lastReadBytes = readOneBlock(b, currentOff, currentLen); + if (lastReadBytes > 0) { + currentOff += lastReadBytes; + currentLen -= lastReadBytes; + totalReadBytes += lastReadBytes; + } + if (currentLen <= 0 || currentLen > b.length - currentOff) { + break; + } + } while (lastReadBytes > 0); + return totalReadBytes > 0 ? totalReadBytes : lastReadBytes; + } + + private int readOneBlock(final byte[] b, final int off, final int len) throws IOException { + if (closed) { + throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED); + } + + Preconditions.checkNotNull(b); + + if (len == 0) { + return 0; + } + + if (this.available() == 0) { + return -1; + } + + if (off < 0 || len < 0 || len > b.length - off) { + throw new IndexOutOfBoundsException(); + } + + //If buffer is empty, then fill the buffer. + if (bCursor == limit) { + //If EOF, then return -1 + if (fCursor >= contentLength) { + return -1; + } + + long bytesRead = 0; + //reset buffer to initial state - i.e., throw away existing data + bCursor = 0; + limit = 0; + if (buffer == null) { + buffer = new byte[bufferSize]; + } + + // Enable readAhead when reading sequentially + if (-1 == fCursorAfterLastRead || fCursorAfterLastRead == fCursor || b.length >= bufferSize) { + bytesRead = readInternal(fCursor, buffer, 0, bufferSize, false); + } else { + bytesRead = readInternal(fCursor, buffer, 0, b.length, true); + } + + if (bytesRead == -1) { + return -1; + } + + limit += bytesRead; + fCursor += bytesRead; + fCursorAfterLastRead = fCursor; + } + + //If there is anything in the buffer, then return lesser of (requested bytes) and (bytes in buffer) + //(bytes returned may be less than requested) + int bytesRemaining = limit - bCursor; + int bytesToRead = Math.min(len, bytesRemaining); + System.arraycopy(buffer, bCursor, b, off, bytesToRead); + bCursor += bytesToRead; + if (statistics != null) { + statistics.incrementBytesRead(bytesToRead); + } + return bytesToRead; + } + + + private int readInternal(final long position, final byte[] b, final int offset, final int length, + final boolean bypassReadAhead) throws IOException { + if (readAheadEnabled && !bypassReadAhead) { + // try reading from read-ahead + if (offset != 0) { + throw new IllegalArgumentException("readahead buffers cannot have non-zero buffer offsets"); + } + int receivedBytes; + + // queue read-aheads + int numReadAheads = this.readAheadQueueDepth; + long nextSize; + long nextOffset = position; + while (numReadAheads > 0 && nextOffset < contentLength) { + nextSize = Math.min((long) bufferSize, contentLength - nextOffset); + ReadBufferManager.getBufferManager().queueReadAhead(this, nextOffset, (int) nextSize); + nextOffset = nextOffset + nextSize; + numReadAheads--; + } + + // try reading from buffers first + receivedBytes = ReadBufferManager.getBufferManager().getBlock(this, position, length, b); + if (receivedBytes > 0) { + return receivedBytes; + } + + // got nothing from read-ahead, do our own read now + receivedBytes = readRemote(position, b, offset, length); + return receivedBytes; + } else { + return readRemote(position, b, offset, length); + } + } + + int readRemote(long position, byte[] b, int offset, int length) throws IOException { + if (position < 0) { + throw new IllegalArgumentException("attempting to read from negative offset"); + } + if (position >= contentLength) { + return -1; // Hadoop prefers -1 to EOFException + } + if (b == null) { + throw new IllegalArgumentException("null byte array passed in to read() method"); + } + if (offset >= b.length) { + throw new IllegalArgumentException("offset greater than length of array"); + } + if (length < 0) { + throw new IllegalArgumentException("requested read length is less than zero"); + } + if (length > (b.length - offset)) { + throw new IllegalArgumentException("requested read length is more than will fit after requested offset in buffer"); + } + + long bytesRead = SeaweedRead.read(filerGrpcClient, visibleIntervalList, position, b, offset, length); + if (bytesRead > Integer.MAX_VALUE) { + throw new IOException("Unexpected Content-Length"); + } + return (int) bytesRead; + } + + /** + * Seek to given position in stream. + * + * @param n position to seek to + * @throws IOException if there is an error + * @throws EOFException if attempting to seek past end of file + */ + @Override + public synchronized void seek(long n) throws IOException { + if (closed) { + throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED); + } + if (n < 0) { + throw new EOFException(FSExceptionMessages.NEGATIVE_SEEK); + } + if (n > contentLength) { + throw new EOFException(FSExceptionMessages.CANNOT_SEEK_PAST_EOF); + } + + if (n >= fCursor - limit && n <= fCursor) { // within buffer + bCursor = (int) (n - (fCursor - limit)); + return; + } + + // next read will read from here + fCursor = n; + + //invalidate buffer + limit = 0; + bCursor = 0; + } + + @Override + public synchronized long skip(long n) throws IOException { + if (closed) { + throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED); + } + long currentPos = getPos(); + if (currentPos == contentLength) { + if (n > 0) { + throw new EOFException(FSExceptionMessages.CANNOT_SEEK_PAST_EOF); + } + } + long newPos = currentPos + n; + if (newPos < 0) { + newPos = 0; + n = newPos - currentPos; + } + if (newPos > contentLength) { + newPos = contentLength; + n = newPos - currentPos; + } + seek(newPos); + return n; + } + + /** + * Return the size of the remaining available bytes + * if the size is less than or equal to {@link Integer#MAX_VALUE}, + * otherwise, return {@link Integer#MAX_VALUE}. + *

+ * This is to match the behavior of DFSInputStream.available(), + * which some clients may rely on (HBase write-ahead log reading in + * particular). + */ + @Override + public synchronized int available() throws IOException { + if (closed) { + throw new IOException( + FSExceptionMessages.STREAM_IS_CLOSED); + } + final long remaining = this.contentLength - this.getPos(); + return remaining <= Integer.MAX_VALUE + ? (int) remaining : Integer.MAX_VALUE; + } + + /** + * Returns the length of the file that this stream refers to. Note that the length returned is the length + * as of the time the Stream was opened. Specifically, if there have been subsequent appends to the file, + * they wont be reflected in the returned length. + * + * @return length of the file. + * @throws IOException if the stream is closed + */ + public long length() throws IOException { + if (closed) { + throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED); + } + return contentLength; + } + + /** + * Return the current offset from the start of the file + * + * @throws IOException throws {@link IOException} if there is an error + */ + @Override + public synchronized long getPos() throws IOException { + if (closed) { + throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED); + } + return fCursor - limit + bCursor; + } + + /** + * Seeks a different copy of the data. Returns true if + * found a new source, false otherwise. + * + * @throws IOException throws {@link IOException} if there is an error + */ + @Override + public boolean seekToNewSource(long l) throws IOException { + return false; + } + + @Override + public synchronized void close() throws IOException { + closed = true; + buffer = null; // de-reference the buffer so it can be GC'ed sooner + } + + /** + * Not supported by this stream. Throws {@link UnsupportedOperationException} + * + * @param readlimit ignored + */ + @Override + public synchronized void mark(int readlimit) { + throw new UnsupportedOperationException("mark()/reset() not supported on this stream"); + } + + /** + * Not supported by this stream. Throws {@link UnsupportedOperationException} + */ + @Override + public synchronized void reset() throws IOException { + throw new UnsupportedOperationException("mark()/reset() not supported on this stream"); + } + + /** + * gets whether mark and reset are supported by {@code ADLFileInputStream}. Always returns false. + * + * @return always {@code false} + */ + @Override + public boolean markSupported() { + return false; + } +} diff --git a/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedOutputStream.java b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedOutputStream.java similarity index 100% rename from other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedOutputStream.java rename to other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedOutputStream.java