Browse Source

Merge pull request #7 from chrislusf/master

sync
pull/1408/head
hilimd 4 years ago
committed by GitHub
parent
commit
d9af6e851b
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 49
      other/java/hdfs2/src/main/java/seaweed/hdfs/BufferedSeaweedInputStream.java
  2. 2
      other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystem.java
  3. 49
      other/java/hdfs3/src/main/java/seaweed/hdfs/BufferedSeaweedInputStream.java
  4. 2
      other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java

49
other/java/hdfs2/src/main/java/seaweed/hdfs/BufferedSeaweedInputStream.java

@ -0,0 +1,49 @@
package seaweed.hdfs;
import org.apache.hadoop.fs.PositionedReadable;
import org.apache.hadoop.fs.Seekable;
import java.io.BufferedInputStream;
import java.io.FilterInputStream;
import java.io.IOException;
import java.io.InputStream;
class BufferedSeaweedInputStream extends FilterInputStream implements Seekable, PositionedReadable {
SeaweedInputStream t;
protected BufferedSeaweedInputStream(InputStream in, int bufferSize) {
super(new BufferedInputStream(in, bufferSize));
t = (SeaweedInputStream)in;
}
@Override
public int read(long position, byte[] buffer, int offset, int length) throws IOException {
return this.t.read(position,buffer,offset,length);
}
@Override
public void readFully(long position, byte[] buffer, int offset, int length) throws IOException {
this.t.readFully(position,buffer,offset,length);
}
@Override
public void readFully(long position, byte[] buffer) throws IOException {
this.t.readFully(position,buffer);
}
@Override
public void seek(long pos) throws IOException {
this.t.seek(pos);
}
@Override
public long getPos() throws IOException {
return this.t.getPos();
}
@Override
public boolean seekToNewSource(long targetPos) throws IOException {
return this.t.seekToNewSource(targetPos);
}
}

2
other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystem.java

@ -76,7 +76,7 @@ public class SeaweedFileSystem extends FileSystem {
try { try {
InputStream inputStream = seaweedFileSystemStore.openFileForRead(path, statistics, bufferSize); InputStream inputStream = seaweedFileSystemStore.openFileForRead(path, statistics, bufferSize);
return new FSDataInputStream(inputStream);
return new FSDataInputStream(new BufferedSeaweedInputStream(inputStream, 16 * 1024 * 1024));
} catch (Exception ex) { } catch (Exception ex) {
LOG.warn("open path: {} bufferSize:{}", path, bufferSize, ex); LOG.warn("open path: {} bufferSize:{}", path, bufferSize, ex);
return null; return null;

49
other/java/hdfs3/src/main/java/seaweed/hdfs/BufferedSeaweedInputStream.java

@ -0,0 +1,49 @@
package seaweed.hdfs;
import org.apache.hadoop.fs.PositionedReadable;
import org.apache.hadoop.fs.Seekable;
import java.io.BufferedInputStream;
import java.io.FilterInputStream;
import java.io.IOException;
import java.io.InputStream;
class BufferedSeaweedInputStream extends FilterInputStream implements Seekable, PositionedReadable {
SeaweedInputStream t;
protected BufferedSeaweedInputStream(InputStream in, int bufferSize) {
super(new BufferedInputStream(in, bufferSize));
t = (SeaweedInputStream)in;
}
@Override
public int read(long position, byte[] buffer, int offset, int length) throws IOException {
return this.t.read(position,buffer,offset,length);
}
@Override
public void readFully(long position, byte[] buffer, int offset, int length) throws IOException {
this.t.readFully(position,buffer,offset,length);
}
@Override
public void readFully(long position, byte[] buffer) throws IOException {
this.t.readFully(position,buffer);
}
@Override
public void seek(long pos) throws IOException {
this.t.seek(pos);
}
@Override
public long getPos() throws IOException {
return this.t.getPos();
}
@Override
public boolean seekToNewSource(long targetPos) throws IOException {
return this.t.seekToNewSource(targetPos);
}
}

2
other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java

@ -76,7 +76,7 @@ public class SeaweedFileSystem extends FileSystem {
try { try {
InputStream inputStream = seaweedFileSystemStore.openFileForRead(path, statistics, bufferSize); InputStream inputStream = seaweedFileSystemStore.openFileForRead(path, statistics, bufferSize);
return new FSDataInputStream(inputStream);
return new FSDataInputStream(new BufferedSeaweedInputStream(inputStream, 16 * 1024 * 1024));
} catch (Exception ex) { } catch (Exception ex) {
LOG.warn("open path: {} bufferSize:{}", path, bufferSize, ex); LOG.warn("open path: {} bufferSize:{}", path, bufferSize, ex);
return null; return null;

Loading…
Cancel
Save