Browse Source

Retry logic

pull/7526/head
chrislu 4 weeks ago
parent
commit
ed21e2d806
  1. 97
      other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java

97
other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java

@ -25,13 +25,13 @@ public class SeaweedWrite {
private static final SecureRandom random = new SecureRandom();
public static void writeData(FilerProto.Entry.Builder entry,
final String replication,
String collection,
final FilerClient filerClient,
final long offset,
final byte[] bytes,
final long bytesOffset, final long bytesLength,
final String path) throws IOException {
final String replication,
String collection,
final FilerClient filerClient,
final long offset,
final byte[] bytes,
final long bytesOffset, final long bytesLength,
final String path) throws IOException {
IOException lastException = null;
for (long waitTime = 1000L; waitTime < 10 * 1000; waitTime += waitTime / 2) {
@ -60,21 +60,50 @@ public class SeaweedWrite {
}
public static FilerProto.FileChunk.Builder writeChunk(final String replication,
final String collection,
final FilerClient filerClient,
final long offset,
final byte[] bytes,
final long bytesOffset,
final long bytesLength,
final String path) throws IOException {
FilerProto.AssignVolumeResponse response = filerClient.getBlockingStub().assignVolume(
FilerProto.AssignVolumeRequest.newBuilder()
.setCollection(Strings.isNullOrEmpty(collection) ? filerClient.getCollection() : collection)
.setReplication(Strings.isNullOrEmpty(replication) ? filerClient.getReplication() : replication)
.setDataCenter("")
.setTtlSec(0)
.setPath(path)
.build());
final String collection,
final FilerClient filerClient,
final long offset,
final byte[] bytes,
final long bytesOffset,
final long bytesLength,
final String path) throws IOException {
// Retry assignVolume call for transient network/server errors
FilerProto.AssignVolumeResponse response = null;
IOException lastException = null;
int maxRetries = 3;
for (int attempt = 0; attempt < maxRetries; attempt++) {
try {
response = filerClient.getBlockingStub().assignVolume(
FilerProto.AssignVolumeRequest.newBuilder()
.setCollection(
Strings.isNullOrEmpty(collection) ? filerClient.getCollection() : collection)
.setReplication(
Strings.isNullOrEmpty(replication) ? filerClient.getReplication() : replication)
.setDataCenter("")
.setTtlSec(0)
.setPath(path)
.build());
break; // Success, exit retry loop
} catch (io.grpc.StatusRuntimeException e) {
lastException = new IOException(
"assignVolume failed (attempt " + (attempt + 1) + "/" + maxRetries + "): " + e.getMessage(), e);
if (attempt < maxRetries - 1) {
try {
Thread.sleep(100 * (attempt + 1)); // Exponential backoff: 100ms, 200ms, 300ms
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new IOException("Interrupted during retry", ie);
}
}
}
}
if (response == null) {
throw lastException != null ? lastException
: new IOException("assignVolume failed after " + maxRetries + " attempts");
}
if (!Strings.isNullOrEmpty(response.getError())) {
throw new IOException(response.getError());
@ -83,7 +112,8 @@ public class SeaweedWrite {
String fileId = response.getFileId();
String auth = response.getAuth();
String targetUrl = filerClient.getChunkUrl(fileId, response.getLocation().getUrl(), response.getLocation().getPublicUrl());
String targetUrl = filerClient.getChunkUrl(fileId, response.getLocation().getUrl(),
response.getLocation().getPublicUrl());
ByteString cipherKeyString = com.google.protobuf.ByteString.EMPTY;
byte[] cipherKey = null;
@ -106,27 +136,27 @@ public class SeaweedWrite {
}
public static void writeMeta(final FilerClient filerClient,
final String parentDirectory,
final FilerProto.Entry.Builder entry) throws IOException {
final String parentDirectory,
final FilerProto.Entry.Builder entry) throws IOException {
synchronized (entry) {
List<FilerProto.FileChunk> chunks = FileChunkManifest.maybeManifestize(filerClient, entry.getChunksList(), parentDirectory);
List<FilerProto.FileChunk> chunks = FileChunkManifest.maybeManifestize(filerClient, entry.getChunksList(),
parentDirectory);
entry.clearChunks();
entry.addAllChunks(chunks);
filerClient.getBlockingStub().createEntry(
FilerProto.CreateEntryRequest.newBuilder()
.setDirectory(parentDirectory)
.setEntry(entry)
.build()
);
.build());
}
}
private static String multipartUpload(String targetUrl,
String auth,
final byte[] bytes,
final long bytesOffset, final long bytesLength,
byte[] cipherKey) throws IOException {
String auth,
final byte[] bytes,
final long bytesOffset, final long bytesLength,
byte[] cipherKey) throws IOException {
MessageDigest md = null;
try {
md = MessageDigest.getInstance("MD5");
@ -162,7 +192,8 @@ public class SeaweedWrite {
try {
if (response.getStatusLine().getStatusCode() / 100 != 2) {
if (response.getEntity().getContentType() != null && response.getEntity().getContentType().getValue().equals("application/json")) {
if (response.getEntity().getContentType() != null
&& response.getEntity().getContentType().getValue().equals("application/json")) {
throw new IOException(EntityUtils.toString(response.getEntity(), "UTF-8"));
} else {
throw new IOException(response.getStatusLine().getReasonPhrase());

Loading…
Cancel
Save