From 15c60cbb2660d60d2714d3386731044fcfeac473 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sun, 7 Feb 2021 03:50:01 -0800 Subject: [PATCH 1/5] close the grpc connection after 10 hours related to https://github.com/chrislusf/seaweedfs/issues/1782 --- weed/pb/grpc_client_server.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/weed/pb/grpc_client_server.go b/weed/pb/grpc_client_server.go index 4d78d769f..9ffda9b04 100644 --- a/weed/pb/grpc_client_server.go +++ b/weed/pb/grpc_client_server.go @@ -35,8 +35,9 @@ func NewGrpcServer(opts ...grpc.ServerOption) *grpc.Server { var options []grpc.ServerOption options = append(options, grpc.KeepaliveParams(keepalive.ServerParameters{ - Time: 10 * time.Second, // wait time before ping if no activity - Timeout: 20 * time.Second, // ping timeout + Time: 10 * time.Second, // wait time before ping if no activity + Timeout: 20 * time.Second, // ping timeout + MaxConnectionAge: 10 * time.Hour, }), grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{ MinTime: 60 * time.Second, // min time a client should wait before sending a ping From 5e4b5109ddaff1fc7c09cc107081c04aaf0056f6 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sun, 7 Feb 2021 22:29:43 -0800 Subject: [PATCH 2/5] 2.24 --- k8s/seaweedfs/Chart.yaml | 4 ++-- k8s/seaweedfs/values.yaml | 2 +- weed/util/constants.go | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/k8s/seaweedfs/Chart.yaml b/k8s/seaweedfs/Chart.yaml index 4febe5129..b890cc666 100644 --- a/k8s/seaweedfs/Chart.yaml +++ b/k8s/seaweedfs/Chart.yaml @@ -1,5 +1,5 @@ apiVersion: v1 description: SeaweedFS name: seaweedfs -appVersion: "2.23" -version: 2.23 +appVersion: "2.24" +version: 2.24 diff --git a/k8s/seaweedfs/values.yaml b/k8s/seaweedfs/values.yaml index 168c0dbcd..2b33a6149 100644 --- a/k8s/seaweedfs/values.yaml +++ b/k8s/seaweedfs/values.yaml @@ -4,7 +4,7 @@ global: registry: "" repository: "" imageName: chrislusf/seaweedfs - # imageTag: "2.23" - started using {.Chart.appVersion} + # imageTag: "2.24" - started using {.Chart.appVersion} imagePullPolicy: IfNotPresent imagePullSecrets: imagepullsecret restartPolicy: Always diff --git a/weed/util/constants.go b/weed/util/constants.go index ccc0ef1fb..6001ae78e 100644 --- a/weed/util/constants.go +++ b/weed/util/constants.go @@ -5,7 +5,7 @@ import ( ) var ( - VERSION = fmt.Sprintf("%s %d.%02d", sizeLimit, 2, 23) + VERSION = fmt.Sprintf("%s %d.%02d", sizeLimit, 2, 24) COMMIT = "" ) From a83302113214748fe3600510c8835b6d88f3496c Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sun, 7 Feb 2021 23:03:03 -0800 Subject: [PATCH 3/5] fix refactoring left over --- weed/storage/types/offset_5bytes.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/weed/storage/types/offset_5bytes.go b/weed/storage/types/offset_5bytes.go index 05c6d2f39..b6181fc11 100644 --- a/weed/storage/types/offset_5bytes.go +++ b/weed/storage/types/offset_5bytes.go @@ -71,7 +71,7 @@ func ToOffset(offset int64) Offset { } } -func (offset Offset) ToAcutalOffset() (actualOffset int64) { +func (offset Offset) ToActualOffset() (actualOffset int64) { return (int64(offset.b0) + int64(offset.b1)<<8 + int64(offset.b2)<<16 + int64(offset.b3)<<24 + int64(offset.b4)<<32) * int64(NeedlePaddingSize) } From ad36c7b0d76f870a5cb0c7c68f5c81ec5340a79e Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Mon, 8 Feb 2021 02:28:45 -0800 Subject: [PATCH 4/5] refactoring: only expose FilerClient class --- .../seaweedfs/client/FileChunkManifest.java | 26 +++++++++---------- .../java/seaweedfs/client/FilerClient.java | 24 +++++++---------- .../seaweedfs/client/SeaweedInputStream.java | 17 ++++++------ .../seaweedfs/client/SeaweedOutputStream.java | 18 ++++++------- .../java/seaweedfs/client/SeaweedRead.java | 18 ++++++------- .../java/seaweedfs/client/SeaweedWrite.java | 22 ++++++++-------- .../seaweedfs/examples/ExampleReadFile.java | 6 ++--- .../seaweedfs/examples/ExampleWriteFile.java | 13 +++++----- .../seaweed/hdfs/SeaweedFileSystemStore.java | 16 +++++------- .../hdfs/SeaweedHadoopInputStream.java | 6 ++--- .../hdfs/SeaweedHadoopOutputStream.java | 6 ++--- .../seaweed/hdfs/SeaweedFileSystemStore.java | 16 +++++------- .../hdfs/SeaweedHadoopInputStream.java | 6 ++--- .../hdfs/SeaweedHadoopOutputStream.java | 6 ++--- 14 files changed, 94 insertions(+), 106 deletions(-) diff --git a/other/java/client/src/main/java/seaweedfs/client/FileChunkManifest.java b/other/java/client/src/main/java/seaweedfs/client/FileChunkManifest.java index 3d7da91d5..9b6ba5dfc 100644 --- a/other/java/client/src/main/java/seaweedfs/client/FileChunkManifest.java +++ b/other/java/client/src/main/java/seaweedfs/client/FileChunkManifest.java @@ -23,7 +23,7 @@ public class FileChunkManifest { } public static List resolveChunkManifest( - final FilerGrpcClient filerGrpcClient, List chunks) throws IOException { + final FilerClient filerClient, List chunks) throws IOException { List dataChunks = new ArrayList<>(); @@ -35,30 +35,30 @@ public class FileChunkManifest { // IsChunkManifest LOG.debug("fetching chunk manifest:{}", chunk); - byte[] data = fetchChunk(filerGrpcClient, chunk); + byte[] data = fetchChunk(filerClient, chunk); FilerProto.FileChunkManifest m = FilerProto.FileChunkManifest.newBuilder().mergeFrom(data).build(); List resolvedChunks = new ArrayList<>(); for (FilerProto.FileChunk t : m.getChunksList()) { // avoid deprecated chunk.getFileId() resolvedChunks.add(t.toBuilder().setFileId(FilerClient.toFileId(t.getFid())).build()); } - dataChunks.addAll(resolveChunkManifest(filerGrpcClient, resolvedChunks)); + dataChunks.addAll(resolveChunkManifest(filerClient, resolvedChunks)); } return dataChunks; } - private static byte[] fetchChunk(final FilerGrpcClient filerGrpcClient, FilerProto.FileChunk chunk) throws IOException { + private static byte[] fetchChunk(final FilerClient filerClient, FilerProto.FileChunk chunk) throws IOException { String vid = "" + chunk.getFid().getVolumeId(); - FilerProto.Locations locations = filerGrpcClient.vidLocations.get(vid); + FilerProto.Locations locations = filerClient.vidLocations.get(vid); if (locations == null) { FilerProto.LookupVolumeRequest.Builder lookupRequest = FilerProto.LookupVolumeRequest.newBuilder(); lookupRequest.addVolumeIds(vid); - FilerProto.LookupVolumeResponse lookupResponse = filerGrpcClient + FilerProto.LookupVolumeResponse lookupResponse = filerClient .getBlockingStub().lookupVolume(lookupRequest.build()); locations = lookupResponse.getLocationsMapMap().get(vid); - filerGrpcClient.vidLocations.put(vid, locations); + filerClient.vidLocations.put(vid, locations); LOG.debug("fetchChunk vid:{} locations:{}", vid, locations); } @@ -74,7 +74,7 @@ public class FileChunkManifest { byte[] chunkData = SeaweedRead.chunkCache.getChunk(chunkView.fileId); if (chunkData == null) { LOG.debug("doFetchFullChunkData:{}", chunkView); - chunkData = SeaweedRead.doFetchFullChunkData(filerGrpcClient, chunkView, locations); + chunkData = SeaweedRead.doFetchFullChunkData(filerClient, chunkView, locations); } if (chunk.getIsChunkManifest()){ LOG.debug("chunk {} size {}", chunkView.fileId, chunkData.length); @@ -86,7 +86,7 @@ public class FileChunkManifest { } public static List maybeManifestize( - final FilerGrpcClient filerGrpcClient, List inputChunks, String parentDirectory) throws IOException { + final FilerClient filerClient, List inputChunks, String parentDirectory) throws IOException { // the return variable List chunks = new ArrayList<>(); @@ -101,7 +101,7 @@ public class FileChunkManifest { int remaining = dataChunks.size(); for (int i = 0; i + mergeFactor < dataChunks.size(); i += mergeFactor) { - FilerProto.FileChunk chunk = mergeIntoManifest(filerGrpcClient, dataChunks.subList(i, i + mergeFactor), parentDirectory); + FilerProto.FileChunk chunk = mergeIntoManifest(filerClient, dataChunks.subList(i, i + mergeFactor), parentDirectory); chunks.add(chunk); remaining -= mergeFactor; } @@ -113,7 +113,7 @@ public class FileChunkManifest { return chunks; } - private static FilerProto.FileChunk mergeIntoManifest(final FilerGrpcClient filerGrpcClient, List dataChunks, String parentDirectory) throws IOException { + private static FilerProto.FileChunk mergeIntoManifest(final FilerClient filerClient, List dataChunks, String parentDirectory) throws IOException { // create and serialize the manifest dataChunks = FilerClient.beforeEntrySerialization(dataChunks); FilerProto.FileChunkManifest.Builder m = FilerProto.FileChunkManifest.newBuilder().addAllChunks(dataChunks); @@ -127,8 +127,8 @@ public class FileChunkManifest { } FilerProto.FileChunk.Builder manifestChunk = SeaweedWrite.writeChunk( - filerGrpcClient.getReplication(), - filerGrpcClient, + filerClient.getReplication(), + filerClient, minOffset, data, 0, data.length, parentDirectory); manifestChunk.setIsChunkManifest(true); diff --git a/other/java/client/src/main/java/seaweedfs/client/FilerClient.java b/other/java/client/src/main/java/seaweedfs/client/FilerClient.java index 7338d5bee..58269d41f 100644 --- a/other/java/client/src/main/java/seaweedfs/client/FilerClient.java +++ b/other/java/client/src/main/java/seaweedfs/client/FilerClient.java @@ -11,18 +11,12 @@ import java.util.Arrays; import java.util.Iterator; import java.util.List; -public class FilerClient { +public class FilerClient extends FilerGrpcClient { private static final Logger LOG = LoggerFactory.getLogger(FilerClient.class); - private final FilerGrpcClient filerGrpcClient; - public FilerClient(String host, int grpcPort) { - filerGrpcClient = new FilerGrpcClient(host, grpcPort); - } - - public FilerClient(FilerGrpcClient filerGrpcClient) { - this.filerGrpcClient = filerGrpcClient; + super(host, grpcPort); } public static String toFileId(FilerProto.FileId fid) { @@ -236,7 +230,7 @@ public class FilerClient { } public List listEntries(String path, String entryPrefix, String lastEntryName, int limit, boolean includeLastEntry) { - Iterator iter = filerGrpcClient.getBlockingStub().listEntries(FilerProto.ListEntriesRequest.newBuilder() + Iterator iter = this.getBlockingStub().listEntries(FilerProto.ListEntriesRequest.newBuilder() .setDirectory(path) .setPrefix(entryPrefix) .setStartFromFileName(lastEntryName) @@ -253,7 +247,7 @@ public class FilerClient { public FilerProto.Entry lookupEntry(String directory, String entryName) { try { - FilerProto.Entry entry = filerGrpcClient.getBlockingStub().lookupDirectoryEntry( + FilerProto.Entry entry = this.getBlockingStub().lookupDirectoryEntry( FilerProto.LookupDirectoryEntryRequest.newBuilder() .setDirectory(directory) .setName(entryName) @@ -274,7 +268,7 @@ public class FilerClient { public boolean createEntry(String parent, FilerProto.Entry entry) { try { FilerProto.CreateEntryResponse createEntryResponse = - filerGrpcClient.getBlockingStub().createEntry(FilerProto.CreateEntryRequest.newBuilder() + this.getBlockingStub().createEntry(FilerProto.CreateEntryRequest.newBuilder() .setDirectory(parent) .setEntry(entry) .build()); @@ -291,7 +285,7 @@ public class FilerClient { public boolean updateEntry(String parent, FilerProto.Entry entry) { try { - filerGrpcClient.getBlockingStub().updateEntry(FilerProto.UpdateEntryRequest.newBuilder() + this.getBlockingStub().updateEntry(FilerProto.UpdateEntryRequest.newBuilder() .setDirectory(parent) .setEntry(entry) .build()); @@ -304,7 +298,7 @@ public class FilerClient { public boolean deleteEntry(String parent, String entryName, boolean isDeleteFileChunk, boolean isRecursive, boolean ignoreRecusiveError) { try { - filerGrpcClient.getBlockingStub().deleteEntry(FilerProto.DeleteEntryRequest.newBuilder() + this.getBlockingStub().deleteEntry(FilerProto.DeleteEntryRequest.newBuilder() .setDirectory(parent) .setName(entryName) .setIsDeleteData(isDeleteFileChunk) @@ -320,7 +314,7 @@ public class FilerClient { public boolean atomicRenameEntry(String oldParent, String oldName, String newParent, String newName) { try { - filerGrpcClient.getBlockingStub().atomicRenameEntry(FilerProto.AtomicRenameEntryRequest.newBuilder() + this.getBlockingStub().atomicRenameEntry(FilerProto.AtomicRenameEntryRequest.newBuilder() .setOldDirectory(oldParent) .setOldName(oldName) .setNewDirectory(newParent) @@ -334,7 +328,7 @@ public class FilerClient { } public Iterator watch(String prefix, String clientName, long sinceNs) { - return filerGrpcClient.getBlockingStub().subscribeMetadata(FilerProto.SubscribeMetadataRequest.newBuilder() + return this.getBlockingStub().subscribeMetadata(FilerProto.SubscribeMetadataRequest.newBuilder() .setPathPrefix(prefix) .setClientName(clientName) .setSinceNs(sinceNs) diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedInputStream.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedInputStream.java index 8b26c242c..4e40ce1b6 100644 --- a/other/java/client/src/main/java/seaweedfs/client/SeaweedInputStream.java +++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedInputStream.java @@ -16,7 +16,7 @@ public class SeaweedInputStream extends InputStream { private static final Logger LOG = LoggerFactory.getLogger(SeaweedInputStream.class); private static final IOException EXCEPTION_STREAM_IS_CLOSED = new IOException("Stream is closed!"); - private final FilerGrpcClient filerGrpcClient; + private final FilerClient filerClient; private final String path; private final FilerProto.Entry entry; private final List visibleIntervalList; @@ -27,32 +27,31 @@ public class SeaweedInputStream extends InputStream { private boolean closed = false; public SeaweedInputStream( - final FilerGrpcClient filerGrpcClient, + final FilerClient filerClient, final String fullpath) throws IOException { - this.filerGrpcClient = filerGrpcClient; this.path = fullpath; - FilerClient filerClient = new FilerClient(filerGrpcClient); + this.filerClient = filerClient; this.entry = filerClient.lookupEntry( SeaweedOutputStream.getParentDirectory(fullpath), SeaweedOutputStream.getFileName(fullpath)); this.contentLength = SeaweedRead.fileSize(entry); - this.visibleIntervalList = SeaweedRead.nonOverlappingVisibleIntervals(filerGrpcClient, entry.getChunksList()); + this.visibleIntervalList = SeaweedRead.nonOverlappingVisibleIntervals(filerClient, entry.getChunksList()); LOG.debug("new path:{} entry:{} visibleIntervalList:{}", path, entry, visibleIntervalList); } public SeaweedInputStream( - final FilerGrpcClient filerGrpcClient, + final FilerClient filerClient, final String path, final FilerProto.Entry entry) throws IOException { - this.filerGrpcClient = filerGrpcClient; + this.filerClient = filerClient; this.path = path; this.entry = entry; this.contentLength = SeaweedRead.fileSize(entry); - this.visibleIntervalList = SeaweedRead.nonOverlappingVisibleIntervals(filerGrpcClient, entry.getChunksList()); + this.visibleIntervalList = SeaweedRead.nonOverlappingVisibleIntervals(filerClient, entry.getChunksList()); LOG.debug("new path:{} entry:{} visibleIntervalList:{}", path, entry, visibleIntervalList); @@ -110,7 +109,7 @@ public class SeaweedInputStream extends InputStream { if (start+len <= entry.getContent().size()) { entry.getContent().substring(start, start+len).copyTo(buf); } else { - bytesRead = SeaweedRead.read(this.filerGrpcClient, this.visibleIntervalList, this.position, buf, SeaweedRead.fileSize(entry)); + bytesRead = SeaweedRead.read(this.filerClient, this.visibleIntervalList, this.position, buf, SeaweedRead.fileSize(entry)); } if (bytesRead > Integer.MAX_VALUE) { diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java index 92dc59f61..b73e99e69 100644 --- a/other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java +++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java @@ -15,7 +15,7 @@ public class SeaweedOutputStream extends OutputStream { private static final Logger LOG = LoggerFactory.getLogger(SeaweedOutputStream.class); protected final boolean supportFlush = true; - private final FilerGrpcClient filerGrpcClient; + private final FilerClient filerClient; private final String path; private final int bufferSize; private final int maxConcurrentRequestCount; @@ -33,17 +33,17 @@ public class SeaweedOutputStream extends OutputStream { private long outputIndex; private String replication = "000"; - public SeaweedOutputStream(FilerGrpcClient filerGrpcClient, final String fullpath) { - this(filerGrpcClient, fullpath, "000"); + public SeaweedOutputStream(FilerClient filerClient, final String fullpath) { + this(filerClient, fullpath, "000"); } - public SeaweedOutputStream(FilerGrpcClient filerGrpcClient, final String fullpath, final String replication) { - this(filerGrpcClient, fullpath, null, 0, 8 * 1024 * 1024, "000"); + public SeaweedOutputStream(FilerClient filerClient, final String fullpath, final String replication) { + this(filerClient, fullpath, null, 0, 8 * 1024 * 1024, "000"); } - public SeaweedOutputStream(FilerGrpcClient filerGrpcClient, final String path, FilerProto.Entry.Builder entry, + public SeaweedOutputStream(FilerClient filerClient, final String path, FilerProto.Entry.Builder entry, final long position, final int bufferSize, final String replication) { - this.filerGrpcClient = filerGrpcClient; + this.filerClient = filerClient; this.replication = replication; this.path = path; this.position = position; @@ -109,7 +109,7 @@ public class SeaweedOutputStream extends OutputStream { private synchronized void flushWrittenBytesToServiceInternal(final long offset) throws IOException { try { - SeaweedWrite.writeMeta(filerGrpcClient, getParentDirectory(path), entry); + SeaweedWrite.writeMeta(filerClient, getParentDirectory(path), entry); } catch (Exception ex) { throw new IOException(ex); } @@ -225,7 +225,7 @@ public class SeaweedOutputStream extends OutputStream { } final Future job = completionService.submit(() -> { // System.out.println(path + " is going to save [" + (writePosition) + "," + ((writePosition) + bytesLength) + ")"); - SeaweedWrite.writeData(entry, replication, filerGrpcClient, writePosition, bufferToWrite.array(), bufferToWrite.position(), bufferToWrite.limit(), path); + SeaweedWrite.writeData(entry, replication, filerClient, writePosition, bufferToWrite.array(), bufferToWrite.position(), bufferToWrite.limit(), path); // System.out.println(path + " saved [" + (writePosition) + "," + ((writePosition) + bytesLength) + ")"); ByteBufferPool.release(bufferToWrite); return null; diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java index e55c5b7aa..384636601 100644 --- a/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java +++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java @@ -23,7 +23,7 @@ public class SeaweedRead { static VolumeIdCache volumeIdCache = new VolumeIdCache(4 * 1024); // returns bytesRead - public static long read(FilerGrpcClient filerGrpcClient, List visibleIntervals, + public static long read(FilerClient filerClient, List visibleIntervals, final long position, final ByteBuffer buf, final long fileSize) throws IOException { List chunkViews = viewFromVisibles(visibleIntervals, position, buf.remaining()); @@ -42,7 +42,7 @@ public class SeaweedRead { } if (lookupRequest.getVolumeIdsCount() > 0) { - FilerProto.LookupVolumeResponse lookupResponse = filerGrpcClient + FilerProto.LookupVolumeResponse lookupResponse = filerClient .getBlockingStub().lookupVolume(lookupRequest.build()); Map vid2Locations = lookupResponse.getLocationsMapMap(); for (Map.Entry entry : vid2Locations.entrySet()) { @@ -71,7 +71,7 @@ public class SeaweedRead { return 0; } - int len = readChunkView(filerGrpcClient, startOffset, buf, chunkView, locations); + int len = readChunkView(filerClient, startOffset, buf, chunkView, locations); LOG.debug("read [{},{}) {} size {}", startOffset, startOffset + len, chunkView.fileId, chunkView.size); @@ -93,12 +93,12 @@ public class SeaweedRead { return readCount; } - private static int readChunkView(FilerGrpcClient filerGrpcClient, long startOffset, ByteBuffer buf, ChunkView chunkView, FilerProto.Locations locations) throws IOException { + private static int readChunkView(FilerClient filerClient, long startOffset, ByteBuffer buf, ChunkView chunkView, FilerProto.Locations locations) throws IOException { byte[] chunkData = chunkCache.getChunk(chunkView.fileId); if (chunkData == null) { - chunkData = doFetchFullChunkData(filerGrpcClient, chunkView, locations); + chunkData = doFetchFullChunkData(filerClient, chunkView, locations); chunkCache.setChunk(chunkView.fileId, chunkData); } @@ -110,13 +110,13 @@ public class SeaweedRead { return len; } - public static byte[] doFetchFullChunkData(FilerGrpcClient filerGrpcClient, ChunkView chunkView, FilerProto.Locations locations) throws IOException { + public static byte[] doFetchFullChunkData(FilerClient filerClient, ChunkView chunkView, FilerProto.Locations locations) throws IOException { byte[] data = null; IOException lastException = null; for (long waitTime = 1000L; waitTime < 10 * 1000; waitTime += waitTime / 2) { for (FilerProto.Location location : locations.getLocationsList()) { - String url = filerGrpcClient.getChunkUrl(chunkView.fileId, location.getUrl(), location.getPublicUrl()); + String url = filerClient.getChunkUrl(chunkView.fileId, location.getUrl(), location.getPublicUrl()); try { data = doFetchOneFullChunkData(chunkView, url); lastException = null; @@ -221,9 +221,9 @@ public class SeaweedRead { } public static List nonOverlappingVisibleIntervals( - final FilerGrpcClient filerGrpcClient, List chunkList) throws IOException { + final FilerClient filerClient, List chunkList) throws IOException { - chunkList = FileChunkManifest.resolveChunkManifest(filerGrpcClient, chunkList); + chunkList = FileChunkManifest.resolveChunkManifest(filerClient, chunkList); FilerProto.FileChunk[] chunks = chunkList.toArray(new FilerProto.FileChunk[0]); Arrays.sort(chunks, new Comparator() { diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java index db3cc3931..f8c0c76b6 100644 --- a/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java +++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java @@ -23,29 +23,29 @@ public class SeaweedWrite { public static void writeData(FilerProto.Entry.Builder entry, final String replication, - final FilerGrpcClient filerGrpcClient, + final FilerClient filerClient, final long offset, final byte[] bytes, final long bytesOffset, final long bytesLength, final String path) throws IOException { FilerProto.FileChunk.Builder chunkBuilder = writeChunk( - replication, filerGrpcClient, offset, bytes, bytesOffset, bytesLength, path); + replication, filerClient, offset, bytes, bytesOffset, bytesLength, path); synchronized (entry) { entry.addChunks(chunkBuilder); } } public static FilerProto.FileChunk.Builder writeChunk(final String replication, - final FilerGrpcClient filerGrpcClient, + final FilerClient filerClient, final long offset, final byte[] bytes, final long bytesOffset, final long bytesLength, final String path) throws IOException { - FilerProto.AssignVolumeResponse response = filerGrpcClient.getBlockingStub().assignVolume( + FilerProto.AssignVolumeResponse response = filerClient.getBlockingStub().assignVolume( FilerProto.AssignVolumeRequest.newBuilder() - .setCollection(filerGrpcClient.getCollection()) - .setReplication(replication == null ? filerGrpcClient.getReplication() : replication) + .setCollection(filerClient.getCollection()) + .setReplication(replication == null ? filerClient.getReplication() : replication) .setDataCenter("") .setTtlSec(0) .setPath(path) @@ -53,11 +53,11 @@ public class SeaweedWrite { String fileId = response.getFileId(); String auth = response.getAuth(); - String targetUrl = filerGrpcClient.getChunkUrl(fileId, response.getUrl(), response.getPublicUrl()); + String targetUrl = filerClient.getChunkUrl(fileId, response.getUrl(), response.getPublicUrl()); ByteString cipherKeyString = com.google.protobuf.ByteString.EMPTY; byte[] cipherKey = null; - if (filerGrpcClient.isCipher()) { + if (filerClient.isCipher()) { cipherKey = genCipherKey(); cipherKeyString = ByteString.copyFrom(cipherKey); } @@ -75,15 +75,15 @@ public class SeaweedWrite { .setCipherKey(cipherKeyString); } - public static void writeMeta(final FilerGrpcClient filerGrpcClient, + public static void writeMeta(final FilerClient filerClient, final String parentDirectory, final FilerProto.Entry.Builder entry) throws IOException { synchronized (entry) { - List chunks = FileChunkManifest.maybeManifestize(filerGrpcClient, entry.getChunksList(), parentDirectory); + List chunks = FileChunkManifest.maybeManifestize(filerClient, entry.getChunksList(), parentDirectory); entry.clearChunks(); entry.addAllChunks(chunks); - filerGrpcClient.getBlockingStub().createEntry( + filerClient.getBlockingStub().createEntry( FilerProto.CreateEntryRequest.newBuilder() .setDirectory(parentDirectory) .setEntry(entry) diff --git a/other/java/examples/src/main/java/com/seaweedfs/examples/ExampleReadFile.java b/other/java/examples/src/main/java/com/seaweedfs/examples/ExampleReadFile.java index bd73df802..d2eb94135 100644 --- a/other/java/examples/src/main/java/com/seaweedfs/examples/ExampleReadFile.java +++ b/other/java/examples/src/main/java/com/seaweedfs/examples/ExampleReadFile.java @@ -1,6 +1,6 @@ package com.seaweedfs.examples; -import seaweedfs.client.FilerGrpcClient; +import seaweedfs.client.FilerClient; import seaweedfs.client.SeaweedInputStream; import java.io.FileInputStream; @@ -13,7 +13,7 @@ public class ExampleReadFile { public static void main(String[] args) throws IOException { - FilerGrpcClient filerGrpcClient = new FilerGrpcClient("localhost", 18888); + FilerClient filerClient = new FilerClient("localhost", 18888); long startTime = System.currentTimeMillis(); parseZip("/Users/chris/tmp/test.zip"); @@ -23,7 +23,7 @@ public class ExampleReadFile { long localProcessTime = startTime2 - startTime; SeaweedInputStream seaweedInputStream = new SeaweedInputStream( - filerGrpcClient, "/test.zip"); + filerClient, "/test.zip"); parseZip(seaweedInputStream); long swProcessTime = System.currentTimeMillis() - startTime2; diff --git a/other/java/examples/src/main/java/com/seaweedfs/examples/ExampleWriteFile.java b/other/java/examples/src/main/java/com/seaweedfs/examples/ExampleWriteFile.java index 228a3c0b7..26b74028f 100644 --- a/other/java/examples/src/main/java/com/seaweedfs/examples/ExampleWriteFile.java +++ b/other/java/examples/src/main/java/com/seaweedfs/examples/ExampleWriteFile.java @@ -1,6 +1,6 @@ package com.seaweedfs.examples; -import seaweedfs.client.FilerGrpcClient; +import seaweedfs.client.FilerClient; import seaweedfs.client.SeaweedInputStream; import seaweedfs.client.SeaweedOutputStream; @@ -13,15 +13,14 @@ public class ExampleWriteFile { public static void main(String[] args) throws IOException { - FilerGrpcClient filerGrpcClient = new FilerGrpcClient("localhost", 18888); + FilerClient filerClient = new FilerClient("localhost", 18888); - SeaweedInputStream seaweedInputStream = new SeaweedInputStream( - filerGrpcClient, "/test.zip"); - unZipFiles(filerGrpcClient, seaweedInputStream); + SeaweedInputStream seaweedInputStream = new SeaweedInputStream(filerClient, "/test.zip"); + unZipFiles(filerClient, seaweedInputStream); } - public static void unZipFiles(FilerGrpcClient filerGrpcClient, InputStream is) throws IOException { + public static void unZipFiles(FilerClient filerClient, InputStream is) throws IOException { ZipInputStream zin = new ZipInputStream(is); ZipEntry ze; while ((ze = zin.getNextEntry()) != null) { @@ -34,7 +33,7 @@ public class ExampleWriteFile { continue; } - SeaweedOutputStream seaweedOutputStream = new SeaweedOutputStream(filerGrpcClient, "/test/"+filename); + SeaweedOutputStream seaweedOutputStream = new SeaweedOutputStream(filerClient, "/test/"+filename); byte[] bytesIn = new byte[16 * 1024]; int read = 0; while ((read = zin.read(bytesIn))!=-1) { diff --git a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java index 8147f3efe..f4e8c9349 100644 --- a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java +++ b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java @@ -24,27 +24,25 @@ public class SeaweedFileSystemStore { private static final Logger LOG = LoggerFactory.getLogger(SeaweedFileSystemStore.class); - private FilerGrpcClient filerGrpcClient; private FilerClient filerClient; private Configuration conf; public SeaweedFileSystemStore(String host, int port, Configuration conf) { int grpcPort = 10000 + port; - filerGrpcClient = new FilerGrpcClient(host, grpcPort); - filerClient = new FilerClient(filerGrpcClient); + filerClient = new FilerClient(host, grpcPort); this.conf = conf; String volumeServerAccessMode = this.conf.get(FS_SEAWEED_VOLUME_SERVER_ACCESS, "direct"); if (volumeServerAccessMode.equals("publicUrl")) { - filerGrpcClient.setAccessVolumeServerByPublicUrl(); + filerClient.setAccessVolumeServerByPublicUrl(); } else if (volumeServerAccessMode.equals("filerProxy")) { - filerGrpcClient.setAccessVolumeServerByFilerProxy(); + filerClient.setAccessVolumeServerByFilerProxy(); } } public void close() { try { - this.filerGrpcClient.shutdown(); + this.filerClient.shutdown(); } catch (InterruptedException e) { e.printStackTrace(); } @@ -219,10 +217,10 @@ public class SeaweedFileSystemStore { .clearGroupName() .addAllGroupName(Arrays.asList(userGroupInformation.getGroupNames())) ); - SeaweedWrite.writeMeta(filerGrpcClient, getParentDirectory(path), entry); + SeaweedWrite.writeMeta(filerClient, getParentDirectory(path), entry); } - return new SeaweedHadoopOutputStream(filerGrpcClient, path.toString(), entry, writePosition, bufferSize, replication); + return new SeaweedHadoopOutputStream(filerClient, path.toString(), entry, writePosition, bufferSize, replication); } @@ -236,7 +234,7 @@ public class SeaweedFileSystemStore { throw new FileNotFoundException("read non-exist file " + path); } - return new SeaweedHadoopInputStream(filerGrpcClient, + return new SeaweedHadoopInputStream(filerClient, statistics, path.toUri().getPath(), entry); diff --git a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedHadoopInputStream.java b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedHadoopInputStream.java index dd9bf4032..f26eae597 100644 --- a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedHadoopInputStream.java +++ b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedHadoopInputStream.java @@ -5,7 +5,7 @@ package seaweed.hdfs; import org.apache.hadoop.fs.ByteBufferReadable; import org.apache.hadoop.fs.FSInputStream; import org.apache.hadoop.fs.FileSystem.Statistics; -import seaweedfs.client.FilerGrpcClient; +import seaweedfs.client.FilerClient; import seaweedfs.client.FilerProto; import seaweedfs.client.SeaweedInputStream; @@ -19,11 +19,11 @@ public class SeaweedHadoopInputStream extends FSInputStream implements ByteBuffe private final Statistics statistics; public SeaweedHadoopInputStream( - final FilerGrpcClient filerGrpcClient, + final FilerClient filerClient, final Statistics statistics, final String path, final FilerProto.Entry entry) throws IOException { - this.seaweedInputStream = new SeaweedInputStream(filerGrpcClient, path, entry); + this.seaweedInputStream = new SeaweedInputStream(filerClient, path, entry); this.statistics = statistics; } diff --git a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedHadoopOutputStream.java b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedHadoopOutputStream.java index f7a6225d8..da5b56bbc 100644 --- a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedHadoopOutputStream.java +++ b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedHadoopOutputStream.java @@ -2,15 +2,15 @@ package seaweed.hdfs; // adapted from org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream -import seaweedfs.client.FilerGrpcClient; +import seaweedfs.client.FilerClient; import seaweedfs.client.FilerProto; import seaweedfs.client.SeaweedOutputStream; public class SeaweedHadoopOutputStream extends SeaweedOutputStream { - public SeaweedHadoopOutputStream(FilerGrpcClient filerGrpcClient, final String path, FilerProto.Entry.Builder entry, + public SeaweedHadoopOutputStream(FilerClient filerClient, final String path, FilerProto.Entry.Builder entry, final long position, final int bufferSize, final String replication) { - super(filerGrpcClient, path.toString(), entry, position, bufferSize, replication); + super(filerClient, path, entry, position, bufferSize, replication); } } diff --git a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java index 8147f3efe..f4e8c9349 100644 --- a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java +++ b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java @@ -24,27 +24,25 @@ public class SeaweedFileSystemStore { private static final Logger LOG = LoggerFactory.getLogger(SeaweedFileSystemStore.class); - private FilerGrpcClient filerGrpcClient; private FilerClient filerClient; private Configuration conf; public SeaweedFileSystemStore(String host, int port, Configuration conf) { int grpcPort = 10000 + port; - filerGrpcClient = new FilerGrpcClient(host, grpcPort); - filerClient = new FilerClient(filerGrpcClient); + filerClient = new FilerClient(host, grpcPort); this.conf = conf; String volumeServerAccessMode = this.conf.get(FS_SEAWEED_VOLUME_SERVER_ACCESS, "direct"); if (volumeServerAccessMode.equals("publicUrl")) { - filerGrpcClient.setAccessVolumeServerByPublicUrl(); + filerClient.setAccessVolumeServerByPublicUrl(); } else if (volumeServerAccessMode.equals("filerProxy")) { - filerGrpcClient.setAccessVolumeServerByFilerProxy(); + filerClient.setAccessVolumeServerByFilerProxy(); } } public void close() { try { - this.filerGrpcClient.shutdown(); + this.filerClient.shutdown(); } catch (InterruptedException e) { e.printStackTrace(); } @@ -219,10 +217,10 @@ public class SeaweedFileSystemStore { .clearGroupName() .addAllGroupName(Arrays.asList(userGroupInformation.getGroupNames())) ); - SeaweedWrite.writeMeta(filerGrpcClient, getParentDirectory(path), entry); + SeaweedWrite.writeMeta(filerClient, getParentDirectory(path), entry); } - return new SeaweedHadoopOutputStream(filerGrpcClient, path.toString(), entry, writePosition, bufferSize, replication); + return new SeaweedHadoopOutputStream(filerClient, path.toString(), entry, writePosition, bufferSize, replication); } @@ -236,7 +234,7 @@ public class SeaweedFileSystemStore { throw new FileNotFoundException("read non-exist file " + path); } - return new SeaweedHadoopInputStream(filerGrpcClient, + return new SeaweedHadoopInputStream(filerClient, statistics, path.toUri().getPath(), entry); diff --git a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedHadoopInputStream.java b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedHadoopInputStream.java index dd9bf4032..f26eae597 100644 --- a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedHadoopInputStream.java +++ b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedHadoopInputStream.java @@ -5,7 +5,7 @@ package seaweed.hdfs; import org.apache.hadoop.fs.ByteBufferReadable; import org.apache.hadoop.fs.FSInputStream; import org.apache.hadoop.fs.FileSystem.Statistics; -import seaweedfs.client.FilerGrpcClient; +import seaweedfs.client.FilerClient; import seaweedfs.client.FilerProto; import seaweedfs.client.SeaweedInputStream; @@ -19,11 +19,11 @@ public class SeaweedHadoopInputStream extends FSInputStream implements ByteBuffe private final Statistics statistics; public SeaweedHadoopInputStream( - final FilerGrpcClient filerGrpcClient, + final FilerClient filerClient, final Statistics statistics, final String path, final FilerProto.Entry entry) throws IOException { - this.seaweedInputStream = new SeaweedInputStream(filerGrpcClient, path, entry); + this.seaweedInputStream = new SeaweedInputStream(filerClient, path, entry); this.statistics = statistics; } diff --git a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedHadoopOutputStream.java b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedHadoopOutputStream.java index f65aef619..1740312fe 100644 --- a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedHadoopOutputStream.java +++ b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedHadoopOutputStream.java @@ -4,7 +4,7 @@ package seaweed.hdfs; import org.apache.hadoop.fs.StreamCapabilities; import org.apache.hadoop.fs.Syncable; -import seaweedfs.client.FilerGrpcClient; +import seaweedfs.client.FilerClient; import seaweedfs.client.FilerProto; import seaweedfs.client.SeaweedOutputStream; @@ -13,9 +13,9 @@ import java.util.Locale; public class SeaweedHadoopOutputStream extends SeaweedOutputStream implements Syncable, StreamCapabilities { - public SeaweedHadoopOutputStream(FilerGrpcClient filerGrpcClient, final String path, FilerProto.Entry.Builder entry, + public SeaweedHadoopOutputStream(FilerClient filerClient, final String path, FilerProto.Entry.Builder entry, final long position, final int bufferSize, final String replication) { - super(filerGrpcClient, path, entry, position, bufferSize, replication); + super(filerClient, path, entry, position, bufferSize, replication); } /** From 5b1def9080b44913280af52821798a9850b42eff Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Mon, 8 Feb 2021 02:42:01 -0800 Subject: [PATCH 5/5] Java: 1.6.1 refacoring API --- other/java/client/pom.xml | 2 +- other/java/client/pom.xml.deploy | 2 +- other/java/client/pom_debug.xml | 2 +- other/java/examples/pom.xml | 4 ++-- other/java/hdfs2/dependency-reduced-pom.xml | 2 +- other/java/hdfs2/pom.xml | 2 +- other/java/hdfs3/dependency-reduced-pom.xml | 2 +- other/java/hdfs3/pom.xml | 2 +- 8 files changed, 9 insertions(+), 9 deletions(-) diff --git a/other/java/client/pom.xml b/other/java/client/pom.xml index aaffe0aa8..056904ebe 100644 --- a/other/java/client/pom.xml +++ b/other/java/client/pom.xml @@ -5,7 +5,7 @@ com.github.chrislusf seaweedfs-client - 1.6.0 + 1.6.1 org.sonatype.oss diff --git a/other/java/client/pom.xml.deploy b/other/java/client/pom.xml.deploy index f32cdc427..69b900017 100644 --- a/other/java/client/pom.xml.deploy +++ b/other/java/client/pom.xml.deploy @@ -5,7 +5,7 @@ com.github.chrislusf seaweedfs-client - 1.6.0 + 1.6.1 org.sonatype.oss diff --git a/other/java/client/pom_debug.xml b/other/java/client/pom_debug.xml index 06b8e0ed5..1447401b7 100644 --- a/other/java/client/pom_debug.xml +++ b/other/java/client/pom_debug.xml @@ -5,7 +5,7 @@ com.github.chrislusf seaweedfs-client - 1.6.0 + 1.6.1 org.sonatype.oss diff --git a/other/java/examples/pom.xml b/other/java/examples/pom.xml index eb1e73a92..2456113d0 100644 --- a/other/java/examples/pom.xml +++ b/other/java/examples/pom.xml @@ -11,13 +11,13 @@ com.github.chrislusf seaweedfs-client - 1.6.0 + 1.6.1 compile com.github.chrislusf seaweedfs-hadoop2-client - 1.6.0 + 1.6.1 compile diff --git a/other/java/hdfs2/dependency-reduced-pom.xml b/other/java/hdfs2/dependency-reduced-pom.xml index 10b95db45..0680d86bb 100644 --- a/other/java/hdfs2/dependency-reduced-pom.xml +++ b/other/java/hdfs2/dependency-reduced-pom.xml @@ -301,7 +301,7 @@ - 1.6.0 + 1.6.1 2.9.2 diff --git a/other/java/hdfs2/pom.xml b/other/java/hdfs2/pom.xml index 48b7750f4..897477066 100644 --- a/other/java/hdfs2/pom.xml +++ b/other/java/hdfs2/pom.xml @@ -5,7 +5,7 @@ 4.0.0 - 1.6.0 + 1.6.1 2.9.2 diff --git a/other/java/hdfs3/dependency-reduced-pom.xml b/other/java/hdfs3/dependency-reduced-pom.xml index 71a2f739d..2b4a1a494 100644 --- a/other/java/hdfs3/dependency-reduced-pom.xml +++ b/other/java/hdfs3/dependency-reduced-pom.xml @@ -309,7 +309,7 @@ - 1.6.0 + 1.6.1 3.1.1 diff --git a/other/java/hdfs3/pom.xml b/other/java/hdfs3/pom.xml index ffe2d319a..49ff8f926 100644 --- a/other/java/hdfs3/pom.xml +++ b/other/java/hdfs3/pom.xml @@ -5,7 +5,7 @@ 4.0.0 - 1.6.0 + 1.6.1 3.1.1