198 lines
6.2 KiB

3 years ago
3 years ago
3 years ago
7 months ago
  1. package weed_server
  2. import (
  3. "context"
  4. "fmt"
  5. "strings"
  6. "sync"
  7. "time"
  8. "github.com/seaweedfs/seaweedfs/weed/filer"
  9. "github.com/seaweedfs/seaweedfs/weed/operation"
  10. "github.com/seaweedfs/seaweedfs/weed/pb"
  11. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  12. "github.com/seaweedfs/seaweedfs/weed/pb/remote_pb"
  13. "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
  14. "github.com/seaweedfs/seaweedfs/weed/storage/needle"
  15. "github.com/seaweedfs/seaweedfs/weed/util"
  16. "google.golang.org/protobuf/proto"
  17. )
  18. func (fs *FilerServer) CacheRemoteObjectToLocalCluster(ctx context.Context, req *filer_pb.CacheRemoteObjectToLocalClusterRequest) (*filer_pb.CacheRemoteObjectToLocalClusterResponse, error) {
  19. // load all mappings
  20. mappingEntry, err := fs.filer.FindEntry(ctx, util.JoinPath(filer.DirectoryEtcRemote, filer.REMOTE_STORAGE_MOUNT_FILE))
  21. if err != nil {
  22. return nil, err
  23. }
  24. mappings, err := filer.UnmarshalRemoteStorageMappings(mappingEntry.Content)
  25. if err != nil {
  26. return nil, err
  27. }
  28. // find mapping
  29. var remoteStorageMountedLocation *remote_pb.RemoteStorageLocation
  30. var localMountedDir string
  31. for k, loc := range mappings.Mappings {
  32. if strings.HasPrefix(req.Directory, k) {
  33. localMountedDir, remoteStorageMountedLocation = k, loc
  34. }
  35. }
  36. if localMountedDir == "" {
  37. return nil, fmt.Errorf("%s is not mounted", req.Directory)
  38. }
  39. // find storage configuration
  40. storageConfEntry, err := fs.filer.FindEntry(ctx, util.JoinPath(filer.DirectoryEtcRemote, remoteStorageMountedLocation.Name+filer.REMOTE_STORAGE_CONF_SUFFIX))
  41. if err != nil {
  42. return nil, err
  43. }
  44. storageConf := &remote_pb.RemoteConf{}
  45. if unMarshalErr := proto.Unmarshal(storageConfEntry.Content, storageConf); unMarshalErr != nil {
  46. return nil, fmt.Errorf("unmarshal remote storage conf %s/%s: %v", filer.DirectoryEtcRemote, remoteStorageMountedLocation.Name+filer.REMOTE_STORAGE_CONF_SUFFIX, unMarshalErr)
  47. }
  48. // find the entry
  49. entry, err := fs.filer.FindEntry(ctx, util.JoinPath(req.Directory, req.Name))
  50. if err == filer_pb.ErrNotFound {
  51. return nil, err
  52. }
  53. resp := &filer_pb.CacheRemoteObjectToLocalClusterResponse{}
  54. if entry.Remote == nil || entry.Remote.RemoteSize == 0 {
  55. return resp, nil
  56. }
  57. // detect storage option
  58. so, err := fs.detectStorageOption(req.Directory, "", "", 0, "", "", "", "")
  59. if err != nil {
  60. return resp, err
  61. }
  62. assignRequest, altRequest := so.ToAssignRequests(1)
  63. // find a good chunk size
  64. chunkSize := int64(5 * 1024 * 1024)
  65. chunkCount := entry.Remote.RemoteSize/chunkSize + 1
  66. for chunkCount > 1000 && chunkSize < int64(fs.option.MaxMB)*1024*1024/2 {
  67. chunkSize *= 2
  68. chunkCount = entry.Remote.RemoteSize/chunkSize + 1
  69. }
  70. dest := util.FullPath(remoteStorageMountedLocation.Path).Child(string(util.FullPath(req.Directory).Child(req.Name))[len(localMountedDir):])
  71. var chunks []*filer_pb.FileChunk
  72. var fetchAndWriteErr error
  73. var wg sync.WaitGroup
  74. limitedConcurrentExecutor := util.NewLimitedConcurrentExecutor(8)
  75. for offset := int64(0); offset < entry.Remote.RemoteSize; offset += chunkSize {
  76. localOffset := offset
  77. wg.Add(1)
  78. limitedConcurrentExecutor.Execute(func() {
  79. defer wg.Done()
  80. size := chunkSize
  81. if localOffset+chunkSize > entry.Remote.RemoteSize {
  82. size = entry.Remote.RemoteSize - localOffset
  83. }
  84. // assign one volume server
  85. assignResult, err := operation.Assign(fs.filer.GetMaster, fs.grpcDialOption, assignRequest, altRequest)
  86. if err != nil {
  87. fetchAndWriteErr = err
  88. return
  89. }
  90. if assignResult.Error != "" {
  91. fetchAndWriteErr = fmt.Errorf("assign: %v", assignResult.Error)
  92. return
  93. }
  94. fileId, parseErr := needle.ParseFileIdFromString(assignResult.Fid)
  95. if assignResult.Error != "" {
  96. fetchAndWriteErr = fmt.Errorf("unrecognized file id %s: %v", assignResult.Fid, parseErr)
  97. return
  98. }
  99. var replicas []*volume_server_pb.FetchAndWriteNeedleRequest_Replica
  100. for _, r := range assignResult.Replicas {
  101. replicas = append(replicas, &volume_server_pb.FetchAndWriteNeedleRequest_Replica{
  102. Url: r.Url,
  103. PublicUrl: r.PublicUrl,
  104. GrpcPort: int32(r.GrpcPort),
  105. })
  106. }
  107. // tell filer to tell volume server to download into needles
  108. assignedServerAddress := pb.NewServerAddressWithGrpcPort(assignResult.Url, assignResult.GrpcPort)
  109. var etag string
  110. err = operation.WithVolumeServerClient(false, assignedServerAddress, fs.grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
  111. resp, fetchAndWriteErr := volumeServerClient.FetchAndWriteNeedle(context.Background(), &volume_server_pb.FetchAndWriteNeedleRequest{
  112. VolumeId: uint32(fileId.VolumeId),
  113. NeedleId: uint64(fileId.Key),
  114. Cookie: uint32(fileId.Cookie),
  115. Offset: localOffset,
  116. Size: size,
  117. Replicas: replicas,
  118. Auth: string(assignResult.Auth),
  119. RemoteConf: storageConf,
  120. RemoteLocation: &remote_pb.RemoteStorageLocation{
  121. Name: remoteStorageMountedLocation.Name,
  122. Bucket: remoteStorageMountedLocation.Bucket,
  123. Path: string(dest),
  124. },
  125. })
  126. if fetchAndWriteErr != nil {
  127. return fmt.Errorf("volume server %s fetchAndWrite %s: %v", assignResult.Url, dest, fetchAndWriteErr)
  128. } else {
  129. etag = resp.ETag
  130. }
  131. return nil
  132. })
  133. if err != nil && fetchAndWriteErr == nil {
  134. fetchAndWriteErr = err
  135. return
  136. }
  137. chunks = append(chunks, &filer_pb.FileChunk{
  138. FileId: assignResult.Fid,
  139. Offset: localOffset,
  140. Size: uint64(size),
  141. ModifiedTsNs: time.Now().UnixNano(),
  142. ETag: etag,
  143. Fid: &filer_pb.FileId{
  144. VolumeId: uint32(fileId.VolumeId),
  145. FileKey: uint64(fileId.Key),
  146. Cookie: uint32(fileId.Cookie),
  147. },
  148. })
  149. })
  150. }
  151. wg.Wait()
  152. if fetchAndWriteErr != nil {
  153. return nil, fetchAndWriteErr
  154. }
  155. garbage := entry.GetChunks()
  156. newEntry := entry.ShallowClone()
  157. newEntry.Chunks = chunks
  158. newEntry.Remote = proto.Clone(entry.Remote).(*filer_pb.RemoteEntry)
  159. newEntry.Remote.LastLocalSyncTsNs = time.Now().UnixNano()
  160. // this skips meta data log events
  161. if err := fs.filer.Store.UpdateEntry(context.Background(), newEntry); err != nil {
  162. fs.filer.DeleteUncommittedChunks(chunks)
  163. return nil, err
  164. }
  165. fs.filer.DeleteChunks(entry.FullPath, garbage)
  166. fs.filer.NotifyUpdateEvent(ctx, entry, newEntry, true, false, nil)
  167. resp.Entry = newEntry.ToProtoEntry()
  168. return resp, nil
  169. }