diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java index 88c7cefbe..7f5ddd6db 100644 --- a/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java +++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java @@ -25,13 +25,13 @@ public class SeaweedWrite { private static final SecureRandom random = new SecureRandom(); public static void writeData(FilerProto.Entry.Builder entry, - final String replication, - String collection, - final FilerClient filerClient, - final long offset, - final byte[] bytes, - final long bytesOffset, final long bytesLength, - final String path) throws IOException { + final String replication, + String collection, + final FilerClient filerClient, + final long offset, + final byte[] bytes, + final long bytesOffset, final long bytesLength, + final String path) throws IOException { IOException lastException = null; for (long waitTime = 1000L; waitTime < 10 * 1000; waitTime += waitTime / 2) { @@ -60,21 +60,50 @@ public class SeaweedWrite { } public static FilerProto.FileChunk.Builder writeChunk(final String replication, - final String collection, - final FilerClient filerClient, - final long offset, - final byte[] bytes, - final long bytesOffset, - final long bytesLength, - final String path) throws IOException { - FilerProto.AssignVolumeResponse response = filerClient.getBlockingStub().assignVolume( - FilerProto.AssignVolumeRequest.newBuilder() - .setCollection(Strings.isNullOrEmpty(collection) ? filerClient.getCollection() : collection) - .setReplication(Strings.isNullOrEmpty(replication) ? filerClient.getReplication() : replication) - .setDataCenter("") - .setTtlSec(0) - .setPath(path) - .build()); + final String collection, + final FilerClient filerClient, + final long offset, + final byte[] bytes, + final long bytesOffset, + final long bytesLength, + final String path) throws IOException { + + // Retry assignVolume call for transient network/server errors + FilerProto.AssignVolumeResponse response = null; + IOException lastException = null; + int maxRetries = 3; + + for (int attempt = 0; attempt < maxRetries; attempt++) { + try { + response = filerClient.getBlockingStub().assignVolume( + FilerProto.AssignVolumeRequest.newBuilder() + .setCollection( + Strings.isNullOrEmpty(collection) ? filerClient.getCollection() : collection) + .setReplication( + Strings.isNullOrEmpty(replication) ? filerClient.getReplication() : replication) + .setDataCenter("") + .setTtlSec(0) + .setPath(path) + .build()); + break; // Success, exit retry loop + } catch (io.grpc.StatusRuntimeException e) { + lastException = new IOException( + "assignVolume failed (attempt " + (attempt + 1) + "/" + maxRetries + "): " + e.getMessage(), e); + if (attempt < maxRetries - 1) { + try { + Thread.sleep(100 * (attempt + 1)); // Exponential backoff: 100ms, 200ms, 300ms + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + throw new IOException("Interrupted during retry", ie); + } + } + } + } + + if (response == null) { + throw lastException != null ? lastException + : new IOException("assignVolume failed after " + maxRetries + " attempts"); + } if (!Strings.isNullOrEmpty(response.getError())) { throw new IOException(response.getError()); @@ -83,7 +112,8 @@ public class SeaweedWrite { String fileId = response.getFileId(); String auth = response.getAuth(); - String targetUrl = filerClient.getChunkUrl(fileId, response.getLocation().getUrl(), response.getLocation().getPublicUrl()); + String targetUrl = filerClient.getChunkUrl(fileId, response.getLocation().getUrl(), + response.getLocation().getPublicUrl()); ByteString cipherKeyString = com.google.protobuf.ByteString.EMPTY; byte[] cipherKey = null; @@ -106,27 +136,27 @@ public class SeaweedWrite { } public static void writeMeta(final FilerClient filerClient, - final String parentDirectory, - final FilerProto.Entry.Builder entry) throws IOException { + final String parentDirectory, + final FilerProto.Entry.Builder entry) throws IOException { synchronized (entry) { - List chunks = FileChunkManifest.maybeManifestize(filerClient, entry.getChunksList(), parentDirectory); + List chunks = FileChunkManifest.maybeManifestize(filerClient, entry.getChunksList(), + parentDirectory); entry.clearChunks(); entry.addAllChunks(chunks); filerClient.getBlockingStub().createEntry( FilerProto.CreateEntryRequest.newBuilder() .setDirectory(parentDirectory) .setEntry(entry) - .build() - ); + .build()); } } private static String multipartUpload(String targetUrl, - String auth, - final byte[] bytes, - final long bytesOffset, final long bytesLength, - byte[] cipherKey) throws IOException { + String auth, + final byte[] bytes, + final long bytesOffset, final long bytesLength, + byte[] cipherKey) throws IOException { MessageDigest md = null; try { md = MessageDigest.getInstance("MD5"); @@ -162,7 +192,8 @@ public class SeaweedWrite { try { if (response.getStatusLine().getStatusCode() / 100 != 2) { - if (response.getEntity().getContentType() != null && response.getEntity().getContentType().getValue().equals("application/json")) { + if (response.getEntity().getContentType() != null + && response.getEntity().getContentType().getValue().equals("application/json")) { throw new IOException(EntityUtils.toString(response.getEntity(), "UTF-8")); } else { throw new IOException(response.getStatusLine().getReasonPhrase());