From 9fa7977714d99e546cee32e02b5f7fdf3528078b Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Thu, 4 Feb 2021 20:30:49 -0800 Subject: [PATCH] Java: SeaweedOutputStream refactoring --- .../seaweedfs/client/SeaweedOutputStream.java | 61 ++++++------------- 1 file changed, 18 insertions(+), 43 deletions(-) diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java index 94f34b221..f9df22c9b 100644 --- a/other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java +++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java @@ -21,8 +21,9 @@ public class SeaweedOutputStream extends OutputStream { private final int maxConcurrentRequestCount; private final ThreadPoolExecutor threadExecutor; private final ExecutorCompletionService completionService; - private final FilerProto.Entry.Builder entry; private final ConcurrentLinkedDeque writeOperations; + private final boolean shouldSaveMetadata = false; + private FilerProto.Entry.Builder entry; private long position; private boolean closed; private volatile IOException lastError; @@ -31,49 +32,13 @@ public class SeaweedOutputStream extends OutputStream { private ByteBuffer buffer; private long outputIndex; private String replication = "000"; - private boolean shouldSaveMetadata = false; public SeaweedOutputStream(FilerGrpcClient filerGrpcClient, final String fullpath) { this(filerGrpcClient, fullpath, "000"); } public SeaweedOutputStream(FilerGrpcClient filerGrpcClient, final String fullpath, final String replication) { - this.replication = replication; - this.filerGrpcClient = filerGrpcClient; - this.path = fullpath; - this.position = 0; - this.closed = false; - this.lastError = null; - this.lastFlushOffset = 0; - this.bufferSize = 8 * 1024 * 1024; - this.buffer = ByteBufferPool.request(bufferSize); - this.outputIndex = 0; - this.writeOperations = new ConcurrentLinkedDeque<>(); - - this.maxConcurrentRequestCount = Runtime.getRuntime().availableProcessors(); - - this.threadExecutor - = new ThreadPoolExecutor(maxConcurrentRequestCount, - maxConcurrentRequestCount, - 120L, - TimeUnit.SECONDS, - new LinkedBlockingQueue()); - this.completionService = new ExecutorCompletionService<>(this.threadExecutor); - - long now = System.currentTimeMillis() / 1000L; - - this.entry = FilerProto.Entry.newBuilder() - .setName(getFileName(path)) - .setIsDirectory(false) - .setAttributes(FilerProto.FuseAttributes.newBuilder() - .setFileMode(0755) - .setReplication(replication) - .setCrtime(now) - .setMtime(now) - .clearGroupName() - ); - this.shouldSaveMetadata = true; - + this(filerGrpcClient, fullpath, null, 0, 8 * 1024 * 1024, "000"); } public SeaweedOutputStream(FilerGrpcClient filerGrpcClient, final String path, FilerProto.Entry.Builder entry, @@ -101,6 +66,20 @@ public class SeaweedOutputStream extends OutputStream { this.completionService = new ExecutorCompletionService<>(this.threadExecutor); this.entry = entry; + if (this.entry == null) { + long now = System.currentTimeMillis() / 1000L; + + this.entry = FilerProto.Entry.newBuilder() + .setName(getFileName(path)) + .setIsDirectory(false) + .setAttributes(FilerProto.FuseAttributes.newBuilder() + .setFileMode(0755) + .setReplication(replication) + .setCrtime(now) + .setMtime(now) + .clearGroupName() + ); + } } @@ -120,7 +99,7 @@ public class SeaweedOutputStream extends OutputStream { return path; } int lastSlashIndex = path.lastIndexOf("/"); - return path.substring(lastSlashIndex+1); + return path.substring(lastSlashIndex + 1); } private synchronized void flushWrittenBytesToServiceInternal(final long offset) throws IOException { @@ -218,10 +197,6 @@ public class SeaweedOutputStream extends OutputStream { } } - if (shouldSaveMetadata) { - SeaweedWrite.writeMeta(filerGrpcClient, getParentDirectory(path), entry); - } - } private synchronized void writeCurrentBufferToService() throws IOException {