Browse Source

refactoring

pull/3472/head
chrislu 2 years ago
parent
commit
973f6dd162
  1. 64
      weed/command/filer_copy.go

64
weed/command/filer_copy.go

@ -331,8 +331,6 @@ func (worker *FileCopyWorker) uploadFileAsOne(task FileCopyTask, f *os.File) err
var mimeType string var mimeType string
var chunks []*filer_pb.FileChunk var chunks []*filer_pb.FileChunk
var assignResult *filer_pb.AssignVolumeResponse
var assignError error
if task.fileMode&os.ModeDir == 0 && task.fileSize > 0 { if task.fileMode&os.ModeDir == 0 && task.fileSize > 0 {
@ -342,66 +340,32 @@ func (worker *FileCopyWorker) uploadFileAsOne(task FileCopyTask, f *os.File) err
return err return err
} }
err = util.Retry("upload", func() error {
// assign a volume
assignErr := pb.WithGrpcFilerClient(false, worker.filerAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
request := &filer_pb.AssignVolumeRequest{
finalFileId, uploadResult, flushErr, _ := operation.UploadWithRetry(
worker,
&filer_pb.AssignVolumeRequest{
Count: 1, Count: 1,
Replication: *worker.options.replication, Replication: *worker.options.replication,
Collection: *worker.options.collection, Collection: *worker.options.collection,
TtlSec: worker.options.ttlSec, TtlSec: worker.options.ttlSec,
DiskType: *worker.options.diskType, DiskType: *worker.options.diskType,
Path: task.destinationUrlPath, Path: task.destinationUrlPath,
}
assignResult, assignError = client.AssignVolume(context.Background(), request)
if assignError != nil {
return fmt.Errorf("assign volume failure %v: %v", request, assignError)
}
if assignResult.Error != "" {
return fmt.Errorf("assign volume failure %v: %v", request, assignResult.Error)
}
if assignResult.Location.Url == "" {
return fmt.Errorf("assign volume failure %v: %v", request, assignResult)
}
return nil
})
if assignErr != nil {
return assignErr
}
// upload data
targetUrl := "http://" + assignResult.Location.Url + "/" + assignResult.FileId
uploadOption := &operation.UploadOption{
UploadUrl: targetUrl,
},
&operation.UploadOption{
Filename: fileName, Filename: fileName,
Cipher: worker.options.cipher, Cipher: worker.options.cipher,
IsInputCompressed: false, IsInputCompressed: false,
MimeType: mimeType, MimeType: mimeType,
PairMap: nil, PairMap: nil,
Jwt: security.EncodedJwt(assignResult.Auth),
}
uploadResult, err := operation.UploadData(data, uploadOption)
if err != nil {
return fmt.Errorf("upload data %v to %s: %v\n", fileName, targetUrl, err)
}
if uploadResult.Error != "" {
return fmt.Errorf("upload %v to %s result: %v\n", fileName, targetUrl, uploadResult.Error)
}
if *worker.options.verbose {
fmt.Printf("uploaded %s to %s\n", fileName, targetUrl)
}
fmt.Printf("copied %s => http://%s%s%s\n", f.Name(), worker.filerAddress.ToHttpAddress(), task.destinationUrlPath, fileName)
chunks = append(chunks, uploadResult.ToPbFileChunk(assignResult.FileId, 0))
return nil
})
if err != nil {
return fmt.Errorf("upload %v: %v\n", fileName, err)
},
func(host, fileId string) string {
return fmt.Sprintf("http://%s/%s", host, fileId)
},
util.NewBytesReader(data),
)
if flushErr != nil {
return flushErr
} }
chunks = append(chunks, uploadResult.ToPbFileChunk(finalFileId, 0))
} }
if err := pb.WithGrpcFilerClient(false, worker.filerAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { if err := pb.WithGrpcFilerClient(false, worker.filerAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {

Loading…
Cancel
Save