Chris Lu
4 years ago
3 changed files with 2 additions and 408 deletions
-
404other/java/client/src/main/java/seaweedfs/client/ReadAheadInputStream.java
-
3other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystem.java
-
3other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java
@ -1,404 +0,0 @@ |
|||||
package seaweedfs.client; |
|
||||
|
|
||||
/* |
|
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|
||||
* you may not use this file except in compliance with the License. |
|
||||
* You may obtain a copy of the License at |
|
||||
* |
|
||||
* http://www.apache.org/licenses/LICENSE-2.0 |
|
||||
* |
|
||||
* Unless required by applicable law or agreed to in writing, software |
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|
||||
* See the License for the specific language governing permissions and |
|
||||
* limitations under the License. |
|
||||
*/ |
|
||||
// package org.apache.spark.io; |
|
||||
|
|
||||
import com.google.common.base.Preconditions; |
|
||||
import com.google.common.base.Throwables; |
|
||||
import com.google.common.util.concurrent.ThreadFactoryBuilder; |
|
||||
import org.slf4j.Logger; |
|
||||
import org.slf4j.LoggerFactory; |
|
||||
|
|
||||
import javax.annotation.concurrent.GuardedBy; |
|
||||
import java.io.EOFException; |
|
||||
import java.io.IOException; |
|
||||
import java.io.InputStream; |
|
||||
import java.io.InterruptedIOException; |
|
||||
import java.nio.ByteBuffer; |
|
||||
import java.util.concurrent.ExecutorService; |
|
||||
import java.util.concurrent.Executors; |
|
||||
import java.util.concurrent.TimeUnit; |
|
||||
import java.util.concurrent.atomic.AtomicBoolean; |
|
||||
import java.util.concurrent.locks.Condition; |
|
||||
import java.util.concurrent.locks.ReentrantLock; |
|
||||
|
|
||||
/** |
|
||||
* {@link InputStream} implementation which asynchronously reads ahead from the underlying input |
|
||||
* stream when specified amount of data has been read from the current buffer. It does it by |
|
||||
* maintaining two buffers - active buffer and read ahead buffer. Active buffer contains data |
|
||||
* which should be returned when a read() call is issued. The read ahead buffer is used to |
|
||||
* asynchronously read from the underlying input stream and once the current active buffer is |
|
||||
* exhausted, we flip the two buffers so that we can start reading from the read ahead buffer |
|
||||
* without being blocked in disk I/O. |
|
||||
*/ |
|
||||
public class ReadAheadInputStream extends InputStream { |
|
||||
|
|
||||
private static final Logger logger = LoggerFactory.getLogger(ReadAheadInputStream.class); |
|
||||
|
|
||||
private ReentrantLock stateChangeLock = new ReentrantLock(); |
|
||||
|
|
||||
@GuardedBy("stateChangeLock") |
|
||||
private ByteBuffer activeBuffer; |
|
||||
|
|
||||
@GuardedBy("stateChangeLock") |
|
||||
private ByteBuffer readAheadBuffer; |
|
||||
|
|
||||
@GuardedBy("stateChangeLock") |
|
||||
private boolean endOfStream; |
|
||||
|
|
||||
@GuardedBy("stateChangeLock") |
|
||||
// true if async read is in progress |
|
||||
private boolean readInProgress; |
|
||||
|
|
||||
@GuardedBy("stateChangeLock") |
|
||||
// true if read is aborted due to an exception in reading from underlying input stream. |
|
||||
private boolean readAborted; |
|
||||
|
|
||||
@GuardedBy("stateChangeLock") |
|
||||
private Throwable readException; |
|
||||
|
|
||||
@GuardedBy("stateChangeLock") |
|
||||
// whether the close method is called. |
|
||||
private boolean isClosed; |
|
||||
|
|
||||
@GuardedBy("stateChangeLock") |
|
||||
// true when the close method will close the underlying input stream. This is valid only if |
|
||||
// `isClosed` is true. |
|
||||
private boolean isUnderlyingInputStreamBeingClosed; |
|
||||
|
|
||||
@GuardedBy("stateChangeLock") |
|
||||
// whether there is a read ahead task running, |
|
||||
private boolean isReading; |
|
||||
|
|
||||
// whether there is a reader waiting for data. |
|
||||
private AtomicBoolean isWaiting = new AtomicBoolean(false); |
|
||||
|
|
||||
private final InputStream underlyingInputStream; |
|
||||
|
|
||||
private final ExecutorService executorService = Executors.newSingleThreadExecutor( |
|
||||
new ThreadFactoryBuilder().setDaemon(true).setNameFormat("read-ahread").build() |
|
||||
); |
|
||||
|
|
||||
private final Condition asyncReadComplete = stateChangeLock.newCondition(); |
|
||||
|
|
||||
private static final ThreadLocal<byte[]> oneByte = ThreadLocal.withInitial(() -> new byte[1]); |
|
||||
|
|
||||
/** |
|
||||
* Creates a <code>ReadAheadInputStream</code> with the specified buffer size and read-ahead |
|
||||
* threshold |
|
||||
* |
|
||||
* @param inputStream The underlying input stream. |
|
||||
* @param bufferSizeInBytes The buffer size. |
|
||||
*/ |
|
||||
public ReadAheadInputStream( |
|
||||
InputStream inputStream, int bufferSizeInBytes) { |
|
||||
Preconditions.checkArgument(bufferSizeInBytes > 0, |
|
||||
"bufferSizeInBytes should be greater than 0, but the value is " + bufferSizeInBytes); |
|
||||
activeBuffer = ByteBuffer.allocate(bufferSizeInBytes); |
|
||||
readAheadBuffer = ByteBuffer.allocate(bufferSizeInBytes); |
|
||||
this.underlyingInputStream = inputStream; |
|
||||
activeBuffer.flip(); |
|
||||
readAheadBuffer.flip(); |
|
||||
} |
|
||||
|
|
||||
private boolean isEndOfStream() { |
|
||||
return (!activeBuffer.hasRemaining() && !readAheadBuffer.hasRemaining() && endOfStream); |
|
||||
} |
|
||||
|
|
||||
private void checkReadException() throws IOException { |
|
||||
if (readAborted) { |
|
||||
Throwables.propagateIfPossible(readException, IOException.class); |
|
||||
throw new IOException(readException); |
|
||||
} |
|
||||
} |
|
||||
|
|
||||
/** Read data from underlyingInputStream to readAheadBuffer asynchronously. */ |
|
||||
private void readAsync() throws IOException { |
|
||||
stateChangeLock.lock(); |
|
||||
final byte[] arr = readAheadBuffer.array(); |
|
||||
try { |
|
||||
if (endOfStream || readInProgress) { |
|
||||
return; |
|
||||
} |
|
||||
checkReadException(); |
|
||||
readAheadBuffer.position(0); |
|
||||
readAheadBuffer.flip(); |
|
||||
readInProgress = true; |
|
||||
} finally { |
|
||||
stateChangeLock.unlock(); |
|
||||
} |
|
||||
executorService.execute(() -> { |
|
||||
stateChangeLock.lock(); |
|
||||
try { |
|
||||
if (isClosed) { |
|
||||
readInProgress = false; |
|
||||
return; |
|
||||
} |
|
||||
// Flip this so that the close method will not close the underlying input stream when we |
|
||||
// are reading. |
|
||||
isReading = true; |
|
||||
} finally { |
|
||||
stateChangeLock.unlock(); |
|
||||
} |
|
||||
|
|
||||
// Please note that it is safe to release the lock and read into the read ahead buffer |
|
||||
// because either of following two conditions will hold - 1. The active buffer has |
|
||||
// data available to read so the reader will not read from the read ahead buffer. |
|
||||
// 2. This is the first time read is called or the active buffer is exhausted, |
|
||||
// in that case the reader waits for this async read to complete. |
|
||||
// So there is no race condition in both the situations. |
|
||||
int read = 0; |
|
||||
int off = 0, len = arr.length; |
|
||||
Throwable exception = null; |
|
||||
try { |
|
||||
// try to fill the read ahead buffer. |
|
||||
// if a reader is waiting, possibly return early. |
|
||||
do { |
|
||||
read = underlyingInputStream.read(arr, off, len); |
|
||||
if (read <= 0) break; |
|
||||
off += read; |
|
||||
len -= read; |
|
||||
} while (len > 0 && !isWaiting.get()); |
|
||||
} catch (Throwable ex) { |
|
||||
exception = ex; |
|
||||
if (ex instanceof Error) { |
|
||||
// `readException` may not be reported to the user. Rethrow Error to make sure at least |
|
||||
// The user can see Error in UncaughtExceptionHandler. |
|
||||
throw (Error) ex; |
|
||||
} |
|
||||
} finally { |
|
||||
stateChangeLock.lock(); |
|
||||
readAheadBuffer.limit(off); |
|
||||
if (read < 0 || (exception instanceof EOFException)) { |
|
||||
endOfStream = true; |
|
||||
} else if (exception != null) { |
|
||||
readAborted = true; |
|
||||
readException = exception; |
|
||||
} |
|
||||
readInProgress = false; |
|
||||
signalAsyncReadComplete(); |
|
||||
stateChangeLock.unlock(); |
|
||||
closeUnderlyingInputStreamIfNecessary(); |
|
||||
} |
|
||||
}); |
|
||||
} |
|
||||
|
|
||||
private void closeUnderlyingInputStreamIfNecessary() { |
|
||||
boolean needToCloseUnderlyingInputStream = false; |
|
||||
stateChangeLock.lock(); |
|
||||
try { |
|
||||
isReading = false; |
|
||||
if (isClosed && !isUnderlyingInputStreamBeingClosed) { |
|
||||
// close method cannot close underlyingInputStream because we were reading. |
|
||||
needToCloseUnderlyingInputStream = true; |
|
||||
} |
|
||||
} finally { |
|
||||
stateChangeLock.unlock(); |
|
||||
} |
|
||||
if (needToCloseUnderlyingInputStream) { |
|
||||
try { |
|
||||
underlyingInputStream.close(); |
|
||||
} catch (IOException e) { |
|
||||
logger.warn(e.getMessage(), e); |
|
||||
} |
|
||||
} |
|
||||
} |
|
||||
|
|
||||
private void signalAsyncReadComplete() { |
|
||||
stateChangeLock.lock(); |
|
||||
try { |
|
||||
asyncReadComplete.signalAll(); |
|
||||
} finally { |
|
||||
stateChangeLock.unlock(); |
|
||||
} |
|
||||
} |
|
||||
|
|
||||
private void waitForAsyncReadComplete() throws IOException { |
|
||||
stateChangeLock.lock(); |
|
||||
isWaiting.set(true); |
|
||||
try { |
|
||||
// There is only one reader, and one writer, so the writer should signal only once, |
|
||||
// but a while loop checking the wake up condition is still needed to avoid spurious wakeups. |
|
||||
while (readInProgress) { |
|
||||
asyncReadComplete.await(); |
|
||||
} |
|
||||
} catch (InterruptedException e) { |
|
||||
InterruptedIOException iio = new InterruptedIOException(e.getMessage()); |
|
||||
iio.initCause(e); |
|
||||
throw iio; |
|
||||
} finally { |
|
||||
isWaiting.set(false); |
|
||||
stateChangeLock.unlock(); |
|
||||
} |
|
||||
checkReadException(); |
|
||||
} |
|
||||
|
|
||||
@Override |
|
||||
public int read() throws IOException { |
|
||||
if (activeBuffer.hasRemaining()) { |
|
||||
// short path - just get one byte. |
|
||||
return activeBuffer.get() & 0xFF; |
|
||||
} else { |
|
||||
byte[] oneByteArray = oneByte.get(); |
|
||||
return read(oneByteArray, 0, 1) == -1 ? -1 : oneByteArray[0] & 0xFF; |
|
||||
} |
|
||||
} |
|
||||
|
|
||||
@Override |
|
||||
public int read(byte[] b, int offset, int len) throws IOException { |
|
||||
if (offset < 0 || len < 0 || len > b.length - offset) { |
|
||||
throw new IndexOutOfBoundsException(); |
|
||||
} |
|
||||
if (len == 0) { |
|
||||
return 0; |
|
||||
} |
|
||||
|
|
||||
if (!activeBuffer.hasRemaining()) { |
|
||||
// No remaining in active buffer - lock and switch to write ahead buffer. |
|
||||
stateChangeLock.lock(); |
|
||||
try { |
|
||||
waitForAsyncReadComplete(); |
|
||||
if (!readAheadBuffer.hasRemaining()) { |
|
||||
// The first read. |
|
||||
readAsync(); |
|
||||
waitForAsyncReadComplete(); |
|
||||
if (isEndOfStream()) { |
|
||||
return -1; |
|
||||
} |
|
||||
} |
|
||||
// Swap the newly read read ahead buffer in place of empty active buffer. |
|
||||
swapBuffers(); |
|
||||
// After swapping buffers, trigger another async read for read ahead buffer. |
|
||||
readAsync(); |
|
||||
} finally { |
|
||||
stateChangeLock.unlock(); |
|
||||
} |
|
||||
} |
|
||||
len = Math.min(len, activeBuffer.remaining()); |
|
||||
activeBuffer.get(b, offset, len); |
|
||||
|
|
||||
return len; |
|
||||
} |
|
||||
|
|
||||
/** |
|
||||
* flip the active and read ahead buffer |
|
||||
*/ |
|
||||
private void swapBuffers() { |
|
||||
ByteBuffer temp = activeBuffer; |
|
||||
activeBuffer = readAheadBuffer; |
|
||||
readAheadBuffer = temp; |
|
||||
} |
|
||||
|
|
||||
@Override |
|
||||
public int available() throws IOException { |
|
||||
stateChangeLock.lock(); |
|
||||
// Make sure we have no integer overflow. |
|
||||
try { |
|
||||
return (int) Math.min((long) Integer.MAX_VALUE, |
|
||||
(long) activeBuffer.remaining() + readAheadBuffer.remaining()); |
|
||||
} finally { |
|
||||
stateChangeLock.unlock(); |
|
||||
} |
|
||||
} |
|
||||
|
|
||||
@Override |
|
||||
public long skip(long n) throws IOException { |
|
||||
if (n <= 0L) { |
|
||||
return 0L; |
|
||||
} |
|
||||
if (n <= activeBuffer.remaining()) { |
|
||||
// Only skipping from active buffer is sufficient |
|
||||
activeBuffer.position((int) n + activeBuffer.position()); |
|
||||
return n; |
|
||||
} |
|
||||
stateChangeLock.lock(); |
|
||||
long skipped; |
|
||||
try { |
|
||||
skipped = skipInternal(n); |
|
||||
} finally { |
|
||||
stateChangeLock.unlock(); |
|
||||
} |
|
||||
return skipped; |
|
||||
} |
|
||||
|
|
||||
/** |
|
||||
* Internal skip function which should be called only from skip() api. The assumption is that |
|
||||
* the stateChangeLock is already acquired in the caller before calling this function. |
|
||||
*/ |
|
||||
private long skipInternal(long n) throws IOException { |
|
||||
assert (stateChangeLock.isLocked()); |
|
||||
waitForAsyncReadComplete(); |
|
||||
if (isEndOfStream()) { |
|
||||
return 0; |
|
||||
} |
|
||||
if (available() >= n) { |
|
||||
// we can skip from the internal buffers |
|
||||
int toSkip = (int) n; |
|
||||
// We need to skip from both active buffer and read ahead buffer |
|
||||
toSkip -= activeBuffer.remaining(); |
|
||||
assert(toSkip > 0); // skipping from activeBuffer already handled. |
|
||||
activeBuffer.position(0); |
|
||||
activeBuffer.flip(); |
|
||||
readAheadBuffer.position(toSkip + readAheadBuffer.position()); |
|
||||
swapBuffers(); |
|
||||
// Trigger async read to emptied read ahead buffer. |
|
||||
readAsync(); |
|
||||
return n; |
|
||||
} else { |
|
||||
int skippedBytes = available(); |
|
||||
long toSkip = n - skippedBytes; |
|
||||
activeBuffer.position(0); |
|
||||
activeBuffer.flip(); |
|
||||
readAheadBuffer.position(0); |
|
||||
readAheadBuffer.flip(); |
|
||||
long skippedFromInputStream = underlyingInputStream.skip(toSkip); |
|
||||
readAsync(); |
|
||||
return skippedBytes + skippedFromInputStream; |
|
||||
} |
|
||||
} |
|
||||
|
|
||||
@Override |
|
||||
public void close() throws IOException { |
|
||||
boolean isSafeToCloseUnderlyingInputStream = false; |
|
||||
stateChangeLock.lock(); |
|
||||
try { |
|
||||
if (isClosed) { |
|
||||
return; |
|
||||
} |
|
||||
isClosed = true; |
|
||||
if (!isReading) { |
|
||||
// Nobody is reading, so we can close the underlying input stream in this method. |
|
||||
isSafeToCloseUnderlyingInputStream = true; |
|
||||
// Flip this to make sure the read ahead task will not close the underlying input stream. |
|
||||
isUnderlyingInputStreamBeingClosed = true; |
|
||||
} |
|
||||
} finally { |
|
||||
stateChangeLock.unlock(); |
|
||||
} |
|
||||
|
|
||||
try { |
|
||||
executorService.shutdownNow(); |
|
||||
executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS); |
|
||||
} catch (InterruptedException e) { |
|
||||
InterruptedIOException iio = new InterruptedIOException(e.getMessage()); |
|
||||
iio.initCause(e); |
|
||||
throw iio; |
|
||||
} finally { |
|
||||
if (isSafeToCloseUnderlyingInputStream) { |
|
||||
underlyingInputStream.close(); |
|
||||
} |
|
||||
} |
|
||||
} |
|
||||
} |
|
Write
Preview
Loading…
Cancel
Save
Reference in new issue