diff --git a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystem.java b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystem.java index 35cc54309..585fa39a3 100644 --- a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystem.java +++ b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystem.java @@ -1,11 +1,11 @@ package seaweed.hdfs; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.*; import org.apache.hadoop.fs.permission.AclEntry; import org.apache.hadoop.fs.permission.AclStatus; import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.Progressable; import org.slf4j.Logger; diff --git a/other/java/hdfs3/src/main/java/seaweed/hdfs/BufferedSeaweedInputStream.java b/other/java/hdfs3/src/main/java/seaweed/hdfs/BufferedSeaweedInputStream.java new file mode 100644 index 000000000..0bee6e43f --- /dev/null +++ b/other/java/hdfs3/src/main/java/seaweed/hdfs/BufferedSeaweedInputStream.java @@ -0,0 +1,49 @@ +package seaweed.hdfs; + +import org.apache.hadoop.fs.PositionedReadable; +import org.apache.hadoop.fs.Seekable; + +import java.io.BufferedInputStream; +import java.io.FilterInputStream; +import java.io.IOException; +import java.io.InputStream; + +class BufferedSeaweedInputStream extends FilterInputStream implements Seekable, PositionedReadable { + + SeaweedInputStream t; + + protected BufferedSeaweedInputStream(InputStream in, int bufferSize) { + super(new BufferedInputStream(in, bufferSize)); + t = (SeaweedInputStream)in; + } + + @Override + public int read(long position, byte[] buffer, int offset, int length) throws IOException { + return this.t.read(position,buffer,offset,length); + } + + @Override + public void readFully(long position, byte[] buffer, int offset, int length) throws IOException { + this.t.readFully(position,buffer,offset,length); + } + + @Override + public void readFully(long position, byte[] buffer) throws IOException { + this.t.readFully(position,buffer); + } + + @Override + public void seek(long pos) throws IOException { + this.t.seek(pos); + } + + @Override + public long getPos() throws IOException { + return this.t.getPos(); + } + + @Override + public boolean seekToNewSource(long targetPos) throws IOException { + return this.t.seekToNewSource(targetPos); + } +} diff --git a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java index 2341d335d..585fa39a3 100644 --- a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java +++ b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java @@ -76,7 +76,7 @@ public class SeaweedFileSystem extends FileSystem { try { InputStream inputStream = seaweedFileSystemStore.openFileForRead(path, statistics, bufferSize); - return new FSDataInputStream(inputStream); + return new FSDataInputStream(new BufferedSeaweedInputStream(inputStream, 16 * 1024 * 1024)); } catch (Exception ex) { LOG.warn("open path: {} bufferSize:{}", path, bufferSize, ex); return null;