You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

161 lines
5.0 KiB

  1. package weed_server
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/chrislusf/seaweedfs/weed/filer"
  6. "github.com/chrislusf/seaweedfs/weed/operation"
  7. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  8. "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
  9. "github.com/chrislusf/seaweedfs/weed/storage/needle"
  10. "github.com/chrislusf/seaweedfs/weed/util"
  11. "github.com/golang/protobuf/proto"
  12. "strings"
  13. "time"
  14. )
  15. func (fs *FilerServer) DownloadToLocal(ctx context.Context, req *filer_pb.DownloadToLocalRequest) (*filer_pb.DownloadToLocalResponse, error) {
  16. // load all mappings
  17. mappingEntry, err := fs.filer.FindEntry(ctx, util.JoinPath(filer.DirectoryEtcRemote, filer.REMOTE_STORAGE_MOUNT_FILE))
  18. if err != nil {
  19. return nil, err
  20. }
  21. mappings, err := filer.UnmarshalRemoteStorageMappings(mappingEntry.Content)
  22. if err != nil {
  23. return nil, err
  24. }
  25. // find mapping
  26. var remoteStorageMountedLocation *filer_pb.RemoteStorageLocation
  27. var localMountedDir string
  28. for k, loc := range mappings.Mappings {
  29. if strings.HasPrefix(req.Directory, k) {
  30. localMountedDir, remoteStorageMountedLocation = k, loc
  31. }
  32. }
  33. if localMountedDir == "" {
  34. return nil, fmt.Errorf("%s is not mounted", req.Directory)
  35. }
  36. // find storage configuration
  37. storageConfEntry, err := fs.filer.FindEntry(ctx, util.JoinPath(filer.DirectoryEtcRemote, remoteStorageMountedLocation.Name+filer.REMOTE_STORAGE_CONF_SUFFIX))
  38. if err != nil {
  39. return nil, err
  40. }
  41. storageConf := &filer_pb.RemoteConf{}
  42. if unMarshalErr := proto.Unmarshal(storageConfEntry.Content, storageConf); unMarshalErr != nil {
  43. return nil, fmt.Errorf("unmarshal remote storage conf %s/%s: %v", filer.DirectoryEtcRemote, remoteStorageMountedLocation.Name+filer.REMOTE_STORAGE_CONF_SUFFIX, unMarshalErr)
  44. }
  45. // find the entry
  46. entry, err := fs.filer.FindEntry(ctx, util.JoinPath(req.Directory, req.Name))
  47. if err == filer_pb.ErrNotFound {
  48. return nil, err
  49. }
  50. resp := &filer_pb.DownloadToLocalResponse{}
  51. if entry.Remote == nil || entry.Remote.RemoteSize == 0 {
  52. return resp, nil
  53. }
  54. // detect storage option
  55. so, err := fs.detectStorageOption(req.Directory, "", "000", 0, "", "", "")
  56. if err != nil {
  57. return resp, err
  58. }
  59. assignRequest, altRequest := so.ToAssignRequests(1)
  60. // find a good chunk size
  61. chunkSize := int64(5 * 1024 * 1024)
  62. chunkCount := entry.Remote.RemoteSize/chunkSize + 1
  63. for chunkCount > 1000 {
  64. chunkSize *= 2
  65. chunkCount = entry.Remote.RemoteSize/chunkSize + 1
  66. }
  67. dest := util.FullPath(remoteStorageMountedLocation.Path).Child(string(util.FullPath(req.Directory).Child(req.Name))[len(localMountedDir):])
  68. var chunks []*filer_pb.FileChunk
  69. for offset := int64(0); offset < entry.Remote.RemoteSize; offset += chunkSize {
  70. size := chunkSize
  71. if offset+chunkSize > entry.Remote.RemoteSize {
  72. size = entry.Remote.RemoteSize - offset
  73. }
  74. // assign one volume server
  75. assignResult, err := operation.Assign(fs.filer.GetMaster, fs.grpcDialOption, assignRequest, altRequest)
  76. if err != nil {
  77. return resp, err
  78. }
  79. if assignResult.Error != "" {
  80. return resp, fmt.Errorf("assign: %v", assignResult.Error)
  81. }
  82. fileId, parseErr := needle.ParseFileIdFromString(assignResult.Fid)
  83. if assignResult.Error != "" {
  84. return resp, fmt.Errorf("unrecognized file id %s: %v", assignResult.Fid, parseErr)
  85. }
  86. // tell filer to tell volume server to download into needles
  87. err = operation.WithVolumeServerClient(assignResult.Url, fs.grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
  88. _, fetchAndWriteErr := volumeServerClient.FetchAndWriteNeedle(context.Background(), &volume_server_pb.FetchAndWriteNeedleRequest{
  89. VolumeId: uint32(fileId.VolumeId),
  90. NeedleId: uint64(fileId.Key),
  91. Cookie: uint32(fileId.Cookie),
  92. Offset: offset,
  93. Size: size,
  94. RemoteType: storageConf.Type,
  95. RemoteName: storageConf.Name,
  96. S3AccessKey: storageConf.S3AccessKey,
  97. S3SecretKey: storageConf.S3SecretKey,
  98. S3Region: storageConf.S3Region,
  99. S3Endpoint: storageConf.S3Endpoint,
  100. RemoteBucket: remoteStorageMountedLocation.Bucket,
  101. RemotePath: string(dest),
  102. })
  103. if fetchAndWriteErr != nil {
  104. return fmt.Errorf("volume server %s fetchAndWrite %s/$s: %v", assignResult.Url, req.Directory, req.Name, fetchAndWriteErr)
  105. }
  106. return nil
  107. })
  108. if err != nil {
  109. return nil, err
  110. }
  111. chunks = append(chunks, &filer_pb.FileChunk{
  112. FileId: assignResult.Fid,
  113. Offset: offset,
  114. Size: uint64(size),
  115. Mtime: time.Now().Unix(),
  116. Fid: &filer_pb.FileId{
  117. VolumeId: uint32(fileId.VolumeId),
  118. FileKey: uint64(fileId.Key),
  119. Cookie: uint32(fileId.Cookie),
  120. },
  121. })
  122. }
  123. garbage := entry.Chunks
  124. newEntry := entry.ShallowClone()
  125. newEntry.Chunks = chunks
  126. newEntry.Remote = proto.Clone(entry.Remote).(*filer_pb.RemoteEntry)
  127. newEntry.Remote.LocalMtime = time.Now().Unix()
  128. // this skips meta data log events
  129. if err := fs.filer.Store.UpdateEntry(context.Background(), newEntry); err != nil {
  130. return nil, err
  131. }
  132. fs.filer.DeleteChunks(garbage)
  133. fs.filer.NotifyUpdateEvent(ctx, entry, newEntry, true, false, nil)
  134. resp.Entry = entry.ToProtoEntry()
  135. return resp, nil
  136. }