You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

162 lines
5.1 KiB

3 years ago
  1. package weed_server
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/chrislusf/seaweedfs/weed/filer"
  6. "github.com/chrislusf/seaweedfs/weed/operation"
  7. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  8. "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
  9. "github.com/chrislusf/seaweedfs/weed/storage/needle"
  10. "github.com/chrislusf/seaweedfs/weed/util"
  11. "github.com/golang/protobuf/proto"
  12. "strings"
  13. "time"
  14. )
  15. func (fs *FilerServer) DownloadToLocal(ctx context.Context, req *filer_pb.DownloadToLocalRequest) (*filer_pb.DownloadToLocalResponse, error) {
  16. // load all mappings
  17. mappingEntry, err := fs.filer.FindEntry(ctx, util.JoinPath(filer.DirectoryEtcRemote, filer.REMOTE_STORAGE_MOUNT_FILE))
  18. if err != nil {
  19. return nil, err
  20. }
  21. mappings, err := filer.UnmarshalRemoteStorageMappings(mappingEntry.Content)
  22. if err != nil {
  23. return nil, err
  24. }
  25. // find mapping
  26. var remoteStorageMountedLocation *filer_pb.RemoteStorageLocation
  27. var localMountedDir string
  28. for k, loc := range mappings.Mappings {
  29. if strings.HasPrefix(req.Directory, k) {
  30. localMountedDir, remoteStorageMountedLocation = k, loc
  31. }
  32. }
  33. if localMountedDir == "" {
  34. return nil, fmt.Errorf("%s is not mounted", req.Directory)
  35. }
  36. // find storage configuration
  37. storageConfEntry, err := fs.filer.FindEntry(ctx, util.JoinPath(filer.DirectoryEtcRemote, remoteStorageMountedLocation.Name+filer.REMOTE_STORAGE_CONF_SUFFIX))
  38. if err != nil {
  39. return nil, err
  40. }
  41. storageConf := &filer_pb.RemoteConf{}
  42. if unMarshalErr := proto.Unmarshal(storageConfEntry.Content, storageConf); unMarshalErr != nil {
  43. return nil, fmt.Errorf("unmarshal remote storage conf %s/%s: %v", filer.DirectoryEtcRemote, remoteStorageMountedLocation.Name+filer.REMOTE_STORAGE_CONF_SUFFIX, unMarshalErr)
  44. }
  45. // find the entry
  46. entry, err := fs.filer.FindEntry(ctx, util.JoinPath(req.Directory, req.Name))
  47. if err == filer_pb.ErrNotFound {
  48. return nil, err
  49. }
  50. resp := &filer_pb.DownloadToLocalResponse{}
  51. if entry.Remote == nil || entry.Remote.RemoteSize == 0 {
  52. return resp, nil
  53. }
  54. // detect storage option
  55. so, err := fs.detectStorageOption(req.Directory, "", "000", 0, "", "", "")
  56. if err != nil {
  57. return resp, err
  58. }
  59. assignRequest, altRequest := so.ToAssignRequests(1)
  60. // find a good chunk size
  61. chunkSize := int64(5 * 1024 * 1024)
  62. chunkCount := entry.Remote.RemoteSize/chunkSize + 1
  63. for chunkCount > 1000 && chunkSize < int64(fs.option.MaxMB)*1024*1024/2 {
  64. chunkSize *= 2
  65. chunkCount = entry.Remote.RemoteSize/chunkSize + 1
  66. }
  67. dest := util.FullPath(remoteStorageMountedLocation.Path).Child(string(util.FullPath(req.Directory).Child(req.Name))[len(localMountedDir):])
  68. var chunks []*filer_pb.FileChunk
  69. // FIXME limit on parallel
  70. for offset := int64(0); offset < entry.Remote.RemoteSize; offset += chunkSize {
  71. size := chunkSize
  72. if offset+chunkSize > entry.Remote.RemoteSize {
  73. size = entry.Remote.RemoteSize - offset
  74. }
  75. // assign one volume server
  76. assignResult, err := operation.Assign(fs.filer.GetMaster, fs.grpcDialOption, assignRequest, altRequest)
  77. if err != nil {
  78. return resp, err
  79. }
  80. if assignResult.Error != "" {
  81. return resp, fmt.Errorf("assign: %v", assignResult.Error)
  82. }
  83. fileId, parseErr := needle.ParseFileIdFromString(assignResult.Fid)
  84. if assignResult.Error != "" {
  85. return resp, fmt.Errorf("unrecognized file id %s: %v", assignResult.Fid, parseErr)
  86. }
  87. // tell filer to tell volume server to download into needles
  88. err = operation.WithVolumeServerClient(assignResult.Url, fs.grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
  89. _, fetchAndWriteErr := volumeServerClient.FetchAndWriteNeedle(context.Background(), &volume_server_pb.FetchAndWriteNeedleRequest{
  90. VolumeId: uint32(fileId.VolumeId),
  91. NeedleId: uint64(fileId.Key),
  92. Cookie: uint32(fileId.Cookie),
  93. Offset: offset,
  94. Size: size,
  95. RemoteType: storageConf.Type,
  96. RemoteName: storageConf.Name,
  97. S3AccessKey: storageConf.S3AccessKey,
  98. S3SecretKey: storageConf.S3SecretKey,
  99. S3Region: storageConf.S3Region,
  100. S3Endpoint: storageConf.S3Endpoint,
  101. RemoteBucket: remoteStorageMountedLocation.Bucket,
  102. RemotePath: string(dest),
  103. })
  104. if fetchAndWriteErr != nil {
  105. return fmt.Errorf("volume server %s fetchAndWrite %s: %v", assignResult.Url, dest, fetchAndWriteErr)
  106. }
  107. return nil
  108. })
  109. if err != nil {
  110. return nil, err
  111. }
  112. chunks = append(chunks, &filer_pb.FileChunk{
  113. FileId: assignResult.Fid,
  114. Offset: offset,
  115. Size: uint64(size),
  116. Mtime: time.Now().Unix(),
  117. Fid: &filer_pb.FileId{
  118. VolumeId: uint32(fileId.VolumeId),
  119. FileKey: uint64(fileId.Key),
  120. Cookie: uint32(fileId.Cookie),
  121. },
  122. })
  123. }
  124. garbage := entry.Chunks
  125. newEntry := entry.ShallowClone()
  126. newEntry.Chunks = chunks
  127. newEntry.Remote = proto.Clone(entry.Remote).(*filer_pb.RemoteEntry)
  128. newEntry.Remote.LocalMtime = time.Now().Unix()
  129. // this skips meta data log events
  130. if err := fs.filer.Store.UpdateEntry(context.Background(), newEntry); err != nil {
  131. return nil, err
  132. }
  133. fs.filer.DeleteChunks(garbage)
  134. fs.filer.NotifyUpdateEvent(ctx, entry, newEntry, true, false, nil)
  135. resp.Entry = newEntry.ToProtoEntry()
  136. return resp, nil
  137. }