You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

188 lines
5.8 KiB

3 years ago
3 years ago
  1. package weed_server
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/chrislusf/seaweedfs/weed/filer"
  6. "github.com/chrislusf/seaweedfs/weed/operation"
  7. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  8. "github.com/chrislusf/seaweedfs/weed/pb/remote_pb"
  9. "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
  10. "github.com/chrislusf/seaweedfs/weed/storage/needle"
  11. "github.com/chrislusf/seaweedfs/weed/util"
  12. "github.com/golang/protobuf/proto"
  13. "strings"
  14. "sync"
  15. "time"
  16. )
  17. func (fs *FilerServer) DownloadToLocal(ctx context.Context, req *filer_pb.DownloadToLocalRequest) (*filer_pb.DownloadToLocalResponse, error) {
  18. // load all mappings
  19. mappingEntry, err := fs.filer.FindEntry(ctx, util.JoinPath(filer.DirectoryEtcRemote, filer.REMOTE_STORAGE_MOUNT_FILE))
  20. if err != nil {
  21. return nil, err
  22. }
  23. mappings, err := filer.UnmarshalRemoteStorageMappings(mappingEntry.Content)
  24. if err != nil {
  25. return nil, err
  26. }
  27. // find mapping
  28. var remoteStorageMountedLocation *remote_pb.RemoteStorageLocation
  29. var localMountedDir string
  30. for k, loc := range mappings.Mappings {
  31. if strings.HasPrefix(req.Directory, k) {
  32. localMountedDir, remoteStorageMountedLocation = k, loc
  33. }
  34. }
  35. if localMountedDir == "" {
  36. return nil, fmt.Errorf("%s is not mounted", req.Directory)
  37. }
  38. // find storage configuration
  39. storageConfEntry, err := fs.filer.FindEntry(ctx, util.JoinPath(filer.DirectoryEtcRemote, remoteStorageMountedLocation.Name+filer.REMOTE_STORAGE_CONF_SUFFIX))
  40. if err != nil {
  41. return nil, err
  42. }
  43. storageConf := &remote_pb.RemoteConf{}
  44. if unMarshalErr := proto.Unmarshal(storageConfEntry.Content, storageConf); unMarshalErr != nil {
  45. return nil, fmt.Errorf("unmarshal remote storage conf %s/%s: %v", filer.DirectoryEtcRemote, remoteStorageMountedLocation.Name+filer.REMOTE_STORAGE_CONF_SUFFIX, unMarshalErr)
  46. }
  47. // find the entry
  48. entry, err := fs.filer.FindEntry(ctx, util.JoinPath(req.Directory, req.Name))
  49. if err == filer_pb.ErrNotFound {
  50. return nil, err
  51. }
  52. resp := &filer_pb.DownloadToLocalResponse{}
  53. if entry.Remote == nil || entry.Remote.RemoteSize == 0 {
  54. return resp, nil
  55. }
  56. // detect storage option
  57. so, err := fs.detectStorageOption(req.Directory, "", "", 0, "", "", "")
  58. if err != nil {
  59. return resp, err
  60. }
  61. assignRequest, altRequest := so.ToAssignRequests(1)
  62. // find a good chunk size
  63. chunkSize := int64(5 * 1024 * 1024)
  64. chunkCount := entry.Remote.RemoteSize/chunkSize + 1
  65. for chunkCount > 1000 && chunkSize < int64(fs.option.MaxMB)*1024*1024/2 {
  66. chunkSize *= 2
  67. chunkCount = entry.Remote.RemoteSize/chunkSize + 1
  68. }
  69. dest := util.FullPath(remoteStorageMountedLocation.Path).Child(string(util.FullPath(req.Directory).Child(req.Name))[len(localMountedDir):])
  70. var chunks []*filer_pb.FileChunk
  71. var fetchAndWriteErr error
  72. var wg sync.WaitGroup
  73. limitedConcurrentExecutor := util.NewLimitedConcurrentExecutor(8)
  74. for offset := int64(0); offset < entry.Remote.RemoteSize; offset += chunkSize {
  75. localOffset := offset
  76. wg.Add(1)
  77. limitedConcurrentExecutor.Execute(func() {
  78. defer wg.Done()
  79. size := chunkSize
  80. if localOffset+chunkSize > entry.Remote.RemoteSize {
  81. size = entry.Remote.RemoteSize - localOffset
  82. }
  83. // assign one volume server
  84. assignResult, err := operation.Assign(fs.filer.GetMaster, fs.grpcDialOption, assignRequest, altRequest)
  85. if err != nil {
  86. fetchAndWriteErr = err
  87. return
  88. }
  89. if assignResult.Error != "" {
  90. fetchAndWriteErr = fmt.Errorf("assign: %v", assignResult.Error)
  91. return
  92. }
  93. fileId, parseErr := needle.ParseFileIdFromString(assignResult.Fid)
  94. if assignResult.Error != "" {
  95. fetchAndWriteErr = fmt.Errorf("unrecognized file id %s: %v", assignResult.Fid, parseErr)
  96. return
  97. }
  98. var replicas []*volume_server_pb.FetchAndWriteNeedleRequest_Replica
  99. for _, r := range assignResult.Replicas {
  100. replicas = append(replicas, &volume_server_pb.FetchAndWriteNeedleRequest_Replica{
  101. Url: r.Url,
  102. PublicUrl: r.PublicUrl,
  103. })
  104. }
  105. // tell filer to tell volume server to download into needles
  106. err = operation.WithVolumeServerClient(assignResult.Url, fs.grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
  107. _, fetchAndWriteErr := volumeServerClient.FetchAndWriteNeedle(context.Background(), &volume_server_pb.FetchAndWriteNeedleRequest{
  108. VolumeId: uint32(fileId.VolumeId),
  109. NeedleId: uint64(fileId.Key),
  110. Cookie: uint32(fileId.Cookie),
  111. Offset: localOffset,
  112. Size: size,
  113. Replicas: replicas,
  114. Auth: string(assignResult.Auth),
  115. RemoteConf: storageConf,
  116. RemoteLocation: &remote_pb.RemoteStorageLocation{
  117. Name: remoteStorageMountedLocation.Name,
  118. Bucket: remoteStorageMountedLocation.Bucket,
  119. Path: string(dest),
  120. },
  121. })
  122. if fetchAndWriteErr != nil {
  123. return fmt.Errorf("volume server %s fetchAndWrite %s: %v", assignResult.Url, dest, fetchAndWriteErr)
  124. }
  125. return nil
  126. })
  127. if err != nil && fetchAndWriteErr == nil {
  128. fetchAndWriteErr = err
  129. return
  130. }
  131. chunks = append(chunks, &filer_pb.FileChunk{
  132. FileId: assignResult.Fid,
  133. Offset: localOffset,
  134. Size: uint64(size),
  135. Mtime: time.Now().Unix(),
  136. Fid: &filer_pb.FileId{
  137. VolumeId: uint32(fileId.VolumeId),
  138. FileKey: uint64(fileId.Key),
  139. Cookie: uint32(fileId.Cookie),
  140. },
  141. })
  142. })
  143. }
  144. wg.Wait()
  145. if fetchAndWriteErr != nil {
  146. return nil, fetchAndWriteErr
  147. }
  148. garbage := entry.Chunks
  149. newEntry := entry.ShallowClone()
  150. newEntry.Chunks = chunks
  151. newEntry.Remote = proto.Clone(entry.Remote).(*filer_pb.RemoteEntry)
  152. newEntry.Remote.LastLocalSyncTsNs = time.Now().UnixNano()
  153. // this skips meta data log events
  154. if err := fs.filer.Store.UpdateEntry(context.Background(), newEntry); err != nil {
  155. return nil, err
  156. }
  157. fs.filer.DeleteChunks(garbage)
  158. fs.filer.NotifyUpdateEvent(ctx, entry, newEntry, true, false, nil)
  159. resp.Entry = newEntry.ToProtoEntry()
  160. return resp, nil
  161. }