You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					
					
						
							187 lines
						
					
					
						
							5.5 KiB
						
					
					
				
			
		
		
		
			
			
			
		
		
	
	
							187 lines
						
					
					
						
							5.5 KiB
						
					
					
				| package shell | |
| 
 | |
| import ( | |
| 	"flag" | |
| 	"fmt" | |
| 	"github.com/seaweedfs/seaweedfs/weed/filer" | |
| 	"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" | |
| 	"github.com/seaweedfs/seaweedfs/weed/pb/remote_pb" | |
| 	"github.com/seaweedfs/seaweedfs/weed/util" | |
| 	"io" | |
| 	"sync" | |
| ) | |
| 
 | |
| func init() { | |
| 	Commands = append(Commands, &commandRemoteCache{}) | |
| } | |
| 
 | |
| type commandRemoteCache struct { | |
| } | |
| 
 | |
| func (c *commandRemoteCache) Name() string { | |
| 	return "remote.cache" | |
| } | |
| 
 | |
| func (c *commandRemoteCache) Help() string { | |
| 	return `cache the file content for mounted directories or files | |
|  | |
| 	# assume a remote storage is configured to name "cloud1" | |
| 	remote.configure -name=cloud1 -type=s3 -s3.access_key=xxx -s3.secret_key=yyy | |
| 	# mount and pull one bucket | |
| 	remote.mount -dir=/xxx -remote=cloud1/bucket | |
|  | |
| 	# after mount, run one of these command to cache the content of the files | |
| 	remote.cache -dir=/xxx | |
| 	remote.cache -dir=/xxx/some/sub/dir | |
| 	remote.cache -dir=/xxx/some/sub/dir -include=*.pdf | |
| 	remote.cache -dir=/xxx/some/sub/dir -exclude=*.txt | |
| 	remote.cache -maxSize=1024000    # cache files smaller than 100K | |
| 	remote.cache -maxAge=3600        # cache files less than 1 hour old | |
|  | |
| 	This is designed to run regularly. So you can add it to some cronjob. | |
| 	If a file is already synchronized with the remote copy, the file will be skipped to avoid unnecessary copy. | |
|  | |
| 	The actual data copying goes through volume severs in parallel. | |
|  | |
| ` | |
| } | |
| 
 | |
| func (c *commandRemoteCache) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { | |
| 
 | |
| 	remoteMountCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) | |
| 
 | |
| 	dir := remoteMountCommand.String("dir", "", "a mounted directory or one of its sub folders in filer") | |
| 	concurrency := remoteMountCommand.Int("concurrent", 32, "concurrent file downloading") | |
| 	fileFiler := newFileFilter(remoteMountCommand) | |
| 
 | |
| 	if err = remoteMountCommand.Parse(args); err != nil { | |
| 		return nil | |
| 	} | |
| 
 | |
| 	if *dir != "" { | |
| 		if err := c.doCacheOneDirectory(commandEnv, writer, *dir, fileFiler, *concurrency); err != nil { | |
| 			return err | |
| 		} | |
| 		return nil | |
| 	} | |
| 
 | |
| 	mappings, err := filer.ReadMountMappings(commandEnv.option.GrpcDialOption, commandEnv.option.FilerAddress) | |
| 	if err != nil { | |
| 		return err | |
| 	} | |
| 
 | |
| 	for key, _ := range mappings.Mappings { | |
| 		if err := c.doCacheOneDirectory(commandEnv, writer, key, fileFiler, *concurrency); err != nil { | |
| 			return err | |
| 		} | |
| 	} | |
| 
 | |
| 	return nil | |
| } | |
| 
 | |
| func (c *commandRemoteCache) doCacheOneDirectory(commandEnv *CommandEnv, writer io.Writer, dir string, fileFiler *FileFilter, concurrency int) error { | |
| 	mappings, localMountedDir, remoteStorageMountedLocation, remoteStorageConf, detectErr := detectMountInfo(commandEnv, writer, dir) | |
| 	if detectErr != nil { | |
| 		jsonPrintln(writer, mappings) | |
| 		return detectErr | |
| 	} | |
| 
 | |
| 	// pull content from remote | |
| 	if err := c.cacheContentData(commandEnv, writer, util.FullPath(localMountedDir), remoteStorageMountedLocation, util.FullPath(dir), fileFiler, remoteStorageConf, concurrency); err != nil { | |
| 		return fmt.Errorf("cache content data on %s: %v", localMountedDir, err) | |
| 	} | |
| 
 | |
| 	return nil | |
| } | |
| 
 | |
| func recursivelyTraverseDirectory(filerClient filer_pb.FilerClient, dirPath util.FullPath, visitEntry func(dir util.FullPath, entry *filer_pb.Entry) bool) (err error) { | |
| 
 | |
| 	err = filer_pb.ReadDirAllEntries(filerClient, dirPath, "", func(entry *filer_pb.Entry, isLast bool) error { | |
| 		if entry.IsDirectory { | |
| 			if !visitEntry(dirPath, entry) { | |
| 				return nil | |
| 			} | |
| 			subDir := dirPath.Child(entry.Name) | |
| 			if err := recursivelyTraverseDirectory(filerClient, subDir, visitEntry); err != nil { | |
| 				return err | |
| 			} | |
| 		} else { | |
| 			if !visitEntry(dirPath, entry) { | |
| 				return nil | |
| 			} | |
| 		} | |
| 		return nil | |
| 	}) | |
| 	return | |
| } | |
| 
 | |
| func shouldCacheToLocal(entry *filer_pb.Entry) bool { | |
| 	if entry.IsDirectory { | |
| 		return false | |
| 	} | |
| 	if entry.RemoteEntry == nil { | |
| 		return false | |
| 	} | |
| 	if entry.RemoteEntry.LastLocalSyncTsNs == 0 && entry.RemoteEntry.RemoteSize > 0 { | |
| 		return true | |
| 	} | |
| 	return false | |
| } | |
| 
 | |
| func mayHaveCachedToLocal(entry *filer_pb.Entry) bool { | |
| 	if entry.IsDirectory { | |
| 		return false | |
| 	} | |
| 	if entry.RemoteEntry == nil { | |
| 		return false // should not uncache an entry that is not in remote | |
| 	} | |
| 	if entry.RemoteEntry.LastLocalSyncTsNs > 0 { | |
| 		return true | |
| 	} | |
| 	return false | |
| } | |
| 
 | |
| func (c *commandRemoteCache) cacheContentData(commandEnv *CommandEnv, writer io.Writer, localMountedDir util.FullPath, remoteMountedLocation *remote_pb.RemoteStorageLocation, dirToCache util.FullPath, fileFilter *FileFilter, remoteConf *remote_pb.RemoteConf, concurrency int) error { | |
| 
 | |
| 	var wg sync.WaitGroup | |
| 	limitedConcurrentExecutor := util.NewLimitedConcurrentExecutor(concurrency) | |
| 	var executionErr error | |
| 
 | |
| 	traverseErr := recursivelyTraverseDirectory(commandEnv, dirToCache, func(dir util.FullPath, entry *filer_pb.Entry) bool { | |
| 		if !shouldCacheToLocal(entry) { | |
| 			return true // true means recursive traversal should continue | |
| 		} | |
| 
 | |
| 		if !fileFilter.matches(entry) { | |
| 			return true | |
| 		} | |
| 
 | |
| 		wg.Add(1) | |
| 		limitedConcurrentExecutor.Execute(func() { | |
| 			defer wg.Done() | |
| 			fmt.Fprintf(writer, "Cache %+v ...\n", dir.Child(entry.Name)) | |
| 
 | |
| 			remoteLocation := filer.MapFullPathToRemoteStorageLocation(localMountedDir, remoteMountedLocation, dir.Child(entry.Name)) | |
| 
 | |
| 			if err := filer.CacheRemoteObjectToLocalCluster(commandEnv, remoteConf, remoteLocation, dir, entry); err != nil { | |
| 				fmt.Fprintf(writer, "CacheRemoteObjectToLocalCluster %+v: %v\n", remoteLocation, err) | |
| 				if executionErr == nil { | |
| 					executionErr = fmt.Errorf("CacheRemoteObjectToLocalCluster %+v: %v\n", remoteLocation, err) | |
| 				} | |
| 				return | |
| 			} | |
| 			fmt.Fprintf(writer, "Cache %+v Done\n", dir.Child(entry.Name)) | |
| 		}) | |
| 
 | |
| 		return true | |
| 	}) | |
| 	wg.Wait() | |
| 
 | |
| 	if traverseErr != nil { | |
| 		return traverseErr | |
| 	} | |
| 	if executionErr != nil { | |
| 		return executionErr | |
| 	} | |
| 	return nil | |
| }
 |