You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					
					
						
							198 lines
						
					
					
						
							6.2 KiB
						
					
					
				
			
		
		
		
			
			
			
		
		
	
	
							198 lines
						
					
					
						
							6.2 KiB
						
					
					
				
								package weed_server
							 | 
						|
								
							 | 
						|
								import (
							 | 
						|
									"context"
							 | 
						|
									"fmt"
							 | 
						|
									"strings"
							 | 
						|
									"sync"
							 | 
						|
									"time"
							 | 
						|
								
							 | 
						|
									"github.com/seaweedfs/seaweedfs/weed/filer"
							 | 
						|
									"github.com/seaweedfs/seaweedfs/weed/operation"
							 | 
						|
									"github.com/seaweedfs/seaweedfs/weed/pb"
							 | 
						|
									"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
							 | 
						|
									"github.com/seaweedfs/seaweedfs/weed/pb/remote_pb"
							 | 
						|
									"github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
							 | 
						|
									"github.com/seaweedfs/seaweedfs/weed/storage/needle"
							 | 
						|
									"github.com/seaweedfs/seaweedfs/weed/util"
							 | 
						|
									"google.golang.org/protobuf/proto"
							 | 
						|
								)
							 | 
						|
								
							 | 
						|
								func (fs *FilerServer) CacheRemoteObjectToLocalCluster(ctx context.Context, req *filer_pb.CacheRemoteObjectToLocalClusterRequest) (*filer_pb.CacheRemoteObjectToLocalClusterResponse, error) {
							 | 
						|
								
							 | 
						|
									// load all mappings
							 | 
						|
									mappingEntry, err := fs.filer.FindEntry(ctx, util.JoinPath(filer.DirectoryEtcRemote, filer.REMOTE_STORAGE_MOUNT_FILE))
							 | 
						|
									if err != nil {
							 | 
						|
										return nil, err
							 | 
						|
									}
							 | 
						|
									mappings, err := filer.UnmarshalRemoteStorageMappings(mappingEntry.Content)
							 | 
						|
									if err != nil {
							 | 
						|
										return nil, err
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// find mapping
							 | 
						|
									var remoteStorageMountedLocation *remote_pb.RemoteStorageLocation
							 | 
						|
									var localMountedDir string
							 | 
						|
									for k, loc := range mappings.Mappings {
							 | 
						|
										if strings.HasPrefix(req.Directory, k) {
							 | 
						|
											localMountedDir, remoteStorageMountedLocation = k, loc
							 | 
						|
										}
							 | 
						|
									}
							 | 
						|
									if localMountedDir == "" {
							 | 
						|
										return nil, fmt.Errorf("%s is not mounted", req.Directory)
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// find storage configuration
							 | 
						|
									storageConfEntry, err := fs.filer.FindEntry(ctx, util.JoinPath(filer.DirectoryEtcRemote, remoteStorageMountedLocation.Name+filer.REMOTE_STORAGE_CONF_SUFFIX))
							 | 
						|
									if err != nil {
							 | 
						|
										return nil, err
							 | 
						|
									}
							 | 
						|
									storageConf := &remote_pb.RemoteConf{}
							 | 
						|
									if unMarshalErr := proto.Unmarshal(storageConfEntry.Content, storageConf); unMarshalErr != nil {
							 | 
						|
										return nil, fmt.Errorf("unmarshal remote storage conf %s/%s: %v", filer.DirectoryEtcRemote, remoteStorageMountedLocation.Name+filer.REMOTE_STORAGE_CONF_SUFFIX, unMarshalErr)
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// find the entry
							 | 
						|
									entry, err := fs.filer.FindEntry(ctx, util.JoinPath(req.Directory, req.Name))
							 | 
						|
									if err == filer_pb.ErrNotFound {
							 | 
						|
										return nil, err
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									resp := &filer_pb.CacheRemoteObjectToLocalClusterResponse{}
							 | 
						|
									if entry.Remote == nil || entry.Remote.RemoteSize == 0 {
							 | 
						|
										return resp, nil
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// detect storage option
							 | 
						|
									so, err := fs.detectStorageOption(req.Directory, "", "", 0, "", "", "", "")
							 | 
						|
									if err != nil {
							 | 
						|
										return resp, err
							 | 
						|
									}
							 | 
						|
									assignRequest, altRequest := so.ToAssignRequests(1)
							 | 
						|
								
							 | 
						|
									// find a good chunk size
							 | 
						|
									chunkSize := int64(5 * 1024 * 1024)
							 | 
						|
									chunkCount := entry.Remote.RemoteSize/chunkSize + 1
							 | 
						|
									for chunkCount > 1000 && chunkSize < int64(fs.option.MaxMB)*1024*1024/2 {
							 | 
						|
										chunkSize *= 2
							 | 
						|
										chunkCount = entry.Remote.RemoteSize/chunkSize + 1
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									dest := util.FullPath(remoteStorageMountedLocation.Path).Child(string(util.FullPath(req.Directory).Child(req.Name))[len(localMountedDir):])
							 | 
						|
								
							 | 
						|
									var chunks []*filer_pb.FileChunk
							 | 
						|
									var fetchAndWriteErr error
							 | 
						|
									var wg sync.WaitGroup
							 | 
						|
								
							 | 
						|
									limitedConcurrentExecutor := util.NewLimitedConcurrentExecutor(8)
							 | 
						|
									for offset := int64(0); offset < entry.Remote.RemoteSize; offset += chunkSize {
							 | 
						|
										localOffset := offset
							 | 
						|
								
							 | 
						|
										wg.Add(1)
							 | 
						|
										limitedConcurrentExecutor.Execute(func() {
							 | 
						|
											defer wg.Done()
							 | 
						|
											size := chunkSize
							 | 
						|
											if localOffset+chunkSize > entry.Remote.RemoteSize {
							 | 
						|
												size = entry.Remote.RemoteSize - localOffset
							 | 
						|
											}
							 | 
						|
								
							 | 
						|
											// assign one volume server
							 | 
						|
											assignResult, err := operation.Assign(fs.filer.GetMaster, fs.grpcDialOption, assignRequest, altRequest)
							 | 
						|
											if err != nil {
							 | 
						|
												fetchAndWriteErr = err
							 | 
						|
												return
							 | 
						|
											}
							 | 
						|
											if assignResult.Error != "" {
							 | 
						|
												fetchAndWriteErr = fmt.Errorf("assign: %v", assignResult.Error)
							 | 
						|
												return
							 | 
						|
											}
							 | 
						|
											fileId, parseErr := needle.ParseFileIdFromString(assignResult.Fid)
							 | 
						|
											if assignResult.Error != "" {
							 | 
						|
												fetchAndWriteErr = fmt.Errorf("unrecognized file id %s: %v", assignResult.Fid, parseErr)
							 | 
						|
												return
							 | 
						|
											}
							 | 
						|
								
							 | 
						|
											var replicas []*volume_server_pb.FetchAndWriteNeedleRequest_Replica
							 | 
						|
											for _, r := range assignResult.Replicas {
							 | 
						|
												replicas = append(replicas, &volume_server_pb.FetchAndWriteNeedleRequest_Replica{
							 | 
						|
													Url:       r.Url,
							 | 
						|
													PublicUrl: r.PublicUrl,
							 | 
						|
													GrpcPort:  int32(r.GrpcPort),
							 | 
						|
												})
							 | 
						|
											}
							 | 
						|
								
							 | 
						|
											// tell filer to tell volume server to download into needles
							 | 
						|
											assignedServerAddress := pb.NewServerAddressWithGrpcPort(assignResult.Url, assignResult.GrpcPort)
							 | 
						|
											var etag string
							 | 
						|
											err = operation.WithVolumeServerClient(false, assignedServerAddress, fs.grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
							 | 
						|
												resp, fetchAndWriteErr := volumeServerClient.FetchAndWriteNeedle(context.Background(), &volume_server_pb.FetchAndWriteNeedleRequest{
							 | 
						|
													VolumeId:   uint32(fileId.VolumeId),
							 | 
						|
													NeedleId:   uint64(fileId.Key),
							 | 
						|
													Cookie:     uint32(fileId.Cookie),
							 | 
						|
													Offset:     localOffset,
							 | 
						|
													Size:       size,
							 | 
						|
													Replicas:   replicas,
							 | 
						|
													Auth:       string(assignResult.Auth),
							 | 
						|
													RemoteConf: storageConf,
							 | 
						|
													RemoteLocation: &remote_pb.RemoteStorageLocation{
							 | 
						|
														Name:   remoteStorageMountedLocation.Name,
							 | 
						|
														Bucket: remoteStorageMountedLocation.Bucket,
							 | 
						|
														Path:   string(dest),
							 | 
						|
													},
							 | 
						|
												})
							 | 
						|
												if fetchAndWriteErr != nil {
							 | 
						|
													return fmt.Errorf("volume server %s fetchAndWrite %s: %v", assignResult.Url, dest, fetchAndWriteErr)
							 | 
						|
												} else {
							 | 
						|
													etag = resp.ETag
							 | 
						|
												}
							 | 
						|
												return nil
							 | 
						|
											})
							 | 
						|
								
							 | 
						|
											if err != nil && fetchAndWriteErr == nil {
							 | 
						|
												fetchAndWriteErr = err
							 | 
						|
												return
							 | 
						|
											}
							 | 
						|
								
							 | 
						|
											chunks = append(chunks, &filer_pb.FileChunk{
							 | 
						|
								
							 | 
						|
												FileId:       assignResult.Fid,
							 | 
						|
												Offset:       localOffset,
							 | 
						|
												Size:         uint64(size),
							 | 
						|
												ModifiedTsNs: time.Now().UnixNano(),
							 | 
						|
												ETag:         etag,
							 | 
						|
												Fid: &filer_pb.FileId{
							 | 
						|
													VolumeId: uint32(fileId.VolumeId),
							 | 
						|
													FileKey:  uint64(fileId.Key),
							 | 
						|
													Cookie:   uint32(fileId.Cookie),
							 | 
						|
												},
							 | 
						|
											})
							 | 
						|
										})
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									wg.Wait()
							 | 
						|
									if fetchAndWriteErr != nil {
							 | 
						|
										return nil, fetchAndWriteErr
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									garbage := entry.GetChunks()
							 | 
						|
								
							 | 
						|
									newEntry := entry.ShallowClone()
							 | 
						|
									newEntry.Chunks = chunks
							 | 
						|
									newEntry.Remote = proto.Clone(entry.Remote).(*filer_pb.RemoteEntry)
							 | 
						|
									newEntry.Remote.LastLocalSyncTsNs = time.Now().UnixNano()
							 | 
						|
								
							 | 
						|
									// this skips meta data log events
							 | 
						|
								
							 | 
						|
									if err := fs.filer.Store.UpdateEntry(context.Background(), newEntry); err != nil {
							 | 
						|
										fs.filer.DeleteChunks(chunks)
							 | 
						|
										return nil, err
							 | 
						|
									}
							 | 
						|
									fs.filer.DeleteChunks(garbage)
							 | 
						|
								
							 | 
						|
									fs.filer.NotifyUpdateEvent(ctx, entry, newEntry, true, false, nil)
							 | 
						|
								
							 | 
						|
									resp.Entry = newEntry.ToProtoEntry()
							 | 
						|
								
							 | 
						|
									return resp, nil
							 | 
						|
								
							 | 
						|
								}
							 |