diff --git a/other/java/client/src/main/java/seaweedfs/client/FileChunkManifest.java b/other/java/client/src/main/java/seaweedfs/client/FileChunkManifest.java new file mode 100644 index 000000000..3cac46db9 --- /dev/null +++ b/other/java/client/src/main/java/seaweedfs/client/FileChunkManifest.java @@ -0,0 +1,134 @@ +package seaweedfs.client; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +public class FileChunkManifest { + + private static final Logger LOG = LoggerFactory.getLogger(FileChunkManifest.class); + + private static final int mergeFactor = 3; + + public static boolean hasChunkManifest(List chunks) { + for (FilerProto.FileChunk chunk : chunks) { + if (chunk.getIsChunkManifest()) { + return true; + } + } + return false; + } + + public static List resolveChunkManifest( + final FilerGrpcClient filerGrpcClient, List chunks) throws IOException { + + List dataChunks = new ArrayList<>(); + + for (FilerProto.FileChunk chunk : chunks) { + if (!chunk.getIsChunkManifest()) { + dataChunks.add(chunk); + continue; + } + + // IsChunkManifest + LOG.debug("fetching chunk manifest:{}", chunk); + byte[] data = fetchChunk(filerGrpcClient, chunk); + FilerProto.FileChunkManifest m = FilerProto.FileChunkManifest.newBuilder().mergeFrom(data).build(); + List resolvedChunks = new ArrayList<>(); + for (FilerProto.FileChunk t : m.getChunksList()) { + // avoid deprecated chunk.getFileId() + resolvedChunks.add(t.toBuilder().setFileId(FilerClient.toFileId(t.getFid())).build()); + } + dataChunks.addAll(resolveChunkManifest(filerGrpcClient, resolvedChunks)); + } + + return dataChunks; + } + + private static byte[] fetchChunk(final FilerGrpcClient filerGrpcClient, FilerProto.FileChunk chunk) throws IOException { + + FilerProto.LookupVolumeRequest.Builder lookupRequest = FilerProto.LookupVolumeRequest.newBuilder(); + String vid = "" + chunk.getFid().getVolumeId(); + lookupRequest.addVolumeIds(vid); + FilerProto.LookupVolumeResponse lookupResponse = filerGrpcClient + .getBlockingStub().lookupVolume(lookupRequest.build()); + Map vid2Locations = lookupResponse.getLocationsMapMap(); + FilerProto.Locations locations = vid2Locations.get(vid); + + SeaweedRead.ChunkView chunkView = new SeaweedRead.ChunkView( + FilerClient.toFileId(chunk.getFid()), // avoid deprecated chunk.getFileId() + 0, + -1, + 0, + true, + chunk.getCipherKey().toByteArray(), + chunk.getIsCompressed()); + + byte[] chunkData = SeaweedRead.chunkCache.getChunk(chunkView.fileId); + if (chunkData == null) { + LOG.debug("doFetchFullChunkData:{}", chunkView); + chunkData = SeaweedRead.doFetchFullChunkData(chunkView, locations); + } + LOG.debug("chunk {} size {}", chunkView.fileId, chunkData.length); + SeaweedRead.chunkCache.setChunk(chunkView.fileId, chunkData); + + return chunkData; + + } + + public static List maybeManifestize( + final FilerGrpcClient filerGrpcClient, List inputChunks) throws IOException { + // the return variable + List chunks = new ArrayList<>(); + + List dataChunks = new ArrayList<>(); + for (FilerProto.FileChunk chunk : inputChunks) { + if (!chunk.getIsChunkManifest()) { + dataChunks.add(chunk); + } else { + chunks.add(chunk); + } + } + + int remaining = dataChunks.size(); + for (int i = 0; i + mergeFactor < dataChunks.size(); i += mergeFactor) { + FilerProto.FileChunk chunk = mergeIntoManifest(filerGrpcClient, dataChunks.subList(i, i + mergeFactor)); + chunks.add(chunk); + remaining -= mergeFactor; + } + + // remaining + for (int i = dataChunks.size() - remaining; i < dataChunks.size(); i++) { + chunks.add(dataChunks.get(i)); + } + return chunks; + } + + private static FilerProto.FileChunk mergeIntoManifest(final FilerGrpcClient filerGrpcClient, List dataChunks) throws IOException { + // create and serialize the manifest + FilerProto.FileChunkManifest.Builder m = FilerProto.FileChunkManifest.newBuilder().addAllChunks(dataChunks); + byte[] data = m.build().toByteArray(); + + long minOffset = Long.MAX_VALUE; + long maxOffset = -1; + for (FilerProto.FileChunk chunk : dataChunks) { + minOffset = Math.min(minOffset, chunk.getOffset()); + maxOffset = Math.max(maxOffset, chunk.getSize() + chunk.getOffset()); + } + + FilerProto.FileChunk.Builder manifestChunk = SeaweedWrite.writeChunk( + filerGrpcClient.getReplication(), + filerGrpcClient, + minOffset, + data, 0, data.length); + manifestChunk.setIsChunkManifest(true); + manifestChunk.setSize(maxOffset - minOffset); + return manifestChunk.build(); + + } + +} diff --git a/other/java/client/src/main/java/seaweedfs/client/FilerClient.java b/other/java/client/src/main/java/seaweedfs/client/FilerClient.java index 2103fc699..0d087d5b4 100644 --- a/other/java/client/src/main/java/seaweedfs/client/FilerClient.java +++ b/other/java/client/src/main/java/seaweedfs/client/FilerClient.java @@ -24,6 +24,10 @@ public class FilerClient { this.filerGrpcClient = filerGrpcClient; } + public static String toFileId(FilerProto.FileId fid) { + return String.format("%d,%d%x", fid.getVolumeId(), fid.getFileKey(), fid.getCookie()); + } + public boolean mkdirs(String path, int mode) { String currentUser = System.getProperty("user.name"); return mkdirs(path, mode, 0, 0, currentUser, new String[]{}); @@ -209,7 +213,6 @@ public class FilerClient { } } - public boolean createEntry(String parent, FilerProto.Entry entry) { try { filerGrpcClient.getBlockingStub().createEntry(FilerProto.CreateEntryRequest.newBuilder() @@ -279,9 +282,7 @@ public class FilerClient { entryBuilder.clearChunks(); for (FilerProto.FileChunk chunk : entry.getChunksList()) { FilerProto.FileChunk.Builder chunkBuilder = chunk.toBuilder(); - FilerProto.FileId fid = chunk.getFid(); - fileId = String.format("%d,%d%x", fid.getVolumeId(), fid.getFileKey(), fid.getCookie()); - chunkBuilder.setFileId(fileId); + chunkBuilder.setFileId(toFileId(chunk.getFid())); entryBuilder.addChunks(chunkBuilder); } return entryBuilder.build(); diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java index 301919919..fbc3296ae 100644 --- a/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java +++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java @@ -2,16 +2,12 @@ package seaweedfs.client; import org.apache.http.HttpEntity; import org.apache.http.HttpHeaders; -import org.apache.http.HttpResponse; -import org.apache.http.client.HttpClient; import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.client.methods.HttpGet; -import org.apache.http.impl.client.DefaultHttpClient; import org.apache.http.util.EntityUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.Closeable; import java.io.IOException; import java.util.*; @@ -77,7 +73,7 @@ public class SeaweedRead { return len; } - private static byte[] doFetchFullChunkData(ChunkView chunkView, FilerProto.Locations locations) throws IOException { + public static byte[] doFetchFullChunkData(ChunkView chunkView, FilerProto.Locations locations) throws IOException { HttpGet request = new HttpGet( String.format("http://%s/%s", locations.getLocations(0).getUrl(), chunkView.fileId)); @@ -138,7 +134,11 @@ public class SeaweedRead { return views; } - public static List nonOverlappingVisibleIntervals(List chunkList) { + public static List nonOverlappingVisibleIntervals( + final FilerGrpcClient filerGrpcClient, List chunkList) throws IOException { + + chunkList = FileChunkManifest.resolveChunkManifest(filerGrpcClient, chunkList); + FilerProto.FileChunk[] chunks = chunkList.toArray(new FilerProto.FileChunk[0]); Arrays.sort(chunks, new Comparator() { @Override diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java index e9819668c..fd54453a1 100644 --- a/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java +++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java @@ -1,8 +1,6 @@ package seaweedfs.client; import com.google.protobuf.ByteString; -import org.apache.http.HttpResponse; -import org.apache.http.client.HttpClient; import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.client.methods.HttpPost; import org.apache.http.entity.mime.HttpMultipartMode; @@ -10,10 +8,10 @@ import org.apache.http.entity.mime.MultipartEntityBuilder; import org.apache.http.util.EntityUtils; import java.io.ByteArrayInputStream; -import java.io.Closeable; import java.io.IOException; import java.io.InputStream; import java.security.SecureRandom; +import java.util.List; public class SeaweedWrite { @@ -25,6 +23,17 @@ public class SeaweedWrite { final long offset, final byte[] bytes, final long bytesOffset, final long bytesLength) throws IOException { + synchronized (entry) { + entry.addChunks(writeChunk(replication, filerGrpcClient, offset, bytes, bytesOffset, bytesLength)); + } + } + + public static FilerProto.FileChunk.Builder writeChunk(final String replication, + final FilerGrpcClient filerGrpcClient, + final long offset, + final byte[] bytes, + final long bytesOffset, + final long bytesLength) throws IOException { FilerProto.AssignVolumeResponse response = filerGrpcClient.getBlockingStub().assignVolume( FilerProto.AssignVolumeRequest.newBuilder() .setCollection(filerGrpcClient.getCollection()) @@ -46,25 +55,28 @@ public class SeaweedWrite { String etag = multipartUpload(targetUrl, auth, bytes, bytesOffset, bytesLength, cipherKey); - synchronized (entry) { - entry.addChunks(FilerProto.FileChunk.newBuilder() - .setFileId(fileId) - .setOffset(offset) - .setSize(bytesLength) - .setMtime(System.currentTimeMillis() / 10000L) - .setETag(etag) - .setCipherKey(cipherKeyString) - ); - } - // cache fileId ~ bytes SeaweedRead.chunkCache.setChunk(fileId, bytes); + return FilerProto.FileChunk.newBuilder() + .setFileId(fileId) + .setOffset(offset) + .setSize(bytesLength) + .setMtime(System.currentTimeMillis() / 10000L) + .setETag(etag) + .setCipherKey(cipherKeyString); } public static void writeMeta(final FilerGrpcClient filerGrpcClient, - final String parentDirectory, final FilerProto.Entry.Builder entry) { + final String parentDirectory, + final FilerProto.Entry.Builder entry) throws IOException { + + int chunkSize = entry.getChunksCount(); + List chunks = FileChunkManifest.maybeManifestize(filerGrpcClient, entry.getChunksList()); + synchronized (entry) { + entry.clearChunks(); + entry.addAllChunks(chunks); filerGrpcClient.getBlockingStub().createEntry( FilerProto.CreateEntryRequest.newBuilder() .setDirectory(parentDirectory) diff --git a/other/java/client/src/test/java/seaweedfs/client/SeaweedReadTest.java b/other/java/client/src/test/java/seaweedfs/client/SeaweedReadTest.java index ccfcdb117..44b833c90 100644 --- a/other/java/client/src/test/java/seaweedfs/client/SeaweedReadTest.java +++ b/other/java/client/src/test/java/seaweedfs/client/SeaweedReadTest.java @@ -3,13 +3,14 @@ package seaweedfs.client; import org.junit.Assert; import org.junit.Test; +import java.io.IOException; import java.util.ArrayList; import java.util.List; public class SeaweedReadTest { @Test - public void testNonOverlappingVisibleIntervals() { + public void testNonOverlappingVisibleIntervals() throws IOException { List chunks = new ArrayList<>(); chunks.add(FilerProto.FileChunk.newBuilder() .setFileId("aaa") @@ -24,7 +25,7 @@ public class SeaweedReadTest { .setMtime(2000) .build()); - List visibleIntervals = SeaweedRead.nonOverlappingVisibleIntervals(chunks); + List visibleIntervals = SeaweedRead.nonOverlappingVisibleIntervals(null, chunks); for (SeaweedRead.VisibleInterval visibleInterval : visibleIntervals) { System.out.println("visible:" + visibleInterval); } diff --git a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedInputStream.java b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedInputStream.java index c26ad728f..6b3c72f7d 100644 --- a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedInputStream.java +++ b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedInputStream.java @@ -2,7 +2,6 @@ package seaweed.hdfs; // based on org.apache.hadoop.fs.azurebfs.services.AbfsInputStream -import com.google.common.base.Preconditions; import org.apache.hadoop.fs.FSExceptionMessages; import org.apache.hadoop.fs.FSInputStream; import org.apache.hadoop.fs.FileSystem.Statistics; @@ -37,7 +36,7 @@ public class SeaweedInputStream extends FSInputStream { final Statistics statistics, final String path, final FilerProto.Entry entry, - final int bufferSize) { + final int bufferSize) throws IOException { this.filerGrpcClient = filerGrpcClient; this.statistics = statistics; this.path = path; @@ -45,7 +44,7 @@ public class SeaweedInputStream extends FSInputStream { this.contentLength = SeaweedRead.totalSize(entry.getChunksList()); this.bufferSize = bufferSize; - this.visibleIntervalList = SeaweedRead.nonOverlappingVisibleIntervals(entry.getChunksList()); + this.visibleIntervalList = SeaweedRead.nonOverlappingVisibleIntervals(filerGrpcClient, entry.getChunksList()); LOG.debug("new path:{} entry:{} visibleIntervalList:{}", path, entry, visibleIntervalList); @@ -100,7 +99,7 @@ public class SeaweedInputStream extends FSInputStream { } } - return (int)bytesRead; + return (int) bytesRead; } diff --git a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedOutputStream.java b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedOutputStream.java index 46de0c443..209e32d0b 100644 --- a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedOutputStream.java +++ b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedOutputStream.java @@ -109,7 +109,7 @@ public class SeaweedOutputStream extends OutputStream { break; } - // System.out.println(path + " [" + (outputIndex + currentOffset) + "," + ((outputIndex + currentOffset) + writableBytes) + ")"); + // System.out.println(path + " [" + (outputIndex + currentOffset) + "," + ((outputIndex + currentOffset) + writableBytes) + ") " + buffer.capacity()); buffer.put(data, currentOffset, writableBytes); outputIndex += writableBytes; currentOffset += writableBytes; diff --git a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedInputStream.java b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedInputStream.java index c26ad728f..10ec9b3cc 100644 --- a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedInputStream.java +++ b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedInputStream.java @@ -45,7 +45,7 @@ public class SeaweedInputStream extends FSInputStream { this.contentLength = SeaweedRead.totalSize(entry.getChunksList()); this.bufferSize = bufferSize; - this.visibleIntervalList = SeaweedRead.nonOverlappingVisibleIntervals(entry.getChunksList()); + this.visibleIntervalList = SeaweedRead.nonOverlappingVisibleIntervals(filerGrpcClient, entry.getChunksList()); LOG.debug("new path:{} entry:{} visibleIntervalList:{}", path, entry, visibleIntervalList);