From 7d426d2a56521f2053f1611c084416534e982e12 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Mon, 30 Mar 2026 13:32:31 -0700 Subject: [PATCH] Retry uploader on volume full (#8853) * retry uploader on volume full * drop unused upload retry helper --- weed/operation/upload_content.go | 63 ++++++++----- weed/operation/upload_content_test.go | 124 ++++++++++++++++++++++++++ 2 files changed, 163 insertions(+), 24 deletions(-) create mode 100644 weed/operation/upload_content_test.go diff --git a/weed/operation/upload_content.go b/weed/operation/upload_content.go index 44f494450..9e7b6c4e8 100644 --- a/weed/operation/upload_content.go +++ b/weed/operation/upload_content.go @@ -97,6 +97,13 @@ var ( once sync.Once ) +var uploadRetryableAssignErrList = []string{ + "transport", + "is read only", + "failed to write to local disk", + "Volume Size ", +} + // HTTPClient interface for testing type HTTPClient interface { Do(req *http.Request) (*http.Response, error) @@ -128,6 +135,34 @@ func newUploader(httpClient HTTPClient) *Uploader { } } +func (uploader *Uploader) uploadWithRetryData(assignFn func() (fileId string, host string, auth security.EncodedJwt, err error), uploadOption *UploadOption, genFileUrlFn func(host, fileId string) string, data []byte) (fileId string, uploadResult *UploadResult, err error) { + doUploadFunc := func() error { + var host string + var auth security.EncodedJwt + fileId, host, auth, err = assignFn() + if err != nil { + return err + } + + uploadOption.UploadUrl = genFileUrlFn(host, fileId) + uploadOption.Jwt = auth + + uploadResult, err = uploader.retriedUploadData(context.Background(), data, uploadOption) + return err + } + + if uploadOption.RetryForever { + util.RetryUntil("uploadWithRetryForever", doUploadFunc, func(err error) (shouldContinue bool) { + glog.V(0).Infof("upload content: %v", err) + return true + }) + } else { + err = util.MultiRetry("uploadWithRetry", uploadRetryableAssignErrList, doUploadFunc) + } + + return +} + // UploadWithRetry will retry both assigning volume request and uploading content // The option parameter does not need to specify UploadUrl and Jwt, which will come from assigning volume. func (uploader *Uploader) UploadWithRetry(filerClient filer_pb.FilerClient, assignRequest *filer_pb.AssignVolumeRequest, uploadOption *UploadOption, genFileUrlFn func(host, fileId string) string, reader io.Reader) (fileId string, uploadResult *UploadResult, err error, data []byte) { @@ -144,11 +179,7 @@ func (uploader *Uploader) UploadWithRetry(filerClient filer_pb.FilerClient, assi glog.V(4).Infof("upload read %d bytes from %s", len(data), uploadOption.SourceUrl) } - doUploadFunc := func() error { - - var host string - var auth security.EncodedJwt - + fileId, uploadResult, err = uploader.uploadWithRetryData(func() (fileId string, host string, auth security.EncodedJwt, err error) { // grpc assign volume if grpcAssignErr := filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { resp, assignErr := client.AssignVolume(context.Background(), assignRequest) @@ -166,26 +197,10 @@ func (uploader *Uploader) UploadWithRetry(filerClient filer_pb.FilerClient, assi return nil }); grpcAssignErr != nil { - return fmt.Errorf("filerGrpcAddress assign volume: %w", grpcAssignErr) + err = fmt.Errorf("filerGrpcAddress assign volume: %w", grpcAssignErr) } - - uploadOption.UploadUrl = genFileUrlFn(host, fileId) - uploadOption.Jwt = auth - - var uploadErr error - uploadResult, uploadErr = uploader.retriedUploadData(context.Background(), data, uploadOption) - return uploadErr - } - if uploadOption.RetryForever { - util.RetryUntil("uploadWithRetryForever", doUploadFunc, func(err error) (shouldContinue bool) { - glog.V(0).Infof("upload content: %v", err) - return true - }) - } else { - uploadErrList := []string{"transport", "is read only"} - err = util.MultiRetry("uploadWithRetry", uploadErrList, doUploadFunc) - } - + return + }, uploadOption, genFileUrlFn, data) return } diff --git a/weed/operation/upload_content_test.go b/weed/operation/upload_content_test.go new file mode 100644 index 000000000..501d6c318 --- /dev/null +++ b/weed/operation/upload_content_test.go @@ -0,0 +1,124 @@ +package operation + +import ( + "fmt" + "io" + "net/http" + "strings" + "sync" + "testing" + + "github.com/seaweedfs/seaweedfs/weed/security" +) + +type scriptedHTTPResponse struct { + status int + body string +} + +type scriptedHTTPClient struct { + mu sync.Mutex + responses map[string][]scriptedHTTPResponse + calls []string +} + +func testIsUploadRetryableAssignError(err error) bool { + if err == nil { + return false + } + for _, retryable := range uploadRetryableAssignErrList { + if strings.Contains(err.Error(), retryable) { + return true + } + } + return false +} + +func (c *scriptedHTTPClient) Do(req *http.Request) (*http.Response, error) { + c.mu.Lock() + defer c.mu.Unlock() + + url := req.URL.String() + c.calls = append(c.calls, url) + + plans := c.responses[url] + if len(plans) == 0 { + return nil, fmt.Errorf("unexpected request to %s", url) + } + plan := plans[0] + c.responses[url] = plans[1:] + + return &http.Response{ + StatusCode: plan.status, + Header: make(http.Header), + Body: io.NopCloser(strings.NewReader(plan.body)), + }, nil +} + +func TestIsUploadRetryableAssignError(t *testing.T) { + testCases := []struct { + name string + err error + want bool + }{ + {name: "nil", err: nil, want: false}, + {name: "transport", err: fmt.Errorf("transport is closing"), want: true}, + {name: "read only", err: fmt.Errorf("volume 1 is read only"), want: true}, + {name: "volume full", err: fmt.Errorf("failed to write to local disk: append to volume 1 size 0 actualSize 0: Volume Size 33555976 Exceeded 33554432"), want: true}, + {name: "other permanent", err: fmt.Errorf("mismatching cookie"), want: false}, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + if got := testIsUploadRetryableAssignError(tc.err); got != tc.want { + t.Fatalf("testIsUploadRetryableAssignError(%v) = %v, want %v", tc.err, got, tc.want) + } + }) + } +} + +func TestUploadWithRetryDataReassignsOnVolumeSizeExceeded(t *testing.T) { + httpClient := &scriptedHTTPClient{ + responses: map[string][]scriptedHTTPResponse{ + "http://volume-a/1,first": { + {status: http.StatusInternalServerError, body: `{"error":"failed to write to local disk: append to volume 1 size 0 actualSize 0: Volume Size 33555976 Exceeded 33554432"}`}, + {status: http.StatusInternalServerError, body: `{"error":"failed to write to local disk: append to volume 1 size 0 actualSize 0: Volume Size 33555976 Exceeded 33554432"}`}, + {status: http.StatusInternalServerError, body: `{"error":"failed to write to local disk: append to volume 1 size 0 actualSize 0: Volume Size 33555976 Exceeded 33554432"}`}, + }, + "http://volume-b/2,second": { + {status: http.StatusCreated, body: `{"name":"test.bin","size":3}`}, + }, + }, + } + uploader := newUploader(httpClient) + + assignCalls := 0 + fileID, uploadResult, err := uploader.uploadWithRetryData(func() (string, string, security.EncodedJwt, error) { + assignCalls++ + if assignCalls == 1 { + return "1,first", "volume-a", "", nil + } + return "2,second", "volume-b", "", nil + }, &UploadOption{Filename: "test.bin"}, func(host, fileId string) string { + return "http://" + host + "/" + fileId + }, []byte("abc")) + + if err != nil { + t.Fatalf("expected success after reassignment, got %v", err) + } + if fileID != "2,second" { + t.Fatalf("expected second file id, got %s", fileID) + } + if assignCalls != 2 { + t.Fatalf("expected 2 assign attempts, got %d", assignCalls) + } + if uploadResult == nil || uploadResult.Name != "test.bin" { + t.Fatalf("expected successful upload result, got %#v", uploadResult) + } + if len(httpClient.calls) != 4 { + t.Fatalf("expected 4 upload attempts (3 same-url retries + 1 reassigned upload), got %d", len(httpClient.calls)) + } + if httpClient.calls[0] != "http://volume-a/1,first" || httpClient.calls[3] != "http://volume-b/2,second" { + t.Fatalf("unexpected upload call sequence: %#v", httpClient.calls) + } +}