Browse Source

filer.copy: retryable upload

pull/3472/head
chrislu 2 years ago
parent
commit
d49d0a9fc2
  1. 94
      weed/command/filer_copy.go
  2. 49
      weed/operation/upload_content.go

94
weed/command/filer_copy.go

@ -585,59 +585,57 @@ func detectMimeType(f *os.File) string {
func (worker *FileCopyWorker) saveDataAsChunk(reader io.Reader, name string, offset int64) (chunk *filer_pb.FileChunk, err error) { func (worker *FileCopyWorker) saveDataAsChunk(reader io.Reader, name string, offset int64) (chunk *filer_pb.FileChunk, err error) {
var fileId, host string
var auth security.EncodedJwt
var finalFileId string
uploadResult, flushErr, _ := operation.UploadWithRetry(
worker,
&filer_pb.AssignVolumeRequest{
Count: 1,
Replication: *worker.options.replication,
Collection: *worker.options.collection,
TtlSec: worker.options.ttlSec,
DiskType: *worker.options.diskType,
Path: name,
},
&operation.UploadOption{
Filename: name,
Cipher: worker.options.cipher,
IsInputCompressed: false,
MimeType: "",
PairMap: nil,
},
func(host, fileId string) string {
finalFileId = fileId
return fmt.Sprintf("http://%s/%s", host, fileId)
},
reader,
)
if flushErr := pb.WithGrpcFilerClient(false, worker.filerAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
ctx := context.Background()
assignErr := util.Retry("assignVolume", func() error {
request := &filer_pb.AssignVolumeRequest{
Count: 1,
Replication: *worker.options.replication,
Collection: *worker.options.collection,
TtlSec: worker.options.ttlSec,
DiskType: *worker.options.diskType,
Path: name,
}
resp, err := client.AssignVolume(ctx, request)
if err != nil {
return fmt.Errorf("assign volume failure %v: %v", request, err)
}
if resp.Error != "" {
return fmt.Errorf("assign volume failure %v: %v", request, resp.Error)
}
fileId, host, auth = resp.FileId, resp.Location.Url, security.EncodedJwt(resp.Auth)
return nil
})
if assignErr != nil {
return assignErr
}
return nil
}); flushErr != nil {
return nil, fmt.Errorf("filerGrpcAddress assign volume: %v", flushErr)
}
uploadOption := &operation.UploadOption{
UploadUrl: fmt.Sprintf("http://%s/%s", host, fileId),
Filename: name,
Cipher: worker.options.cipher,
IsInputCompressed: false,
MimeType: "",
PairMap: nil,
Jwt: auth,
}
uploadResult, flushErr, _ := operation.Upload(reader, uploadOption)
if flushErr != nil { if flushErr != nil {
return nil, fmt.Errorf("upload data: %v", flushErr) return nil, fmt.Errorf("upload data: %v", flushErr)
} }
if uploadResult.Error != "" { if uploadResult.Error != "" {
return nil, fmt.Errorf("upload result: %v", uploadResult.Error) return nil, fmt.Errorf("upload result: %v", uploadResult.Error)
} }
return uploadResult.ToPbFileChunk(fileId, offset), nil
return uploadResult.ToPbFileChunk(finalFileId, offset), nil
}
var _ = filer_pb.FilerClient(&FileCopyWorker{})
func (worker *FileCopyWorker) WithFilerClient(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) (err error) {
filerGrpcAddress := worker.filerAddress.ToGrpcAddress()
err = pb.WithGrpcClient(streamingMode, func(grpcConnection *grpc.ClientConn) error {
client := filer_pb.NewSeaweedFilerClient(grpcConnection)
return fn(client)
}, filerGrpcAddress, worker.options.grpcDialOption)
return
}
func (worker *FileCopyWorker) AdjustedUrl(location *filer_pb.Location) string {
return location.Url
}
func (worker *FileCopyWorker) GetDataCenter() string {
return ""
} }

49
weed/operation/upload_content.go

@ -2,6 +2,7 @@ package operation
import ( import (
"bytes" "bytes"
"context"
"encoding/json" "encoding/json"
"fmt" "fmt"
"github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/glog"
@ -28,6 +29,7 @@ type UploadOption struct {
MimeType string MimeType string
PairMap map[string]string PairMap map[string]string
Jwt security.EncodedJwt Jwt security.EncodedJwt
RetryForever bool
} }
type UploadResult struct { type UploadResult struct {
@ -76,6 +78,53 @@ func init() {
}} }}
} }
// UploadWithRetry will retry both assigning volume request and uploading content
// The option parameter does not need to specify UploadUrl and Jwt, which will come from assigning volume.
func UploadWithRetry(filerClient filer_pb.FilerClient, assignRequest *filer_pb.AssignVolumeRequest, uploadOption *UploadOption, genFileUrlFn func(host, fileId string) string, reader io.Reader) (uploadResult *UploadResult, err error, data []byte) {
doUploadFunc := func() error {
var fileId, host string
var auth security.EncodedJwt
// grpc assign volume
if grpcAssignErr := filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
resp, assignErr := client.AssignVolume(context.Background(), assignRequest)
if assignErr != nil {
glog.V(0).Infof("assign volume failure %v: %v", assignRequest, assignErr)
return assignErr
}
if resp.Error != "" {
return fmt.Errorf("assign volume failure %v: %v", assignRequest, resp.Error)
}
fileId, auth = resp.FileId, security.EncodedJwt(resp.Auth)
loc := resp.Location
host = filerClient.AdjustedUrl(loc)
return nil
}); grpcAssignErr != nil {
return fmt.Errorf("filerGrpcAddress assign volume: %v", grpcAssignErr)
}
uploadOption.UploadUrl = genFileUrlFn(host, fileId)
uploadOption.Jwt = auth
var uploadErr error
uploadResult, uploadErr, data = doUpload(reader, uploadOption)
return uploadErr
}
if uploadOption.RetryForever {
util.RetryForever("uploadWithRetryForever", doUploadFunc, func(err error) (shouldContinue bool) {
glog.V(0).Infof("upload content: %v", err)
return true
})
} else {
err = util.Retry("uploadWithRetry", doUploadFunc)
}
return
}
var fileNameEscaper = strings.NewReplacer(`\`, `\\`, `"`, `\"`, "\n", "") var fileNameEscaper = strings.NewReplacer(`\`, `\\`, `"`, `\"`, "\n", "")
// Upload sends a POST request to a volume server to upload the content with adjustable compression level // Upload sends a POST request to a volume server to upload the content with adjustable compression level

Loading…
Cancel
Save