You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

175 lines
5.5 KiB

3 years ago
3 years ago
  1. package weed_server
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/chrislusf/seaweedfs/weed/filer"
  6. "github.com/chrislusf/seaweedfs/weed/operation"
  7. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  8. "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
  9. "github.com/chrislusf/seaweedfs/weed/storage/needle"
  10. "github.com/chrislusf/seaweedfs/weed/util"
  11. "github.com/golang/protobuf/proto"
  12. "strings"
  13. "time"
  14. )
  15. func (fs *FilerServer) DownloadToLocal(ctx context.Context, req *filer_pb.DownloadToLocalRequest) (*filer_pb.DownloadToLocalResponse, error) {
  16. // load all mappings
  17. mappingEntry, err := fs.filer.FindEntry(ctx, util.JoinPath(filer.DirectoryEtcRemote, filer.REMOTE_STORAGE_MOUNT_FILE))
  18. if err != nil {
  19. return nil, err
  20. }
  21. mappings, err := filer.UnmarshalRemoteStorageMappings(mappingEntry.Content)
  22. if err != nil {
  23. return nil, err
  24. }
  25. // find mapping
  26. var remoteStorageMountedLocation *filer_pb.RemoteStorageLocation
  27. var localMountedDir string
  28. for k, loc := range mappings.Mappings {
  29. if strings.HasPrefix(req.Directory, k) {
  30. localMountedDir, remoteStorageMountedLocation = k, loc
  31. }
  32. }
  33. if localMountedDir == "" {
  34. return nil, fmt.Errorf("%s is not mounted", req.Directory)
  35. }
  36. // find storage configuration
  37. storageConfEntry, err := fs.filer.FindEntry(ctx, util.JoinPath(filer.DirectoryEtcRemote, remoteStorageMountedLocation.Name+filer.REMOTE_STORAGE_CONF_SUFFIX))
  38. if err != nil {
  39. return nil, err
  40. }
  41. storageConf := &filer_pb.RemoteConf{}
  42. if unMarshalErr := proto.Unmarshal(storageConfEntry.Content, storageConf); unMarshalErr != nil {
  43. return nil, fmt.Errorf("unmarshal remote storage conf %s/%s: %v", filer.DirectoryEtcRemote, remoteStorageMountedLocation.Name+filer.REMOTE_STORAGE_CONF_SUFFIX, unMarshalErr)
  44. }
  45. // find the entry
  46. entry, err := fs.filer.FindEntry(ctx, util.JoinPath(req.Directory, req.Name))
  47. if err == filer_pb.ErrNotFound {
  48. return nil, err
  49. }
  50. resp := &filer_pb.DownloadToLocalResponse{}
  51. if entry.Remote == nil || entry.Remote.RemoteSize == 0 {
  52. return resp, nil
  53. }
  54. // detect storage option
  55. // replication level is set to "000" to ensure only need to ask one volume server to fetch the data.
  56. so, err := fs.detectStorageOption(req.Directory, "", "000", 0, "", "", "")
  57. if err != nil {
  58. return resp, err
  59. }
  60. assignRequest, altRequest := so.ToAssignRequests(1)
  61. // find a good chunk size
  62. chunkSize := int64(5 * 1024 * 1024)
  63. chunkCount := entry.Remote.RemoteSize/chunkSize + 1
  64. for chunkCount > 1000 && chunkSize < int64(fs.option.MaxMB)*1024*1024/2 {
  65. chunkSize *= 2
  66. chunkCount = entry.Remote.RemoteSize/chunkSize + 1
  67. }
  68. dest := util.FullPath(remoteStorageMountedLocation.Path).Child(string(util.FullPath(req.Directory).Child(req.Name))[len(localMountedDir):])
  69. var chunks []*filer_pb.FileChunk
  70. var fetchAndWriteErr error
  71. limitedConcurrentExecutor := util.NewLimitedConcurrentExecutor(8)
  72. for offset := int64(0); offset < entry.Remote.RemoteSize; offset += chunkSize {
  73. localOffset := offset
  74. limitedConcurrentExecutor.Execute(func() {
  75. size := chunkSize
  76. if localOffset+chunkSize > entry.Remote.RemoteSize {
  77. size = entry.Remote.RemoteSize - localOffset
  78. }
  79. // assign one volume server
  80. assignResult, err := operation.Assign(fs.filer.GetMaster, fs.grpcDialOption, assignRequest, altRequest)
  81. if err != nil {
  82. fetchAndWriteErr = err
  83. return
  84. }
  85. if assignResult.Error != "" {
  86. fetchAndWriteErr = fmt.Errorf("assign: %v", assignResult.Error)
  87. return
  88. }
  89. fileId, parseErr := needle.ParseFileIdFromString(assignResult.Fid)
  90. if assignResult.Error != "" {
  91. fetchAndWriteErr = fmt.Errorf("unrecognized file id %s: %v", assignResult.Fid, parseErr)
  92. return
  93. }
  94. // tell filer to tell volume server to download into needles
  95. err = operation.WithVolumeServerClient(assignResult.Url, fs.grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
  96. _, fetchAndWriteErr := volumeServerClient.FetchAndWriteNeedle(context.Background(), &volume_server_pb.FetchAndWriteNeedleRequest{
  97. VolumeId: uint32(fileId.VolumeId),
  98. NeedleId: uint64(fileId.Key),
  99. Cookie: uint32(fileId.Cookie),
  100. Offset: localOffset,
  101. Size: size,
  102. RemoteType: storageConf.Type,
  103. RemoteName: storageConf.Name,
  104. S3AccessKey: storageConf.S3AccessKey,
  105. S3SecretKey: storageConf.S3SecretKey,
  106. S3Region: storageConf.S3Region,
  107. S3Endpoint: storageConf.S3Endpoint,
  108. RemoteBucket: remoteStorageMountedLocation.Bucket,
  109. RemotePath: string(dest),
  110. })
  111. if fetchAndWriteErr != nil {
  112. return fmt.Errorf("volume server %s fetchAndWrite %s: %v", assignResult.Url, dest, fetchAndWriteErr)
  113. }
  114. return nil
  115. })
  116. if err != nil {
  117. fetchAndWriteErr = err
  118. return
  119. }
  120. chunks = append(chunks, &filer_pb.FileChunk{
  121. FileId: assignResult.Fid,
  122. Offset: localOffset,
  123. Size: uint64(size),
  124. Mtime: time.Now().Unix(),
  125. Fid: &filer_pb.FileId{
  126. VolumeId: uint32(fileId.VolumeId),
  127. FileKey: uint64(fileId.Key),
  128. Cookie: uint32(fileId.Cookie),
  129. },
  130. })
  131. })
  132. }
  133. if fetchAndWriteErr != nil {
  134. return nil, fetchAndWriteErr
  135. }
  136. garbage := entry.Chunks
  137. newEntry := entry.ShallowClone()
  138. newEntry.Chunks = chunks
  139. newEntry.Remote = proto.Clone(entry.Remote).(*filer_pb.RemoteEntry)
  140. newEntry.Remote.LastLocalSyncTsNs = time.Now().UnixNano()
  141. // this skips meta data log events
  142. if err := fs.filer.Store.UpdateEntry(context.Background(), newEntry); err != nil {
  143. return nil, err
  144. }
  145. fs.filer.DeleteChunks(garbage)
  146. fs.filer.NotifyUpdateEvent(ctx, entry, newEntry, true, false, nil)
  147. resp.Entry = newEntry.ToProtoEntry()
  148. return resp, nil
  149. }