From af207bbaf05ce441e9a87f0833c09a36c0629e85 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Fri, 1 Oct 2021 23:23:39 -0700 Subject: [PATCH] retry both assign volume and uploading data fix https://github.com/chrislusf/seaweedfs/issues/2351 --- weed/command/filer_copy.go | 62 +++++++++++++++++++++----------------- 1 file changed, 34 insertions(+), 28 deletions(-) diff --git a/weed/command/filer_copy.go b/weed/command/filer_copy.go index 0feae63b3..630f066d6 100644 --- a/weed/command/filer_copy.go +++ b/weed/command/filer_copy.go @@ -344,9 +344,9 @@ func (worker *FileCopyWorker) uploadFileAsOne(task FileCopyTask, f *os.File) err return err } - // assign a volume - err = util.Retry("assignVolume", func() error { - return pb.WithGrpcFilerClient(worker.filerAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + err = util.Retry("upload", func() error { + // assign a volume + assignErr := pb.WithGrpcFilerClient(worker.filerAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.AssignVolumeRequest{ Count: 1, @@ -369,35 +369,41 @@ func (worker *FileCopyWorker) uploadFileAsOne(task FileCopyTask, f *os.File) err } return nil }) - }) - if err != nil { - return fmt.Errorf("Failed to assign from %v: %v\n", worker.options.masters, err) - } + if assignErr != nil { + return assignErr + } - targetUrl := "http://" + assignResult.Location.Url + "/" + assignResult.FileId - uploadOption := &operation.UploadOption{ - UploadUrl: targetUrl, - Filename: fileName, - Cipher: worker.options.cipher, - IsInputCompressed: false, - MimeType: mimeType, - PairMap: nil, - Jwt: security.EncodedJwt(assignResult.Auth), - } - uploadResult, err := operation.UploadData(data, uploadOption) + // uplaod data + targetUrl := "http://" + assignResult.Location.Url + "/" + assignResult.FileId + uploadOption := &operation.UploadOption{ + UploadUrl: targetUrl, + Filename: fileName, + Cipher: worker.options.cipher, + IsInputCompressed: false, + MimeType: mimeType, + PairMap: nil, + Jwt: security.EncodedJwt(assignResult.Auth), + } + uploadResult, err := operation.UploadData(data, uploadOption) + if err != nil { + return fmt.Errorf("upload data %v to %s: %v\n", fileName, targetUrl, err) + } + if uploadResult.Error != "" { + return fmt.Errorf("upload %v to %s result: %v\n", fileName, targetUrl, uploadResult.Error) + } + if *worker.options.verbose { + fmt.Printf("uploaded %s to %s\n", fileName, targetUrl) + } + + fmt.Printf("copied %s => http://%s%s%s\n", f.Name(), worker.filerAddress.ToHttpAddress(), task.destinationUrlPath, fileName) + chunks = append(chunks, uploadResult.ToPbFileChunk(assignResult.FileId, 0)) + + return nil + }) if err != nil { - return fmt.Errorf("upload data %v to %s: %v\n", fileName, targetUrl, err) - } - if uploadResult.Error != "" { - return fmt.Errorf("upload %v to %s result: %v\n", fileName, targetUrl, uploadResult.Error) + return fmt.Errorf("upload %v: %v\n", fileName, err) } - if *worker.options.verbose { - fmt.Printf("uploaded %s to %s\n", fileName, targetUrl) - } - - chunks = append(chunks, uploadResult.ToPbFileChunk(assignResult.FileId, 0)) - fmt.Printf("copied %s => http://%s%s%s\n", f.Name(), worker.filerAddress.ToHttpAddress(), task.destinationUrlPath, fileName) } if err := pb.WithGrpcFilerClient(worker.filerAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {