Browse Source

HDFS: implement ByteBufferReadable

fix https://github.com/chrislusf/seaweedfs/issues/1645
pull/1650/head
Chris Lu 4 years ago
parent
commit
a9efaa6385
  1. 12
      other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedInputStream.java
  2. 12
      other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedInputStream.java

12
other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedInputStream.java

@ -2,6 +2,7 @@ package seaweed.hdfs;
// based on org.apache.hadoop.fs.azurebfs.services.AbfsInputStream // based on org.apache.hadoop.fs.azurebfs.services.AbfsInputStream
import org.apache.hadoop.fs.ByteBufferReadable;
import org.apache.hadoop.fs.FSExceptionMessages; import org.apache.hadoop.fs.FSExceptionMessages;
import org.apache.hadoop.fs.FSInputStream; import org.apache.hadoop.fs.FSInputStream;
import org.apache.hadoop.fs.FileSystem.Statistics; import org.apache.hadoop.fs.FileSystem.Statistics;
@ -13,9 +14,10 @@ import seaweedfs.client.SeaweedRead;
import java.io.EOFException; import java.io.EOFException;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List; import java.util.List;
public class SeaweedInputStream extends FSInputStream {
public class SeaweedInputStream extends FSInputStream implements ByteBufferReadable {
private static final Logger LOG = LoggerFactory.getLogger(SeaweedInputStream.class); private static final Logger LOG = LoggerFactory.getLogger(SeaweedInputStream.class);
@ -85,7 +87,7 @@ public class SeaweedInputStream extends FSInputStream {
} }
long bytesRead = 0; long bytesRead = 0;
if (position+len < entry.getContent().size()) {
if (position+len <= entry.getContent().size()) {
entry.getContent().copyTo(b, (int) position, (int) off, len); entry.getContent().copyTo(b, (int) position, (int) off, len);
} else { } else {
bytesRead = SeaweedRead.read(this.filerGrpcClient, this.visibleIntervalList, this.position, b, off, len, SeaweedRead.fileSize(entry)); bytesRead = SeaweedRead.read(this.filerGrpcClient, this.visibleIntervalList, this.position, b, off, len, SeaweedRead.fileSize(entry));
@ -106,6 +108,12 @@ public class SeaweedInputStream extends FSInputStream {
} }
// implement ByteBufferReadable
@Override
public synchronized int read(ByteBuffer buf) throws IOException {
return read(buf.array(), buf.position(), buf.remaining());
}
/** /**
* Seek to given position in stream. * Seek to given position in stream.
* *

12
other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedInputStream.java

@ -2,6 +2,7 @@ package seaweed.hdfs;
// based on org.apache.hadoop.fs.azurebfs.services.AbfsInputStream // based on org.apache.hadoop.fs.azurebfs.services.AbfsInputStream
import org.apache.hadoop.fs.ByteBufferReadable;
import org.apache.hadoop.fs.FSExceptionMessages; import org.apache.hadoop.fs.FSExceptionMessages;
import org.apache.hadoop.fs.FSInputStream; import org.apache.hadoop.fs.FSInputStream;
import org.apache.hadoop.fs.FileSystem.Statistics; import org.apache.hadoop.fs.FileSystem.Statistics;
@ -13,9 +14,10 @@ import seaweedfs.client.SeaweedRead;
import java.io.EOFException; import java.io.EOFException;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List; import java.util.List;
public class SeaweedInputStream extends FSInputStream {
public class SeaweedInputStream extends FSInputStream implements ByteBufferReadable {
private static final Logger LOG = LoggerFactory.getLogger(SeaweedInputStream.class); private static final Logger LOG = LoggerFactory.getLogger(SeaweedInputStream.class);
@ -85,7 +87,7 @@ public class SeaweedInputStream extends FSInputStream {
} }
long bytesRead = 0; long bytesRead = 0;
if (position+len < entry.getContent().size()) {
if (position+len <= entry.getContent().size()) {
entry.getContent().copyTo(b, (int) position, (int) off, len); entry.getContent().copyTo(b, (int) position, (int) off, len);
} else { } else {
bytesRead = SeaweedRead.read(this.filerGrpcClient, this.visibleIntervalList, this.position, b, off, len, SeaweedRead.fileSize(entry)); bytesRead = SeaweedRead.read(this.filerGrpcClient, this.visibleIntervalList, this.position, b, off, len, SeaweedRead.fileSize(entry));
@ -106,6 +108,12 @@ public class SeaweedInputStream extends FSInputStream {
} }
// implement ByteBufferReadable
@Override
public synchronized int read(ByteBuffer buf) throws IOException {
return read(buf.array(), buf.position(), buf.remaining());
}
/** /**
* Seek to given position in stream. * Seek to given position in stream.
* *

Loading…
Cancel
Save