From 4119c61df84305886e6116705f1ad08d3da69328 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Mon, 3 Dec 2018 20:25:57 -0800 Subject: [PATCH] HCFS can read files --- .gitignore | 1 + .../java/seaweed/hdfs/SeaweedFileSystem.java | 17 +++++++-- .../seaweed/hdfs/SeaweedFileSystemStore.java | 31 ++++++++++++++-- .../java/seaweed/hdfs/SeaweedInputStream.java | 6 ++- .../main/java/seaweed/hdfs/SeaweedRead.java | 37 +++++++++++++++---- 5 files changed, 77 insertions(+), 15 deletions(-) diff --git a/.gitignore b/.gitignore index 98a401933..a56dfb8a3 100644 --- a/.gitignore +++ b/.gitignore @@ -79,3 +79,4 @@ test_data build target *.class +other/java/hdfs/dependency-reduced-pom.xml diff --git a/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedFileSystem.java b/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedFileSystem.java index 79c2641ec..f2dd01385 100644 --- a/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedFileSystem.java +++ b/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedFileSystem.java @@ -13,9 +13,12 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; +import java.io.InputStream; import java.io.OutputStream; import java.net.URI; +import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY; + public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem { public static final int FS_SEAWEED_DEFAULT_PORT = 8888; @@ -23,6 +26,7 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem { public static final String FS_SEAWEED_FILER_PORT = "fs.seaweed.filer.port"; private static final Logger LOG = LoggerFactory.getLogger(SeaweedFileSystem.class); + private static int BUFFER_SIZE = 16 * 1024 * 1024; private URI uri; private Path workingDirectory = new Path("/"); @@ -53,6 +57,8 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem { port = (port == -1) ? FS_SEAWEED_DEFAULT_PORT : port; conf.setInt(FS_SEAWEED_FILER_PORT, port); + conf.setInt(IO_FILE_BUFFER_SIZE_KEY, BUFFER_SIZE); + setConf(conf); this.uri = uri; @@ -65,7 +71,12 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem { path = qualify(path); - return null; + try { + InputStream inputStream = seaweedFileSystemStore.openFileForRead(path, statistics, bufferSize); + return new FSDataInputStream(inputStream); + } catch (Exception ex) { + return null; + } } public FSDataOutputStream create(Path path, FsPermission permission, final boolean overwrite, final int bufferSize, @@ -77,7 +88,7 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem { try { String replicaPlacement = String.format("%03d", replication - 1); - OutputStream outputStream = seaweedFileSystemStore.createFile(path, overwrite, permission, replicaPlacement); + OutputStream outputStream = seaweedFileSystemStore.createFile(path, overwrite, permission, bufferSize, replicaPlacement); return new FSDataOutputStream(outputStream, statistics); } catch (Exception ex) { return null; @@ -90,7 +101,7 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem { path = qualify(path); try { - OutputStream outputStream = seaweedFileSystemStore.createFile(path, false, null, ""); + OutputStream outputStream = seaweedFileSystemStore.createFile(path, false, null, bufferSize, ""); return new FSDataOutputStream(outputStream, statistics); } catch (Exception ex) { return null; diff --git a/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java b/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java index 7cc12424b..dd68e53f1 100644 --- a/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java +++ b/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java @@ -1,6 +1,7 @@ package seaweed.hdfs; import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.security.UserGroupInformation; @@ -9,7 +10,9 @@ import org.slf4j.LoggerFactory; import seaweedfs.client.FilerGrpcClient; import seaweedfs.client.FilerProto; +import java.io.FileNotFoundException; import java.io.IOException; +import java.io.InputStream; import java.io.OutputStream; import java.util.ArrayList; import java.util.Arrays; @@ -35,7 +38,6 @@ public class SeaweedFileSystemStore { if (isDirectory) { p = p | 1 << 31; } - System.out.println(permission + " = " + p); return p; } @@ -126,7 +128,7 @@ public class SeaweedFileSystemStore { private FileStatus doGetFileStatus(Path path, FilerProto.Entry entry) { FilerProto.FuseAttributes attributes = entry.getAttributes(); - long length = attributes.getFileSize(); + long length = SeaweedRead.totalSize(entry.getChunksList()); boolean isDir = entry.getIsDirectory(); int block_replication = 1; int blocksize = 512; @@ -206,6 +208,7 @@ public class SeaweedFileSystemStore { public OutputStream createFile(final Path path, final boolean overwrite, FsPermission permission, + int bufferSize, String replication) throws IOException { permission = permission == null ? FsPermission.getFileDefault() : permission; @@ -226,7 +229,7 @@ public class SeaweedFileSystemStore { entry.mergeFrom(existingEntry); entry.getAttributesBuilder().setMtime(now); } - writePosition = existingEntry.getAttributes().getFileSize(); + writePosition = SeaweedRead.totalSize(existingEntry.getChunksList()); replication = existingEntry.getAttributes().getReplication(); } if (entry == null) { @@ -243,7 +246,27 @@ public class SeaweedFileSystemStore { ); } - return new SeaweedOutputStream(filerGrpcClient, path, entry, writePosition, 16 * 1024 * 1024, replication); + return new SeaweedOutputStream(filerGrpcClient, path, entry, writePosition, bufferSize, replication); } + + public InputStream openFileForRead(final Path path, FileSystem.Statistics statistics, + int bufferSize) throws IOException { + + LOG.debug("openFileForRead path:{} bufferSize:{}", path, bufferSize); + + int readAheadQueueDepth = 2; + FilerProto.Entry entry = lookupEntry(path); + + if (entry == null) { + throw new FileNotFoundException("read non-exist file " + path); + } + + return new SeaweedInputStream(filerGrpcClient, + statistics, + path.toUri().getPath(), + entry, + bufferSize, + readAheadQueueDepth); + } } diff --git a/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedInputStream.java b/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedInputStream.java index 0cd118f22..b31cae166 100644 --- a/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedInputStream.java +++ b/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedInputStream.java @@ -6,6 +6,8 @@ import com.google.common.base.Preconditions; import org.apache.hadoop.fs.FSExceptionMessages; import org.apache.hadoop.fs.FSInputStream; import org.apache.hadoop.fs.FileSystem.Statistics; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import seaweedfs.client.FilerGrpcClient; import seaweedfs.client.FilerProto; @@ -15,6 +17,8 @@ import java.util.List; public class SeaweedInputStream extends FSInputStream { + private static final Logger LOG = LoggerFactory.getLogger(SeaweedInputStream.class); + private final FilerGrpcClient filerGrpcClient; private final Statistics statistics; private final String path; @@ -45,7 +49,7 @@ public class SeaweedInputStream extends FSInputStream { this.statistics = statistics; this.path = path; this.entry = entry; - this.contentLength = entry.getAttributes().getFileSize(); + this.contentLength = SeaweedRead.totalSize(entry.getChunksList()); this.bufferSize = bufferSize; this.readAheadQueueDepth = (readAheadQueueDepth >= 0) ? readAheadQueueDepth : Runtime.getRuntime().availableProcessors(); this.readAheadEnabled = true; diff --git a/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedRead.java b/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedRead.java index bf3669d59..edc279adc 100644 --- a/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedRead.java +++ b/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedRead.java @@ -5,6 +5,8 @@ import org.apache.http.HttpResponse; import org.apache.http.client.HttpClient; import org.apache.http.client.methods.HttpGet; import org.apache.http.impl.client.HttpClientBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import seaweedfs.client.FilerGrpcClient; import seaweedfs.client.FilerProto; @@ -17,6 +19,8 @@ import java.util.Map; public class SeaweedRead { + private static final Logger LOG = LoggerFactory.getLogger(SeaweedRead.class); + // returns bytesRead public static long read(FilerGrpcClient filerGrpcClient, List visibleIntervals, final long position, final byte[] buffer, final int bufferOffset, @@ -54,9 +58,11 @@ public class SeaweedRead { HttpResponse response = client.execute(request); HttpEntity entity = response.getEntity(); - readCount += entity.getContent().read(buffer, - (int) (chunkView.logicOffset - position), - (int) (chunkView.logicOffset - position + chunkView.size)); + int len = (int) (chunkView.logicOffset - position + chunkView.size); + entity.getContent().read(buffer, bufferOffset, len); + + LOG.debug("* read chunkView:{} length:{} position:{} bufferLength:{}", chunkView, len, position, bufferLength); + readCount += len; } catch (IOException e) { e.printStackTrace(); @@ -93,11 +99,17 @@ public class SeaweedRead { List newVisibles = new ArrayList<>(); List visibles = new ArrayList<>(); for (FilerProto.FileChunk chunk : chunks) { + List t = newVisibles; newVisibles = mergeIntoVisibles(visibles, newVisibles, chunk); - visibles.clear(); - List t = visibles; - visibles = newVisibles; - newVisibles = t; + if (t != newVisibles) { + // visibles are changed in place + } else { + // newVisibles are modified + visibles.clear(); + t = visibles; + visibles = newVisibles; + newVisibles = t; + } } return visibles; @@ -168,6 +180,17 @@ public class SeaweedRead { return fileId; } + public static long totalSize(List chunksList) { + long size = 0; + for (FilerProto.FileChunk chunk : chunksList) { + long t = chunk.getOffset() + chunk.getSize(); + if (size < t) { + size = t; + } + } + return size; + } + public static class VisibleInterval { long start; long stop;