diff --git a/other/java/hdfs/pom.xml b/other/java/hdfs/pom.xml
index c91fa2bfe..e668e1266 100644
--- a/other/java/hdfs/pom.xml
+++ b/other/java/hdfs/pom.xml
@@ -40,6 +40,11 @@
hadoop-common
${hadoop.version}
+
+ org.apache.httpcomponents
+ httpmime
+ 4.5.2
+
diff --git a/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedFileSystem.java b/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedFileSystem.java
index 54e2d65cf..50f05a7c7 100644
--- a/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedFileSystem.java
+++ b/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedFileSystem.java
@@ -12,6 +12,7 @@ import org.apache.hadoop.util.Progressable;
import java.io.FileNotFoundException;
import java.io.IOException;
+import java.io.OutputStream;
import java.net.URI;
public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem {
@@ -55,16 +56,31 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem {
seaweedFileSystemStore = new SeaweedFileSystemStore(host, port);
}
- public FSDataInputStream open(Path path, int i) throws IOException {
+ public FSDataInputStream open(Path path, int bufferSize) throws IOException {
return null;
}
- public FSDataOutputStream create(Path path, FsPermission fsPermission, boolean b, int i, short i1, long l, Progressable progressable) throws IOException {
- return null;
+ public FSDataOutputStream create(Path path, FsPermission permission, final boolean overwrite, final int bufferSize,
+ final short replication, final long blockSize, final Progressable progress) throws IOException {
+ path = qualify(path);
+
+ try {
+ String replicaPlacement = String.format("%03d", replication - 1);
+ OutputStream outputStream = seaweedFileSystemStore.createFile(path, overwrite, permission, replicaPlacement);
+ return new FSDataOutputStream(outputStream, statistics);
+ } catch (Exception ex) {
+ return null;
+ }
}
- public FSDataOutputStream append(Path path, int i, Progressable progressable) throws IOException {
- return null;
+ public FSDataOutputStream append(Path path, int bufferSize, Progressable progressable) throws IOException {
+ path = qualify(path);
+ try {
+ OutputStream outputStream = seaweedFileSystemStore.createFile(path, false, null, "");
+ return new FSDataOutputStream(outputStream, statistics);
+ } catch (Exception ex) {
+ return null;
+ }
}
public boolean rename(Path src, Path dst) throws IOException {
@@ -73,7 +89,7 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem {
if (parentFolder == null) {
return false;
}
- if (src.equals(dst)){
+ if (src.equals(dst)) {
return true;
}
FileStatus dstFileStatus = getFileStatus(dst);
diff --git a/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java b/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java
index be60f3638..d890762a0 100644
--- a/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java
+++ b/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java
@@ -9,6 +9,8 @@ import org.slf4j.LoggerFactory;
import seaweedfs.client.FilerGrpcClient;
import seaweedfs.client.FilerProto;
+import java.io.IOException;
+import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
@@ -155,4 +157,43 @@ public class SeaweedFileSystemStore {
return true;
}
+
+ public OutputStream createFile(final Path path,
+ final boolean overwrite,
+ FsPermission permission,
+ String replication) throws IOException {
+
+ permission = permission == null ? FsPermission.getFileDefault() : permission;
+
+ LOG.debug("createFile path: {} overwrite: {} permission: {}",
+ path,
+ overwrite,
+ permission.toString());
+
+ UserGroupInformation userGroupInformation = UserGroupInformation.getCurrentUser();
+
+ FilerProto.Entry.Builder entry = FilerProto.Entry.newBuilder();
+ long writePosition = 0;
+ if (!overwrite) {
+ FilerProto.Entry existingEntry = lookupEntry(path);
+ if (existingEntry != null) {
+ entry.mergeFrom(existingEntry);
+ }
+ writePosition = existingEntry.getAttributes().getFileSize();
+ replication = existingEntry.getAttributes().getReplication();
+ }
+ if (entry == null) {
+ entry = FilerProto.Entry.newBuilder()
+ .setAttributes(FilerProto.FuseAttributes.newBuilder()
+ .setFileMode(permission.toOctal())
+ .setReplication(replication)
+ .setCrtime(System.currentTimeMillis() / 1000L)
+ .setUserName(userGroupInformation.getUserName())
+ .addAllGroupName(Arrays.asList(userGroupInformation.getGroupNames()))
+ );
+ }
+
+ return new SeaweedOutputStream(filerGrpcClient, path, entry, writePosition, 16 * 1024 * 1024, replication);
+
+ }
}
diff --git a/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedOutputStream.java b/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedOutputStream.java
index d0d488b1d..77886183c 100644
--- a/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedOutputStream.java
+++ b/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedOutputStream.java
@@ -4,6 +4,7 @@ package seaweed.hdfs;
import com.google.common.base.Preconditions;
import org.apache.hadoop.fs.FSExceptionMessages;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.StreamCapabilities;
import org.apache.hadoop.fs.Syncable;
import seaweedfs.client.FilerGrpcClient;
@@ -12,8 +13,6 @@ import seaweedfs.client.FilerProto;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.io.OutputStream;
-import java.util.ArrayList;
-import java.util.List;
import java.util.Locale;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentLinkedDeque;
@@ -26,12 +25,12 @@ import java.util.concurrent.TimeUnit;
public class SeaweedOutputStream extends OutputStream implements Syncable, StreamCapabilities {
private final FilerGrpcClient filerGrpcClient;
- private final List entries = new ArrayList<>();
- private final String path;
+ private final Path path;
private final int bufferSize;
private final int maxConcurrentRequestCount;
private final ThreadPoolExecutor threadExecutor;
private final ExecutorCompletionService completionService;
+ private FilerProto.Entry.Builder entry;
private long position;
private boolean closed;
private boolean supportFlush = true;
@@ -41,12 +40,12 @@ public class SeaweedOutputStream extends OutputStream implements Syncable, Strea
private byte[] buffer;
private int bufferIndex;
private ConcurrentLinkedDeque writeOperations;
+ private String replication = "000";
- public SeaweedOutputStream(FilerGrpcClient filerGrpcClient,
- final String path,
- final long position,
- final int bufferSize) {
+ public SeaweedOutputStream(FilerGrpcClient filerGrpcClient, final Path path, FilerProto.Entry.Builder entry,
+ final long position, final int bufferSize, final String replication) {
this.filerGrpcClient = filerGrpcClient;
+ this.replication = replication;
this.path = path;
this.position = position;
this.closed = false;
@@ -67,11 +66,14 @@ public class SeaweedOutputStream extends OutputStream implements Syncable, Strea
new LinkedBlockingQueue());
this.completionService = new ExecutorCompletionService<>(this.threadExecutor);
+ this.entry = entry;
+
+
}
private synchronized void flushWrittenBytesToServiceInternal(final long offset) throws IOException {
try {
- SeaweedWrite.writeMeta(filerGrpcClient, path, entries);
+ SeaweedWrite.writeMeta(filerGrpcClient, path, entry);
} catch (Exception ex) {
throw new IOException(ex);
}
@@ -221,8 +223,7 @@ public class SeaweedOutputStream extends OutputStream implements Syncable, Strea
@Override
public Void call() throws Exception {
// originally: client.append(path, offset, bytes, 0, bytesLength);
- FilerProto.Entry entry = SeaweedWrite.writeData(filerGrpcClient, offset, bytes, 0, bytesLength);
- entries.add(entry);
+ SeaweedWrite.writeData(entry, replication, filerGrpcClient, offset, bytes, 0, bytesLength);
return null;
}
});
diff --git a/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedWrite.java b/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedWrite.java
index 1624eb739..227c9bde8 100644
--- a/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedWrite.java
+++ b/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedWrite.java
@@ -1,18 +1,84 @@
package seaweed.hdfs;
+import org.apache.hadoop.fs.Path;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.HttpClient;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.entity.mime.HttpMultipartMode;
+import org.apache.http.entity.mime.MultipartEntityBuilder;
+import org.apache.http.impl.client.HttpClientBuilder;
import seaweedfs.client.FilerGrpcClient;
import seaweedfs.client.FilerProto;
-import java.util.List;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
public class SeaweedWrite {
- public static FilerProto.Entry writeData(final FilerGrpcClient filerGrpcClient, final long offset,
- final byte[] bytes, final long bytesOffset, final long bytesLength) {
- return null;
+
+ public static void writeData(FilerProto.Entry.Builder entry,
+ final String replication,
+ final FilerGrpcClient filerGrpcClient,
+ final long offset,
+ final byte[] bytes,
+ final long bytesOffset, final long bytesLength) throws IOException {
+ FilerProto.AssignVolumeResponse response = filerGrpcClient.getBlockingStub().assignVolume(
+ FilerProto.AssignVolumeRequest.newBuilder()
+ .setCollection("")
+ .setReplication(replication)
+ .setDataCenter("")
+ .setReplication("")
+ .setTtlSec(0)
+ .build());
+ String fileId = response.getFileId();
+ String url = response.getUrl();
+ String targetUrl = String.format("http://%s/%s", url, fileId);
+
+ String etag = multipartUpload(targetUrl, bytes, bytesOffset, bytesLength);
+
+ entry.addChunks(FilerProto.FileChunk.newBuilder()
+ .setFileId(fileId)
+ .setOffset(offset)
+ .setSize(bytesLength)
+ .setMtime(System.currentTimeMillis() / 10000L)
+ .setETag(etag)
+ );
+
}
public static void writeMeta(final FilerGrpcClient filerGrpcClient,
- final String path, final List entries) {
- return;
+ final Path path, final FilerProto.Entry.Builder entry) {
+ filerGrpcClient.getBlockingStub().createEntry(
+ FilerProto.CreateEntryRequest.newBuilder()
+ .setDirectory(path.getParent().toUri().getPath())
+ .setEntry(entry)
+ .build()
+ );
+ }
+
+ private static String multipartUpload(String targetUrl,
+ final byte[] bytes,
+ final long bytesOffset, final long bytesLength) throws IOException {
+
+ HttpClient client = HttpClientBuilder.create().setUserAgent("hdfs-client").build();
+
+ InputStream inputStream = new ByteArrayInputStream(bytes, (int) bytesOffset, (int) bytesLength);
+
+ HttpPost post = new HttpPost(targetUrl);
+
+ post.setEntity(MultipartEntityBuilder.create()
+ .setMode(HttpMultipartMode.BROWSER_COMPATIBLE)
+ .addBinaryBody("upload", inputStream)
+ .build());
+
+ HttpResponse response = client.execute(post);
+
+ String etag = response.getLastHeader("ETag").getValue();
+
+ if (etag != null && etag.startsWith("\"") && etag.endsWith("\"")) {
+ etag = etag.substring(1, etag.length() - 1);
+ }
+
+ return etag;
}
}