From 6c8822f269c1dbddcc80383137417b5f071652b0 Mon Sep 17 00:00:00 2001 From: chrislu Date: Sat, 20 Aug 2022 18:59:57 -0700 Subject: [PATCH] filer.copy: retryable file part upload --- weed/command/filer_copy.go | 69 ++++++++++++++------------------------ 1 file changed, 26 insertions(+), 43 deletions(-) diff --git a/weed/command/filer_copy.go b/weed/command/filer_copy.go index a196d25f3..5fbddc07e 100644 --- a/weed/command/filer_copy.go +++ b/weed/command/filer_copy.go @@ -454,58 +454,41 @@ func (worker *FileCopyWorker) uploadFileInChunks(task FileCopyTask, f *os.File, wg.Done() <-concurrentChunks }() - // assign a volume - var assignResult *filer_pb.AssignVolumeResponse - var assignError error - err := util.Retry("assignVolume", func() error { - return pb.WithGrpcFilerClient(false, worker.filerAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { - request := &filer_pb.AssignVolumeRequest{ - Count: 1, - Replication: *worker.options.replication, - Collection: *worker.options.collection, - TtlSec: worker.options.ttlSec, - DiskType: *worker.options.diskType, - Path: task.destinationUrlPath + fileName, - } - - assignResult, assignError = client.AssignVolume(context.Background(), request) - if assignError != nil { - return fmt.Errorf("assign volume failure %v: %v", request, assignError) - } - if assignResult.Error != "" { - return fmt.Errorf("assign volume failure %v: %v", request, assignResult.Error) - } - return nil - }) - }) - if err != nil { - uploadError = fmt.Errorf("Failed to assign from %v: %v\n", worker.options.masters, err) - return - } - targetUrl := "http://" + assignResult.Location.Url + "/" + assignResult.FileId + fileId, uploadResult, err, _ := operation.UploadWithRetry( + worker, + &filer_pb.AssignVolumeRequest{ + Count: 1, + Replication: *worker.options.replication, + Collection: *worker.options.collection, + TtlSec: worker.options.ttlSec, + DiskType: *worker.options.diskType, + Path: task.destinationUrlPath + fileName, + }, + &operation.UploadOption{ + Filename: fileName + "-" + strconv.FormatInt(i+1, 10), + Cipher: worker.options.cipher, + IsInputCompressed: false, + MimeType: "", + PairMap: nil, + }, + func(host, fileId string) string { + return fmt.Sprintf("http://%s/%s", host, fileId) + }, + io.NewSectionReader(f, i*chunkSize, chunkSize), + ) - uploadOption := &operation.UploadOption{ - UploadUrl: targetUrl, - Filename: fileName + "-" + strconv.FormatInt(i+1, 10), - Cipher: worker.options.cipher, - IsInputCompressed: false, - MimeType: "", - PairMap: nil, - Jwt: security.EncodedJwt(assignResult.Auth), - } - uploadResult, err, _ := operation.Upload(io.NewSectionReader(f, i*chunkSize, chunkSize), uploadOption) if err != nil { - uploadError = fmt.Errorf("upload data %v to %s: %v\n", fileName, targetUrl, err) + uploadError = fmt.Errorf("upload data %v: %v\n", fileName, err) return } if uploadResult.Error != "" { - uploadError = fmt.Errorf("upload %v to %s result: %v\n", fileName, targetUrl, uploadResult.Error) + uploadError = fmt.Errorf("upload %v result: %v\n", fileName, uploadResult.Error) return } - chunksChan <- uploadResult.ToPbFileChunk(assignResult.FileId, i*chunkSize) + chunksChan <- uploadResult.ToPbFileChunk(fileId, i*chunkSize) - fmt.Printf("uploaded %s-%d to %s [%d,%d)\n", fileName, i+1, targetUrl, i*chunkSize, i*chunkSize+int64(uploadResult.Size)) + fmt.Printf("uploaded %s-%d [%d,%d)\n", fileName, i+1, i*chunkSize, i*chunkSize+int64(uploadResult.Size)) }(i) } wg.Wait()