Browse Source

cloud drive: parallelize remote storage downloading

pull/2283/head
Chris Lu 3 years ago
parent
commit
6a0bb7106b
  1. 5
      weed/server/filer_grpc_server_remote.go
  2. 34
      weed/shell/command_remote_cache.go

5
weed/server/filer_grpc_server_remote.go

@ -12,6 +12,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/util" "github.com/chrislusf/seaweedfs/weed/util"
"github.com/golang/protobuf/proto" "github.com/golang/protobuf/proto"
"strings" "strings"
"sync"
"time" "time"
) )
@ -80,12 +81,15 @@ func (fs *FilerServer) DownloadToLocal(ctx context.Context, req *filer_pb.Downlo
var chunks []*filer_pb.FileChunk var chunks []*filer_pb.FileChunk
var fetchAndWriteErr error var fetchAndWriteErr error
var wg sync.WaitGroup
limitedConcurrentExecutor := util.NewLimitedConcurrentExecutor(8) limitedConcurrentExecutor := util.NewLimitedConcurrentExecutor(8)
for offset := int64(0); offset < entry.Remote.RemoteSize; offset += chunkSize { for offset := int64(0); offset < entry.Remote.RemoteSize; offset += chunkSize {
localOffset := offset localOffset := offset
wg.Add(1)
limitedConcurrentExecutor.Execute(func() { limitedConcurrentExecutor.Execute(func() {
defer wg.Done()
size := chunkSize size := chunkSize
if localOffset+chunkSize > entry.Remote.RemoteSize { if localOffset+chunkSize > entry.Remote.RemoteSize {
size = entry.Remote.RemoteSize - localOffset size = entry.Remote.RemoteSize - localOffset
@ -147,6 +151,7 @@ func (fs *FilerServer) DownloadToLocal(ctx context.Context, req *filer_pb.Downlo
}) })
} }
wg.Wait()
if fetchAndWriteErr != nil { if fetchAndWriteErr != nil {
return nil, fetchAndWriteErr return nil, fetchAndWriteErr
} }

34
weed/shell/command_remote_cache.go

@ -8,6 +8,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/pb/remote_pb" "github.com/chrislusf/seaweedfs/weed/pb/remote_pb"
"github.com/chrislusf/seaweedfs/weed/util" "github.com/chrislusf/seaweedfs/weed/util"
"io" "io"
"sync"
) )
func init() { func init() {
@ -50,6 +51,7 @@ func (c *commandRemoteCache) Do(args []string, commandEnv *CommandEnv, writer io
remoteMountCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) remoteMountCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
dir := remoteMountCommand.String("dir", "", "a directory in filer") dir := remoteMountCommand.String("dir", "", "a directory in filer")
concurrency := remoteMountCommand.Int("concurrent", 8, "concurrent file downloading")
fileFiler := newFileFilter(remoteMountCommand) fileFiler := newFileFilter(remoteMountCommand)
if err = remoteMountCommand.Parse(args); err != nil { if err = remoteMountCommand.Parse(args); err != nil {
@ -63,7 +65,7 @@ func (c *commandRemoteCache) Do(args []string, commandEnv *CommandEnv, writer io
} }
// pull content from remote // pull content from remote
if err = c.cacheContentData(commandEnv, writer, util.FullPath(localMountedDir), remoteStorageMountedLocation, util.FullPath(*dir), fileFiler, remoteStorageConf); err != nil {
if err = c.cacheContentData(commandEnv, writer, util.FullPath(localMountedDir), remoteStorageMountedLocation, util.FullPath(*dir), fileFiler, remoteStorageConf, *concurrency); err != nil {
return fmt.Errorf("cache content data: %v", err) return fmt.Errorf("cache content data: %v", err)
} }
@ -117,9 +119,13 @@ func mayHaveCachedToLocal(entry *filer_pb.Entry) bool {
return false return false
} }
func (c *commandRemoteCache) cacheContentData(commandEnv *CommandEnv, writer io.Writer, localMountedDir util.FullPath, remoteMountedLocation *remote_pb.RemoteStorageLocation, dirToCache util.FullPath, fileFilter *FileFilter, remoteConf *remote_pb.RemoteConf) error {
func (c *commandRemoteCache) cacheContentData(commandEnv *CommandEnv, writer io.Writer, localMountedDir util.FullPath, remoteMountedLocation *remote_pb.RemoteStorageLocation, dirToCache util.FullPath, fileFilter *FileFilter, remoteConf *remote_pb.RemoteConf, concurrency int) error {
return recursivelyTraverseDirectory(commandEnv, dirToCache, func(dir util.FullPath, entry *filer_pb.Entry) bool {
var wg sync.WaitGroup
limitedConcurrentExecutor := util.NewLimitedConcurrentExecutor(concurrency)
var executionErr error
traverseErr := recursivelyTraverseDirectory(commandEnv, dirToCache, func(dir util.FullPath, entry *filer_pb.Entry) bool {
if !shouldCacheToLocal(entry) { if !shouldCacheToLocal(entry) {
return true // true means recursive traversal should continue return true // true means recursive traversal should continue
} }
@ -128,16 +134,32 @@ func (c *commandRemoteCache) cacheContentData(commandEnv *CommandEnv, writer io.
return true return true
} }
fmt.Fprintf(writer, "Cache %+v ... ", dir.Child(entry.Name))
wg.Add(1)
limitedConcurrentExecutor.Execute(func() {
defer wg.Done()
fmt.Fprintf(writer, "Cache %+v ...\n", dir.Child(entry.Name))
remoteLocation := filer.MapFullPathToRemoteStorageLocation(localMountedDir, remoteMountedLocation, dir.Child(entry.Name)) remoteLocation := filer.MapFullPathToRemoteStorageLocation(localMountedDir, remoteMountedLocation, dir.Child(entry.Name))
if err := filer.DownloadToLocal(commandEnv, remoteConf, remoteLocation, dir, entry); err != nil { if err := filer.DownloadToLocal(commandEnv, remoteConf, remoteLocation, dir, entry); err != nil {
fmt.Fprintf(writer, "DownloadToLocal %+v: %v\n", remoteLocation, err) fmt.Fprintf(writer, "DownloadToLocal %+v: %v\n", remoteLocation, err)
return false
if executionErr == nil {
executionErr = fmt.Errorf("DownloadToLocal %+v: %v\n", remoteLocation, err)
} }
fmt.Fprintf(writer, "Done\n")
return
}
fmt.Fprintf(writer, "Cache %+v Done\n", dir.Child(entry.Name))
})
return true return true
}) })
wg.Wait()
if traverseErr != nil {
return traverseErr
}
if executionErr != nil {
return executionErr
}
return nil
} }
Loading…
Cancel
Save