You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

163 lines
5.2 KiB

3 years ago
4 years ago
  1. package weed_server
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/chrislusf/seaweedfs/weed/filer"
  6. "github.com/chrislusf/seaweedfs/weed/operation"
  7. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  8. "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
  9. "github.com/chrislusf/seaweedfs/weed/storage/needle"
  10. "github.com/chrislusf/seaweedfs/weed/util"
  11. "github.com/golang/protobuf/proto"
  12. "strings"
  13. "time"
  14. )
  15. func (fs *FilerServer) DownloadToLocal(ctx context.Context, req *filer_pb.DownloadToLocalRequest) (*filer_pb.DownloadToLocalResponse, error) {
  16. // load all mappings
  17. mappingEntry, err := fs.filer.FindEntry(ctx, util.JoinPath(filer.DirectoryEtcRemote, filer.REMOTE_STORAGE_MOUNT_FILE))
  18. if err != nil {
  19. return nil, err
  20. }
  21. mappings, err := filer.UnmarshalRemoteStorageMappings(mappingEntry.Content)
  22. if err != nil {
  23. return nil, err
  24. }
  25. // find mapping
  26. var remoteStorageMountedLocation *filer_pb.RemoteStorageLocation
  27. var localMountedDir string
  28. for k, loc := range mappings.Mappings {
  29. if strings.HasPrefix(req.Directory, k) {
  30. localMountedDir, remoteStorageMountedLocation = k, loc
  31. }
  32. }
  33. if localMountedDir == "" {
  34. return nil, fmt.Errorf("%s is not mounted", req.Directory)
  35. }
  36. // find storage configuration
  37. storageConfEntry, err := fs.filer.FindEntry(ctx, util.JoinPath(filer.DirectoryEtcRemote, remoteStorageMountedLocation.Name+filer.REMOTE_STORAGE_CONF_SUFFIX))
  38. if err != nil {
  39. return nil, err
  40. }
  41. storageConf := &filer_pb.RemoteConf{}
  42. if unMarshalErr := proto.Unmarshal(storageConfEntry.Content, storageConf); unMarshalErr != nil {
  43. return nil, fmt.Errorf("unmarshal remote storage conf %s/%s: %v", filer.DirectoryEtcRemote, remoteStorageMountedLocation.Name+filer.REMOTE_STORAGE_CONF_SUFFIX, unMarshalErr)
  44. }
  45. // find the entry
  46. entry, err := fs.filer.FindEntry(ctx, util.JoinPath(req.Directory, req.Name))
  47. if err == filer_pb.ErrNotFound {
  48. return nil, err
  49. }
  50. resp := &filer_pb.DownloadToLocalResponse{}
  51. if entry.Remote == nil || entry.Remote.RemoteSize == 0 {
  52. return resp, nil
  53. }
  54. // detect storage option
  55. // replication level is set to "000" to ensure only need to ask one volume server to fetch the data.
  56. so, err := fs.detectStorageOption(req.Directory, "", "000", 0, "", "", "")
  57. if err != nil {
  58. return resp, err
  59. }
  60. assignRequest, altRequest := so.ToAssignRequests(1)
  61. // find a good chunk size
  62. chunkSize := int64(5 * 1024 * 1024)
  63. chunkCount := entry.Remote.RemoteSize/chunkSize + 1
  64. for chunkCount > 1000 && chunkSize < int64(fs.option.MaxMB)*1024*1024/2 {
  65. chunkSize *= 2
  66. chunkCount = entry.Remote.RemoteSize/chunkSize + 1
  67. }
  68. dest := util.FullPath(remoteStorageMountedLocation.Path).Child(string(util.FullPath(req.Directory).Child(req.Name))[len(localMountedDir):])
  69. var chunks []*filer_pb.FileChunk
  70. // FIXME limit on parallel
  71. for offset := int64(0); offset < entry.Remote.RemoteSize; offset += chunkSize {
  72. size := chunkSize
  73. if offset+chunkSize > entry.Remote.RemoteSize {
  74. size = entry.Remote.RemoteSize - offset
  75. }
  76. // assign one volume server
  77. assignResult, err := operation.Assign(fs.filer.GetMaster, fs.grpcDialOption, assignRequest, altRequest)
  78. if err != nil {
  79. return resp, err
  80. }
  81. if assignResult.Error != "" {
  82. return resp, fmt.Errorf("assign: %v", assignResult.Error)
  83. }
  84. fileId, parseErr := needle.ParseFileIdFromString(assignResult.Fid)
  85. if assignResult.Error != "" {
  86. return resp, fmt.Errorf("unrecognized file id %s: %v", assignResult.Fid, parseErr)
  87. }
  88. // tell filer to tell volume server to download into needles
  89. err = operation.WithVolumeServerClient(assignResult.Url, fs.grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
  90. _, fetchAndWriteErr := volumeServerClient.FetchAndWriteNeedle(context.Background(), &volume_server_pb.FetchAndWriteNeedleRequest{
  91. VolumeId: uint32(fileId.VolumeId),
  92. NeedleId: uint64(fileId.Key),
  93. Cookie: uint32(fileId.Cookie),
  94. Offset: offset,
  95. Size: size,
  96. RemoteType: storageConf.Type,
  97. RemoteName: storageConf.Name,
  98. S3AccessKey: storageConf.S3AccessKey,
  99. S3SecretKey: storageConf.S3SecretKey,
  100. S3Region: storageConf.S3Region,
  101. S3Endpoint: storageConf.S3Endpoint,
  102. RemoteBucket: remoteStorageMountedLocation.Bucket,
  103. RemotePath: string(dest),
  104. })
  105. if fetchAndWriteErr != nil {
  106. return fmt.Errorf("volume server %s fetchAndWrite %s: %v", assignResult.Url, dest, fetchAndWriteErr)
  107. }
  108. return nil
  109. })
  110. if err != nil {
  111. return nil, err
  112. }
  113. chunks = append(chunks, &filer_pb.FileChunk{
  114. FileId: assignResult.Fid,
  115. Offset: offset,
  116. Size: uint64(size),
  117. Mtime: time.Now().Unix(),
  118. Fid: &filer_pb.FileId{
  119. VolumeId: uint32(fileId.VolumeId),
  120. FileKey: uint64(fileId.Key),
  121. Cookie: uint32(fileId.Cookie),
  122. },
  123. })
  124. }
  125. garbage := entry.Chunks
  126. newEntry := entry.ShallowClone()
  127. newEntry.Chunks = chunks
  128. newEntry.Remote = proto.Clone(entry.Remote).(*filer_pb.RemoteEntry)
  129. newEntry.Remote.LocalMtime = time.Now().Unix()
  130. // this skips meta data log events
  131. if err := fs.filer.Store.UpdateEntry(context.Background(), newEntry); err != nil {
  132. return nil, err
  133. }
  134. fs.filer.DeleteChunks(garbage)
  135. fs.filer.NotifyUpdateEvent(ctx, entry, newEntry, true, false, nil)
  136. resp.Entry = newEntry.ToProtoEntry()
  137. return resp, nil
  138. }