Browse Source

filer: parallize weed filer.copy for single file

pull/1096/head
Chris Lu 5 years ago
parent
commit
5d78871487
  1. 49
      weed/command/filer_copy.go

49
weed/command/filer_copy.go

@ -35,7 +35,8 @@ type CopyOptions struct {
ttl *string ttl *string
maxMB *int maxMB *int
masterClient *wdclient.MasterClient masterClient *wdclient.MasterClient
concurrency *int
concurrenctFiles *int
concurrenctChunks *int
compressionLevel *int compressionLevel *int
grpcDialOption grpc.DialOption grpcDialOption grpc.DialOption
masters []string masters []string
@ -49,7 +50,8 @@ func init() {
copy.collection = cmdCopy.Flag.String("collection", "", "optional collection name") copy.collection = cmdCopy.Flag.String("collection", "", "optional collection name")
copy.ttl = cmdCopy.Flag.String("ttl", "", "time to live, e.g.: 1m, 1h, 1d, 1M, 1y") copy.ttl = cmdCopy.Flag.String("ttl", "", "time to live, e.g.: 1m, 1h, 1d, 1M, 1y")
copy.maxMB = cmdCopy.Flag.Int("maxMB", 32, "split files larger than the limit") copy.maxMB = cmdCopy.Flag.Int("maxMB", 32, "split files larger than the limit")
copy.concurrency = cmdCopy.Flag.Int("c", 8, "concurrent file copy goroutines")
copy.concurrenctFiles = cmdCopy.Flag.Int("c", 8, "concurrent file copy goroutines")
copy.concurrenctChunks = cmdCopy.Flag.Int("concurrentChunks", 8, "concurrent chunk copy goroutines for each file")
copy.compressionLevel = cmdCopy.Flag.Int("compressionLevel", 9, "local file compression level 1 ~ 9") copy.compressionLevel = cmdCopy.Flag.Int("compressionLevel", 9, "local file compression level 1 ~ 9")
} }
@ -131,7 +133,7 @@ func runCopy(cmd *Command, args []string) bool {
util.SetupProfiling("filer.copy.cpu.pprof", "filer.copy.mem.pprof") util.SetupProfiling("filer.copy.cpu.pprof", "filer.copy.mem.pprof")
} }
fileCopyTaskChan := make(chan FileCopyTask, *copy.concurrency)
fileCopyTaskChan := make(chan FileCopyTask, *copy.concurrenctFiles)
go func() { go func() {
defer close(fileCopyTaskChan) defer close(fileCopyTaskChan)
@ -142,7 +144,7 @@ func runCopy(cmd *Command, args []string) bool {
} }
} }
}() }()
for i := 0; i < *copy.concurrency; i++ {
for i := 0; i < *copy.concurrenctFiles; i++ {
waitGroup.Add(1) waitGroup.Add(1)
go func() { go func() {
defer waitGroup.Done() defer waitGroup.Done()
@ -345,10 +347,21 @@ func (worker *FileCopyWorker) uploadFileInChunks(ctx context.Context, task FileC
fileName := filepath.Base(f.Name()) fileName := filepath.Base(f.Name())
mimeType := detectMimeType(f) mimeType := detectMimeType(f)
var chunks []*filer_pb.FileChunk
chunksChan := make(chan *filer_pb.FileChunk, chunkCount)
for i := int64(0); i < int64(chunkCount); i++ {
concurrentChunks := make(chan struct{}, *worker.options.concurrenctChunks)
var wg sync.WaitGroup
var uploadError error
fmt.Printf("uploading %s in %d chunks ...\n", fileName, chunkCount)
for i := int64(0); i < int64(chunkCount) && uploadError == nil; i++ {
wg.Add(1)
concurrentChunks <- struct{}{}
go func(i int64) {
defer func() {
wg.Done()
<-concurrentChunks
}()
// assign a volume // assign a volume
assignResult, err := operation.Assign(worker.options.masterClient.GetMaster(), worker.options.grpcDialOption, &operation.VolumeAssignRequest{ assignResult, err := operation.Assign(worker.options.masterClient.GetMaster(), worker.options.grpcDialOption, &operation.VolumeAssignRequest{
Count: 1, Count: 1,
@ -364,22 +377,36 @@ func (worker *FileCopyWorker) uploadFileInChunks(ctx context.Context, task FileC
uploadResult, err := operation.Upload(targetUrl, uploadResult, err := operation.Upload(targetUrl,
fileName+"-"+strconv.FormatInt(i+1, 10), fileName+"-"+strconv.FormatInt(i+1, 10),
io.LimitReader(f, chunkSize),
io.NewSectionReader(f, i*chunkSize, chunkSize),
false, "application/octet-stream", nil, assignResult.Auth) false, "application/octet-stream", nil, assignResult.Auth)
if err != nil { if err != nil {
return fmt.Errorf("upload data %v to %s: %v\n", fileName, targetUrl, err)
uploadError = fmt.Errorf("upload data %v to %s: %v\n", fileName, targetUrl, err)
return
} }
if uploadResult.Error != "" { if uploadResult.Error != "" {
return fmt.Errorf("upload %v to %s result: %v\n", fileName, targetUrl, uploadResult.Error)
uploadError = fmt.Errorf("upload %v to %s result: %v\n", fileName, targetUrl, uploadResult.Error)
return
} }
chunks = append(chunks, &filer_pb.FileChunk{
chunksChan <- &filer_pb.FileChunk{
FileId: assignResult.Fid, FileId: assignResult.Fid,
Offset: i * chunkSize, Offset: i * chunkSize,
Size: uint64(uploadResult.Size), Size: uint64(uploadResult.Size),
Mtime: time.Now().UnixNano(), Mtime: time.Now().UnixNano(),
ETag: uploadResult.ETag, ETag: uploadResult.ETag,
})
}
fmt.Printf("uploaded %s-%d to %s [%d,%d)\n", fileName, i+1, targetUrl, i*chunkSize, i*chunkSize+int64(uploadResult.Size)) fmt.Printf("uploaded %s-%d to %s [%d,%d)\n", fileName, i+1, targetUrl, i*chunkSize, i*chunkSize+int64(uploadResult.Size))
}(i)
}
wg.Wait()
close(chunksChan)
if uploadError != nil {
return uploadError
}
var chunks []*filer_pb.FileChunk
for chunk := range chunksChan {
chunks = append(chunks, chunk)
} }
if err := withFilerClient(ctx, worker.filerGrpcAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { if err := withFilerClient(ctx, worker.filerGrpcAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {

Loading…
Cancel
Save