Browse Source

same logic for reading random access files from Go

pull/1427/head
Chris Lu 4 years ago
parent
commit
13bfe5deef
  1. 55
      other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java
  2. 4
      other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java
  3. 4
      other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedInputStream.java
  4. 4
      other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java
  5. 4
      other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedInputStream.java

55
other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java

@ -23,7 +23,7 @@ public class SeaweedRead {
// returns bytesRead // returns bytesRead
public static long read(FilerGrpcClient filerGrpcClient, List<VisibleInterval> visibleIntervals, public static long read(FilerGrpcClient filerGrpcClient, List<VisibleInterval> visibleIntervals,
final long position, final byte[] buffer, final int bufferOffset, final long position, final byte[] buffer, final int bufferOffset,
final int bufferLength) throws IOException {
final int bufferLength, final long fileSize) throws IOException {
List<ChunkView> chunkViews = viewFromVisibles(visibleIntervals, position, bufferLength); List<ChunkView> chunkViews = viewFromVisibles(visibleIntervals, position, bufferLength);
@ -42,6 +42,14 @@ public class SeaweedRead {
long readCount = 0; long readCount = 0;
int startOffset = bufferOffset; int startOffset = bufferOffset;
for (ChunkView chunkView : chunkViews) { for (ChunkView chunkView : chunkViews) {
if (startOffset < chunkView.logicOffset) {
long gap = chunkView.logicOffset - startOffset;
LOG.debug("zero [{},{})", startOffset, startOffset + gap);
readCount += gap;
startOffset += gap;
}
FilerProto.Locations locations = vid2Locations.get(parseVolumeId(chunkView.fileId)); FilerProto.Locations locations = vid2Locations.get(parseVolumeId(chunkView.fileId));
if (locations == null || locations.getLocationsCount() == 0) { if (locations == null || locations.getLocationsCount() == 0) {
LOG.error("failed to locate {}", chunkView.fileId); LOG.error("failed to locate {}", chunkView.fileId);
@ -51,11 +59,22 @@ public class SeaweedRead {
int len = readChunkView(position, buffer, startOffset, chunkView, locations); int len = readChunkView(position, buffer, startOffset, chunkView, locations);
LOG.debug("read [{},{}) {} size {}", startOffset, startOffset + len, chunkView.fileId, chunkView.size);
readCount += len; readCount += len;
startOffset += len; startOffset += len;
} }
long limit = Math.min(bufferLength, fileSize);
if (startOffset < limit) {
long gap = limit - startOffset;
LOG.debug("zero2 [{},{})", startOffset, startOffset + gap);
readCount += gap;
startOffset += gap;
}
return readCount; return readCount;
} }
@ -71,7 +90,7 @@ public class SeaweedRead {
int len = (int) chunkView.size; int len = (int) chunkView.size;
LOG.debug("readChunkView fid:{} chunkData.length:{} chunkView.offset:{} buffer.length:{} startOffset:{} len:{}", LOG.debug("readChunkView fid:{} chunkData.length:{} chunkView.offset:{} buffer.length:{} startOffset:{} len:{}",
chunkView.fileId, chunkData.length, chunkView.offset, buffer.length, startOffset, len); chunkView.fileId, chunkData.length, chunkView.offset, buffer.length, startOffset, len);
System.arraycopy(chunkData, (int) chunkView.offset, buffer, startOffset, len);
System.arraycopy(chunkData, startOffset - (int) (chunkView.logicOffset - chunkView.offset), buffer, startOffset, len);
return len; return len;
} }
@ -93,7 +112,7 @@ public class SeaweedRead {
Header contentEncodingHeader = entity.getContentEncoding(); Header contentEncodingHeader = entity.getContentEncoding();
if (contentEncodingHeader != null) { if (contentEncodingHeader != null) {
HeaderElement[] encodings =contentEncodingHeader.getElements();
HeaderElement[] encodings = contentEncodingHeader.getElements();
for (int i = 0; i < encodings.length; i++) { for (int i = 0; i < encodings.length; i++) {
if (encodings[i].getName().equalsIgnoreCase("gzip")) { if (encodings[i].getName().equalsIgnoreCase("gzip")) {
entity = new GzipDecompressingEntity(entity); entity = new GzipDecompressingEntity(entity);
@ -134,18 +153,19 @@ public class SeaweedRead {
long stop = offset + size; long stop = offset + size;
for (VisibleInterval chunk : visibleIntervals) { for (VisibleInterval chunk : visibleIntervals) {
if (chunk.start <= offset && offset < chunk.stop && offset < stop) {
long chunkStart = Math.max(offset, chunk.start);
long chunkStop = Math.min(stop, chunk.stop);
if (chunkStart < chunkStop) {
boolean isFullChunk = chunk.isFullChunk && chunk.start == offset && chunk.stop <= stop; boolean isFullChunk = chunk.isFullChunk && chunk.start == offset && chunk.stop <= stop;
views.add(new ChunkView( views.add(new ChunkView(
chunk.fileId, chunk.fileId,
offset - chunk.start,
Math.min(chunk.stop, stop) - offset,
offset,
chunkStart - chunk.start + chunk.chunkOffset,
chunkStop - chunkStart,
chunkStart,
isFullChunk, isFullChunk,
chunk.cipherKey, chunk.cipherKey,
chunk.isCompressed chunk.isCompressed
)); ));
offset = Math.min(chunk.stop, stop);
} }
} }
return views; return views;
@ -160,7 +180,13 @@ public class SeaweedRead {
Arrays.sort(chunks, new Comparator<FilerProto.FileChunk>() { Arrays.sort(chunks, new Comparator<FilerProto.FileChunk>() {
@Override @Override
public int compare(FilerProto.FileChunk a, FilerProto.FileChunk b) { public int compare(FilerProto.FileChunk a, FilerProto.FileChunk b) {
return (int) (a.getMtime() - b.getMtime());
// if just a.getMtime() - b.getMtime(), it will overflow!
if (a.getMtime() < b.getMtime()) {
return -1;
} else if (a.getMtime() > b.getMtime()) {
return 1;
}
return 0;
} }
}); });
@ -181,6 +207,7 @@ public class SeaweedRead {
chunk.getOffset() + chunk.getSize(), chunk.getOffset() + chunk.getSize(),
chunk.getFileId(), chunk.getFileId(),
chunk.getMtime(), chunk.getMtime(),
0,
true, true,
chunk.getCipherKey().toByteArray(), chunk.getCipherKey().toByteArray(),
chunk.getIsCompressed() chunk.getIsCompressed()
@ -203,6 +230,7 @@ public class SeaweedRead {
chunk.getOffset(), chunk.getOffset(),
v.fileId, v.fileId,
v.modifiedTime, v.modifiedTime,
v.chunkOffset,
false, false,
v.cipherKey, v.cipherKey,
v.isCompressed v.isCompressed
@ -215,6 +243,7 @@ public class SeaweedRead {
v.stop, v.stop,
v.fileId, v.fileId,
v.modifiedTime, v.modifiedTime,
v.chunkOffset + (chunkStop - v.start),
false, false,
v.cipherKey, v.cipherKey,
v.isCompressed v.isCompressed
@ -247,6 +276,10 @@ public class SeaweedRead {
return fileId; return fileId;
} }
public static long fileSize(FilerProto.Entry entry) {
return Math.max(totalSize(entry.getChunksList()), entry.getAttributes().getFileSize());
}
public static long totalSize(List<FilerProto.FileChunk> chunksList) { public static long totalSize(List<FilerProto.FileChunk> chunksList) {
long size = 0; long size = 0;
for (FilerProto.FileChunk chunk : chunksList) { for (FilerProto.FileChunk chunk : chunksList) {
@ -263,15 +296,17 @@ public class SeaweedRead {
public final long stop; public final long stop;
public final long modifiedTime; public final long modifiedTime;
public final String fileId; public final String fileId;
public final long chunkOffset;
public final boolean isFullChunk; public final boolean isFullChunk;
public final byte[] cipherKey; public final byte[] cipherKey;
public final boolean isCompressed; public final boolean isCompressed;
public VisibleInterval(long start, long stop, String fileId, long modifiedTime, boolean isFullChunk, byte[] cipherKey, boolean isCompressed) {
public VisibleInterval(long start, long stop, String fileId, long modifiedTime, long chunkOffset, boolean isFullChunk, byte[] cipherKey, boolean isCompressed) {
this.start = start; this.start = start;
this.stop = stop; this.stop = stop;
this.modifiedTime = modifiedTime; this.modifiedTime = modifiedTime;
this.fileId = fileId; this.fileId = fileId;
this.chunkOffset = chunkOffset;
this.isFullChunk = isFullChunk; this.isFullChunk = isFullChunk;
this.cipherKey = cipherKey; this.cipherKey = cipherKey;
this.isCompressed = isCompressed; this.isCompressed = isCompressed;

4
other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java

@ -124,7 +124,7 @@ public class SeaweedFileSystemStore {
private FileStatus doGetFileStatus(Path path, FilerProto.Entry entry) { private FileStatus doGetFileStatus(Path path, FilerProto.Entry entry) {
FilerProto.FuseAttributes attributes = entry.getAttributes(); FilerProto.FuseAttributes attributes = entry.getAttributes();
long length = SeaweedRead.totalSize(entry.getChunksList());
long length = SeaweedRead.fileSize(entry);
boolean isDir = entry.getIsDirectory(); boolean isDir = entry.getIsDirectory();
int block_replication = 1; int block_replication = 1;
int blocksize = 512; int blocksize = 512;
@ -185,7 +185,7 @@ public class SeaweedFileSystemStore {
entry.mergeFrom(existingEntry); entry.mergeFrom(existingEntry);
entry.getAttributesBuilder().setMtime(now); entry.getAttributesBuilder().setMtime(now);
LOG.debug("createFile merged entry path:{} entry:{} from:{}", path, entry, existingEntry); LOG.debug("createFile merged entry path:{} entry:{} from:{}", path, entry, existingEntry);
writePosition = SeaweedRead.totalSize(existingEntry.getChunksList());
writePosition = SeaweedRead.fileSize(existingEntry);
replication = existingEntry.getAttributes().getReplication(); replication = existingEntry.getAttributes().getReplication();
} }
} }

4
other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedInputStream.java

@ -41,7 +41,7 @@ public class SeaweedInputStream extends FSInputStream {
this.statistics = statistics; this.statistics = statistics;
this.path = path; this.path = path;
this.entry = entry; this.entry = entry;
this.contentLength = SeaweedRead.totalSize(entry.getChunksList());
this.contentLength = SeaweedRead.fileSize(entry);
this.bufferSize = bufferSize; this.bufferSize = bufferSize;
this.visibleIntervalList = SeaweedRead.nonOverlappingVisibleIntervals(filerGrpcClient, entry.getChunksList()); this.visibleIntervalList = SeaweedRead.nonOverlappingVisibleIntervals(filerGrpcClient, entry.getChunksList());
@ -87,7 +87,7 @@ public class SeaweedInputStream extends FSInputStream {
throw new IllegalArgumentException("requested read length is more than will fit after requested offset in buffer"); throw new IllegalArgumentException("requested read length is more than will fit after requested offset in buffer");
} }
long bytesRead = SeaweedRead.read(this.filerGrpcClient, this.visibleIntervalList, this.position, b, off, len);
long bytesRead = SeaweedRead.read(this.filerGrpcClient, this.visibleIntervalList, this.position, b, off, len, SeaweedRead.fileSize(entry));
if (bytesRead > Integer.MAX_VALUE) { if (bytesRead > Integer.MAX_VALUE) {
throw new IOException("Unexpected Content-Length"); throw new IOException("Unexpected Content-Length");
} }

4
other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java

@ -124,7 +124,7 @@ public class SeaweedFileSystemStore {
private FileStatus doGetFileStatus(Path path, FilerProto.Entry entry) { private FileStatus doGetFileStatus(Path path, FilerProto.Entry entry) {
FilerProto.FuseAttributes attributes = entry.getAttributes(); FilerProto.FuseAttributes attributes = entry.getAttributes();
long length = SeaweedRead.totalSize(entry.getChunksList());
long length = SeaweedRead.fileSize(entry);
boolean isDir = entry.getIsDirectory(); boolean isDir = entry.getIsDirectory();
int block_replication = 1; int block_replication = 1;
int blocksize = 512; int blocksize = 512;
@ -185,7 +185,7 @@ public class SeaweedFileSystemStore {
entry.mergeFrom(existingEntry); entry.mergeFrom(existingEntry);
entry.getAttributesBuilder().setMtime(now); entry.getAttributesBuilder().setMtime(now);
LOG.debug("createFile merged entry path:{} entry:{} from:{}", path, entry, existingEntry); LOG.debug("createFile merged entry path:{} entry:{} from:{}", path, entry, existingEntry);
writePosition = SeaweedRead.totalSize(existingEntry.getChunksList());
writePosition = SeaweedRead.fileSize(existingEntry);
replication = existingEntry.getAttributes().getReplication(); replication = existingEntry.getAttributes().getReplication();
} }
} }

4
other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedInputStream.java

@ -41,7 +41,7 @@ public class SeaweedInputStream extends FSInputStream {
this.statistics = statistics; this.statistics = statistics;
this.path = path; this.path = path;
this.entry = entry; this.entry = entry;
this.contentLength = SeaweedRead.totalSize(entry.getChunksList());
this.contentLength = SeaweedRead.fileSize(entry);
this.bufferSize = bufferSize; this.bufferSize = bufferSize;
this.visibleIntervalList = SeaweedRead.nonOverlappingVisibleIntervals(filerGrpcClient, entry.getChunksList()); this.visibleIntervalList = SeaweedRead.nonOverlappingVisibleIntervals(filerGrpcClient, entry.getChunksList());
@ -87,7 +87,7 @@ public class SeaweedInputStream extends FSInputStream {
throw new IllegalArgumentException("requested read length is more than will fit after requested offset in buffer"); throw new IllegalArgumentException("requested read length is more than will fit after requested offset in buffer");
} }
long bytesRead = SeaweedRead.read(this.filerGrpcClient, this.visibleIntervalList, this.position, b, off, len);
long bytesRead = SeaweedRead.read(this.filerGrpcClient, this.visibleIntervalList, this.position, b, off, len, SeaweedRead.fileSize(entry));
if (bytesRead > Integer.MAX_VALUE) { if (bytesRead > Integer.MAX_VALUE) {
throw new IOException("Unexpected Content-Length"); throw new IOException("Unexpected Content-Length");
} }

Loading…
Cancel
Save