Browse Source

parallelize remote content fetching

pull/2274/head
Chris Lu 4 years ago
parent
commit
f365af81c2
  1. 112
      weed/server/filer_grpc_server_remote.go

112
weed/server/filer_grpc_server_remote.go

@ -78,66 +78,74 @@ func (fs *FilerServer) DownloadToLocal(ctx context.Context, req *filer_pb.Downlo
dest := util.FullPath(remoteStorageMountedLocation.Path).Child(string(util.FullPath(req.Directory).Child(req.Name))[len(localMountedDir):])
var chunks []*filer_pb.FileChunk
var fetchAndWriteErr error
// FIXME limit on parallel
limitedConcurrentExecutor := util.NewLimitedConcurrentExecutor(8)
for offset := int64(0); offset < entry.Remote.RemoteSize; offset += chunkSize {
size := chunkSize
if offset+chunkSize > entry.Remote.RemoteSize {
size = entry.Remote.RemoteSize - offset
}
localOffset := offset
// assign one volume server
assignResult, err := operation.Assign(fs.filer.GetMaster, fs.grpcDialOption, assignRequest, altRequest)
if err != nil {
return resp, err
}
if assignResult.Error != "" {
return resp, fmt.Errorf("assign: %v", assignResult.Error)
}
fileId, parseErr := needle.ParseFileIdFromString(assignResult.Fid)
if assignResult.Error != "" {
return resp, fmt.Errorf("unrecognized file id %s: %v", assignResult.Fid, parseErr)
}
limitedConcurrentExecutor.Execute(func() {
size := chunkSize
if localOffset+chunkSize > entry.Remote.RemoteSize {
size = entry.Remote.RemoteSize - localOffset
}
// tell filer to tell volume server to download into needles
err = operation.WithVolumeServerClient(assignResult.Url, fs.grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
_, fetchAndWriteErr := volumeServerClient.FetchAndWriteNeedle(context.Background(), &volume_server_pb.FetchAndWriteNeedleRequest{
VolumeId: uint32(fileId.VolumeId),
NeedleId: uint64(fileId.Key),
Cookie: uint32(fileId.Cookie),
Offset: offset,
Size: size,
RemoteType: storageConf.Type,
RemoteName: storageConf.Name,
S3AccessKey: storageConf.S3AccessKey,
S3SecretKey: storageConf.S3SecretKey,
S3Region: storageConf.S3Region,
S3Endpoint: storageConf.S3Endpoint,
RemoteBucket: remoteStorageMountedLocation.Bucket,
RemotePath: string(dest),
})
if fetchAndWriteErr != nil {
return fmt.Errorf("volume server %s fetchAndWrite %s: %v", assignResult.Url, dest, fetchAndWriteErr)
// assign one volume server
assignResult, err := operation.Assign(fs.filer.GetMaster, fs.grpcDialOption, assignRequest, altRequest)
if err != nil {
fetchAndWriteErr = err
return
}
if assignResult.Error != "" {
fetchAndWriteErr = fmt.Errorf("assign: %v", assignResult.Error)
return
}
fileId, parseErr := needle.ParseFileIdFromString(assignResult.Fid)
if assignResult.Error != "" {
fetchAndWriteErr = fmt.Errorf("unrecognized file id %s: %v", assignResult.Fid, parseErr)
return
}
return nil
})
if err != nil {
return nil, err
}
// tell filer to tell volume server to download into needles
err = operation.WithVolumeServerClient(assignResult.Url, fs.grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
_, fetchAndWriteErr := volumeServerClient.FetchAndWriteNeedle(context.Background(), &volume_server_pb.FetchAndWriteNeedleRequest{
VolumeId: uint32(fileId.VolumeId),
NeedleId: uint64(fileId.Key),
Cookie: uint32(fileId.Cookie),
Offset: localOffset,
Size: size,
RemoteType: storageConf.Type,
RemoteName: storageConf.Name,
S3AccessKey: storageConf.S3AccessKey,
S3SecretKey: storageConf.S3SecretKey,
S3Region: storageConf.S3Region,
S3Endpoint: storageConf.S3Endpoint,
RemoteBucket: remoteStorageMountedLocation.Bucket,
RemotePath: string(dest),
})
if fetchAndWriteErr != nil {
return fmt.Errorf("volume server %s fetchAndWrite %s: %v", assignResult.Url, dest, fetchAndWriteErr)
}
return nil
})
chunks = append(chunks, &filer_pb.FileChunk{
FileId: assignResult.Fid,
Offset: offset,
Size: uint64(size),
Mtime: time.Now().Unix(),
Fid: &filer_pb.FileId{
VolumeId: uint32(fileId.VolumeId),
FileKey: uint64(fileId.Key),
Cookie: uint32(fileId.Cookie),
},
})
if err != nil {
fetchAndWriteErr = err
return
}
chunks = append(chunks, &filer_pb.FileChunk{
FileId: assignResult.Fid,
Offset: localOffset,
Size: uint64(size),
Mtime: time.Now().Unix(),
Fid: &filer_pb.FileId{
VolumeId: uint32(fileId.VolumeId),
FileKey: uint64(fileId.Key),
Cookie: uint32(fileId.Cookie),
},
})
})
}
garbage := entry.Chunks

Loading…
Cancel
Save