From a6d0f962e89ea0188620eac1c71439d43ba6c1e9 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Wed, 29 Jul 2020 20:10:04 -0700 Subject: [PATCH] add buffered seaweed input stream --- .../hdfs/BufferedSeaweedInputStream.java | 49 +++++++++++++++++++ .../java/seaweed/hdfs/SeaweedFileSystem.java | 4 +- 2 files changed, 51 insertions(+), 2 deletions(-) create mode 100644 other/java/hdfs2/src/main/java/seaweed/hdfs/BufferedSeaweedInputStream.java diff --git a/other/java/hdfs2/src/main/java/seaweed/hdfs/BufferedSeaweedInputStream.java b/other/java/hdfs2/src/main/java/seaweed/hdfs/BufferedSeaweedInputStream.java new file mode 100644 index 000000000..0bee6e43f --- /dev/null +++ b/other/java/hdfs2/src/main/java/seaweed/hdfs/BufferedSeaweedInputStream.java @@ -0,0 +1,49 @@ +package seaweed.hdfs; + +import org.apache.hadoop.fs.PositionedReadable; +import org.apache.hadoop.fs.Seekable; + +import java.io.BufferedInputStream; +import java.io.FilterInputStream; +import java.io.IOException; +import java.io.InputStream; + +class BufferedSeaweedInputStream extends FilterInputStream implements Seekable, PositionedReadable { + + SeaweedInputStream t; + + protected BufferedSeaweedInputStream(InputStream in, int bufferSize) { + super(new BufferedInputStream(in, bufferSize)); + t = (SeaweedInputStream)in; + } + + @Override + public int read(long position, byte[] buffer, int offset, int length) throws IOException { + return this.t.read(position,buffer,offset,length); + } + + @Override + public void readFully(long position, byte[] buffer, int offset, int length) throws IOException { + this.t.readFully(position,buffer,offset,length); + } + + @Override + public void readFully(long position, byte[] buffer) throws IOException { + this.t.readFully(position,buffer); + } + + @Override + public void seek(long pos) throws IOException { + this.t.seek(pos); + } + + @Override + public long getPos() throws IOException { + return this.t.getPos(); + } + + @Override + public boolean seekToNewSource(long targetPos) throws IOException { + return this.t.seekToNewSource(targetPos); + } +} diff --git a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystem.java b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystem.java index 2341d335d..35cc54309 100644 --- a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystem.java +++ b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystem.java @@ -1,11 +1,11 @@ package seaweed.hdfs; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.*; import org.apache.hadoop.fs.permission.AclEntry; import org.apache.hadoop.fs.permission.AclStatus; import org.apache.hadoop.fs.permission.FsPermission; -import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.Progressable; import org.slf4j.Logger; @@ -76,7 +76,7 @@ public class SeaweedFileSystem extends FileSystem { try { InputStream inputStream = seaweedFileSystemStore.openFileForRead(path, statistics, bufferSize); - return new FSDataInputStream(inputStream); + return new FSDataInputStream(new BufferedSeaweedInputStream(inputStream, 16 * 1024 * 1024)); } catch (Exception ex) { LOG.warn("open path: {} bufferSize:{}", path, bufferSize, ex); return null;