Browse Source

Java: SeaweedOutputStream refactoring

pull/1784/head
Chris Lu 4 years ago
parent
commit
9fa7977714
  1. 53
      other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java

53
other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java

@ -21,8 +21,9 @@ public class SeaweedOutputStream extends OutputStream {
private final int maxConcurrentRequestCount; private final int maxConcurrentRequestCount;
private final ThreadPoolExecutor threadExecutor; private final ThreadPoolExecutor threadExecutor;
private final ExecutorCompletionService<Void> completionService; private final ExecutorCompletionService<Void> completionService;
private final FilerProto.Entry.Builder entry;
private final ConcurrentLinkedDeque<WriteOperation> writeOperations; private final ConcurrentLinkedDeque<WriteOperation> writeOperations;
private final boolean shouldSaveMetadata = false;
private FilerProto.Entry.Builder entry;
private long position; private long position;
private boolean closed; private boolean closed;
private volatile IOException lastError; private volatile IOException lastError;
@ -31,21 +32,25 @@ public class SeaweedOutputStream extends OutputStream {
private ByteBuffer buffer; private ByteBuffer buffer;
private long outputIndex; private long outputIndex;
private String replication = "000"; private String replication = "000";
private boolean shouldSaveMetadata = false;
public SeaweedOutputStream(FilerGrpcClient filerGrpcClient, final String fullpath) { public SeaweedOutputStream(FilerGrpcClient filerGrpcClient, final String fullpath) {
this(filerGrpcClient, fullpath, "000"); this(filerGrpcClient, fullpath, "000");
} }
public SeaweedOutputStream(FilerGrpcClient filerGrpcClient, final String fullpath, final String replication) { public SeaweedOutputStream(FilerGrpcClient filerGrpcClient, final String fullpath, final String replication) {
this.replication = replication;
this(filerGrpcClient, fullpath, null, 0, 8 * 1024 * 1024, "000");
}
public SeaweedOutputStream(FilerGrpcClient filerGrpcClient, final String path, FilerProto.Entry.Builder entry,
final long position, final int bufferSize, final String replication) {
this.filerGrpcClient = filerGrpcClient; this.filerGrpcClient = filerGrpcClient;
this.path = fullpath;
this.position = 0;
this.replication = replication;
this.path = path;
this.position = position;
this.closed = false; this.closed = false;
this.lastError = null; this.lastError = null;
this.lastFlushOffset = 0; this.lastFlushOffset = 0;
this.bufferSize = 8 * 1024 * 1024;
this.bufferSize = bufferSize;
this.buffer = ByteBufferPool.request(bufferSize); this.buffer = ByteBufferPool.request(bufferSize);
this.outputIndex = 0; this.outputIndex = 0;
this.writeOperations = new ConcurrentLinkedDeque<>(); this.writeOperations = new ConcurrentLinkedDeque<>();
@ -60,6 +65,8 @@ public class SeaweedOutputStream extends OutputStream {
new LinkedBlockingQueue<Runnable>()); new LinkedBlockingQueue<Runnable>());
this.completionService = new ExecutorCompletionService<>(this.threadExecutor); this.completionService = new ExecutorCompletionService<>(this.threadExecutor);
this.entry = entry;
if (this.entry == null) {
long now = System.currentTimeMillis() / 1000L; long now = System.currentTimeMillis() / 1000L;
this.entry = FilerProto.Entry.newBuilder() this.entry = FilerProto.Entry.newBuilder()
@ -72,36 +79,8 @@ public class SeaweedOutputStream extends OutputStream {
.setMtime(now) .setMtime(now)
.clearGroupName() .clearGroupName()
); );
this.shouldSaveMetadata = true;
} }
public SeaweedOutputStream(FilerGrpcClient filerGrpcClient, final String path, FilerProto.Entry.Builder entry,
final long position, final int bufferSize, final String replication) {
this.filerGrpcClient = filerGrpcClient;
this.replication = replication;
this.path = path;
this.position = position;
this.closed = false;
this.lastError = null;
this.lastFlushOffset = 0;
this.bufferSize = bufferSize;
this.buffer = ByteBufferPool.request(bufferSize);
this.outputIndex = 0;
this.writeOperations = new ConcurrentLinkedDeque<>();
this.maxConcurrentRequestCount = Runtime.getRuntime().availableProcessors();
this.threadExecutor
= new ThreadPoolExecutor(maxConcurrentRequestCount,
maxConcurrentRequestCount,
120L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>());
this.completionService = new ExecutorCompletionService<>(this.threadExecutor);
this.entry = entry;
} }
public static String getParentDirectory(String path) { public static String getParentDirectory(String path) {
@ -120,7 +99,7 @@ public class SeaweedOutputStream extends OutputStream {
return path; return path;
} }
int lastSlashIndex = path.lastIndexOf("/"); int lastSlashIndex = path.lastIndexOf("/");
return path.substring(lastSlashIndex+1);
return path.substring(lastSlashIndex + 1);
} }
private synchronized void flushWrittenBytesToServiceInternal(final long offset) throws IOException { private synchronized void flushWrittenBytesToServiceInternal(final long offset) throws IOException {
@ -218,10 +197,6 @@ public class SeaweedOutputStream extends OutputStream {
} }
} }
if (shouldSaveMetadata) {
SeaweedWrite.writeMeta(filerGrpcClient, getParentDirectory(path), entry);
}
} }
private synchronized void writeCurrentBufferToService() throws IOException { private synchronized void writeCurrentBufferToService() throws IOException {

Loading…
Cancel
Save