diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedInputStream.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedInputStream.java index 519ff0fd9..8b26c242c 100644 --- a/other/java/client/src/main/java/seaweedfs/client/SeaweedInputStream.java +++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedInputStream.java @@ -28,11 +28,13 @@ public class SeaweedInputStream extends InputStream { public SeaweedInputStream( final FilerGrpcClient filerGrpcClient, - final String dir, final String name) throws IOException { + final String fullpath) throws IOException { this.filerGrpcClient = filerGrpcClient; - this.path = dir; + this.path = fullpath; FilerClient filerClient = new FilerClient(filerGrpcClient); - this.entry = filerClient.lookupEntry(dir, name); + this.entry = filerClient.lookupEntry( + SeaweedOutputStream.getParentDirectory(fullpath), + SeaweedOutputStream.getFileName(fullpath)); this.contentLength = SeaweedRead.fileSize(entry); this.visibleIntervalList = SeaweedRead.nonOverlappingVisibleIntervals(filerGrpcClient, entry.getChunksList()); diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java index b09a15a5c..a98bbd1ab 100644 --- a/other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java +++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java @@ -14,7 +14,7 @@ import java.util.concurrent.*; public class SeaweedOutputStream extends OutputStream { private static final Logger LOG = LoggerFactory.getLogger(SeaweedOutputStream.class); - + protected final boolean supportFlush = false; // true; private final FilerGrpcClient filerGrpcClient; private final String path; private final int bufferSize; @@ -22,7 +22,6 @@ public class SeaweedOutputStream extends OutputStream { private final ThreadPoolExecutor threadExecutor; private final ExecutorCompletionService completionService; private final FilerProto.Entry.Builder entry; - protected final boolean supportFlush = false; // true; private final ConcurrentLinkedDeque writeOperations; private long position; private boolean closed; @@ -32,6 +31,45 @@ public class SeaweedOutputStream extends OutputStream { private ByteBuffer buffer; private long outputIndex; private String replication = "000"; + private boolean shouldSaveMetadata = false; + + public SeaweedOutputStream(FilerGrpcClient filerGrpcClient, final String fullpath) { + this.filerGrpcClient = filerGrpcClient; + this.path = fullpath; + this.position = 0; + this.closed = false; + this.lastError = null; + this.lastFlushOffset = 0; + this.bufferSize = 8 * 1024 * 1024; + this.buffer = ByteBufferPool.request(bufferSize); + this.outputIndex = 0; + this.writeOperations = new ConcurrentLinkedDeque<>(); + + this.maxConcurrentRequestCount = Runtime.getRuntime().availableProcessors(); + + this.threadExecutor + = new ThreadPoolExecutor(maxConcurrentRequestCount, + maxConcurrentRequestCount, + 120L, + TimeUnit.SECONDS, + new LinkedBlockingQueue()); + this.completionService = new ExecutorCompletionService<>(this.threadExecutor); + + long now = System.currentTimeMillis() / 1000L; + + this.entry = FilerProto.Entry.newBuilder() + .setName(getFileName(path)) + .setIsDirectory(false) + .setAttributes(FilerProto.FuseAttributes.newBuilder() + .setFileMode(0755) + .setReplication(replication) + .setCrtime(now) + .setMtime(now) + .clearGroupName() + ); + this.shouldSaveMetadata = true; + + } public SeaweedOutputStream(FilerGrpcClient filerGrpcClient, final String path, FilerProto.Entry.Builder entry, final long position, final int bufferSize, final String replication) { @@ -66,9 +104,20 @@ public class SeaweedOutputStream extends OutputStream { return path; } int lastSlashIndex = path.lastIndexOf("/"); + if (lastSlashIndex == 0) { + return "/"; + } return path.substring(0, lastSlashIndex); } + public static String getFileName(String path) { + if (path.indexOf("/") < 0) { + return path; + } + int lastSlashIndex = path.lastIndexOf("/"); + return path.substring(lastSlashIndex+1); + } + private synchronized void flushWrittenBytesToServiceInternal(final long offset) throws IOException { try { SeaweedWrite.writeMeta(filerGrpcClient, getParentDirectory(path), entry); @@ -163,6 +212,11 @@ public class SeaweedOutputStream extends OutputStream { threadExecutor.shutdownNow(); } } + + if (shouldSaveMetadata) { + SeaweedWrite.writeMeta(filerGrpcClient, getParentDirectory(path), entry); + } + } private synchronized void writeCurrentBufferToService() throws IOException { diff --git a/other/java/examples/src/main/java/com/seaweedfs/examples/UnzipFile.java b/other/java/examples/src/main/java/com/seaweedfs/examples/UnzipFile.java index fad1471b6..12eab1a2c 100644 --- a/other/java/examples/src/main/java/com/seaweedfs/examples/UnzipFile.java +++ b/other/java/examples/src/main/java/com/seaweedfs/examples/UnzipFile.java @@ -23,7 +23,7 @@ public class UnzipFile { long localProcessTime = startTime2 - startTime; SeaweedInputStream seaweedInputStream = new SeaweedInputStream( - filerGrpcClient, "/", "test.zip"); + filerGrpcClient, "/test.zip"); parseZip(seaweedInputStream); long swProcessTime = System.currentTimeMillis() - startTime2; diff --git a/other/java/examples/src/main/java/com/seaweedfs/examples/WriteFile.java b/other/java/examples/src/main/java/com/seaweedfs/examples/WriteFile.java new file mode 100644 index 000000000..b0bd54997 --- /dev/null +++ b/other/java/examples/src/main/java/com/seaweedfs/examples/WriteFile.java @@ -0,0 +1,48 @@ +package com.seaweedfs.examples; + +import seaweedfs.client.FilerGrpcClient; +import seaweedfs.client.SeaweedInputStream; +import seaweedfs.client.SeaweedOutputStream; + +import java.io.IOException; +import java.io.InputStream; +import java.util.zip.ZipEntry; +import java.util.zip.ZipInputStream; + +public class WriteFile { + + public static void main(String[] args) throws IOException { + + FilerGrpcClient filerGrpcClient = new FilerGrpcClient("localhost", 18888); + + SeaweedInputStream seaweedInputStream = new SeaweedInputStream( + filerGrpcClient, "/test.zip"); + unZipFiles(filerGrpcClient, seaweedInputStream); + + } + + public static void unZipFiles(FilerGrpcClient filerGrpcClient, InputStream is) throws IOException { + ZipInputStream zin = new ZipInputStream(is); + ZipEntry ze; + while ((ze = zin.getNextEntry()) != null) { + + String filename = ze.getName(); + if (filename.indexOf("/") >= 0) { + filename = filename.substring(filename.lastIndexOf("/") + 1); + } + if (filename.length()==0) { + continue; + } + + SeaweedOutputStream seaweedOutputStream = new SeaweedOutputStream(filerGrpcClient, "/test/"+filename); + byte[] bytesIn = new byte[16 * 1024]; + int read = 0; + while ((read = zin.read(bytesIn))!=-1) { + seaweedOutputStream.write(bytesIn,0,read); + } + seaweedOutputStream.close(); + + System.out.println(ze.getName()); + } + } +}