diff --git a/weed/server/filer_grpc_server_remote.go b/weed/server/filer_grpc_server_remote.go index 8144d6a90..54e9445ce 100644 --- a/weed/server/filer_grpc_server_remote.go +++ b/weed/server/filer_grpc_server_remote.go @@ -78,66 +78,74 @@ func (fs *FilerServer) DownloadToLocal(ctx context.Context, req *filer_pb.Downlo dest := util.FullPath(remoteStorageMountedLocation.Path).Child(string(util.FullPath(req.Directory).Child(req.Name))[len(localMountedDir):]) var chunks []*filer_pb.FileChunk + var fetchAndWriteErr error - // FIXME limit on parallel + limitedConcurrentExecutor := util.NewLimitedConcurrentExecutor(8) for offset := int64(0); offset < entry.Remote.RemoteSize; offset += chunkSize { - size := chunkSize - if offset+chunkSize > entry.Remote.RemoteSize { - size = entry.Remote.RemoteSize - offset - } + localOffset := offset - // assign one volume server - assignResult, err := operation.Assign(fs.filer.GetMaster, fs.grpcDialOption, assignRequest, altRequest) - if err != nil { - return resp, err - } - if assignResult.Error != "" { - return resp, fmt.Errorf("assign: %v", assignResult.Error) - } - fileId, parseErr := needle.ParseFileIdFromString(assignResult.Fid) - if assignResult.Error != "" { - return resp, fmt.Errorf("unrecognized file id %s: %v", assignResult.Fid, parseErr) - } + limitedConcurrentExecutor.Execute(func() { + size := chunkSize + if localOffset+chunkSize > entry.Remote.RemoteSize { + size = entry.Remote.RemoteSize - localOffset + } - // tell filer to tell volume server to download into needles - err = operation.WithVolumeServerClient(assignResult.Url, fs.grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { - _, fetchAndWriteErr := volumeServerClient.FetchAndWriteNeedle(context.Background(), &volume_server_pb.FetchAndWriteNeedleRequest{ - VolumeId: uint32(fileId.VolumeId), - NeedleId: uint64(fileId.Key), - Cookie: uint32(fileId.Cookie), - Offset: offset, - Size: size, - RemoteType: storageConf.Type, - RemoteName: storageConf.Name, - S3AccessKey: storageConf.S3AccessKey, - S3SecretKey: storageConf.S3SecretKey, - S3Region: storageConf.S3Region, - S3Endpoint: storageConf.S3Endpoint, - RemoteBucket: remoteStorageMountedLocation.Bucket, - RemotePath: string(dest), - }) - if fetchAndWriteErr != nil { - return fmt.Errorf("volume server %s fetchAndWrite %s: %v", assignResult.Url, dest, fetchAndWriteErr) + // assign one volume server + assignResult, err := operation.Assign(fs.filer.GetMaster, fs.grpcDialOption, assignRequest, altRequest) + if err != nil { + fetchAndWriteErr = err + return + } + if assignResult.Error != "" { + fetchAndWriteErr = fmt.Errorf("assign: %v", assignResult.Error) + return + } + fileId, parseErr := needle.ParseFileIdFromString(assignResult.Fid) + if assignResult.Error != "" { + fetchAndWriteErr = fmt.Errorf("unrecognized file id %s: %v", assignResult.Fid, parseErr) + return } - return nil - }) - if err != nil { - return nil, err - } + // tell filer to tell volume server to download into needles + err = operation.WithVolumeServerClient(assignResult.Url, fs.grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + _, fetchAndWriteErr := volumeServerClient.FetchAndWriteNeedle(context.Background(), &volume_server_pb.FetchAndWriteNeedleRequest{ + VolumeId: uint32(fileId.VolumeId), + NeedleId: uint64(fileId.Key), + Cookie: uint32(fileId.Cookie), + Offset: localOffset, + Size: size, + RemoteType: storageConf.Type, + RemoteName: storageConf.Name, + S3AccessKey: storageConf.S3AccessKey, + S3SecretKey: storageConf.S3SecretKey, + S3Region: storageConf.S3Region, + S3Endpoint: storageConf.S3Endpoint, + RemoteBucket: remoteStorageMountedLocation.Bucket, + RemotePath: string(dest), + }) + if fetchAndWriteErr != nil { + return fmt.Errorf("volume server %s fetchAndWrite %s: %v", assignResult.Url, dest, fetchAndWriteErr) + } + return nil + }) - chunks = append(chunks, &filer_pb.FileChunk{ - FileId: assignResult.Fid, - Offset: offset, - Size: uint64(size), - Mtime: time.Now().Unix(), - Fid: &filer_pb.FileId{ - VolumeId: uint32(fileId.VolumeId), - FileKey: uint64(fileId.Key), - Cookie: uint32(fileId.Cookie), - }, - }) + if err != nil { + fetchAndWriteErr = err + return + } + chunks = append(chunks, &filer_pb.FileChunk{ + FileId: assignResult.Fid, + Offset: localOffset, + Size: uint64(size), + Mtime: time.Now().Unix(), + Fid: &filer_pb.FileId{ + VolumeId: uint32(fileId.VolumeId), + FileKey: uint64(fileId.Key), + Cookie: uint32(fileId.Cookie), + }, + }) + }) } garbage := entry.Chunks