Browse Source

Retry uploader on volume full (#8853)

* retry uploader on volume full

* drop unused upload retry helper
master
Chris Lu 6 hours ago
committed by GitHub
parent
commit
7d426d2a56
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
  1. 63
      weed/operation/upload_content.go
  2. 124
      weed/operation/upload_content_test.go

63
weed/operation/upload_content.go

@ -97,6 +97,13 @@ var (
once sync.Once
)
var uploadRetryableAssignErrList = []string{
"transport",
"is read only",
"failed to write to local disk",
"Volume Size ",
}
// HTTPClient interface for testing
type HTTPClient interface {
Do(req *http.Request) (*http.Response, error)
@ -128,6 +135,34 @@ func newUploader(httpClient HTTPClient) *Uploader {
}
}
func (uploader *Uploader) uploadWithRetryData(assignFn func() (fileId string, host string, auth security.EncodedJwt, err error), uploadOption *UploadOption, genFileUrlFn func(host, fileId string) string, data []byte) (fileId string, uploadResult *UploadResult, err error) {
doUploadFunc := func() error {
var host string
var auth security.EncodedJwt
fileId, host, auth, err = assignFn()
if err != nil {
return err
}
uploadOption.UploadUrl = genFileUrlFn(host, fileId)
uploadOption.Jwt = auth
uploadResult, err = uploader.retriedUploadData(context.Background(), data, uploadOption)
return err
}
if uploadOption.RetryForever {
util.RetryUntil("uploadWithRetryForever", doUploadFunc, func(err error) (shouldContinue bool) {
glog.V(0).Infof("upload content: %v", err)
return true
})
} else {
err = util.MultiRetry("uploadWithRetry", uploadRetryableAssignErrList, doUploadFunc)
}
return
}
// UploadWithRetry will retry both assigning volume request and uploading content
// The option parameter does not need to specify UploadUrl and Jwt, which will come from assigning volume.
func (uploader *Uploader) UploadWithRetry(filerClient filer_pb.FilerClient, assignRequest *filer_pb.AssignVolumeRequest, uploadOption *UploadOption, genFileUrlFn func(host, fileId string) string, reader io.Reader) (fileId string, uploadResult *UploadResult, err error, data []byte) {
@ -144,11 +179,7 @@ func (uploader *Uploader) UploadWithRetry(filerClient filer_pb.FilerClient, assi
glog.V(4).Infof("upload read %d bytes from %s", len(data), uploadOption.SourceUrl)
}
doUploadFunc := func() error {
var host string
var auth security.EncodedJwt
fileId, uploadResult, err = uploader.uploadWithRetryData(func() (fileId string, host string, auth security.EncodedJwt, err error) {
// grpc assign volume
if grpcAssignErr := filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
resp, assignErr := client.AssignVolume(context.Background(), assignRequest)
@ -166,26 +197,10 @@ func (uploader *Uploader) UploadWithRetry(filerClient filer_pb.FilerClient, assi
return nil
}); grpcAssignErr != nil {
return fmt.Errorf("filerGrpcAddress assign volume: %w", grpcAssignErr)
err = fmt.Errorf("filerGrpcAddress assign volume: %w", grpcAssignErr)
}
uploadOption.UploadUrl = genFileUrlFn(host, fileId)
uploadOption.Jwt = auth
var uploadErr error
uploadResult, uploadErr = uploader.retriedUploadData(context.Background(), data, uploadOption)
return uploadErr
}
if uploadOption.RetryForever {
util.RetryUntil("uploadWithRetryForever", doUploadFunc, func(err error) (shouldContinue bool) {
glog.V(0).Infof("upload content: %v", err)
return true
})
} else {
uploadErrList := []string{"transport", "is read only"}
err = util.MultiRetry("uploadWithRetry", uploadErrList, doUploadFunc)
}
return
}, uploadOption, genFileUrlFn, data)
return
}

124
weed/operation/upload_content_test.go

@ -0,0 +1,124 @@
package operation
import (
"fmt"
"io"
"net/http"
"strings"
"sync"
"testing"
"github.com/seaweedfs/seaweedfs/weed/security"
)
type scriptedHTTPResponse struct {
status int
body string
}
type scriptedHTTPClient struct {
mu sync.Mutex
responses map[string][]scriptedHTTPResponse
calls []string
}
func testIsUploadRetryableAssignError(err error) bool {
if err == nil {
return false
}
for _, retryable := range uploadRetryableAssignErrList {
if strings.Contains(err.Error(), retryable) {
return true
}
}
return false
}
func (c *scriptedHTTPClient) Do(req *http.Request) (*http.Response, error) {
c.mu.Lock()
defer c.mu.Unlock()
url := req.URL.String()
c.calls = append(c.calls, url)
plans := c.responses[url]
if len(plans) == 0 {
return nil, fmt.Errorf("unexpected request to %s", url)
}
plan := plans[0]
c.responses[url] = plans[1:]
return &http.Response{
StatusCode: plan.status,
Header: make(http.Header),
Body: io.NopCloser(strings.NewReader(plan.body)),
}, nil
}
func TestIsUploadRetryableAssignError(t *testing.T) {
testCases := []struct {
name string
err error
want bool
}{
{name: "nil", err: nil, want: false},
{name: "transport", err: fmt.Errorf("transport is closing"), want: true},
{name: "read only", err: fmt.Errorf("volume 1 is read only"), want: true},
{name: "volume full", err: fmt.Errorf("failed to write to local disk: append to volume 1 size 0 actualSize 0: Volume Size 33555976 Exceeded 33554432"), want: true},
{name: "other permanent", err: fmt.Errorf("mismatching cookie"), want: false},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
if got := testIsUploadRetryableAssignError(tc.err); got != tc.want {
t.Fatalf("testIsUploadRetryableAssignError(%v) = %v, want %v", tc.err, got, tc.want)
}
})
}
}
func TestUploadWithRetryDataReassignsOnVolumeSizeExceeded(t *testing.T) {
httpClient := &scriptedHTTPClient{
responses: map[string][]scriptedHTTPResponse{
"http://volume-a/1,first": {
{status: http.StatusInternalServerError, body: `{"error":"failed to write to local disk: append to volume 1 size 0 actualSize 0: Volume Size 33555976 Exceeded 33554432"}`},
{status: http.StatusInternalServerError, body: `{"error":"failed to write to local disk: append to volume 1 size 0 actualSize 0: Volume Size 33555976 Exceeded 33554432"}`},
{status: http.StatusInternalServerError, body: `{"error":"failed to write to local disk: append to volume 1 size 0 actualSize 0: Volume Size 33555976 Exceeded 33554432"}`},
},
"http://volume-b/2,second": {
{status: http.StatusCreated, body: `{"name":"test.bin","size":3}`},
},
},
}
uploader := newUploader(httpClient)
assignCalls := 0
fileID, uploadResult, err := uploader.uploadWithRetryData(func() (string, string, security.EncodedJwt, error) {
assignCalls++
if assignCalls == 1 {
return "1,first", "volume-a", "", nil
}
return "2,second", "volume-b", "", nil
}, &UploadOption{Filename: "test.bin"}, func(host, fileId string) string {
return "http://" + host + "/" + fileId
}, []byte("abc"))
if err != nil {
t.Fatalf("expected success after reassignment, got %v", err)
}
if fileID != "2,second" {
t.Fatalf("expected second file id, got %s", fileID)
}
if assignCalls != 2 {
t.Fatalf("expected 2 assign attempts, got %d", assignCalls)
}
if uploadResult == nil || uploadResult.Name != "test.bin" {
t.Fatalf("expected successful upload result, got %#v", uploadResult)
}
if len(httpClient.calls) != 4 {
t.Fatalf("expected 4 upload attempts (3 same-url retries + 1 reassigned upload), got %d", len(httpClient.calls))
}
if httpClient.calls[0] != "http://volume-a/1,first" || httpClient.calls[3] != "http://volume-b/2,second" {
t.Fatalf("unexpected upload call sequence: %#v", httpClient.calls)
}
}
Loading…
Cancel
Save