From 11eb014311144e8b3e450c70fb3b1877bd02d534 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sun, 25 Nov 2018 23:49:05 -0800 Subject: [PATCH] SeaweedFileSystem add SeaweedOutputStream to write --- other/java/hdfs/pom.xml | 5 ++ .../java/seaweed/hdfs/SeaweedFileSystem.java | 28 +++++-- .../seaweed/hdfs/SeaweedFileSystemStore.java | 41 ++++++++++ .../seaweed/hdfs/SeaweedOutputStream.java | 23 +++--- .../main/java/seaweed/hdfs/SeaweedWrite.java | 78 +++++++++++++++++-- 5 files changed, 152 insertions(+), 23 deletions(-) diff --git a/other/java/hdfs/pom.xml b/other/java/hdfs/pom.xml index c91fa2bfe..e668e1266 100644 --- a/other/java/hdfs/pom.xml +++ b/other/java/hdfs/pom.xml @@ -40,6 +40,11 @@ hadoop-common ${hadoop.version} + + org.apache.httpcomponents + httpmime + 4.5.2 + diff --git a/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedFileSystem.java b/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedFileSystem.java index 54e2d65cf..50f05a7c7 100644 --- a/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedFileSystem.java +++ b/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedFileSystem.java @@ -12,6 +12,7 @@ import org.apache.hadoop.util.Progressable; import java.io.FileNotFoundException; import java.io.IOException; +import java.io.OutputStream; import java.net.URI; public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem { @@ -55,16 +56,31 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem { seaweedFileSystemStore = new SeaweedFileSystemStore(host, port); } - public FSDataInputStream open(Path path, int i) throws IOException { + public FSDataInputStream open(Path path, int bufferSize) throws IOException { return null; } - public FSDataOutputStream create(Path path, FsPermission fsPermission, boolean b, int i, short i1, long l, Progressable progressable) throws IOException { - return null; + public FSDataOutputStream create(Path path, FsPermission permission, final boolean overwrite, final int bufferSize, + final short replication, final long blockSize, final Progressable progress) throws IOException { + path = qualify(path); + + try { + String replicaPlacement = String.format("%03d", replication - 1); + OutputStream outputStream = seaweedFileSystemStore.createFile(path, overwrite, permission, replicaPlacement); + return new FSDataOutputStream(outputStream, statistics); + } catch (Exception ex) { + return null; + } } - public FSDataOutputStream append(Path path, int i, Progressable progressable) throws IOException { - return null; + public FSDataOutputStream append(Path path, int bufferSize, Progressable progressable) throws IOException { + path = qualify(path); + try { + OutputStream outputStream = seaweedFileSystemStore.createFile(path, false, null, ""); + return new FSDataOutputStream(outputStream, statistics); + } catch (Exception ex) { + return null; + } } public boolean rename(Path src, Path dst) throws IOException { @@ -73,7 +89,7 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem { if (parentFolder == null) { return false; } - if (src.equals(dst)){ + if (src.equals(dst)) { return true; } FileStatus dstFileStatus = getFileStatus(dst); diff --git a/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java b/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java index be60f3638..d890762a0 100644 --- a/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java +++ b/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java @@ -9,6 +9,8 @@ import org.slf4j.LoggerFactory; import seaweedfs.client.FilerGrpcClient; import seaweedfs.client.FilerProto; +import java.io.IOException; +import java.io.OutputStream; import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -155,4 +157,43 @@ public class SeaweedFileSystemStore { return true; } + + public OutputStream createFile(final Path path, + final boolean overwrite, + FsPermission permission, + String replication) throws IOException { + + permission = permission == null ? FsPermission.getFileDefault() : permission; + + LOG.debug("createFile path: {} overwrite: {} permission: {}", + path, + overwrite, + permission.toString()); + + UserGroupInformation userGroupInformation = UserGroupInformation.getCurrentUser(); + + FilerProto.Entry.Builder entry = FilerProto.Entry.newBuilder(); + long writePosition = 0; + if (!overwrite) { + FilerProto.Entry existingEntry = lookupEntry(path); + if (existingEntry != null) { + entry.mergeFrom(existingEntry); + } + writePosition = existingEntry.getAttributes().getFileSize(); + replication = existingEntry.getAttributes().getReplication(); + } + if (entry == null) { + entry = FilerProto.Entry.newBuilder() + .setAttributes(FilerProto.FuseAttributes.newBuilder() + .setFileMode(permission.toOctal()) + .setReplication(replication) + .setCrtime(System.currentTimeMillis() / 1000L) + .setUserName(userGroupInformation.getUserName()) + .addAllGroupName(Arrays.asList(userGroupInformation.getGroupNames())) + ); + } + + return new SeaweedOutputStream(filerGrpcClient, path, entry, writePosition, 16 * 1024 * 1024, replication); + + } } diff --git a/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedOutputStream.java b/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedOutputStream.java index d0d488b1d..77886183c 100644 --- a/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedOutputStream.java +++ b/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedOutputStream.java @@ -4,6 +4,7 @@ package seaweed.hdfs; import com.google.common.base.Preconditions; import org.apache.hadoop.fs.FSExceptionMessages; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.StreamCapabilities; import org.apache.hadoop.fs.Syncable; import seaweedfs.client.FilerGrpcClient; @@ -12,8 +13,6 @@ import seaweedfs.client.FilerProto; import java.io.IOException; import java.io.InterruptedIOException; import java.io.OutputStream; -import java.util.ArrayList; -import java.util.List; import java.util.Locale; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentLinkedDeque; @@ -26,12 +25,12 @@ import java.util.concurrent.TimeUnit; public class SeaweedOutputStream extends OutputStream implements Syncable, StreamCapabilities { private final FilerGrpcClient filerGrpcClient; - private final List entries = new ArrayList<>(); - private final String path; + private final Path path; private final int bufferSize; private final int maxConcurrentRequestCount; private final ThreadPoolExecutor threadExecutor; private final ExecutorCompletionService completionService; + private FilerProto.Entry.Builder entry; private long position; private boolean closed; private boolean supportFlush = true; @@ -41,12 +40,12 @@ public class SeaweedOutputStream extends OutputStream implements Syncable, Strea private byte[] buffer; private int bufferIndex; private ConcurrentLinkedDeque writeOperations; + private String replication = "000"; - public SeaweedOutputStream(FilerGrpcClient filerGrpcClient, - final String path, - final long position, - final int bufferSize) { + public SeaweedOutputStream(FilerGrpcClient filerGrpcClient, final Path path, FilerProto.Entry.Builder entry, + final long position, final int bufferSize, final String replication) { this.filerGrpcClient = filerGrpcClient; + this.replication = replication; this.path = path; this.position = position; this.closed = false; @@ -67,11 +66,14 @@ public class SeaweedOutputStream extends OutputStream implements Syncable, Strea new LinkedBlockingQueue()); this.completionService = new ExecutorCompletionService<>(this.threadExecutor); + this.entry = entry; + + } private synchronized void flushWrittenBytesToServiceInternal(final long offset) throws IOException { try { - SeaweedWrite.writeMeta(filerGrpcClient, path, entries); + SeaweedWrite.writeMeta(filerGrpcClient, path, entry); } catch (Exception ex) { throw new IOException(ex); } @@ -221,8 +223,7 @@ public class SeaweedOutputStream extends OutputStream implements Syncable, Strea @Override public Void call() throws Exception { // originally: client.append(path, offset, bytes, 0, bytesLength); - FilerProto.Entry entry = SeaweedWrite.writeData(filerGrpcClient, offset, bytes, 0, bytesLength); - entries.add(entry); + SeaweedWrite.writeData(entry, replication, filerGrpcClient, offset, bytes, 0, bytesLength); return null; } }); diff --git a/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedWrite.java b/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedWrite.java index 1624eb739..227c9bde8 100644 --- a/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedWrite.java +++ b/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedWrite.java @@ -1,18 +1,84 @@ package seaweed.hdfs; +import org.apache.hadoop.fs.Path; +import org.apache.http.HttpResponse; +import org.apache.http.client.HttpClient; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.entity.mime.HttpMultipartMode; +import org.apache.http.entity.mime.MultipartEntityBuilder; +import org.apache.http.impl.client.HttpClientBuilder; import seaweedfs.client.FilerGrpcClient; import seaweedfs.client.FilerProto; -import java.util.List; +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; public class SeaweedWrite { - public static FilerProto.Entry writeData(final FilerGrpcClient filerGrpcClient, final long offset, - final byte[] bytes, final long bytesOffset, final long bytesLength) { - return null; + + public static void writeData(FilerProto.Entry.Builder entry, + final String replication, + final FilerGrpcClient filerGrpcClient, + final long offset, + final byte[] bytes, + final long bytesOffset, final long bytesLength) throws IOException { + FilerProto.AssignVolumeResponse response = filerGrpcClient.getBlockingStub().assignVolume( + FilerProto.AssignVolumeRequest.newBuilder() + .setCollection("") + .setReplication(replication) + .setDataCenter("") + .setReplication("") + .setTtlSec(0) + .build()); + String fileId = response.getFileId(); + String url = response.getUrl(); + String targetUrl = String.format("http://%s/%s", url, fileId); + + String etag = multipartUpload(targetUrl, bytes, bytesOffset, bytesLength); + + entry.addChunks(FilerProto.FileChunk.newBuilder() + .setFileId(fileId) + .setOffset(offset) + .setSize(bytesLength) + .setMtime(System.currentTimeMillis() / 10000L) + .setETag(etag) + ); + } public static void writeMeta(final FilerGrpcClient filerGrpcClient, - final String path, final List entries) { - return; + final Path path, final FilerProto.Entry.Builder entry) { + filerGrpcClient.getBlockingStub().createEntry( + FilerProto.CreateEntryRequest.newBuilder() + .setDirectory(path.getParent().toUri().getPath()) + .setEntry(entry) + .build() + ); + } + + private static String multipartUpload(String targetUrl, + final byte[] bytes, + final long bytesOffset, final long bytesLength) throws IOException { + + HttpClient client = HttpClientBuilder.create().setUserAgent("hdfs-client").build(); + + InputStream inputStream = new ByteArrayInputStream(bytes, (int) bytesOffset, (int) bytesLength); + + HttpPost post = new HttpPost(targetUrl); + + post.setEntity(MultipartEntityBuilder.create() + .setMode(HttpMultipartMode.BROWSER_COMPATIBLE) + .addBinaryBody("upload", inputStream) + .build()); + + HttpResponse response = client.execute(post); + + String etag = response.getLastHeader("ETag").getValue(); + + if (etag != null && etag.startsWith("\"") && etag.endsWith("\"")) { + etag = etag.substring(1, etag.length() - 1); + } + + return etag; } }