From 53190a997261a1d2a1d98d1f64eaea2d6a82124e Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Mon, 3 Aug 2020 09:05:49 -0700 Subject: [PATCH 01/26] fix compilation --- .../src/main/java/seaweedfs/client/FileChunkManifest.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/other/java/client/src/main/java/seaweedfs/client/FileChunkManifest.java b/other/java/client/src/main/java/seaweedfs/client/FileChunkManifest.java index 1248ff13f..79e8d9bc4 100644 --- a/other/java/client/src/main/java/seaweedfs/client/FileChunkManifest.java +++ b/other/java/client/src/main/java/seaweedfs/client/FileChunkManifest.java @@ -76,8 +76,7 @@ public class FileChunkManifest { LOG.debug("doFetchFullChunkData:{}", chunkView); chunkData = SeaweedRead.doFetchFullChunkData(chunkView, locations); } - if(chunk.getIsChunkManifest()){ - // only cache manifest chunks + if (chunk.getIsChunkManifest()){ LOG.debug("chunk {} size {}", chunkView.fileId, chunkData.length); SeaweedRead.chunkCache.setChunk(chunkView.fileId, chunkData); } From b3089dcc8eaf9b1018bab68bb64e4fa3af6f4bd6 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Mon, 3 Aug 2020 09:06:09 -0700 Subject: [PATCH 02/26] add read ahead input stream --- .../client/ReadAheadInputStream.java | 404 ++++++++++++++++++ .../java/seaweed/hdfs/SeaweedFileSystem.java | 3 +- .../java/seaweed/hdfs/SeaweedFileSystem.java | 3 +- 3 files changed, 408 insertions(+), 2 deletions(-) create mode 100644 other/java/client/src/main/java/seaweedfs/client/ReadAheadInputStream.java diff --git a/other/java/client/src/main/java/seaweedfs/client/ReadAheadInputStream.java b/other/java/client/src/main/java/seaweedfs/client/ReadAheadInputStream.java new file mode 100644 index 000000000..52c7ac09c --- /dev/null +++ b/other/java/client/src/main/java/seaweedfs/client/ReadAheadInputStream.java @@ -0,0 +1,404 @@ +package seaweedfs.client; + +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +// package org.apache.spark.io; + +import com.google.common.base.Preconditions; +import com.google.common.base.Throwables; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.concurrent.GuardedBy; +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.io.InterruptedIOException; +import java.nio.ByteBuffer; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; + +/** + * {@link InputStream} implementation which asynchronously reads ahead from the underlying input + * stream when specified amount of data has been read from the current buffer. It does it by + * maintaining two buffers - active buffer and read ahead buffer. Active buffer contains data + * which should be returned when a read() call is issued. The read ahead buffer is used to + * asynchronously read from the underlying input stream and once the current active buffer is + * exhausted, we flip the two buffers so that we can start reading from the read ahead buffer + * without being blocked in disk I/O. + */ +public class ReadAheadInputStream extends InputStream { + + private static final Logger logger = LoggerFactory.getLogger(ReadAheadInputStream.class); + + private ReentrantLock stateChangeLock = new ReentrantLock(); + + @GuardedBy("stateChangeLock") + private ByteBuffer activeBuffer; + + @GuardedBy("stateChangeLock") + private ByteBuffer readAheadBuffer; + + @GuardedBy("stateChangeLock") + private boolean endOfStream; + + @GuardedBy("stateChangeLock") + // true if async read is in progress + private boolean readInProgress; + + @GuardedBy("stateChangeLock") + // true if read is aborted due to an exception in reading from underlying input stream. + private boolean readAborted; + + @GuardedBy("stateChangeLock") + private Throwable readException; + + @GuardedBy("stateChangeLock") + // whether the close method is called. + private boolean isClosed; + + @GuardedBy("stateChangeLock") + // true when the close method will close the underlying input stream. This is valid only if + // `isClosed` is true. + private boolean isUnderlyingInputStreamBeingClosed; + + @GuardedBy("stateChangeLock") + // whether there is a read ahead task running, + private boolean isReading; + + // whether there is a reader waiting for data. + private AtomicBoolean isWaiting = new AtomicBoolean(false); + + private final InputStream underlyingInputStream; + + private final ExecutorService executorService = Executors.newSingleThreadExecutor( + new ThreadFactoryBuilder().setDaemon(true).setNameFormat("read-ahread").build() + ); + + private final Condition asyncReadComplete = stateChangeLock.newCondition(); + + private static final ThreadLocal oneByte = ThreadLocal.withInitial(() -> new byte[1]); + + /** + * Creates a ReadAheadInputStream with the specified buffer size and read-ahead + * threshold + * + * @param inputStream The underlying input stream. + * @param bufferSizeInBytes The buffer size. + */ + public ReadAheadInputStream( + InputStream inputStream, int bufferSizeInBytes) { + Preconditions.checkArgument(bufferSizeInBytes > 0, + "bufferSizeInBytes should be greater than 0, but the value is " + bufferSizeInBytes); + activeBuffer = ByteBuffer.allocate(bufferSizeInBytes); + readAheadBuffer = ByteBuffer.allocate(bufferSizeInBytes); + this.underlyingInputStream = inputStream; + activeBuffer.flip(); + readAheadBuffer.flip(); + } + + private boolean isEndOfStream() { + return (!activeBuffer.hasRemaining() && !readAheadBuffer.hasRemaining() && endOfStream); + } + + private void checkReadException() throws IOException { + if (readAborted) { + Throwables.propagateIfPossible(readException, IOException.class); + throw new IOException(readException); + } + } + + /** Read data from underlyingInputStream to readAheadBuffer asynchronously. */ + private void readAsync() throws IOException { + stateChangeLock.lock(); + final byte[] arr = readAheadBuffer.array(); + try { + if (endOfStream || readInProgress) { + return; + } + checkReadException(); + readAheadBuffer.position(0); + readAheadBuffer.flip(); + readInProgress = true; + } finally { + stateChangeLock.unlock(); + } + executorService.execute(() -> { + stateChangeLock.lock(); + try { + if (isClosed) { + readInProgress = false; + return; + } + // Flip this so that the close method will not close the underlying input stream when we + // are reading. + isReading = true; + } finally { + stateChangeLock.unlock(); + } + + // Please note that it is safe to release the lock and read into the read ahead buffer + // because either of following two conditions will hold - 1. The active buffer has + // data available to read so the reader will not read from the read ahead buffer. + // 2. This is the first time read is called or the active buffer is exhausted, + // in that case the reader waits for this async read to complete. + // So there is no race condition in both the situations. + int read = 0; + int off = 0, len = arr.length; + Throwable exception = null; + try { + // try to fill the read ahead buffer. + // if a reader is waiting, possibly return early. + do { + read = underlyingInputStream.read(arr, off, len); + if (read <= 0) break; + off += read; + len -= read; + } while (len > 0 && !isWaiting.get()); + } catch (Throwable ex) { + exception = ex; + if (ex instanceof Error) { + // `readException` may not be reported to the user. Rethrow Error to make sure at least + // The user can see Error in UncaughtExceptionHandler. + throw (Error) ex; + } + } finally { + stateChangeLock.lock(); + readAheadBuffer.limit(off); + if (read < 0 || (exception instanceof EOFException)) { + endOfStream = true; + } else if (exception != null) { + readAborted = true; + readException = exception; + } + readInProgress = false; + signalAsyncReadComplete(); + stateChangeLock.unlock(); + closeUnderlyingInputStreamIfNecessary(); + } + }); + } + + private void closeUnderlyingInputStreamIfNecessary() { + boolean needToCloseUnderlyingInputStream = false; + stateChangeLock.lock(); + try { + isReading = false; + if (isClosed && !isUnderlyingInputStreamBeingClosed) { + // close method cannot close underlyingInputStream because we were reading. + needToCloseUnderlyingInputStream = true; + } + } finally { + stateChangeLock.unlock(); + } + if (needToCloseUnderlyingInputStream) { + try { + underlyingInputStream.close(); + } catch (IOException e) { + logger.warn(e.getMessage(), e); + } + } + } + + private void signalAsyncReadComplete() { + stateChangeLock.lock(); + try { + asyncReadComplete.signalAll(); + } finally { + stateChangeLock.unlock(); + } + } + + private void waitForAsyncReadComplete() throws IOException { + stateChangeLock.lock(); + isWaiting.set(true); + try { + // There is only one reader, and one writer, so the writer should signal only once, + // but a while loop checking the wake up condition is still needed to avoid spurious wakeups. + while (readInProgress) { + asyncReadComplete.await(); + } + } catch (InterruptedException e) { + InterruptedIOException iio = new InterruptedIOException(e.getMessage()); + iio.initCause(e); + throw iio; + } finally { + isWaiting.set(false); + stateChangeLock.unlock(); + } + checkReadException(); + } + + @Override + public int read() throws IOException { + if (activeBuffer.hasRemaining()) { + // short path - just get one byte. + return activeBuffer.get() & 0xFF; + } else { + byte[] oneByteArray = oneByte.get(); + return read(oneByteArray, 0, 1) == -1 ? -1 : oneByteArray[0] & 0xFF; + } + } + + @Override + public int read(byte[] b, int offset, int len) throws IOException { + if (offset < 0 || len < 0 || len > b.length - offset) { + throw new IndexOutOfBoundsException(); + } + if (len == 0) { + return 0; + } + + if (!activeBuffer.hasRemaining()) { + // No remaining in active buffer - lock and switch to write ahead buffer. + stateChangeLock.lock(); + try { + waitForAsyncReadComplete(); + if (!readAheadBuffer.hasRemaining()) { + // The first read. + readAsync(); + waitForAsyncReadComplete(); + if (isEndOfStream()) { + return -1; + } + } + // Swap the newly read read ahead buffer in place of empty active buffer. + swapBuffers(); + // After swapping buffers, trigger another async read for read ahead buffer. + readAsync(); + } finally { + stateChangeLock.unlock(); + } + } + len = Math.min(len, activeBuffer.remaining()); + activeBuffer.get(b, offset, len); + + return len; + } + + /** + * flip the active and read ahead buffer + */ + private void swapBuffers() { + ByteBuffer temp = activeBuffer; + activeBuffer = readAheadBuffer; + readAheadBuffer = temp; + } + + @Override + public int available() throws IOException { + stateChangeLock.lock(); + // Make sure we have no integer overflow. + try { + return (int) Math.min((long) Integer.MAX_VALUE, + (long) activeBuffer.remaining() + readAheadBuffer.remaining()); + } finally { + stateChangeLock.unlock(); + } + } + + @Override + public long skip(long n) throws IOException { + if (n <= 0L) { + return 0L; + } + if (n <= activeBuffer.remaining()) { + // Only skipping from active buffer is sufficient + activeBuffer.position((int) n + activeBuffer.position()); + return n; + } + stateChangeLock.lock(); + long skipped; + try { + skipped = skipInternal(n); + } finally { + stateChangeLock.unlock(); + } + return skipped; + } + + /** + * Internal skip function which should be called only from skip() api. The assumption is that + * the stateChangeLock is already acquired in the caller before calling this function. + */ + private long skipInternal(long n) throws IOException { + assert (stateChangeLock.isLocked()); + waitForAsyncReadComplete(); + if (isEndOfStream()) { + return 0; + } + if (available() >= n) { + // we can skip from the internal buffers + int toSkip = (int) n; + // We need to skip from both active buffer and read ahead buffer + toSkip -= activeBuffer.remaining(); + assert(toSkip > 0); // skipping from activeBuffer already handled. + activeBuffer.position(0); + activeBuffer.flip(); + readAheadBuffer.position(toSkip + readAheadBuffer.position()); + swapBuffers(); + // Trigger async read to emptied read ahead buffer. + readAsync(); + return n; + } else { + int skippedBytes = available(); + long toSkip = n - skippedBytes; + activeBuffer.position(0); + activeBuffer.flip(); + readAheadBuffer.position(0); + readAheadBuffer.flip(); + long skippedFromInputStream = underlyingInputStream.skip(toSkip); + readAsync(); + return skippedBytes + skippedFromInputStream; + } + } + + @Override + public void close() throws IOException { + boolean isSafeToCloseUnderlyingInputStream = false; + stateChangeLock.lock(); + try { + if (isClosed) { + return; + } + isClosed = true; + if (!isReading) { + // Nobody is reading, so we can close the underlying input stream in this method. + isSafeToCloseUnderlyingInputStream = true; + // Flip this to make sure the read ahead task will not close the underlying input stream. + isUnderlyingInputStreamBeingClosed = true; + } + } finally { + stateChangeLock.unlock(); + } + + try { + executorService.shutdownNow(); + executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS); + } catch (InterruptedException e) { + InterruptedIOException iio = new InterruptedIOException(e.getMessage()); + iio.initCause(e); + throw iio; + } finally { + if (isSafeToCloseUnderlyingInputStream) { + underlyingInputStream.close(); + } + } + } +} diff --git a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystem.java b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystem.java index fd8877806..836bb4db5 100644 --- a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystem.java +++ b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystem.java @@ -11,6 +11,7 @@ import org.apache.hadoop.util.Progressable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import seaweedfs.client.FilerProto; +import seaweedfs.client.ReadAheadInputStream; import java.io.FileNotFoundException; import java.io.IOException; @@ -76,7 +77,7 @@ public class SeaweedFileSystem extends FileSystem { try { FSInputStream inputStream = seaweedFileSystemStore.openFileForRead(path, statistics, bufferSize); - return new FSDataInputStream(new BufferedFSInputStream(inputStream, 16 * 1024 * 1024)); + return new FSDataInputStream(new ReadAheadInputStream(inputStream, 16 * 1024 * 1024)); } catch (Exception ex) { LOG.warn("open path: {} bufferSize:{}", path, bufferSize, ex); return null; diff --git a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java index fd8877806..836bb4db5 100644 --- a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java +++ b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java @@ -11,6 +11,7 @@ import org.apache.hadoop.util.Progressable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import seaweedfs.client.FilerProto; +import seaweedfs.client.ReadAheadInputStream; import java.io.FileNotFoundException; import java.io.IOException; @@ -76,7 +77,7 @@ public class SeaweedFileSystem extends FileSystem { try { FSInputStream inputStream = seaweedFileSystemStore.openFileForRead(path, statistics, bufferSize); - return new FSDataInputStream(new BufferedFSInputStream(inputStream, 16 * 1024 * 1024)); + return new FSDataInputStream(new ReadAheadInputStream(inputStream, 16 * 1024 * 1024)); } catch (Exception ex) { LOG.warn("open path: {} bufferSize:{}", path, bufferSize, ex); return null; From 15dc0a704db7aa542471b56f10ceb749dc041b12 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Mon, 3 Aug 2020 09:11:24 -0700 Subject: [PATCH 03/26] Revert "add read ahead input stream" This reverts commit b3089dcc8eaf9b1018bab68bb64e4fa3af6f4bd6. --- .../client/ReadAheadInputStream.java | 404 ------------------ .../java/seaweed/hdfs/SeaweedFileSystem.java | 3 +- .../java/seaweed/hdfs/SeaweedFileSystem.java | 3 +- 3 files changed, 2 insertions(+), 408 deletions(-) delete mode 100644 other/java/client/src/main/java/seaweedfs/client/ReadAheadInputStream.java diff --git a/other/java/client/src/main/java/seaweedfs/client/ReadAheadInputStream.java b/other/java/client/src/main/java/seaweedfs/client/ReadAheadInputStream.java deleted file mode 100644 index 52c7ac09c..000000000 --- a/other/java/client/src/main/java/seaweedfs/client/ReadAheadInputStream.java +++ /dev/null @@ -1,404 +0,0 @@ -package seaweedfs.client; - -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -// package org.apache.spark.io; - -import com.google.common.base.Preconditions; -import com.google.common.base.Throwables; -import com.google.common.util.concurrent.ThreadFactoryBuilder; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.annotation.concurrent.GuardedBy; -import java.io.EOFException; -import java.io.IOException; -import java.io.InputStream; -import java.io.InterruptedIOException; -import java.nio.ByteBuffer; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.ReentrantLock; - -/** - * {@link InputStream} implementation which asynchronously reads ahead from the underlying input - * stream when specified amount of data has been read from the current buffer. It does it by - * maintaining two buffers - active buffer and read ahead buffer. Active buffer contains data - * which should be returned when a read() call is issued. The read ahead buffer is used to - * asynchronously read from the underlying input stream and once the current active buffer is - * exhausted, we flip the two buffers so that we can start reading from the read ahead buffer - * without being blocked in disk I/O. - */ -public class ReadAheadInputStream extends InputStream { - - private static final Logger logger = LoggerFactory.getLogger(ReadAheadInputStream.class); - - private ReentrantLock stateChangeLock = new ReentrantLock(); - - @GuardedBy("stateChangeLock") - private ByteBuffer activeBuffer; - - @GuardedBy("stateChangeLock") - private ByteBuffer readAheadBuffer; - - @GuardedBy("stateChangeLock") - private boolean endOfStream; - - @GuardedBy("stateChangeLock") - // true if async read is in progress - private boolean readInProgress; - - @GuardedBy("stateChangeLock") - // true if read is aborted due to an exception in reading from underlying input stream. - private boolean readAborted; - - @GuardedBy("stateChangeLock") - private Throwable readException; - - @GuardedBy("stateChangeLock") - // whether the close method is called. - private boolean isClosed; - - @GuardedBy("stateChangeLock") - // true when the close method will close the underlying input stream. This is valid only if - // `isClosed` is true. - private boolean isUnderlyingInputStreamBeingClosed; - - @GuardedBy("stateChangeLock") - // whether there is a read ahead task running, - private boolean isReading; - - // whether there is a reader waiting for data. - private AtomicBoolean isWaiting = new AtomicBoolean(false); - - private final InputStream underlyingInputStream; - - private final ExecutorService executorService = Executors.newSingleThreadExecutor( - new ThreadFactoryBuilder().setDaemon(true).setNameFormat("read-ahread").build() - ); - - private final Condition asyncReadComplete = stateChangeLock.newCondition(); - - private static final ThreadLocal oneByte = ThreadLocal.withInitial(() -> new byte[1]); - - /** - * Creates a ReadAheadInputStream with the specified buffer size and read-ahead - * threshold - * - * @param inputStream The underlying input stream. - * @param bufferSizeInBytes The buffer size. - */ - public ReadAheadInputStream( - InputStream inputStream, int bufferSizeInBytes) { - Preconditions.checkArgument(bufferSizeInBytes > 0, - "bufferSizeInBytes should be greater than 0, but the value is " + bufferSizeInBytes); - activeBuffer = ByteBuffer.allocate(bufferSizeInBytes); - readAheadBuffer = ByteBuffer.allocate(bufferSizeInBytes); - this.underlyingInputStream = inputStream; - activeBuffer.flip(); - readAheadBuffer.flip(); - } - - private boolean isEndOfStream() { - return (!activeBuffer.hasRemaining() && !readAheadBuffer.hasRemaining() && endOfStream); - } - - private void checkReadException() throws IOException { - if (readAborted) { - Throwables.propagateIfPossible(readException, IOException.class); - throw new IOException(readException); - } - } - - /** Read data from underlyingInputStream to readAheadBuffer asynchronously. */ - private void readAsync() throws IOException { - stateChangeLock.lock(); - final byte[] arr = readAheadBuffer.array(); - try { - if (endOfStream || readInProgress) { - return; - } - checkReadException(); - readAheadBuffer.position(0); - readAheadBuffer.flip(); - readInProgress = true; - } finally { - stateChangeLock.unlock(); - } - executorService.execute(() -> { - stateChangeLock.lock(); - try { - if (isClosed) { - readInProgress = false; - return; - } - // Flip this so that the close method will not close the underlying input stream when we - // are reading. - isReading = true; - } finally { - stateChangeLock.unlock(); - } - - // Please note that it is safe to release the lock and read into the read ahead buffer - // because either of following two conditions will hold - 1. The active buffer has - // data available to read so the reader will not read from the read ahead buffer. - // 2. This is the first time read is called or the active buffer is exhausted, - // in that case the reader waits for this async read to complete. - // So there is no race condition in both the situations. - int read = 0; - int off = 0, len = arr.length; - Throwable exception = null; - try { - // try to fill the read ahead buffer. - // if a reader is waiting, possibly return early. - do { - read = underlyingInputStream.read(arr, off, len); - if (read <= 0) break; - off += read; - len -= read; - } while (len > 0 && !isWaiting.get()); - } catch (Throwable ex) { - exception = ex; - if (ex instanceof Error) { - // `readException` may not be reported to the user. Rethrow Error to make sure at least - // The user can see Error in UncaughtExceptionHandler. - throw (Error) ex; - } - } finally { - stateChangeLock.lock(); - readAheadBuffer.limit(off); - if (read < 0 || (exception instanceof EOFException)) { - endOfStream = true; - } else if (exception != null) { - readAborted = true; - readException = exception; - } - readInProgress = false; - signalAsyncReadComplete(); - stateChangeLock.unlock(); - closeUnderlyingInputStreamIfNecessary(); - } - }); - } - - private void closeUnderlyingInputStreamIfNecessary() { - boolean needToCloseUnderlyingInputStream = false; - stateChangeLock.lock(); - try { - isReading = false; - if (isClosed && !isUnderlyingInputStreamBeingClosed) { - // close method cannot close underlyingInputStream because we were reading. - needToCloseUnderlyingInputStream = true; - } - } finally { - stateChangeLock.unlock(); - } - if (needToCloseUnderlyingInputStream) { - try { - underlyingInputStream.close(); - } catch (IOException e) { - logger.warn(e.getMessage(), e); - } - } - } - - private void signalAsyncReadComplete() { - stateChangeLock.lock(); - try { - asyncReadComplete.signalAll(); - } finally { - stateChangeLock.unlock(); - } - } - - private void waitForAsyncReadComplete() throws IOException { - stateChangeLock.lock(); - isWaiting.set(true); - try { - // There is only one reader, and one writer, so the writer should signal only once, - // but a while loop checking the wake up condition is still needed to avoid spurious wakeups. - while (readInProgress) { - asyncReadComplete.await(); - } - } catch (InterruptedException e) { - InterruptedIOException iio = new InterruptedIOException(e.getMessage()); - iio.initCause(e); - throw iio; - } finally { - isWaiting.set(false); - stateChangeLock.unlock(); - } - checkReadException(); - } - - @Override - public int read() throws IOException { - if (activeBuffer.hasRemaining()) { - // short path - just get one byte. - return activeBuffer.get() & 0xFF; - } else { - byte[] oneByteArray = oneByte.get(); - return read(oneByteArray, 0, 1) == -1 ? -1 : oneByteArray[0] & 0xFF; - } - } - - @Override - public int read(byte[] b, int offset, int len) throws IOException { - if (offset < 0 || len < 0 || len > b.length - offset) { - throw new IndexOutOfBoundsException(); - } - if (len == 0) { - return 0; - } - - if (!activeBuffer.hasRemaining()) { - // No remaining in active buffer - lock and switch to write ahead buffer. - stateChangeLock.lock(); - try { - waitForAsyncReadComplete(); - if (!readAheadBuffer.hasRemaining()) { - // The first read. - readAsync(); - waitForAsyncReadComplete(); - if (isEndOfStream()) { - return -1; - } - } - // Swap the newly read read ahead buffer in place of empty active buffer. - swapBuffers(); - // After swapping buffers, trigger another async read for read ahead buffer. - readAsync(); - } finally { - stateChangeLock.unlock(); - } - } - len = Math.min(len, activeBuffer.remaining()); - activeBuffer.get(b, offset, len); - - return len; - } - - /** - * flip the active and read ahead buffer - */ - private void swapBuffers() { - ByteBuffer temp = activeBuffer; - activeBuffer = readAheadBuffer; - readAheadBuffer = temp; - } - - @Override - public int available() throws IOException { - stateChangeLock.lock(); - // Make sure we have no integer overflow. - try { - return (int) Math.min((long) Integer.MAX_VALUE, - (long) activeBuffer.remaining() + readAheadBuffer.remaining()); - } finally { - stateChangeLock.unlock(); - } - } - - @Override - public long skip(long n) throws IOException { - if (n <= 0L) { - return 0L; - } - if (n <= activeBuffer.remaining()) { - // Only skipping from active buffer is sufficient - activeBuffer.position((int) n + activeBuffer.position()); - return n; - } - stateChangeLock.lock(); - long skipped; - try { - skipped = skipInternal(n); - } finally { - stateChangeLock.unlock(); - } - return skipped; - } - - /** - * Internal skip function which should be called only from skip() api. The assumption is that - * the stateChangeLock is already acquired in the caller before calling this function. - */ - private long skipInternal(long n) throws IOException { - assert (stateChangeLock.isLocked()); - waitForAsyncReadComplete(); - if (isEndOfStream()) { - return 0; - } - if (available() >= n) { - // we can skip from the internal buffers - int toSkip = (int) n; - // We need to skip from both active buffer and read ahead buffer - toSkip -= activeBuffer.remaining(); - assert(toSkip > 0); // skipping from activeBuffer already handled. - activeBuffer.position(0); - activeBuffer.flip(); - readAheadBuffer.position(toSkip + readAheadBuffer.position()); - swapBuffers(); - // Trigger async read to emptied read ahead buffer. - readAsync(); - return n; - } else { - int skippedBytes = available(); - long toSkip = n - skippedBytes; - activeBuffer.position(0); - activeBuffer.flip(); - readAheadBuffer.position(0); - readAheadBuffer.flip(); - long skippedFromInputStream = underlyingInputStream.skip(toSkip); - readAsync(); - return skippedBytes + skippedFromInputStream; - } - } - - @Override - public void close() throws IOException { - boolean isSafeToCloseUnderlyingInputStream = false; - stateChangeLock.lock(); - try { - if (isClosed) { - return; - } - isClosed = true; - if (!isReading) { - // Nobody is reading, so we can close the underlying input stream in this method. - isSafeToCloseUnderlyingInputStream = true; - // Flip this to make sure the read ahead task will not close the underlying input stream. - isUnderlyingInputStreamBeingClosed = true; - } - } finally { - stateChangeLock.unlock(); - } - - try { - executorService.shutdownNow(); - executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS); - } catch (InterruptedException e) { - InterruptedIOException iio = new InterruptedIOException(e.getMessage()); - iio.initCause(e); - throw iio; - } finally { - if (isSafeToCloseUnderlyingInputStream) { - underlyingInputStream.close(); - } - } - } -} diff --git a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystem.java b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystem.java index 836bb4db5..fd8877806 100644 --- a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystem.java +++ b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystem.java @@ -11,7 +11,6 @@ import org.apache.hadoop.util.Progressable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import seaweedfs.client.FilerProto; -import seaweedfs.client.ReadAheadInputStream; import java.io.FileNotFoundException; import java.io.IOException; @@ -77,7 +76,7 @@ public class SeaweedFileSystem extends FileSystem { try { FSInputStream inputStream = seaweedFileSystemStore.openFileForRead(path, statistics, bufferSize); - return new FSDataInputStream(new ReadAheadInputStream(inputStream, 16 * 1024 * 1024)); + return new FSDataInputStream(new BufferedFSInputStream(inputStream, 16 * 1024 * 1024)); } catch (Exception ex) { LOG.warn("open path: {} bufferSize:{}", path, bufferSize, ex); return null; diff --git a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java index 836bb4db5..fd8877806 100644 --- a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java +++ b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java @@ -11,7 +11,6 @@ import org.apache.hadoop.util.Progressable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import seaweedfs.client.FilerProto; -import seaweedfs.client.ReadAheadInputStream; import java.io.FileNotFoundException; import java.io.IOException; @@ -77,7 +76,7 @@ public class SeaweedFileSystem extends FileSystem { try { FSInputStream inputStream = seaweedFileSystemStore.openFileForRead(path, statistics, bufferSize); - return new FSDataInputStream(new ReadAheadInputStream(inputStream, 16 * 1024 * 1024)); + return new FSDataInputStream(new BufferedFSInputStream(inputStream, 16 * 1024 * 1024)); } catch (Exception ex) { LOG.warn("open path: {} bufferSize:{}", path, bufferSize, ex); return null; From cbd80253e33688f55c02dd29c994a3ee6eac3d6c Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Tue, 4 Aug 2020 22:09:07 -0700 Subject: [PATCH 04/26] better needle id format --- weed/storage/needle/needle_read_write.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/weed/storage/needle/needle_read_write.go b/weed/storage/needle/needle_read_write.go index 9702cf939..575a72e40 100644 --- a/weed/storage/needle/needle_read_write.go +++ b/weed/storage/needle/needle_read_write.go @@ -168,7 +168,7 @@ func ReadNeedleBlob(r backend.BackendStorageFile, offset int64, size uint32, ver func (n *Needle) ReadBytes(bytes []byte, offset int64, size uint32, version Version) (err error) { n.ParseNeedleHeader(bytes) if n.Size != size { - return fmt.Errorf("entry not found: offset %d found id %d size %d, expected size %d", offset, n.Id, n.Size, size) + return fmt.Errorf("entry not found: offset %d found id %x size %d, expected size %d", offset, n.Id, n.Size, size) } switch version { case Version1: From 2b74abf7661faaeca944cc1ec3a5bf4c85cc58b7 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Thu, 6 Aug 2020 03:41:34 -0700 Subject: [PATCH 05/26] S3: configurable access for anonymous user fix https://github.com/chrislusf/seaweedfs/issues/1413 --- weed/s3api/auth_credentials.go | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/weed/s3api/auth_credentials.go b/weed/s3api/auth_credentials.go index db5f4c8a3..851f6d4a3 100644 --- a/weed/s3api/auth_credentials.go +++ b/weed/s3api/auth_credentials.go @@ -107,6 +107,16 @@ func (iam *IdentityAccessManagement) lookupByAccessKey(accessKey string) (identi return nil, nil, false } +func (iam *IdentityAccessManagement) lookupAnonymous() (identity *Identity, found bool) { + + for _, ident := range iam.identities { + if ident.Name == "anonymous" { + return ident, true + } + } + return nil, false +} + func (iam *IdentityAccessManagement) Auth(f http.HandlerFunc, action Action) http.HandlerFunc { if !iam.isEnabled() { @@ -127,6 +137,7 @@ func (iam *IdentityAccessManagement) Auth(f http.HandlerFunc, action Action) htt func (iam *IdentityAccessManagement) authRequest(r *http.Request, action Action) ErrorCode { var identity *Identity var s3Err ErrorCode + var found bool switch getRequestAuthType(r) { case authTypeStreamingSigned: return ErrNone @@ -146,7 +157,10 @@ func (iam *IdentityAccessManagement) authRequest(r *http.Request, action Action) glog.V(3).Infof("jwt auth type") return ErrNotImplemented case authTypeAnonymous: - return ErrAccessDenied + identity, found = iam.lookupAnonymous() + if !found { + return ErrAccessDenied + } default: return ErrNotImplemented } From 41007ced77742d31b6ac4df3221578716a6bf882 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Thu, 6 Aug 2020 04:26:29 -0700 Subject: [PATCH 06/26] remove logging --- weed/filer2/stream.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/weed/filer2/stream.go b/weed/filer2/stream.go index c7df007ec..e9707d3ae 100644 --- a/weed/filer2/stream.go +++ b/weed/filer2/stream.go @@ -2,7 +2,6 @@ package filer2 import ( "bytes" - "fmt" "io" "math" "strings" @@ -15,7 +14,7 @@ import ( func StreamContent(masterClient *wdclient.MasterClient, w io.Writer, chunks []*filer_pb.FileChunk, offset int64, size int64) error { - fmt.Printf("start to stream content for chunks: %+v\n", chunks) + // fmt.Printf("start to stream content for chunks: %+v\n", chunks) chunkViews := ViewFromChunks(masterClient.LookupFileId, chunks, offset, size) fileId2Url := make(map[string]string) From 4703a3daad1cf72a740b1883af72d39dc12fb735 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Thu, 6 Aug 2020 04:32:05 -0700 Subject: [PATCH 07/26] add an example --- .../s3/presigned_put/presigned_put.go | 73 +++++++++++++++++++ 1 file changed, 73 insertions(+) create mode 100644 unmaintained/s3/presigned_put/presigned_put.go diff --git a/unmaintained/s3/presigned_put/presigned_put.go b/unmaintained/s3/presigned_put/presigned_put.go new file mode 100644 index 000000000..e8368d124 --- /dev/null +++ b/unmaintained/s3/presigned_put/presigned_put.go @@ -0,0 +1,73 @@ +package main + +import ( + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/session" + "github.com/aws/aws-sdk-go/service/s3" + "encoding/base64" + "fmt" + "crypto/md5" + "strings" + "time" + "net/http" +) + +// Downloads an item from an S3 Bucket in the region configured in the shared config +// or AWS_REGION environment variable. +// +// Usage: +// go run presigned_put.go +// For this exampl to work, the domainName is needd +// weed s3 -domainName=localhost +func main() { + h := md5.New() + content := strings.NewReader(stringContent) + content.WriteTo(h) + + // Initialize a session in us-west-2 that the SDK will use to load + // credentials from the shared credentials file ~/.aws/credentials. + sess, err := session.NewSession(&aws.Config{ + Region: aws.String("us-east-1"), + Endpoint: aws.String("http://localhost:8333"), + }) + + // Create S3 service client + svc := s3.New(sess) + + putRequest, output := svc.PutObjectRequest(&s3.PutObjectInput{ + Bucket: aws.String("dev"), + Key: aws.String("testKey"), + }) + fmt.Printf("output: %+v\n", output) + + md5s := base64.StdEncoding.EncodeToString(h.Sum(nil)) + putRequest.HTTPRequest.Header.Set("Content-MD5", md5s) + + url, err := putRequest.Presign(15 * time.Minute) + if err != nil { + fmt.Println("error presigning request", err) + return + } + + fmt.Println(url) + + req, err := http.NewRequest("PUT", url, strings.NewReader(stringContent)) + req.Header.Set("Content-MD5", md5s) + if err != nil { + fmt.Println("error creating request", url) + return + } + + resp, err := http.DefaultClient.Do(req) + if err != nil { + fmt.Printf("error put request: %v\n", err) + return + } + fmt.Printf("response: %+v\n", resp) +} + +var stringContent = `Generate a Pre-Signed URL for an Amazon S3 PUT Operation with a Specific Payload +You can generate a pre-signed URL for a PUT operation that checks whether users upload the correct content. When the SDK pre-signs a request, it computes the checksum of the request body and generates an MD5 checksum that is included in the pre-signed URL. Users must upload the same content that produces the same MD5 checksum generated by the SDK; otherwise, the operation fails. This is not the Content-MD5, but the signature. To enforce Content-MD5, simply add the header to the request. + +The following example adds a Body field to generate a pre-signed PUT operation that requires a specific payload to be uploaded by users. +` \ No newline at end of file From 4ecfa9879d8f7c70b4515ac31440a111ae274dc3 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Thu, 6 Aug 2020 05:22:53 -0700 Subject: [PATCH 08/26] volume: report Content-MD5 in response header --- weed/operation/needle_parse_test.go | 2 +- weed/server/volume_server_handlers_write.go | 3 ++- weed/storage/needle/needle.go | 2 +- weed/storage/needle/needle_parse_upload.go | 11 +++++++---- 4 files changed, 11 insertions(+), 7 deletions(-) diff --git a/weed/operation/needle_parse_test.go b/weed/operation/needle_parse_test.go index 74d58d1b5..177c620f4 100644 --- a/weed/operation/needle_parse_test.go +++ b/weed/operation/needle_parse_test.go @@ -18,7 +18,7 @@ type MockClient struct { } func (m *MockClient) Do(req *http.Request) (*http.Response, error) { - n, originalSize, err := needle.CreateNeedleFromRequest(req, false, 1024*1024) + n, originalSize, _, err := needle.CreateNeedleFromRequest(req, false, 1024*1024) if m.needleHandling != nil { m.needleHandling(n, originalSize, err) } diff --git a/weed/server/volume_server_handlers_write.go b/weed/server/volume_server_handlers_write.go index 74dad28de..b4f8a90b2 100644 --- a/weed/server/volume_server_handlers_write.go +++ b/weed/server/volume_server_handlers_write.go @@ -42,7 +42,7 @@ func (vs *VolumeServer) PostHandler(w http.ResponseWriter, r *http.Request) { return } - reqNeedle, originalSize, ne := needle.CreateNeedleFromRequest(r, vs.FixJpgOrientation, vs.fileSizeLimitBytes) + reqNeedle, originalSize, contentMd5, ne := needle.CreateNeedleFromRequest(r, vs.FixJpgOrientation, vs.fileSizeLimitBytes) if ne != nil { writeJsonError(w, r, http.StatusBadRequest, ne) return @@ -70,6 +70,7 @@ func (vs *VolumeServer) PostHandler(w http.ResponseWriter, r *http.Request) { ret.ETag = reqNeedle.Etag() ret.Mime = string(reqNeedle.Mime) setEtag(w, ret.ETag) + w.Header().Set("Content-MD5", contentMd5) writeJsonQuiet(w, r, httpStatus, ret) } diff --git a/weed/storage/needle/needle.go b/weed/storage/needle/needle.go index 150d6ee4b..eb1d9537b 100644 --- a/weed/storage/needle/needle.go +++ b/weed/storage/needle/needle.go @@ -48,7 +48,7 @@ func (n *Needle) String() (str string) { return } -func CreateNeedleFromRequest(r *http.Request, fixJpgOrientation bool, sizeLimit int64) (n *Needle, originalSize int, e error) { +func CreateNeedleFromRequest(r *http.Request, fixJpgOrientation bool, sizeLimit int64) (n *Needle, originalSize int, md5 string, e error) { n = new(Needle) pu, e := ParseUpload(r, sizeLimit) if e != nil { diff --git a/weed/storage/needle/needle_parse_upload.go b/weed/storage/needle/needle_parse_upload.go index dd678f87f..d97ca3101 100644 --- a/weed/storage/needle/needle_parse_upload.go +++ b/weed/storage/needle/needle_parse_upload.go @@ -29,6 +29,7 @@ type ParsedUpload struct { Ttl *TTL IsChunkedFile bool UncompressedData []byte + ContentMd5 string } func ParseUpload(r *http.Request, sizeLimit int64) (pu *ParsedUpload, e error) { @@ -83,11 +84,13 @@ func ParseUpload(r *http.Request, sizeLimit int64) (pu *ParsedUpload, e error) { } } + // md5 + h := md5.New() + h.Write(pu.UncompressedData) + pu.ContentMd5 = base64.StdEncoding.EncodeToString(h.Sum(nil)) if expectedChecksum := r.Header.Get("Content-MD5"); expectedChecksum != "" { - h := md5.New() - h.Write(pu.UncompressedData) - if receivedChecksum := base64.StdEncoding.EncodeToString(h.Sum(nil)); expectedChecksum != receivedChecksum { - e = fmt.Errorf("Content-MD5 did not match md5 of file data [%s] != [%s]", expectedChecksum, receivedChecksum) + if expectedChecksum != pu.ContentMd5 { + e = fmt.Errorf("Content-MD5 did not match md5 of file data [%s] != [%s]", expectedChecksum, pu.ContentMd5) return } } From 93ea0801ea65375c16f148c9e77056e6c145f770 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Thu, 6 Aug 2020 09:48:54 -0700 Subject: [PATCH 09/26] volume: the variable for the master node may be stale? related to https://github.com/chrislusf/seaweedfs/issues/1414 --- weed/server/master_grpc_server.go | 19 +++++++++---------- weed/server/volume_grpc_client_to_master.go | 20 +++----------------- 2 files changed, 12 insertions(+), 27 deletions(-) diff --git a/weed/server/master_grpc_server.go b/weed/server/master_grpc_server.go index 1ee214deb..d310a27d4 100644 --- a/weed/server/master_grpc_server.go +++ b/weed/server/master_grpc_server.go @@ -19,14 +19,13 @@ import ( func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServer) error { var dn *topology.DataNode - t := ms.Topo defer func() { if dn != nil { // if the volume server disconnects and reconnects quickly // the unregister and register can race with each other - t.UnRegisterDataNode(dn) + ms.Topo.UnRegisterDataNode(dn) glog.V(0).Infof("unregister disconnected volume server %s:%d", dn.Ip, dn.Port) message := &master_pb.VolumeLocation{ @@ -62,11 +61,11 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ return err } - t.Sequence.SetMax(heartbeat.MaxFileKey) + ms.Topo.Sequence.SetMax(heartbeat.MaxFileKey) if dn == nil { - dcName, rackName := t.Configuration.Locate(heartbeat.Ip, heartbeat.DataCenter, heartbeat.Rack) - dc := t.GetOrCreateDataCenter(dcName) + dcName, rackName := ms.Topo.Configuration.Locate(heartbeat.Ip, heartbeat.DataCenter, heartbeat.Rack) + dc := ms.Topo.GetOrCreateDataCenter(dcName) rack := dc.GetOrCreateRack(rackName) dn = rack.GetOrCreateDataNode(heartbeat.Ip, int(heartbeat.Port), heartbeat.PublicUrl, @@ -102,12 +101,12 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ message.DeletedVids = append(message.DeletedVids, volInfo.Id) } // update master internal volume layouts - t.IncrementalSyncDataNodeRegistration(heartbeat.NewVolumes, heartbeat.DeletedVolumes, dn) + ms.Topo.IncrementalSyncDataNodeRegistration(heartbeat.NewVolumes, heartbeat.DeletedVolumes, dn) } if len(heartbeat.Volumes) > 0 || heartbeat.HasNoVolumes { // process heartbeat.Volumes - newVolumes, deletedVolumes := t.SyncDataNodeRegistration(heartbeat.Volumes, dn) + newVolumes, deletedVolumes := ms.Topo.SyncDataNodeRegistration(heartbeat.Volumes, dn) for _, v := range newVolumes { glog.V(0).Infof("master see new volume %d from %s", uint32(v.Id), dn.Url()) @@ -122,7 +121,7 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ if len(heartbeat.NewEcShards) > 0 || len(heartbeat.DeletedEcShards) > 0 { // update master internal volume layouts - t.IncrementalSyncDataNodeEcShards(heartbeat.NewEcShards, heartbeat.DeletedEcShards, dn) + ms.Topo.IncrementalSyncDataNodeEcShards(heartbeat.NewEcShards, heartbeat.DeletedEcShards, dn) for _, s := range heartbeat.NewEcShards { message.NewVids = append(message.NewVids, s.Id) @@ -138,7 +137,7 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ if len(heartbeat.EcShards) > 0 || heartbeat.HasNoEcShards { glog.V(1).Infof("master recieved ec shards from %s: %+v", dn.Url(), heartbeat.EcShards) - newShards, deletedShards := t.SyncDataNodeEcShards(heartbeat.EcShards, dn) + newShards, deletedShards := ms.Topo.SyncDataNodeEcShards(heartbeat.EcShards, dn) // broadcast the ec vid changes to master clients for _, s := range newShards { @@ -163,7 +162,7 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ } // tell the volume servers about the leader - newLeader, err := t.Leader() + newLeader, err := ms.Topo.Leader() if err != nil { glog.Warningf("SendHeartbeat find leader: %v", err) return err diff --git a/weed/server/volume_grpc_client_to_master.go b/weed/server/volume_grpc_client_to_master.go index 7cb836344..694a9fb31 100644 --- a/weed/server/volume_grpc_client_to_master.go +++ b/weed/server/volume_grpc_client_to_master.go @@ -2,7 +2,6 @@ package weed_server import ( "fmt" - "net" "time" "google.golang.org/grpc" @@ -87,12 +86,12 @@ func (vs *VolumeServer) doHeartbeat(masterNode, masterGrpcAddress string, grpcDi vs.store.SetVolumeSizeLimit(in.GetVolumeSizeLimit()) if vs.store.MaybeAdjustVolumeMax() { if err = stream.Send(vs.store.CollectHeartbeat()); err != nil { - glog.V(0).Infof("Volume Server Failed to talk with master %s: %v", masterNode, err) + glog.V(0).Infof("Volume Server Failed to talk with master %s: %v", vs.currentMaster, err) } } } - if in.GetLeader() != "" && masterNode != in.GetLeader() && !isSameIP(in.GetLeader(), masterNode) { - glog.V(0).Infof("Volume Server found a new master newLeader: %v instead of %v", in.GetLeader(), masterNode) + if in.GetLeader() != "" && vs.currentMaster != in.GetLeader() { + glog.V(0).Infof("Volume Server found a new master newLeader: %v instead of %v", in.GetLeader(), vs.currentMaster) newLeader = in.GetLeader() doneChan <- nil return @@ -185,16 +184,3 @@ func (vs *VolumeServer) doHeartbeat(masterNode, masterGrpcAddress string, grpcDi } } } - -func isSameIP(ip string, host string) bool { - ips, err := net.LookupIP(host) - if err != nil { - return false - } - for _, t := range ips { - if ip == t.String() { - return true - } - } - return false -} From 20e2ac1add0e93710d54f41c8ba142918c60d620 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Thu, 6 Aug 2020 10:04:17 -0700 Subject: [PATCH 10/26] filer: store md5 metadata for files uploaded by filer fix https://github.com/chrislusf/seaweedfs/issues/1412 --- weed/filesys/filehandle.go | 9 +++++-- weed/filesys/wfs.go | 2 +- weed/operation/upload_content.go | 26 +++++++------------ weed/server/filer_server_handlers_write.go | 21 +++++---------- .../filer_server_handlers_write_cipher.go | 1 + weed/storage/needle/needle.go | 3 ++- weed/util/bytes.go | 17 ++++++++++-- 7 files changed, 41 insertions(+), 38 deletions(-) diff --git a/weed/filesys/filehandle.go b/weed/filesys/filehandle.go index ca35bfd02..b9d224fb2 100644 --- a/weed/filesys/filehandle.go +++ b/weed/filesys/filehandle.go @@ -9,11 +9,12 @@ import ( "os" "time" + "github.com/seaweedfs/fuse" + "github.com/seaweedfs/fuse/fs" + "github.com/chrislusf/seaweedfs/weed/filer2" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" - "github.com/seaweedfs/fuse" - "github.com/seaweedfs/fuse/fs" ) type FileHandle struct { @@ -225,6 +226,10 @@ func (fh *FileHandle) Flush(ctx context.Context, req *fuse.FlushRequest) error { fh.f.entry.Chunks = chunks // fh.f.entryViewCache = nil + // special handling of one chunk md5 + if len(chunks) == 1 { + } + if err := filer_pb.CreateEntry(client, request); err != nil { glog.Errorf("fh flush create %s: %v", fh.f.fullpath(), err) return fmt.Errorf("fh flush create %s: %v", fh.f.fullpath(), err) diff --git a/weed/filesys/wfs.go b/weed/filesys/wfs.go index 68ad987be..9ef597024 100644 --- a/weed/filesys/wfs.go +++ b/weed/filesys/wfs.go @@ -82,7 +82,7 @@ func NewSeaweedFileSystem(option *Option) *WFS { }, }, } - cacheUniqueId := util.Md5([]byte(option.FilerGrpcAddress + option.FilerMountRootPath + util.Version()))[0:4] + cacheUniqueId := util.Base64Md5([]byte(option.FilerGrpcAddress + option.FilerMountRootPath + util.Version()))[0:4] cacheDir := path.Join(option.CacheDir, cacheUniqueId) if option.CacheSizeMB > 0 { os.MkdirAll(cacheDir, 0755) diff --git a/weed/operation/upload_content.go b/weed/operation/upload_content.go index cb129daa2..6fd8a60d1 100644 --- a/weed/operation/upload_content.go +++ b/weed/operation/upload_content.go @@ -2,7 +2,6 @@ package operation import ( "bytes" - "crypto/md5" "encoding/json" "errors" "fmt" @@ -23,14 +22,14 @@ import ( ) type UploadResult struct { - Name string `json:"name,omitempty"` - Size uint32 `json:"size,omitempty"` - Error string `json:"error,omitempty"` - ETag string `json:"eTag,omitempty"` - CipherKey []byte `json:"cipherKey,omitempty"` - Mime string `json:"mime,omitempty"` - Gzip uint32 `json:"gzip,omitempty"` - Md5 string `json:"md5,omitempty"` + Name string `json:"name,omitempty"` + Size uint32 `json:"size,omitempty"` + Error string `json:"error,omitempty"` + ETag string `json:"eTag,omitempty"` + CipherKey []byte `json:"cipherKey,omitempty"` + Mime string `json:"mime,omitempty"` + Gzip uint32 `json:"gzip,omitempty"` + ContentMd5 string `json:"contentMd5,omitempty"` } func (uploadResult *UploadResult) ToPbFileChunk(fileId string, offset int64) *filer_pb.FileChunk { @@ -65,20 +64,12 @@ var fileNameEscaper = strings.NewReplacer("\\", "\\\\", "\"", "\\\"") // Upload sends a POST request to a volume server to upload the content with adjustable compression level func UploadData(uploadUrl string, filename string, cipher bool, data []byte, isInputCompressed bool, mtype string, pairMap map[string]string, jwt security.EncodedJwt) (uploadResult *UploadResult, err error) { uploadResult, err = doUploadData(uploadUrl, filename, cipher, data, isInputCompressed, mtype, pairMap, jwt) - if uploadResult != nil { - uploadResult.Md5 = util.Md5(data) - } return } // Upload sends a POST request to a volume server to upload the content with fast compression func Upload(uploadUrl string, filename string, cipher bool, reader io.Reader, isInputCompressed bool, mtype string, pairMap map[string]string, jwt security.EncodedJwt) (uploadResult *UploadResult, err error, data []byte) { - hash := md5.New() - reader = io.TeeReader(reader, hash) uploadResult, err, data = doUpload(uploadUrl, filename, cipher, reader, isInputCompressed, mtype, pairMap, jwt) - if uploadResult != nil { - uploadResult.Md5 = fmt.Sprintf("%x", hash.Sum(nil)) - } return } @@ -241,6 +232,7 @@ func upload_content(uploadUrl string, fillBufferFunction func(w io.Writer) error return nil, errors.New(ret.Error) } ret.ETag = etag + ret.ContentMd5 = resp.Header.Get("Content-MD5") return &ret, nil } diff --git a/weed/server/filer_server_handlers_write.go b/weed/server/filer_server_handlers_write.go index da66178ce..c7833a85e 100644 --- a/weed/server/filer_server_handlers_write.go +++ b/weed/server/filer_server_handlers_write.go @@ -2,7 +2,6 @@ package weed_server import ( "context" - "crypto/md5" "encoding/json" "errors" "fmt" @@ -124,12 +123,12 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) { glog.V(4).Infof("write %s to %v", r.URL.Path, urlLocation) u, _ := url.Parse(urlLocation) - ret, md5value, err := fs.uploadToVolumeServer(r, u, auth, w, fileId) + ret, err := fs.uploadToVolumeServer(r, u, auth, w, fileId) if err != nil { return } - if err = fs.updateFilerStore(ctx, r, w, replication, collection, ret, md5value, fileId, ttlSeconds); err != nil { + if err = fs.updateFilerStore(ctx, r, w, replication, collection, ret, fileId, ttlSeconds); err != nil { return } @@ -147,7 +146,7 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) { // update metadata in filer store func (fs *FilerServer) updateFilerStore(ctx context.Context, r *http.Request, w http.ResponseWriter, replication string, - collection string, ret *operation.UploadResult, md5value []byte, fileId string, ttlSeconds int32) (err error) { + collection string, ret *operation.UploadResult, fileId string, ttlSeconds int32) (err error) { stats.FilerRequestCounter.WithLabelValues("postStoreWrite").Inc() start := time.Now() @@ -188,7 +187,7 @@ func (fs *FilerServer) updateFilerStore(ctx context.Context, r *http.Request, w Collection: collection, TtlSec: ttlSeconds, Mime: ret.Mime, - Md5: md5value, + Md5: util.Base64Md5ToBytes(ret.ContentMd5), }, Chunks: []*filer_pb.FileChunk{{ FileId: fileId, @@ -215,7 +214,7 @@ func (fs *FilerServer) updateFilerStore(ctx context.Context, r *http.Request, w } // send request to volume server -func (fs *FilerServer) uploadToVolumeServer(r *http.Request, u *url.URL, auth security.EncodedJwt, w http.ResponseWriter, fileId string) (ret *operation.UploadResult, md5value []byte, err error) { +func (fs *FilerServer) uploadToVolumeServer(r *http.Request, u *url.URL, auth security.EncodedJwt, w http.ResponseWriter, fileId string) (ret *operation.UploadResult, err error) { stats.FilerRequestCounter.WithLabelValues("postUpload").Inc() start := time.Now() @@ -223,12 +222,7 @@ func (fs *FilerServer) uploadToVolumeServer(r *http.Request, u *url.URL, auth se ret = &operation.UploadResult{} - md5Hash := md5.New() body := r.Body - if r.Method == "PUT" { - // only PUT or large chunked files has Md5 in attributes - body = ioutil.NopCloser(io.TeeReader(r.Body, md5Hash)) - } request := &http.Request{ Method: r.Method, @@ -292,11 +286,8 @@ func (fs *FilerServer) uploadToVolumeServer(r *http.Request, u *url.URL, auth se return } } - // use filer calculated md5 ETag, instead of the volume server crc ETag - if r.Method == "PUT" { - md5value = md5Hash.Sum(nil) - } ret.ETag = getEtag(resp) + ret.ContentMd5 = resp.Header.Get("Content-MD5") return } diff --git a/weed/server/filer_server_handlers_write_cipher.go b/weed/server/filer_server_handlers_write_cipher.go index 8413496b8..6ec06d3de 100644 --- a/weed/server/filer_server_handlers_write_cipher.go +++ b/weed/server/filer_server_handlers_write_cipher.go @@ -70,6 +70,7 @@ func (fs *FilerServer) encrypt(ctx context.Context, w http.ResponseWriter, r *ht Collection: collection, TtlSec: ttlSeconds, Mime: pu.MimeType, + Md5: util.Base64Md5ToBytes(pu.ContentMd5), }, Chunks: fileChunks, } diff --git a/weed/storage/needle/needle.go b/weed/storage/needle/needle.go index eb1d9537b..7c7aa3feb 100644 --- a/weed/storage/needle/needle.go +++ b/weed/storage/needle/needle.go @@ -48,7 +48,7 @@ func (n *Needle) String() (str string) { return } -func CreateNeedleFromRequest(r *http.Request, fixJpgOrientation bool, sizeLimit int64) (n *Needle, originalSize int, md5 string, e error) { +func CreateNeedleFromRequest(r *http.Request, fixJpgOrientation bool, sizeLimit int64) (n *Needle, originalSize int, contentMd5 string, e error) { n = new(Needle) pu, e := ParseUpload(r, sizeLimit) if e != nil { @@ -58,6 +58,7 @@ func CreateNeedleFromRequest(r *http.Request, fixJpgOrientation bool, sizeLimit originalSize = pu.OriginalDataSize n.LastModified = pu.ModifiedTime n.Ttl = pu.Ttl + contentMd5 = pu.ContentMd5 if len(pu.FileName) < 256 { n.Name = []byte(pu.FileName) diff --git a/weed/util/bytes.go b/weed/util/bytes.go index 0650919c0..5076c3e67 100644 --- a/weed/util/bytes.go +++ b/weed/util/bytes.go @@ -2,6 +2,7 @@ package util import ( "crypto/md5" + "encoding/base64" "fmt" "io" ) @@ -109,8 +110,20 @@ func HashToInt32(data []byte) (v int32) { return } -func Md5(data []byte) string { +func Base64Encode(data []byte) string { + return base64.StdEncoding.EncodeToString(data) +} + +func Base64Md5(data []byte) string { hash := md5.New() hash.Write(data) - return fmt.Sprintf("%x", hash.Sum(nil)) + return Base64Encode(hash.Sum(nil)) +} + +func Base64Md5ToBytes(contentMd5 string) []byte { + data, err := base64.StdEncoding.DecodeString(contentMd5) + if err != nil { + return nil + } + return data } From fcb0ff9890b90b4338ab511f88ce37452d5e708e Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Thu, 6 Aug 2020 22:22:14 -0700 Subject: [PATCH 11/26] git mod tidy --- go.mod | 1 - go.sum | 4 ---- 2 files changed, 5 deletions(-) diff --git a/go.mod b/go.mod index e5fc3bbfd..cdb951f9c 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,6 @@ require ( cloud.google.com/go v0.44.3 github.com/Azure/azure-pipeline-go v0.2.2 // indirect github.com/Azure/azure-storage-blob-go v0.8.0 - github.com/DataDog/zstd v1.4.1 // indirect github.com/OneOfOne/xxhash v1.2.2 github.com/Shopify/sarama v1.23.1 github.com/aws/aws-sdk-go v1.23.13 diff --git a/go.sum b/go.sum index 28461324e..bc37db039 100644 --- a/go.sum +++ b/go.sum @@ -32,8 +32,6 @@ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03 github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= github.com/DataDog/zstd v1.3.6-0.20190409195224-796139022798 h1:2T/jmrHeTezcCM58lvEQXs0UpQJCo5SoGAcg+mbSTIg= github.com/DataDog/zstd v1.3.6-0.20190409195224-796139022798/go.mod h1:1jcaCB/ufaK+sKp1NBhlGmpz41jOoPQ35bpF36t7BBo= -github.com/DataDog/zstd v1.4.1 h1:3oxKN3wbHibqx897utPC2LTQU4J+IHWWJO+glkAkpFM= -github.com/DataDog/zstd v1.4.1/go.mod h1:1jcaCB/ufaK+sKp1NBhlGmpz41jOoPQ35bpF36t7BBo= github.com/GoogleCloudPlatform/cloudsql-proxy v0.0.0-20190605020000-c4ba1fdf4d36/go.mod h1:aJ4qN3TfrelA6NZ6AXsXRfmEVaYin3EDbSPJrKS8OXo= github.com/OneOfOne/xxhash v1.2.2 h1:KMrpdQIwFcEqXDklaen+P1axHaj9BSKzvpUUfnHldSE= github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= @@ -234,8 +232,6 @@ github.com/googleapis/gax-go v2.0.2+incompatible/go.mod h1:SFVmujtThgffbyetf+mdk github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg= github.com/googleapis/gax-go/v2 v2.0.5 h1:sjZBwGj9Jlw33ImPtvFviGYvseOtDM7hkSKB7+Tv3SM= github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk= -github.com/gorilla/mux v1.7.3 h1:gnP5JzjVOuiZD07fKKToCAOjS0yOpj/qPETTXCCS6hw= -github.com/gorilla/mux v1.7.3/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs= github.com/gorilla/mux v1.7.4 h1:VuZ8uybHlWmqV03+zRzdwKL4tUnIp1MAQtp1mIFE1bc= github.com/gorilla/mux v1.7.4/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So= github.com/gorilla/websocket v0.0.0-20170926233335-4201258b820c/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ= From 828a5ae429e5fc7d3bac692184b3b2ac1edea410 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sat, 8 Aug 2020 09:11:40 -0700 Subject: [PATCH 12/26] check signature only when auth is enabled --- weed/s3api/s3api_object_handlers.go | 28 ++++++++++--------- weed/s3api/s3api_object_multipart_handlers.go | 28 ++++++++++--------- 2 files changed, 30 insertions(+), 26 deletions(-) diff --git a/weed/s3api/s3api_object_handlers.go b/weed/s3api/s3api_object_handlers.go index 357ac9ce0..84d685fa8 100644 --- a/weed/s3api/s3api_object_handlers.go +++ b/weed/s3api/s3api_object_handlers.go @@ -40,20 +40,22 @@ func (s3a *S3ApiServer) PutObjectHandler(w http.ResponseWriter, r *http.Request) return } - rAuthType := getRequestAuthType(r) dataReader := r.Body - var s3ErrCode ErrorCode - switch rAuthType { - case authTypeStreamingSigned: - dataReader, s3ErrCode = s3a.iam.newSignV4ChunkedReader(r) - case authTypeSignedV2, authTypePresignedV2: - _, s3ErrCode = s3a.iam.isReqAuthenticatedV2(r) - case authTypePresigned, authTypeSigned: - _, s3ErrCode = s3a.iam.reqSignatureV4Verify(r) - } - if s3ErrCode != ErrNone { - writeErrorResponse(w, s3ErrCode, r.URL) - return + if s3a.iam.isEnabled() { + rAuthType := getRequestAuthType(r) + var s3ErrCode ErrorCode + switch rAuthType { + case authTypeStreamingSigned: + dataReader, s3ErrCode = s3a.iam.newSignV4ChunkedReader(r) + case authTypeSignedV2, authTypePresignedV2: + _, s3ErrCode = s3a.iam.isReqAuthenticatedV2(r) + case authTypePresigned, authTypeSigned: + _, s3ErrCode = s3a.iam.reqSignatureV4Verify(r) + } + if s3ErrCode != ErrNone { + writeErrorResponse(w, s3ErrCode, r.URL) + return + } } defer dataReader.Close() diff --git a/weed/s3api/s3api_object_multipart_handlers.go b/weed/s3api/s3api_object_multipart_handlers.go index 0ed96afa2..7611b1e7e 100644 --- a/weed/s3api/s3api_object_multipart_handlers.go +++ b/weed/s3api/s3api_object_multipart_handlers.go @@ -179,20 +179,22 @@ func (s3a *S3ApiServer) PutObjectPartHandler(w http.ResponseWriter, r *http.Requ return } - rAuthType := getRequestAuthType(r) dataReader := r.Body - var s3ErrCode ErrorCode - switch rAuthType { - case authTypeStreamingSigned: - dataReader, s3ErrCode = s3a.iam.newSignV4ChunkedReader(r) - case authTypeSignedV2, authTypePresignedV2: - _, s3ErrCode = s3a.iam.isReqAuthenticatedV2(r) - case authTypePresigned, authTypeSigned: - _, s3ErrCode = s3a.iam.reqSignatureV4Verify(r) - } - if s3ErrCode != ErrNone { - writeErrorResponse(w, s3ErrCode, r.URL) - return + if s3a.iam.isEnabled() { + rAuthType := getRequestAuthType(r) + var s3ErrCode ErrorCode + switch rAuthType { + case authTypeStreamingSigned: + dataReader, s3ErrCode = s3a.iam.newSignV4ChunkedReader(r) + case authTypeSignedV2, authTypePresignedV2: + _, s3ErrCode = s3a.iam.isReqAuthenticatedV2(r) + case authTypePresigned, authTypeSigned: + _, s3ErrCode = s3a.iam.reqSignatureV4Verify(r) + } + if s3ErrCode != ErrNone { + writeErrorResponse(w, s3ErrCode, r.URL) + return + } } defer dataReader.Close() From ab6e5c0dc48230fd781edebe936578590a22695f Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sat, 8 Aug 2020 10:18:32 -0700 Subject: [PATCH 13/26] adjust error message --- weed/storage/needle/needle_parse_upload.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/weed/storage/needle/needle_parse_upload.go b/weed/storage/needle/needle_parse_upload.go index d97ca3101..4d244046e 100644 --- a/weed/storage/needle/needle_parse_upload.go +++ b/weed/storage/needle/needle_parse_upload.go @@ -90,7 +90,7 @@ func ParseUpload(r *http.Request, sizeLimit int64) (pu *ParsedUpload, e error) { pu.ContentMd5 = base64.StdEncoding.EncodeToString(h.Sum(nil)) if expectedChecksum := r.Header.Get("Content-MD5"); expectedChecksum != "" { if expectedChecksum != pu.ContentMd5 { - e = fmt.Errorf("Content-MD5 did not match md5 of file data [%s] != [%s]", expectedChecksum, pu.ContentMd5) + e = fmt.Errorf("Content-MD5 did not match md5 of file data expected [%s] received [%s] size %d", expectedChecksum, pu.ContentMd5, len(pu.UncompressedData)) return } } From bd8bfdae0716a4071c55b05b06f943267a35c4b9 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sat, 8 Aug 2020 10:18:43 -0700 Subject: [PATCH 14/26] refactoring --- weed/server/filer_server_handlers_write_autochunk.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/weed/server/filer_server_handlers_write_autochunk.go b/weed/server/filer_server_handlers_write_autochunk.go index be0438efb..c3ce09af6 100644 --- a/weed/server/filer_server_handlers_write_autochunk.go +++ b/weed/server/filer_server_handlers_write_autochunk.go @@ -57,7 +57,7 @@ func (fs *FilerServer) autoChunk(ctx context.Context, w http.ResponseWriter, r * return false } - reply, err := fs.doAutoChunk(ctx, w, r, contentLength, chunkSize, replication, collection, dataCenter, ttlSec, ttlString, fsync) + reply, err := fs.doPostAutoChunk(ctx, w, r, contentLength, chunkSize, replication, collection, dataCenter, ttlSec, ttlString, fsync) if err != nil { writeJsonError(w, r, http.StatusInternalServerError, err) } else if reply != nil { @@ -66,7 +66,7 @@ func (fs *FilerServer) autoChunk(ctx context.Context, w http.ResponseWriter, r * return true } -func (fs *FilerServer) doAutoChunk(ctx context.Context, w http.ResponseWriter, r *http.Request, +func (fs *FilerServer) doPostAutoChunk(ctx context.Context, w http.ResponseWriter, r *http.Request, contentLength int64, chunkSize int32, replication string, collection string, dataCenter string, ttlSec int32, ttlString string, fsync bool) (filerResult *FilerPostResult, replyerr error) { stats.FilerRequestCounter.WithLabelValues("postAutoChunk").Inc() From ae00cce4bd673363f5f7c36905fbff138bcbd772 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sat, 8 Aug 2020 10:45:37 -0700 Subject: [PATCH 15/26] support POST and PUT auto chunking --- .../filer_server_handlers_write_autochunk.go | 131 ++++++++++++------ 1 file changed, 86 insertions(+), 45 deletions(-) diff --git a/weed/server/filer_server_handlers_write_autochunk.go b/weed/server/filer_server_handlers_write_autochunk.go index c3ce09af6..77ae10f19 100644 --- a/weed/server/filer_server_handlers_write_autochunk.go +++ b/weed/server/filer_server_handlers_write_autochunk.go @@ -3,6 +3,7 @@ package weed_server import ( "context" "crypto/md5" + "hash" "io" "io/ioutil" "net/http" @@ -22,10 +23,6 @@ import ( func (fs *FilerServer) autoChunk(ctx context.Context, w http.ResponseWriter, r *http.Request, replication string, collection string, dataCenter string, ttlSec int32, ttlString string, fsync bool) bool { - if r.Method != "POST" { - glog.V(4).Infoln("AutoChunking not supported for method", r.Method) - return false - } // autoChunking can be set at the command-line level or as a query param. Query param overrides command-line query := r.URL.Query() @@ -57,7 +54,19 @@ func (fs *FilerServer) autoChunk(ctx context.Context, w http.ResponseWriter, r * return false } - reply, err := fs.doPostAutoChunk(ctx, w, r, contentLength, chunkSize, replication, collection, dataCenter, ttlSec, ttlString, fsync) + stats.FilerRequestCounter.WithLabelValues("postAutoChunk").Inc() + start := time.Now() + defer func() { + stats.FilerRequestHistogram.WithLabelValues("postAutoChunk").Observe(time.Since(start).Seconds()) + }() + + var reply *FilerPostResult + var err error + if r.Method == "POST" { + reply, err = fs.doPostAutoChunk(ctx, w, r, contentLength, chunkSize, replication, collection, dataCenter, ttlSec, ttlString, fsync) + } else { + reply, err = fs.doPutAutoChunk(ctx, w, r, contentLength, chunkSize, replication, collection, dataCenter, ttlSec, ttlString, fsync) + } if err != nil { writeJsonError(w, r, http.StatusInternalServerError, err) } else if reply != nil { @@ -69,12 +78,6 @@ func (fs *FilerServer) autoChunk(ctx context.Context, w http.ResponseWriter, r * func (fs *FilerServer) doPostAutoChunk(ctx context.Context, w http.ResponseWriter, r *http.Request, contentLength int64, chunkSize int32, replication string, collection string, dataCenter string, ttlSec int32, ttlString string, fsync bool) (filerResult *FilerPostResult, replyerr error) { - stats.FilerRequestCounter.WithLabelValues("postAutoChunk").Inc() - start := time.Now() - defer func() { - stats.FilerRequestHistogram.WithLabelValues("postAutoChunk").Observe(time.Since(start).Seconds()) - }() - multipartReader, multipartReaderErr := r.MultipartReader() if multipartReaderErr != nil { return nil, multipartReaderErr @@ -90,46 +93,36 @@ func (fs *FilerServer) doPostAutoChunk(ctx context.Context, w http.ResponseWrite fileName = path.Base(fileName) } contentType := part1.Header.Get("Content-Type") + if contentType == "application/octet-stream" { + contentType = "" + } - var fileChunks []*filer_pb.FileChunk - - md5Hash := md5.New() - var partReader = ioutil.NopCloser(io.TeeReader(part1, md5Hash)) - - chunkOffset := int64(0) - - for chunkOffset < contentLength { - limitedReader := io.LimitReader(partReader, int64(chunkSize)) + fileChunks, md5Hash, chunkOffset, err := fs.uploadReaderToChunks(w, r, part1, contentLength, chunkSize, replication, collection, dataCenter, ttlString, fileName, contentType, fsync) + if err != nil { + return nil, err + } - // assign one file id for one chunk - fileId, urlLocation, auth, assignErr := fs.assignNewFileInfo(replication, collection, dataCenter, ttlString, fsync) - if assignErr != nil { - return nil, assignErr - } + fileChunks, replyerr = filer2.MaybeManifestize(fs.saveAsChunk(replication, collection, dataCenter, ttlString, fsync), fileChunks) + if replyerr != nil { + glog.V(0).Infof("manifestize %s: %v", r.RequestURI, replyerr) + return + } - // upload the chunk to the volume server - uploadResult, uploadErr := fs.doUpload(urlLocation, w, r, limitedReader, fileName, contentType, nil, auth) - if uploadErr != nil { - return nil, uploadErr - } + filerResult, replyerr = fs.saveMetaData(ctx, r, fileName, replication, collection, ttlSec, contentType, md5Hash, fileChunks, chunkOffset) - // if last chunk exhausted the reader exactly at the border - if uploadResult.Size == 0 { - break - } + return +} - // Save to chunk manifest structure - fileChunks = append(fileChunks, uploadResult.ToPbFileChunk(fileId, chunkOffset)) - glog.V(4).Infof("uploaded %s chunk %d to %s [%d,%d) of %d", fileName, len(fileChunks), fileId, chunkOffset, chunkOffset+int64(uploadResult.Size), contentLength) +func (fs *FilerServer) doPutAutoChunk(ctx context.Context, w http.ResponseWriter, r *http.Request, + contentLength int64, chunkSize int32, replication string, collection string, dataCenter string, ttlSec int32, ttlString string, fsync bool) (filerResult *FilerPostResult, replyerr error) { - // reset variables for the next chunk - chunkOffset = chunkOffset + int64(uploadResult.Size) + fileName := "" + contentType := "" - // if last chunk was not at full chunk size, but already exhausted the reader - if int64(uploadResult.Size) < int64(chunkSize) { - break - } + fileChunks, md5Hash, chunkOffset, err := fs.uploadReaderToChunks(w, r, r.Body, contentLength, chunkSize, replication, collection, dataCenter, ttlString, fileName, contentType, fsync) + if err != nil { + return nil, err } fileChunks, replyerr = filer2.MaybeManifestize(fs.saveAsChunk(replication, collection, dataCenter, ttlString, fsync), fileChunks) @@ -138,6 +131,12 @@ func (fs *FilerServer) doPostAutoChunk(ctx context.Context, w http.ResponseWrite return } + filerResult, replyerr = fs.saveMetaData(ctx, r, fileName, replication, collection, ttlSec, contentType, md5Hash, fileChunks, chunkOffset) + + return +} + +func (fs *FilerServer) saveMetaData(ctx context.Context, r *http.Request, fileName string, replication string, collection string, ttlSec int32, contentType string, md5Hash hash.Hash, fileChunks []*filer_pb.FileChunk, chunkOffset int64) (filerResult *FilerPostResult, replyerr error) { path := r.URL.Path if strings.HasSuffix(path, "/") { if fileName != "" { @@ -173,10 +172,52 @@ func (fs *FilerServer) doPostAutoChunk(ctx context.Context, w http.ResponseWrite replyerr = dbErr filerResult.Error = dbErr.Error() glog.V(0).Infof("failing to write %s to filer server : %v", path, dbErr) - return } + return filerResult, replyerr +} - return +func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Request, reader io.Reader, contentLength int64, chunkSize int32, replication string, collection string, dataCenter string, ttlString string, fileName string, contentType string, fsync bool) ([]*filer_pb.FileChunk, hash.Hash, int64, error) { + var fileChunks []*filer_pb.FileChunk + + md5Hash := md5.New() + var partReader = ioutil.NopCloser(io.TeeReader(reader, md5Hash)) + + chunkOffset := int64(0) + + for chunkOffset < contentLength { + limitedReader := io.LimitReader(partReader, int64(chunkSize)) + + // assign one file id for one chunk + fileId, urlLocation, auth, assignErr := fs.assignNewFileInfo(replication, collection, dataCenter, ttlString, fsync) + if assignErr != nil { + return nil, nil, 0, assignErr + } + + // upload the chunk to the volume server + uploadResult, uploadErr := fs.doUpload(urlLocation, w, r, limitedReader, fileName, contentType, nil, auth) + if uploadErr != nil { + return nil, nil, 0, uploadErr + } + + // if last chunk exhausted the reader exactly at the border + if uploadResult.Size == 0 { + break + } + + // Save to chunk manifest structure + fileChunks = append(fileChunks, uploadResult.ToPbFileChunk(fileId, chunkOffset)) + + glog.V(4).Infof("uploaded %s chunk %d to %s [%d,%d) of %d", fileName, len(fileChunks), fileId, chunkOffset, chunkOffset+int64(uploadResult.Size), contentLength) + + // reset variables for the next chunk + chunkOffset = chunkOffset + int64(uploadResult.Size) + + // if last chunk was not at full chunk size, but already exhausted the reader + if int64(uploadResult.Size) < int64(chunkSize) { + break + } + } + return fileChunks, md5Hash, chunkOffset, nil } func (fs *FilerServer) doUpload(urlLocation string, w http.ResponseWriter, r *http.Request, limitedReader io.Reader, fileName string, contentType string, pairMap map[string]string, auth security.EncodedJwt) (*operation.UploadResult, error) { From bee0d7e5eb184f8b60fad623d269b0a65b57cc1e Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sat, 8 Aug 2020 10:52:13 -0700 Subject: [PATCH 16/26] lower log priority for noisy heartbeat --- weed/server/master_grpc_server.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/weed/server/master_grpc_server.go b/weed/server/master_grpc_server.go index d310a27d4..579b30a6a 100644 --- a/weed/server/master_grpc_server.go +++ b/weed/server/master_grpc_server.go @@ -87,7 +87,7 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ dn.UpAdjustMaxVolumeCountDelta(delta) } - glog.V(4).Infof("master received heartbeat %s", heartbeat.String()) + glog.V(5).Infof("master received heartbeat %s", heartbeat.String()) message := &master_pb.VolumeLocation{ Url: dn.Url(), PublicUrl: dn.PublicUrl, From 67348e7b15be145f7410bb644df89aa1904e4bb2 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sat, 8 Aug 2020 10:53:35 -0700 Subject: [PATCH 17/26] less noisy heartbeat logs --- weed/server/volume_grpc_client_to_master.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/weed/server/volume_grpc_client_to_master.go b/weed/server/volume_grpc_client_to_master.go index 694a9fb31..c62a4a388 100644 --- a/weed/server/volume_grpc_client_to_master.go +++ b/weed/server/volume_grpc_client_to_master.go @@ -168,13 +168,13 @@ func (vs *VolumeServer) doHeartbeat(masterNode, masterGrpcAddress string, grpcDi return "", err } case <-volumeTickChan: - glog.V(4).Infof("volume server %s:%d heartbeat", vs.store.Ip, vs.store.Port) + glog.V(5).Infof("volume server %s:%d heartbeat", vs.store.Ip, vs.store.Port) if err = stream.Send(vs.store.CollectHeartbeat()); err != nil { glog.V(0).Infof("Volume Server Failed to talk with master %s: %v", masterNode, err) return "", err } case <-ecShardTickChan: - glog.V(4).Infof("volume server %s:%d ec heartbeat", vs.store.Ip, vs.store.Port) + glog.V(5).Infof("volume server %s:%d ec heartbeat", vs.store.Ip, vs.store.Port) if err = stream.Send(vs.store.CollectErasureCodingHeartbeat()); err != nil { glog.V(0).Infof("Volume Server Failed to talk with master %s: %v", masterNode, err) return "", err From 3b1a95ac26debb3080794bf8605ea2d5636818c7 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sat, 8 Aug 2020 12:02:06 -0700 Subject: [PATCH 18/26] filer refactoring: same auto chunking logic for POST and PUT, no size limit --- weed/server/filer_server_handlers_write.go | 202 +----------------- .../filer_server_handlers_write_autochunk.go | 92 ++++---- 2 files changed, 49 insertions(+), 245 deletions(-) diff --git a/weed/server/filer_server_handlers_write.go b/weed/server/filer_server_handlers_write.go index c7833a85e..d22376a45 100644 --- a/weed/server/filer_server_handlers_write.go +++ b/weed/server/filer_server_handlers_write.go @@ -2,21 +2,11 @@ package weed_server import ( "context" - "encoding/json" - "errors" - "fmt" - "io" - "io/ioutil" - "mime" "net/http" - "net/url" "os" - filenamePath "path" - "strconv" "strings" "time" - "github.com/chrislusf/seaweedfs/weed/filer2" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" @@ -97,198 +87,8 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) { ttlSeconds = int32(ttl.Minutes()) * 60 } - if autoChunked := fs.autoChunk(ctx, w, r, replication, collection, dataCenter, ttlSeconds, ttlString, fsync); autoChunked { - return - } - - if fs.option.Cipher { - reply, err := fs.encrypt(ctx, w, r, replication, collection, dataCenter, ttlSeconds, ttlString, fsync) - if err != nil { - writeJsonError(w, r, http.StatusInternalServerError, err) - } else if reply != nil { - writeJsonQuiet(w, r, http.StatusCreated, reply) - } - - return - } - - fileId, urlLocation, auth, err := fs.assignNewFileInfo(replication, collection, dataCenter, ttlString, fsync) - - if err != nil || fileId == "" || urlLocation == "" { - glog.V(0).Infof("fail to allocate volume for %s, collection:%s, datacenter:%s", r.URL.Path, collection, dataCenter) - writeJsonError(w, r, http.StatusInternalServerError, fmt.Errorf("fail to allocate volume for %s, collection:%s, datacenter:%s", r.URL.Path, collection, dataCenter)) - return - } - - glog.V(4).Infof("write %s to %v", r.URL.Path, urlLocation) - - u, _ := url.Parse(urlLocation) - ret, err := fs.uploadToVolumeServer(r, u, auth, w, fileId) - if err != nil { - return - } + fs.autoChunk(ctx, w, r, replication, collection, dataCenter, ttlSeconds, ttlString, fsync) - if err = fs.updateFilerStore(ctx, r, w, replication, collection, ret, fileId, ttlSeconds); err != nil { - return - } - - // send back post result - reply := FilerPostResult{ - Name: ret.Name, - Size: int64(ret.Size), - Error: ret.Error, - Fid: fileId, - Url: urlLocation, - } - setEtag(w, ret.ETag) - writeJsonQuiet(w, r, http.StatusCreated, reply) -} - -// update metadata in filer store -func (fs *FilerServer) updateFilerStore(ctx context.Context, r *http.Request, w http.ResponseWriter, replication string, - collection string, ret *operation.UploadResult, fileId string, ttlSeconds int32) (err error) { - - stats.FilerRequestCounter.WithLabelValues("postStoreWrite").Inc() - start := time.Now() - defer func() { - stats.FilerRequestHistogram.WithLabelValues("postStoreWrite").Observe(time.Since(start).Seconds()) - }() - - modeStr := r.URL.Query().Get("mode") - if modeStr == "" { - modeStr = "0660" - } - mode, err := strconv.ParseUint(modeStr, 8, 32) - if err != nil { - glog.Errorf("Invalid mode format: %s, use 0660 by default", modeStr) - mode = 0660 - } - - path := r.URL.Path - if strings.HasSuffix(path, "/") { - if ret.Name != "" { - path += ret.Name - } - } - existingEntry, err := fs.filer.FindEntry(ctx, util.FullPath(path)) - crTime := time.Now() - if err == nil && existingEntry != nil { - crTime = existingEntry.Crtime - } - entry := &filer2.Entry{ - FullPath: util.FullPath(path), - Attr: filer2.Attr{ - Mtime: time.Now(), - Crtime: crTime, - Mode: os.FileMode(mode), - Uid: OS_UID, - Gid: OS_GID, - Replication: replication, - Collection: collection, - TtlSec: ttlSeconds, - Mime: ret.Mime, - Md5: util.Base64Md5ToBytes(ret.ContentMd5), - }, - Chunks: []*filer_pb.FileChunk{{ - FileId: fileId, - Size: uint64(ret.Size), - Mtime: time.Now().UnixNano(), - ETag: ret.ETag, - }}, - } - if entry.Attr.Mime == "" { - if ext := filenamePath.Ext(path); ext != "" { - entry.Attr.Mime = mime.TypeByExtension(ext) - } - } - // glog.V(4).Infof("saving %s => %+v", path, entry) - if dbErr := fs.filer.CreateEntry(ctx, entry, false, false); dbErr != nil { - fs.filer.DeleteChunks(entry.Chunks) - glog.V(0).Infof("failing to write %s to filer server : %v", path, dbErr) - writeJsonError(w, r, http.StatusInternalServerError, dbErr) - err = dbErr - return - } - - return nil -} - -// send request to volume server -func (fs *FilerServer) uploadToVolumeServer(r *http.Request, u *url.URL, auth security.EncodedJwt, w http.ResponseWriter, fileId string) (ret *operation.UploadResult, err error) { - - stats.FilerRequestCounter.WithLabelValues("postUpload").Inc() - start := time.Now() - defer func() { stats.FilerRequestHistogram.WithLabelValues("postUpload").Observe(time.Since(start).Seconds()) }() - - ret = &operation.UploadResult{} - - body := r.Body - - request := &http.Request{ - Method: r.Method, - URL: u, - Proto: r.Proto, - ProtoMajor: r.ProtoMajor, - ProtoMinor: r.ProtoMinor, - Header: r.Header, - Body: body, - Host: r.Host, - ContentLength: r.ContentLength, - } - - if auth != "" { - request.Header.Set("Authorization", "BEARER "+string(auth)) - } - resp, doErr := util.Do(request) - if doErr != nil { - glog.Errorf("failing to connect to volume server %s: %v, %+v", r.RequestURI, doErr, r.Method) - writeJsonError(w, r, http.StatusInternalServerError, doErr) - err = doErr - return - } - defer func() { - io.Copy(ioutil.Discard, resp.Body) - resp.Body.Close() - }() - - respBody, raErr := ioutil.ReadAll(resp.Body) - if raErr != nil { - glog.V(0).Infoln("failing to upload to volume server", r.RequestURI, raErr.Error()) - writeJsonError(w, r, http.StatusInternalServerError, raErr) - err = raErr - return - } - - glog.V(4).Infoln("post result", string(respBody)) - unmarshalErr := json.Unmarshal(respBody, &ret) - if unmarshalErr != nil { - glog.V(0).Infoln("failing to read upload resonse", r.RequestURI, string(respBody)) - writeJsonError(w, r, http.StatusInternalServerError, unmarshalErr) - err = unmarshalErr - return - } - if ret.Error != "" { - err = errors.New(ret.Error) - glog.V(0).Infoln("failing to post to volume server", r.RequestURI, ret.Error) - writeJsonError(w, r, http.StatusInternalServerError, err) - return - } - // find correct final path - path := r.URL.Path - if strings.HasSuffix(path, "/") { - if ret.Name != "" { - path += ret.Name - } else { - err = fmt.Errorf("can not to write to folder %s without a file name", path) - fs.filer.DeleteFileByFileId(fileId) - glog.V(0).Infoln("Can not to write to folder", path, "without a file name!") - writeJsonError(w, r, http.StatusInternalServerError, err) - return - } - } - ret.ETag = getEtag(resp) - ret.ContentMd5 = resp.Header.Get("Content-MD5") - return } // curl -X DELETE http://localhost:8888/path/to diff --git a/weed/server/filer_server_handlers_write_autochunk.go b/weed/server/filer_server_handlers_write_autochunk.go index 77ae10f19..0365ea3ab 100644 --- a/weed/server/filer_server_handlers_write_autochunk.go +++ b/weed/server/filer_server_handlers_write_autochunk.go @@ -7,6 +7,7 @@ import ( "io" "io/ioutil" "net/http" + "os" "path" "strconv" "strings" @@ -22,7 +23,7 @@ import ( ) func (fs *FilerServer) autoChunk(ctx context.Context, w http.ResponseWriter, r *http.Request, - replication string, collection string, dataCenter string, ttlSec int32, ttlString string, fsync bool) bool { + replication string, collection string, dataCenter string, ttlSec int32, ttlString string, fsync bool) { // autoChunking can be set at the command-line level or as a query param. Query param overrides command-line query := r.URL.Query() @@ -32,28 +33,9 @@ func (fs *FilerServer) autoChunk(ctx context.Context, w http.ResponseWriter, r * if maxMB <= 0 && fs.option.MaxMB > 0 { maxMB = int32(fs.option.MaxMB) } - if maxMB <= 0 { - glog.V(4).Infoln("AutoChunking not enabled") - return false - } - glog.V(4).Infoln("AutoChunking level set to", maxMB, "(MB)") chunkSize := 1024 * 1024 * maxMB - contentLength := int64(0) - if contentLengthHeader := r.Header["Content-Length"]; len(contentLengthHeader) == 1 { - contentLength, _ = strconv.ParseInt(contentLengthHeader[0], 10, 64) - if contentLength <= int64(chunkSize) { - glog.V(4).Infoln("Content-Length of", contentLength, "is less than the chunk size of", chunkSize, "so autoChunking will be skipped.") - return false - } - } - - if contentLength <= 0 { - glog.V(4).Infoln("Content-Length value is missing or unexpected so autoChunking will be skipped.") - return false - } - stats.FilerRequestCounter.WithLabelValues("postAutoChunk").Inc() start := time.Now() defer func() { @@ -62,30 +44,32 @@ func (fs *FilerServer) autoChunk(ctx context.Context, w http.ResponseWriter, r * var reply *FilerPostResult var err error + var md5bytes []byte if r.Method == "POST" { - reply, err = fs.doPostAutoChunk(ctx, w, r, contentLength, chunkSize, replication, collection, dataCenter, ttlSec, ttlString, fsync) + reply, md5bytes, err = fs.doPostAutoChunk(ctx, w, r, chunkSize, replication, collection, dataCenter, ttlSec, ttlString, fsync) } else { - reply, err = fs.doPutAutoChunk(ctx, w, r, contentLength, chunkSize, replication, collection, dataCenter, ttlSec, ttlString, fsync) + reply, md5bytes, err = fs.doPutAutoChunk(ctx, w, r, chunkSize, replication, collection, dataCenter, ttlSec, ttlString, fsync) } if err != nil { writeJsonError(w, r, http.StatusInternalServerError, err) } else if reply != nil { + if len(md5bytes) > 0 { + w.Header().Set("Content-MD5", util.Base64Encode(md5bytes)) + } writeJsonQuiet(w, r, http.StatusCreated, reply) } - return true } -func (fs *FilerServer) doPostAutoChunk(ctx context.Context, w http.ResponseWriter, r *http.Request, - contentLength int64, chunkSize int32, replication string, collection string, dataCenter string, ttlSec int32, ttlString string, fsync bool) (filerResult *FilerPostResult, replyerr error) { +func (fs *FilerServer) doPostAutoChunk(ctx context.Context, w http.ResponseWriter, r *http.Request, chunkSize int32, replication string, collection string, dataCenter string, ttlSec int32, ttlString string, fsync bool) (filerResult *FilerPostResult, md5bytes []byte, replyerr error) { multipartReader, multipartReaderErr := r.MultipartReader() if multipartReaderErr != nil { - return nil, multipartReaderErr + return nil, nil, multipartReaderErr } part1, part1Err := multipartReader.NextPart() if part1Err != nil { - return nil, part1Err + return nil, nil, part1Err } fileName := part1.FileName() @@ -97,9 +81,9 @@ func (fs *FilerServer) doPostAutoChunk(ctx context.Context, w http.ResponseWrite contentType = "" } - fileChunks, md5Hash, chunkOffset, err := fs.uploadReaderToChunks(w, r, part1, contentLength, chunkSize, replication, collection, dataCenter, ttlString, fileName, contentType, fsync) + fileChunks, md5Hash, chunkOffset, err := fs.uploadReaderToChunks(w, r, part1, chunkSize, replication, collection, dataCenter, ttlString, fileName, contentType, fsync) if err != nil { - return nil, err + return nil, nil, err } fileChunks, replyerr = filer2.MaybeManifestize(fs.saveAsChunk(replication, collection, dataCenter, ttlString, fsync), fileChunks) @@ -108,21 +92,20 @@ func (fs *FilerServer) doPostAutoChunk(ctx context.Context, w http.ResponseWrite return } - filerResult, replyerr = fs.saveMetaData(ctx, r, fileName, replication, collection, ttlSec, contentType, md5Hash, fileChunks, chunkOffset) + md5bytes = md5Hash.Sum(nil) + filerResult, replyerr = fs.saveMetaData(ctx, r, fileName, replication, collection, ttlSec, contentType, md5bytes, fileChunks, chunkOffset) return } - -func (fs *FilerServer) doPutAutoChunk(ctx context.Context, w http.ResponseWriter, r *http.Request, - contentLength int64, chunkSize int32, replication string, collection string, dataCenter string, ttlSec int32, ttlString string, fsync bool) (filerResult *FilerPostResult, replyerr error) { +func (fs *FilerServer) doPutAutoChunk(ctx context.Context, w http.ResponseWriter, r *http.Request, chunkSize int32, replication string, collection string, dataCenter string, ttlSec int32, ttlString string, fsync bool) (filerResult *FilerPostResult, md5bytes []byte, replyerr error) { fileName := "" contentType := "" - fileChunks, md5Hash, chunkOffset, err := fs.uploadReaderToChunks(w, r, r.Body, contentLength, chunkSize, replication, collection, dataCenter, ttlString, fileName, contentType, fsync) + fileChunks, md5Hash, chunkOffset, err := fs.uploadReaderToChunks(w, r, r.Body, chunkSize, replication, collection, dataCenter, ttlString, fileName, contentType, fsync) if err != nil { - return nil, err + return nil, nil, err } fileChunks, replyerr = filer2.MaybeManifestize(fs.saveAsChunk(replication, collection, dataCenter, ttlString, fsync), fileChunks) @@ -131,12 +114,26 @@ func (fs *FilerServer) doPutAutoChunk(ctx context.Context, w http.ResponseWriter return } - filerResult, replyerr = fs.saveMetaData(ctx, r, fileName, replication, collection, ttlSec, contentType, md5Hash, fileChunks, chunkOffset) + md5bytes = md5Hash.Sum(nil) + filerResult, replyerr = fs.saveMetaData(ctx, r, fileName, replication, collection, ttlSec, contentType, md5bytes, fileChunks, chunkOffset) return } -func (fs *FilerServer) saveMetaData(ctx context.Context, r *http.Request, fileName string, replication string, collection string, ttlSec int32, contentType string, md5Hash hash.Hash, fileChunks []*filer_pb.FileChunk, chunkOffset int64) (filerResult *FilerPostResult, replyerr error) { +func (fs *FilerServer) saveMetaData(ctx context.Context, r *http.Request, fileName string, replication string, collection string, ttlSec int32, contentType string, md5bytes []byte, fileChunks []*filer_pb.FileChunk, chunkOffset int64) (filerResult *FilerPostResult, replyerr error) { + + // detect file mode + modeStr := r.URL.Query().Get("mode") + if modeStr == "" { + modeStr = "0660" + } + mode, err := strconv.ParseUint(modeStr, 8, 32) + if err != nil { + glog.Errorf("Invalid mode format: %s, use 0660 by default", modeStr) + mode = 0660 + } + + // fix the path path := r.URL.Path if strings.HasSuffix(path, "/") { if fileName != "" { @@ -144,20 +141,28 @@ func (fs *FilerServer) saveMetaData(ctx context.Context, r *http.Request, fileNa } } + // fix the crTime + existingEntry, err := fs.filer.FindEntry(ctx, util.FullPath(path)) + crTime := time.Now() + if err == nil && existingEntry != nil { + crTime = existingEntry.Crtime + } + + glog.V(4).Infoln("saving", path) entry := &filer2.Entry{ FullPath: util.FullPath(path), Attr: filer2.Attr{ Mtime: time.Now(), - Crtime: time.Now(), - Mode: 0660, + Crtime: crTime, + Mode: os.FileMode(mode), Uid: OS_UID, Gid: OS_GID, Replication: replication, Collection: collection, TtlSec: ttlSec, Mime: contentType, - Md5: md5Hash.Sum(nil), + Md5: md5bytes, }, Chunks: fileChunks, } @@ -176,7 +181,7 @@ func (fs *FilerServer) saveMetaData(ctx context.Context, r *http.Request, fileNa return filerResult, replyerr } -func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Request, reader io.Reader, contentLength int64, chunkSize int32, replication string, collection string, dataCenter string, ttlString string, fileName string, contentType string, fsync bool) ([]*filer_pb.FileChunk, hash.Hash, int64, error) { +func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Request, reader io.Reader, chunkSize int32, replication string, collection string, dataCenter string, ttlString string, fileName string, contentType string, fsync bool) ([]*filer_pb.FileChunk, hash.Hash, int64, error) { var fileChunks []*filer_pb.FileChunk md5Hash := md5.New() @@ -184,7 +189,7 @@ func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Reque chunkOffset := int64(0) - for chunkOffset < contentLength { + for { limitedReader := io.LimitReader(partReader, int64(chunkSize)) // assign one file id for one chunk @@ -207,7 +212,7 @@ func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Reque // Save to chunk manifest structure fileChunks = append(fileChunks, uploadResult.ToPbFileChunk(fileId, chunkOffset)) - glog.V(4).Infof("uploaded %s chunk %d to %s [%d,%d) of %d", fileName, len(fileChunks), fileId, chunkOffset, chunkOffset+int64(uploadResult.Size), contentLength) + glog.V(4).Infof("uploaded %s chunk %d to %s [%d,%d)", fileName, len(fileChunks), fileId, chunkOffset, chunkOffset+int64(uploadResult.Size)) // reset variables for the next chunk chunkOffset = chunkOffset + int64(uploadResult.Size) @@ -250,4 +255,3 @@ func (fs *FilerServer) saveAsChunk(replication string, collection string, dataCe return uploadResult.ToPbFileChunk(fileId, offset), collection, replication, nil } } - From 9832653e1d3c6a4382b12ce5ccd55f2374bddeb6 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sat, 8 Aug 2020 21:37:36 -0700 Subject: [PATCH 19/26] FUSE mount: proper error with deleting non empty folder --- weed/filesys/dir.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/weed/filesys/dir.go b/weed/filesys/dir.go index 2214b1ac7..843ada866 100644 --- a/weed/filesys/dir.go +++ b/weed/filesys/dir.go @@ -334,7 +334,10 @@ func (dir *Dir) removeFolder(req *fuse.RemoveRequest) error { glog.V(3).Infof("remove directory entry: %v", req) err := filer_pb.Remove(dir.wfs, dir.FullPath(), req.Name, true, false, false, false) if err != nil { - glog.V(3).Infof("not found remove %s/%s: %v", dir.FullPath(), req.Name, err) + glog.V(3).Infof("remove %s/%s: %v", dir.FullPath(), req.Name, err) + if strings.Contains(err.Error(), "non-empty"){ + return fuse.EEXIST + } return fuse.ENOENT } From b0567077700cf5f8ed67f8ae840f821618aebd34 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sat, 8 Aug 2020 22:55:12 -0700 Subject: [PATCH 20/26] 1.88 --- k8s/seaweedfs/Chart.yaml | 2 +- k8s/seaweedfs/values.yaml | 2 +- weed/util/constants.go | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/k8s/seaweedfs/Chart.yaml b/k8s/seaweedfs/Chart.yaml index 8664e9394..73d9a67e4 100644 --- a/k8s/seaweedfs/Chart.yaml +++ b/k8s/seaweedfs/Chart.yaml @@ -1,4 +1,4 @@ apiVersion: v1 description: SeaweedFS name: seaweedfs -version: 1.87 \ No newline at end of file +version: 1.88 \ No newline at end of file diff --git a/k8s/seaweedfs/values.yaml b/k8s/seaweedfs/values.yaml index b2fbc17e5..6fb25e0a3 100644 --- a/k8s/seaweedfs/values.yaml +++ b/k8s/seaweedfs/values.yaml @@ -4,7 +4,7 @@ global: registry: "" repository: "" imageName: chrislusf/seaweedfs - imageTag: "1.87" + imageTag: "1.88" imagePullPolicy: IfNotPresent imagePullSecrets: imagepullsecret restartPolicy: Always diff --git a/weed/util/constants.go b/weed/util/constants.go index 9f0e00506..10955acde 100644 --- a/weed/util/constants.go +++ b/weed/util/constants.go @@ -5,7 +5,7 @@ import ( ) var ( - VERSION = fmt.Sprintf("%s %d.%d", sizeLimit, 1, 87) + VERSION = fmt.Sprintf("%s %d.%d", sizeLimit, 1, 88) COMMIT = "" ) From 5850f49c90a3ad70cd7a97d7d062886e4f9f1540 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sat, 8 Aug 2020 23:09:13 -0700 Subject: [PATCH 21/26] Update README.md --- README.md | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/README.md b/README.md index 5e18ac1e8..72492bb47 100644 --- a/README.md +++ b/README.md @@ -112,15 +112,15 @@ On top of the object store, optional [Filer] can support directories and POSIX a [Back to TOC](#table-of-contents) ## Filer Features ## -* [Filer server][Filer] provide "normal" directories and files via http. +* [Filer server][Filer] provides "normal" directories and files via http. * [Super Large Files][SuperLargeFiles] stores large or super large files in tens of TB. -* [Mount filer][Mount] to read and write files directly as a local directory via FUSE. -* [Amazon S3 compatible API][AmazonS3API] to access files with S3 tooling. -* [Hadoop Compatible File System][Hadoop] to access files from Hadoop/Spark/Flink/etc jobs. +* [Mount filer][Mount] reads and writes files directly as a local directory via FUSE. +* [Amazon S3 compatible API][AmazonS3API] accesses files with S3 tooling. +* [Hadoop Compatible File System][Hadoop] accesses files from Hadoop/Spark/Flink/etc jobs. * [Async Backup To Cloud][BackupToCloud] has extremely fast local access and backups to Amazon S3, Google Cloud Storage, Azure, BackBlaze. -* [WebDAV] access as a mapped drive on Mac and Windows, or from mobile devices. +* [WebDAV] accesses as a mapped drive on Mac and Windows, or from mobile devices. * [AES256-GCM Encrypted Storage][FilerDataEncryption] safely stores the encrypted data. -* [File TTL][FilerTTL] automatically purge file metadata and actual file data. +* [File TTL][FilerTTL] automatically purges file metadata and actual file data. * [Kubernetes CSI Driver][SeaweedFsCsiDriver] A Container Storage Interface (CSI) Driver. [![Docker Pulls](https://img.shields.io/docker/pulls/chrislusf/seaweedfs-csi-driver.svg?maxAge=4800)](https://hub.docker.com/r/chrislusf/seaweedfs-csi-driver/) [Filer]: https://github.com/chrislusf/seaweedfs/wiki/Directories-and-Files From 3f4aff5ddef0cb20f033295765be7fbc129e2157 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sun, 9 Aug 2020 09:09:35 -0700 Subject: [PATCH 22/26] s3: fix delimiter in list response --- weed/s3api/s3api_objects_list_handlers.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/weed/s3api/s3api_objects_list_handlers.go b/weed/s3api/s3api_objects_list_handlers.go index 9203c56f3..0a1f1a56c 100644 --- a/weed/s3api/s3api_objects_list_handlers.go +++ b/weed/s3api/s3api_objects_list_handlers.go @@ -159,7 +159,7 @@ func (s3a *S3ApiServer) listFilerEntries(bucket, originalPrefix string, maxKeys Marker: marker, NextMarker: lastEntryName, MaxKeys: maxKeys, - Delimiter: "/", + Delimiter: delimiter, IsTruncated: isTruncated, Contents: contents, CommonPrefixes: commonPrefixes, From 9ecc1170a39131125cf31a956989d9ed02a05511 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sun, 9 Aug 2020 14:35:53 -0700 Subject: [PATCH 23/26] =?UTF-8?q?istObjects=E5=92=8ClistObjectsV2=E4=B8=8D?= =?UTF-8?q?=E8=83=BD=E6=9F=A5=E8=AF=A2=E5=AD=90=E7=9B=AE=E5=BD=95=20#1418?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit fix https://github.com/chrislusf/seaweedfs/issues/1418 --- weed/s3api/s3api_objects_list_handlers.go | 209 ++++++++++++++++------ 1 file changed, 151 insertions(+), 58 deletions(-) diff --git a/weed/s3api/s3api_objects_list_handlers.go b/weed/s3api/s3api_objects_list_handlers.go index 0a1f1a56c..c62e776a7 100644 --- a/weed/s3api/s3api_objects_list_handlers.go +++ b/weed/s3api/s3api_objects_list_handlers.go @@ -2,6 +2,7 @@ package s3api import ( "context" + "encoding/xml" "fmt" "io" "net/http" @@ -12,10 +13,24 @@ import ( "time" "github.com/chrislusf/seaweedfs/weed/filer2" - "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" ) +type ListBucketResultV2 struct { + XMLName xml.Name `xml:"http://s3.amazonaws.com/doc/2006-03-01/ ListBucketResult"` + Name string `xml:"Name"` + Prefix string `xml:"Prefix"` + MaxKeys int `xml:"MaxKeys"` + Delimiter string `xml:"Delimiter,omitempty"` + IsTruncated bool `xml:"IsTruncated"` + Contents []ListEntry `xml:"Contents,omitempty"` + CommonPrefixes []PrefixEntry `xml:"CommonPrefixes,omitempty"` + ContinuationToken string `xml:"ContinuationToken,omitempty"` + NextContinuationToken string `xml:"NextContinuationToken,omitempty"` + KeyCount int `xml:"KeyCount"` + StartAfter string `xml:"StartAfter,omitempty"` +} + func (s3a *S3ApiServer) ListObjectsV2Handler(w http.ResponseWriter, r *http.Request) { // https://docs.aws.amazon.com/AmazonS3/latest/API/v2-RESTBucketGET.html @@ -23,7 +38,7 @@ func (s3a *S3ApiServer) ListObjectsV2Handler(w http.ResponseWriter, r *http.Requ // collect parameters bucket, _ := getBucketAndObject(r) - originalPrefix, marker, startAfter, delimiter, _, maxKeys := getListObjectsV2Args(r.URL.Query()) + originalPrefix, continuationToken, startAfter, delimiter, _, maxKeys := getListObjectsV2Args(r.URL.Query()) if maxKeys < 0 { writeErrorResponse(w, ErrInvalidMaxKeys, r.URL) @@ -34,7 +49,8 @@ func (s3a *S3ApiServer) ListObjectsV2Handler(w http.ResponseWriter, r *http.Requ return } - if marker == "" { + marker := continuationToken + if continuationToken == "" { marker = startAfter } @@ -44,8 +60,22 @@ func (s3a *S3ApiServer) ListObjectsV2Handler(w http.ResponseWriter, r *http.Requ writeErrorResponse(w, ErrInternalError, r.URL) return } + responseV2 := &ListBucketResultV2{ + XMLName: response.XMLName, + Name: response.Name, + CommonPrefixes: response.CommonPrefixes, + Contents: response.Contents, + ContinuationToken: continuationToken, + Delimiter: response.Delimiter, + IsTruncated: response.IsTruncated, + KeyCount: len(response.Contents), + MaxKeys: response.MaxKeys, + NextContinuationToken: response.NextMarker, + Prefix: response.Prefix, + StartAfter: startAfter, + } - writeSuccessResponseXML(w, encodeResponse(response)) + writeSuccessResponseXML(w, encodeResponse(responseV2)) } func (s3a *S3ApiServer) ListObjectsV1Handler(w http.ResponseWriter, r *http.Request) { @@ -76,70 +106,39 @@ func (s3a *S3ApiServer) ListObjectsV1Handler(w http.ResponseWriter, r *http.Requ writeSuccessResponseXML(w, encodeResponse(response)) } -func (s3a *S3ApiServer) listFilerEntries(bucket, originalPrefix string, maxKeys int, marker string, delimiter string) (response ListBucketResult, err error) { +func (s3a *S3ApiServer) listFilerEntries(bucket string, originalPrefix string, maxKeys int, marker string, delimiter string) (response ListBucketResult, err error) { // convert full path prefix into directory name and prefix for entry name - dir, prefix := filepath.Split(originalPrefix) - if strings.HasPrefix(dir, "/") { - dir = dir[1:] + reqDir, prefix := filepath.Split(originalPrefix) + if strings.HasPrefix(reqDir, "/") { + reqDir = reqDir[1:] } + if strings.HasSuffix(reqDir, "/") { + // remove trailing "/" + reqDir = reqDir[:len(reqDir)-1] + } + bucketPrefix := fmt.Sprintf("%s/%s/", s3a.option.BucketsPath, bucket) + reqDir = fmt.Sprintf("%s%s", bucketPrefix, reqDir) + + var contents []ListEntry + var commonPrefixes []PrefixEntry + var isTruncated bool + var doErr error + var nextMarker string // check filer err = s3a.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { - request := &filer_pb.ListEntriesRequest{ - Directory: fmt.Sprintf("%s/%s/%s", s3a.option.BucketsPath, bucket, dir), - Prefix: prefix, - Limit: uint32(maxKeys + 1), - StartFromFileName: marker, - InclusiveStartFrom: false, - } - - stream, err := client.ListEntries(context.Background(), request) - if err != nil { - return fmt.Errorf("list buckets: %v", err) - } - - var contents []ListEntry - var commonPrefixes []PrefixEntry - var counter int - var lastEntryName string - var isTruncated bool - - for { - resp, recvErr := stream.Recv() - if recvErr != nil { - if recvErr == io.EOF { - break - } else { - return recvErr - } - } - - entry := resp.Entry - counter++ - if counter > maxKeys { - isTruncated = true - break - } - lastEntryName = entry.Name + _, isTruncated, nextMarker, doErr = s3a.doListFilerEntries(client, reqDir, prefix, maxKeys, marker, delimiter, func(dir string, entry *filer_pb.Entry) { if entry.IsDirectory { - if entry.Name != ".uploads" { + if delimiter == "/" { prefix = fmt.Sprintf("%s%s/", dir, entry.Name) - commonPrefixes = append(commonPrefixes, PrefixEntry{ - Prefix: prefix, + Prefix: prefix[len(bucketPrefix):], }) - - if delimiter != "/" { - response, _ := s3a.listFilerEntries(bucket, prefix, maxKeys, marker, delimiter) - for _, content := range response.Contents { - contents = append(contents, content) - } - } } } else { contents = append(contents, ListEntry{ - Key: fmt.Sprintf("%s%s", dir, entry.Name), + Key: fmt.Sprintf("%s%s", dir[len(bucketPrefix):], entry.Name), LastModified: time.Unix(entry.Attributes.Mtime, 0).UTC(), ETag: "\"" + filer2.ETag(entry) + "\"", Size: int64(filer2.TotalSize(entry.Chunks)), @@ -150,14 +149,20 @@ func (s3a *S3ApiServer) listFilerEntries(bucket, originalPrefix string, maxKeys StorageClass: "STANDARD", }) } + }) + if doErr != nil { + return doErr + } + if !isTruncated { + nextMarker = "" } response = ListBucketResult{ Name: bucket, Prefix: originalPrefix, Marker: marker, - NextMarker: lastEntryName, + NextMarker: nextMarker, MaxKeys: maxKeys, Delimiter: delimiter, IsTruncated: isTruncated, @@ -165,14 +170,102 @@ func (s3a *S3ApiServer) listFilerEntries(bucket, originalPrefix string, maxKeys CommonPrefixes: commonPrefixes, } - glog.V(4).Infof("read directory: %v, found: %v, %+v", request, counter, response) - return nil }) return } +func (s3a *S3ApiServer) doListFilerEntries(client filer_pb.SeaweedFilerClient, dir, prefix string, maxKeys int, marker, delimiter string, eachEntryFn func(dir string, entry *filer_pb.Entry)) (counter int, isTruncated bool, nextMarker string, err error) { + // invariants + // prefix and marker should be under dir, marker may contain "/" + // maxKeys should be updated for each recursion + + if prefix == "/" && delimiter == "/" { + return + } + if maxKeys <= 0 { + return + } + + if strings.Contains(marker, "/") { + sepIndex := strings.Index(marker, "/") + subDir, subMarker := marker[0:sepIndex], marker[sepIndex+1:] + // println("doListFilerEntries dir", dir+"/"+subDir, "subMarker", subMarker, "maxKeys", maxKeys) + subCounter, _, subNextMarker, subErr := s3a.doListFilerEntries(client, dir+"/"+subDir, "", maxKeys, subMarker, delimiter, eachEntryFn) + if subErr != nil { + err = subErr + return + } + maxKeys -= subCounter + nextMarker = subDir + "/" + subNextMarker + counter += subCounter + // finished processing this sub directory + marker = subDir + } + + // now marker is also a direct child of dir + request := &filer_pb.ListEntriesRequest{ + Directory: dir, + Prefix: prefix, + Limit: uint32(maxKeys + 1), + StartFromFileName: marker, + InclusiveStartFrom: false, + } + + stream, listErr := client.ListEntries(context.Background(), request) + if listErr != nil { + err = fmt.Errorf("list entires %+v: %v", request, listErr) + return + } + + for { + resp, recvErr := stream.Recv() + if recvErr != nil { + if recvErr == io.EOF { + break + } else { + err = fmt.Errorf("iterating entires %+v: %v", request, recvErr) + return + } + } + if counter >= maxKeys { + isTruncated = true + return + } + entry := resp.Entry + nextMarker = entry.Name + if entry.IsDirectory { + println("ListEntries", dir, "dir:", entry.Name) + if entry.Name != ".uploads" { // FIXME no need to apply to all directories. this extra also affects maxKeys + eachEntryFn(dir, entry) + if delimiter != "/" { + // println("doListFilerEntries2 dir", dir+"/"+entry.Name, "maxKeys", maxKeys-counter) + subCounter, subIsTruncated, subNextMarker, subErr := s3a.doListFilerEntries(client, dir+"/"+entry.Name, "", maxKeys-counter, "", delimiter, eachEntryFn) + if subErr != nil { + err = fmt.Errorf("doListFilerEntries2: %v", subErr) + return + } + // println("doListFilerEntries2 dir", dir+"/"+entry.Name, "maxKeys", maxKeys-counter, "subCounter", subCounter, "subNextMarker", subNextMarker, "subIsTruncated", subIsTruncated) + counter += subCounter + nextMarker = entry.Name + "/" + subNextMarker + if subIsTruncated { + isTruncated = true + return + } + } else { + counter++ + } + } + } else { + // println("ListEntries", dir, "file:", entry.Name) + eachEntryFn(dir, entry) + counter++ + } + } + return +} + func getListObjectsV2Args(values url.Values) (prefix, token, startAfter, delimiter string, fetchOwner bool, maxkeys int) { prefix = values.Get("prefix") token = values.Get("continuation-token") From f86c7d911a7e423c66840e86f0e8b99d4d1517f3 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sun, 9 Aug 2020 14:42:25 -0700 Subject: [PATCH 24/26] remove println --- weed/s3api/s3api_objects_list_handlers.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/weed/s3api/s3api_objects_list_handlers.go b/weed/s3api/s3api_objects_list_handlers.go index c62e776a7..311442551 100644 --- a/weed/s3api/s3api_objects_list_handlers.go +++ b/weed/s3api/s3api_objects_list_handlers.go @@ -236,7 +236,7 @@ func (s3a *S3ApiServer) doListFilerEntries(client filer_pb.SeaweedFilerClient, d entry := resp.Entry nextMarker = entry.Name if entry.IsDirectory { - println("ListEntries", dir, "dir:", entry.Name) + // println("ListEntries", dir, "dir:", entry.Name) if entry.Name != ".uploads" { // FIXME no need to apply to all directories. this extra also affects maxKeys eachEntryFn(dir, entry) if delimiter != "/" { From 4f195a54ca5c048a0684f3cf9fadfe0a1fb184c5 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sun, 9 Aug 2020 16:34:47 -0700 Subject: [PATCH 25/26] 1.4.5 change the scope for hadoop depencies to provided --- other/java/client/pom.xml | 2 +- other/java/client/pom.xml.deploy | 2 +- other/java/client/pom_debug.xml | 2 +- other/java/hdfs2/dependency-reduced-pom.xml | 176 +++++++++++++++++++- other/java/hdfs2/pom.xml | 4 +- other/java/hdfs3/dependency-reduced-pom.xml | 2 +- other/java/hdfs3/pom.xml | 4 +- 7 files changed, 185 insertions(+), 7 deletions(-) diff --git a/other/java/client/pom.xml b/other/java/client/pom.xml index 4d8f93bff..6727f749f 100644 --- a/other/java/client/pom.xml +++ b/other/java/client/pom.xml @@ -5,7 +5,7 @@ com.github.chrislusf seaweedfs-client - 1.4.4 + 1.4.5 org.sonatype.oss diff --git a/other/java/client/pom.xml.deploy b/other/java/client/pom.xml.deploy index 4d8f93bff..6727f749f 100644 --- a/other/java/client/pom.xml.deploy +++ b/other/java/client/pom.xml.deploy @@ -5,7 +5,7 @@ com.github.chrislusf seaweedfs-client - 1.4.4 + 1.4.5 org.sonatype.oss diff --git a/other/java/client/pom_debug.xml b/other/java/client/pom_debug.xml index bb2ba5e74..ed3f07298 100644 --- a/other/java/client/pom_debug.xml +++ b/other/java/client/pom_debug.xml @@ -5,7 +5,7 @@ com.github.chrislusf seaweedfs-client - 1.4.4 + 1.4.5 org.sonatype.oss diff --git a/other/java/hdfs2/dependency-reduced-pom.xml b/other/java/hdfs2/dependency-reduced-pom.xml index d00291c98..c54f8d2a7 100644 --- a/other/java/hdfs2/dependency-reduced-pom.xml +++ b/other/java/hdfs2/dependency-reduced-pom.xml @@ -120,6 +120,180 @@ + + + org.apache.hadoop + hadoop-client + 2.9.2 + provided + + + hadoop-hdfs-client + org.apache.hadoop + + + hadoop-mapreduce-client-app + org.apache.hadoop + + + hadoop-yarn-api + org.apache.hadoop + + + hadoop-mapreduce-client-core + org.apache.hadoop + + + hadoop-mapreduce-client-jobclient + org.apache.hadoop + + + hadoop-annotations + org.apache.hadoop + + + + + org.apache.hadoop + hadoop-common + 2.9.2 + provided + + + commons-cli + commons-cli + + + commons-math3 + org.apache.commons + + + xmlenc + xmlenc + + + commons-io + commons-io + + + commons-net + commons-net + + + commons-collections + commons-collections + + + servlet-api + javax.servlet + + + jetty + org.mortbay.jetty + + + jetty-util + org.mortbay.jetty + + + jetty-sslengine + org.mortbay.jetty + + + jsp-api + javax.servlet.jsp + + + jersey-core + com.sun.jersey + + + jersey-json + com.sun.jersey + + + jersey-server + com.sun.jersey + + + log4j + log4j + + + jets3t + net.java.dev.jets3t + + + commons-lang + commons-lang + + + commons-configuration + commons-configuration + + + commons-lang3 + org.apache.commons + + + slf4j-log4j12 + org.slf4j + + + jackson-core-asl + org.codehaus.jackson + + + jackson-mapper-asl + org.codehaus.jackson + + + avro + org.apache.avro + + + hadoop-auth + org.apache.hadoop + + + jsch + com.jcraft + + + curator-client + org.apache.curator + + + curator-recipes + org.apache.curator + + + htrace-core4 + org.apache.htrace + + + zookeeper + org.apache.zookeeper + + + commons-compress + org.apache.commons + + + stax2-api + org.codehaus.woodstox + + + woodstox-core + com.fasterxml.woodstox + + + hadoop-annotations + org.apache.hadoop + + + + ossrh @@ -127,7 +301,7 @@ - 1.4.4 + 1.4.5 2.9.2 diff --git a/other/java/hdfs2/pom.xml b/other/java/hdfs2/pom.xml index 6d9191727..2c8d4ce32 100644 --- a/other/java/hdfs2/pom.xml +++ b/other/java/hdfs2/pom.xml @@ -5,7 +5,7 @@ 4.0.0 - 1.4.4 + 1.4.5 2.9.2 @@ -147,6 +147,7 @@ org.apache.hadoop hadoop-client ${hadoop.version} + provided com.github.chrislusf @@ -157,6 +158,7 @@ org.apache.hadoop hadoop-common ${hadoop.version} + provided diff --git a/other/java/hdfs3/dependency-reduced-pom.xml b/other/java/hdfs3/dependency-reduced-pom.xml index 0dcc49b3f..5f1e278f8 100644 --- a/other/java/hdfs3/dependency-reduced-pom.xml +++ b/other/java/hdfs3/dependency-reduced-pom.xml @@ -127,7 +127,7 @@ - 1.4.4 + 1.4.5 3.1.1 diff --git a/other/java/hdfs3/pom.xml b/other/java/hdfs3/pom.xml index 05a613759..b1bd27f74 100644 --- a/other/java/hdfs3/pom.xml +++ b/other/java/hdfs3/pom.xml @@ -5,7 +5,7 @@ 4.0.0 - 1.4.4 + 1.4.5 3.1.1 @@ -147,6 +147,7 @@ org.apache.hadoop hadoop-client ${hadoop.version} + provided com.github.chrislusf @@ -157,6 +158,7 @@ org.apache.hadoop hadoop-common ${hadoop.version} + provided From e74dc4e4bca245828df180f516973e5d6ac2e1df Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sun, 9 Aug 2020 21:56:09 -0700 Subject: [PATCH 26/26] add back fs node cache for renaming --- weed/filesys/dir.go | 25 +++-- weed/filesys/dir_rename.go | 1 + weed/filesys/file.go | 1 + weed/filesys/fscache.go | 207 +++++++++++++++++++++++++++++++++++ weed/filesys/fscache_test.go | 96 ++++++++++++++++ weed/filesys/wfs.go | 4 +- weed/util/bytes.go | 10 +- 7 files changed, 334 insertions(+), 10 deletions(-) create mode 100644 weed/filesys/fscache.go create mode 100644 weed/filesys/fscache_test.go diff --git a/weed/filesys/dir.go b/weed/filesys/dir.go index 843ada866..08332d967 100644 --- a/weed/filesys/dir.go +++ b/weed/filesys/dir.go @@ -101,18 +101,22 @@ func (dir *Dir) Fsync(ctx context.Context, req *fuse.FsyncRequest) error { } func (dir *Dir) newFile(name string, entry *filer_pb.Entry) fs.Node { - return &File{ - Name: name, - dir: dir, - wfs: dir.wfs, - entry: entry, - entryViewCache: nil, - } + return dir.wfs.fsNodeCache.EnsureFsNode(util.NewFullPath(dir.FullPath(), name), func() fs.Node { + return &File{ + Name: name, + dir: dir, + wfs: dir.wfs, + entry: entry, + entryViewCache: nil, + } + }) } func (dir *Dir) newDirectory(fullpath util.FullPath, entry *filer_pb.Entry) fs.Node { - return &Dir{name: entry.Name, wfs: dir.wfs, entry: entry, parent: dir} + return dir.wfs.fsNodeCache.EnsureFsNode(fullpath, func() fs.Node { + return &Dir{name: entry.Name, wfs: dir.wfs, entry: entry, parent: dir} + }) } @@ -312,6 +316,8 @@ func (dir *Dir) removeOneFile(req *fuse.RemoveRequest) error { dir.wfs.deleteFileChunks(entry.Chunks) + dir.wfs.fsNodeCache.DeleteFsNode(filePath) + dir.wfs.metaCache.DeleteEntry(context.Background(), filePath) glog.V(3).Infof("remove file: %v", req) @@ -328,6 +334,7 @@ func (dir *Dir) removeOneFile(req *fuse.RemoveRequest) error { func (dir *Dir) removeFolder(req *fuse.RemoveRequest) error { t := util.NewFullPath(dir.FullPath(), req.Name) + dir.wfs.fsNodeCache.DeleteFsNode(t) dir.wfs.metaCache.DeleteEntry(context.Background(), t) @@ -423,6 +430,8 @@ func (dir *Dir) Listxattr(ctx context.Context, req *fuse.ListxattrRequest, resp func (dir *Dir) Forget() { glog.V(3).Infof("Forget dir %s", dir.FullPath()) + + dir.wfs.fsNodeCache.DeleteFsNode(util.FullPath(dir.FullPath())) } func (dir *Dir) maybeLoadEntry() error { diff --git a/weed/filesys/dir_rename.go b/weed/filesys/dir_rename.go index 120dffd1d..da4f1b232 100644 --- a/weed/filesys/dir_rename.go +++ b/weed/filesys/dir_rename.go @@ -62,6 +62,7 @@ func (dir *Dir) Rename(ctx context.Context, req *fuse.RenameRequest, newDirector } // fmt.Printf("rename path: %v => %v\n", oldPath, newPath) + dir.wfs.fsNodeCache.Move(oldPath, newPath) delete(dir.wfs.handles, oldPath.AsInode()) return err diff --git a/weed/filesys/file.go b/weed/filesys/file.go index f96cd8118..dcda93522 100644 --- a/weed/filesys/file.go +++ b/weed/filesys/file.go @@ -213,6 +213,7 @@ func (file *File) Fsync(ctx context.Context, req *fuse.FsyncRequest) error { func (file *File) Forget() { t := util.NewFullPath(file.dir.FullPath(), file.Name) glog.V(3).Infof("Forget file %s", t) + file.wfs.fsNodeCache.DeleteFsNode(t) } func (file *File) maybeLoadEntry(ctx context.Context) error { diff --git a/weed/filesys/fscache.go b/weed/filesys/fscache.go new file mode 100644 index 000000000..b146f0615 --- /dev/null +++ b/weed/filesys/fscache.go @@ -0,0 +1,207 @@ +package filesys + +import ( + "sync" + + "github.com/chrislusf/seaweedfs/weed/util" + "github.com/seaweedfs/fuse/fs" +) + +type FsCache struct { + root *FsNode + sync.RWMutex +} +type FsNode struct { + parent *FsNode + node fs.Node + name string + childrenLock sync.RWMutex + children map[string]*FsNode +} + +func newFsCache(root fs.Node) *FsCache { + return &FsCache{ + root: &FsNode{ + node: root, + }, + } +} + +func (c *FsCache) GetFsNode(path util.FullPath) fs.Node { + + c.RLock() + defer c.RUnlock() + + return c.doGetFsNode(path) +} + +func (c *FsCache) doGetFsNode(path util.FullPath) fs.Node { + t := c.root + for _, p := range path.Split() { + t = t.findChild(p) + if t == nil { + return nil + } + } + return t.node +} + +func (c *FsCache) SetFsNode(path util.FullPath, node fs.Node) { + + c.Lock() + defer c.Unlock() + + c.doSetFsNode(path, node) +} + +func (c *FsCache) doSetFsNode(path util.FullPath, node fs.Node) { + t := c.root + for _, p := range path.Split() { + t = t.ensureChild(p) + } + t.node = node +} + +func (c *FsCache) EnsureFsNode(path util.FullPath, genNodeFn func() fs.Node) fs.Node { + + c.Lock() + defer c.Unlock() + + t := c.doGetFsNode(path) + if t != nil { + return t + } + t = genNodeFn() + c.doSetFsNode(path, t) + return t +} + +func (c *FsCache) DeleteFsNode(path util.FullPath) { + + c.Lock() + defer c.Unlock() + + t := c.root + for _, p := range path.Split() { + t = t.findChild(p) + if t == nil { + return + } + } + if t.parent != nil { + t.parent.disconnectChild(t) + } + t.deleteSelf() +} + +// oldPath and newPath are full path including the new name +func (c *FsCache) Move(oldPath util.FullPath, newPath util.FullPath) *FsNode { + + c.Lock() + defer c.Unlock() + + // find old node + src := c.root + for _, p := range oldPath.Split() { + src = src.findChild(p) + if src == nil { + return src + } + } + if src.parent != nil { + src.parent.disconnectChild(src) + } + + // find new node + target := c.root + for _, p := range newPath.Split() { + target = target.ensureChild(p) + } + parent := target.parent + src.name = target.name + if dir, ok := src.node.(*Dir); ok { + dir.name = target.name // target is not Dir, but a shortcut + } + if f, ok := src.node.(*File); ok { + f.Name = target.name + if f.entry != nil { + f.entry.Name = f.Name + } + } + parent.disconnectChild(target) + + target.deleteSelf() + + src.connectToParent(parent) + + return src +} + +func (n *FsNode) connectToParent(parent *FsNode) { + n.parent = parent + oldNode := parent.findChild(n.name) + if oldNode != nil { + oldNode.deleteSelf() + } + if dir, ok := n.node.(*Dir); ok { + dir.parent = parent.node.(*Dir) + } + if f, ok := n.node.(*File); ok { + f.dir = parent.node.(*Dir) + } + n.childrenLock.Lock() + parent.children[n.name] = n + n.childrenLock.Unlock() +} + +func (n *FsNode) findChild(name string) *FsNode { + n.childrenLock.RLock() + defer n.childrenLock.RUnlock() + + child, found := n.children[name] + if found { + return child + } + return nil +} + +func (n *FsNode) ensureChild(name string) *FsNode { + n.childrenLock.Lock() + defer n.childrenLock.Unlock() + + if n.children == nil { + n.children = make(map[string]*FsNode) + } + child, found := n.children[name] + if found { + return child + } + t := &FsNode{ + parent: n, + node: nil, + name: name, + children: nil, + } + n.children[name] = t + return t +} + +func (n *FsNode) disconnectChild(child *FsNode) { + n.childrenLock.Lock() + delete(n.children, child.name) + n.childrenLock.Unlock() + child.parent = nil +} + +func (n *FsNode) deleteSelf() { + n.childrenLock.Lock() + for _, child := range n.children { + child.deleteSelf() + } + n.children = nil + n.childrenLock.Unlock() + + n.node = nil + n.parent = nil + +} diff --git a/weed/filesys/fscache_test.go b/weed/filesys/fscache_test.go new file mode 100644 index 000000000..67f9aacc8 --- /dev/null +++ b/weed/filesys/fscache_test.go @@ -0,0 +1,96 @@ +package filesys + +import ( + "testing" + + "github.com/chrislusf/seaweedfs/weed/util" +) + +func TestPathSplit(t *testing.T) { + parts := util.FullPath("/").Split() + if len(parts) != 0 { + t.Errorf("expecting an empty list, but getting %d", len(parts)) + } + + parts = util.FullPath("/readme.md").Split() + if len(parts) != 1 { + t.Errorf("expecting an empty list, but getting %d", len(parts)) + } + +} + +func TestFsCache(t *testing.T) { + + cache := newFsCache(nil) + + x := cache.GetFsNode(util.FullPath("/y/x")) + if x != nil { + t.Errorf("wrong node!") + } + + p := util.FullPath("/a/b/c") + cache.SetFsNode(p, &File{Name: "cc"}) + tNode := cache.GetFsNode(p) + tFile := tNode.(*File) + if tFile.Name != "cc" { + t.Errorf("expecting a FsNode") + } + + cache.SetFsNode(util.FullPath("/a/b/d"), &File{Name: "dd"}) + cache.SetFsNode(util.FullPath("/a/b/e"), &File{Name: "ee"}) + cache.SetFsNode(util.FullPath("/a/b/f"), &File{Name: "ff"}) + cache.SetFsNode(util.FullPath("/z"), &File{Name: "zz"}) + cache.SetFsNode(util.FullPath("/a"), &File{Name: "aa"}) + + b := cache.GetFsNode(util.FullPath("/a/b")) + if b != nil { + t.Errorf("unexpected node!") + } + + a := cache.GetFsNode(util.FullPath("/a")) + if a == nil { + t.Errorf("missing node!") + } + + cache.DeleteFsNode(util.FullPath("/a")) + if b != nil { + t.Errorf("unexpected node!") + } + + a = cache.GetFsNode(util.FullPath("/a")) + if a != nil { + t.Errorf("wrong DeleteFsNode!") + } + + z := cache.GetFsNode(util.FullPath("/z")) + if z == nil { + t.Errorf("missing node!") + } + + y := cache.GetFsNode(util.FullPath("/x/y")) + if y != nil { + t.Errorf("wrong node!") + } + +} + +func TestFsCacheMove(t *testing.T) { + + cache := newFsCache(nil) + + cache.SetFsNode(util.FullPath("/a/b/d"), &File{Name: "dd"}) + cache.SetFsNode(util.FullPath("/a/b/e"), &File{Name: "ee"}) + cache.SetFsNode(util.FullPath("/z"), &File{Name: "zz"}) + cache.SetFsNode(util.FullPath("/a"), &File{Name: "aa"}) + + cache.Move(util.FullPath("/a/b"), util.FullPath("/z/x")) + + d := cache.GetFsNode(util.FullPath("/z/x/d")) + if d == nil { + t.Errorf("unexpected nil node!") + } + if d.(*File).Name != "dd" { + t.Errorf("unexpected non dd node!") + } + +} diff --git a/weed/filesys/wfs.go b/weed/filesys/wfs.go index 9ef597024..22f0b655a 100644 --- a/weed/filesys/wfs.go +++ b/weed/filesys/wfs.go @@ -63,6 +63,7 @@ type WFS struct { stats statsCache root fs.Node + fsNodeCache *FsCache chunkCache *chunk_cache.ChunkCache metaCache *meta_cache.MetaCache @@ -82,7 +83,7 @@ func NewSeaweedFileSystem(option *Option) *WFS { }, }, } - cacheUniqueId := util.Base64Md5([]byte(option.FilerGrpcAddress + option.FilerMountRootPath + util.Version()))[0:4] + cacheUniqueId := util.Md5String([]byte(option.FilerGrpcAddress + option.FilerMountRootPath + util.Version()))[0:4] cacheDir := path.Join(option.CacheDir, cacheUniqueId) if option.CacheSizeMB > 0 { os.MkdirAll(cacheDir, 0755) @@ -100,6 +101,7 @@ func NewSeaweedFileSystem(option *Option) *WFS { }) wfs.root = &Dir{name: wfs.option.FilerMountRootPath, wfs: wfs} + wfs.fsNodeCache = newFsCache(wfs.root) return wfs } diff --git a/weed/util/bytes.go b/weed/util/bytes.go index 5076c3e67..890d50586 100644 --- a/weed/util/bytes.go +++ b/weed/util/bytes.go @@ -115,9 +115,17 @@ func Base64Encode(data []byte) string { } func Base64Md5(data []byte) string { + return Base64Encode(Md5(data)) +} + +func Md5(data []byte) []byte { hash := md5.New() hash.Write(data) - return Base64Encode(hash.Sum(nil)) + return hash.Sum(nil) +} + +func Md5String(data []byte) string { + return fmt.Sprintf("%x", Md5(data)) } func Base64Md5ToBytes(contentMd5 string) []byte {