You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					
					
						
							414 lines
						
					
					
						
							14 KiB
						
					
					
				
			
		
		
		
			
			
			
		
		
	
	
							414 lines
						
					
					
						
							14 KiB
						
					
					
				| package shell | |
| 
 | |
| import ( | |
| 	"context" | |
| 	"flag" | |
| 	"fmt" | |
| 	"io" | |
| 	"sync" | |
| 
 | |
| 	"github.com/seaweedfs/seaweedfs/weed/filer" | |
| 	"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" | |
| 	"github.com/seaweedfs/seaweedfs/weed/pb/remote_pb" | |
| 	"github.com/seaweedfs/seaweedfs/weed/remote_storage" | |
| 	"github.com/seaweedfs/seaweedfs/weed/util" | |
| ) | |
| 
 | |
| func init() { | |
| 	Commands = append(Commands, &commandRemoteCache{}) | |
| } | |
| 
 | |
| type commandRemoteCache struct { | |
| } | |
| 
 | |
| func (c *commandRemoteCache) Name() string { | |
| 	return "remote.cache" | |
| } | |
| 
 | |
| func (c *commandRemoteCache) Help() string { | |
| 	return `comprehensive synchronization and caching between local and remote storage | |
|  | |
| 	# assume a remote storage is configured to name "cloud1" | |
| 	remote.configure -name=cloud1 -type=s3 -s3.access_key=xxx -s3.secret_key=yyy | |
| 	# mount and pull one bucket | |
| 	remote.mount -dir=/xxx -remote=cloud1/bucket | |
|  | |
| 	# comprehensive sync and cache: update metadata, cache content, and remove deleted files | |
| 	remote.cache -dir=/xxx                                # sync metadata, cache content, and remove deleted files (default) | |
| 	remote.cache -dir=/xxx -cacheContent=false            # sync metadata and cleanup only, no caching | |
| 	remote.cache -dir=/xxx -deleteLocalExtra=false        # skip removal of local files missing from remote | |
| 	remote.cache -dir=/xxx -concurrent=32                 # with custom concurrency | |
| 	remote.cache -dir=/xxx -include=*.pdf                 # only sync PDF files | |
| 	remote.cache -dir=/xxx -exclude=*.tmp                 # exclude temporary files | |
| 	remote.cache -dir=/xxx -dryRun=true                   # show what would be done without making changes | |
|  | |
| 	This command will: | |
| 	1. Synchronize metadata from remote storage | |
| 	2. Cache file content from remote by default | |
| 	3. Remove local files that no longer exist on remote by default (use -deleteLocalExtra=false to disable) | |
|  | |
| 	This is designed to run regularly. So you can add it to some cronjob. | |
|  | |
| ` | |
| } | |
| 
 | |
| func (c *commandRemoteCache) HasTag(CommandTag) bool { | |
| 	return false | |
| } | |
| 
 | |
| func (c *commandRemoteCache) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { | |
| 
 | |
| 	remoteCacheCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) | |
| 
 | |
| 	dir := remoteCacheCommand.String("dir", "", "a directory in filer") | |
| 	cache := remoteCacheCommand.Bool("cacheContent", true, "cache file content from remote") | |
| 	deleteLocalExtra := remoteCacheCommand.Bool("deleteLocalExtra", true, "delete local files that no longer exist on remote") | |
| 	concurrency := remoteCacheCommand.Int("concurrent", 16, "concurrent file operations") | |
| 	dryRun := remoteCacheCommand.Bool("dryRun", false, "show what would be done without making changes") | |
| 	fileFiler := newFileFilter(remoteCacheCommand) | |
| 
 | |
| 	if err = remoteCacheCommand.Parse(args); err != nil { | |
| 		return nil | |
| 	} | |
| 
 | |
| 	if *dir == "" { | |
| 		return fmt.Errorf("need to specify -dir option") | |
| 	} | |
| 
 | |
| 	mappings, localMountedDir, remoteStorageMountedLocation, remoteStorageConf, detectErr := detectMountInfo(commandEnv, writer, *dir) | |
| 	if detectErr != nil { | |
| 		jsonPrintln(writer, mappings) | |
| 		return detectErr | |
| 	} | |
| 
 | |
| 	// perform comprehensive sync | |
| 	return c.doComprehensiveSync(commandEnv, writer, util.FullPath(localMountedDir), remoteStorageMountedLocation, util.FullPath(*dir), remoteStorageConf, *cache, *deleteLocalExtra, *concurrency, *dryRun, fileFiler) | |
| } | |
| 
 | |
| func (c *commandRemoteCache) doComprehensiveSync(commandEnv *CommandEnv, writer io.Writer, localMountedDir util.FullPath, remoteMountedLocation *remote_pb.RemoteStorageLocation, dirToSync util.FullPath, remoteConf *remote_pb.RemoteConf, shouldCache bool, deleteLocalExtra bool, concurrency int, dryRun bool, fileFilter *FileFilter) error { | |
| 
 | |
| 	// visit remote storage | |
| 	remoteStorage, err := remote_storage.GetRemoteStorage(remoteConf) | |
| 	if err != nil { | |
| 		return err | |
| 	} | |
| 
 | |
| 	remote := filer.MapFullPathToRemoteStorageLocation(localMountedDir, remoteMountedLocation, dirToSync) | |
| 
 | |
| 	// Step 1: Collect all remote files | |
| 	remoteFiles := make(map[string]*filer_pb.RemoteEntry) | |
| 	err = remoteStorage.Traverse(remote, func(remoteDir, name string, isDirectory bool, remoteEntry *filer_pb.RemoteEntry) error { | |
| 		localDir := filer.MapRemoteStorageLocationPathToFullPath(localMountedDir, remoteMountedLocation, remoteDir) | |
| 		fullPath := string(localDir.Child(name)) | |
| 		remoteFiles[fullPath] = remoteEntry | |
| 		return nil | |
| 	}) | |
| 	if err != nil { | |
| 		return fmt.Errorf("failed to traverse remote storage: %w", err) | |
| 	} | |
| 
 | |
| 	fmt.Fprintf(writer, "Found %d files/directories in remote storage\n", len(remoteFiles)) | |
| 
 | |
| 	// Step 2: Collect all local files (only if we need to delete local extra files) | |
| 	localFiles := make(map[string]*filer_pb.Entry) | |
| 	if deleteLocalExtra { | |
| 		err = recursivelyTraverseDirectory(commandEnv, dirToSync, func(dir util.FullPath, entry *filer_pb.Entry) bool { | |
| 			if entry.RemoteEntry != nil { // only consider files that are part of remote mount | |
| 				fullPath := string(dir.Child(entry.Name)) | |
| 				localFiles[fullPath] = entry | |
| 			} | |
| 			return true | |
| 		}) | |
| 		if err != nil { | |
| 			return fmt.Errorf("failed to traverse local directory: %w", err) | |
| 		} | |
| 		fmt.Fprintf(writer, "Found %d files/directories in local storage\n", len(localFiles)) | |
| 	} else { | |
| 		fmt.Fprintf(writer, "Skipping local file collection (deleteLocalExtra=false)\n") | |
| 	} | |
| 
 | |
| 	// Step 3: Determine actions needed | |
| 	var filesToDelete []string | |
| 	var filesToUpdate []string | |
| 	var filesToCache []string | |
| 
 | |
| 	// Find files to delete (exist locally but not remotely) - only if deleteLocalExtra is enabled | |
| 	if deleteLocalExtra { | |
| 		for localPath := range localFiles { | |
| 			if _, exists := remoteFiles[localPath]; !exists { | |
| 				filesToDelete = append(filesToDelete, localPath) | |
| 			} | |
| 		} | |
| 	} | |
| 
 | |
| 	// Find files to update/cache (exist remotely) | |
| 	for remotePath, remoteEntry := range remoteFiles { | |
| 		if deleteLocalExtra { | |
| 			// When deleteLocalExtra is enabled, we have localFiles to compare with | |
| 			if localEntry, exists := localFiles[remotePath]; exists { | |
| 				// File exists locally, check if it needs updating | |
| 				if localEntry.RemoteEntry == nil || | |
| 					localEntry.RemoteEntry.RemoteETag != remoteEntry.RemoteETag || | |
| 					localEntry.RemoteEntry.RemoteMtime < remoteEntry.RemoteMtime { | |
| 					filesToUpdate = append(filesToUpdate, remotePath) | |
| 				} | |
| 				// Check if it needs caching | |
| 				if shouldCache && shouldCacheToLocal(localEntry) && fileFilter.matches(localEntry) { | |
| 					filesToCache = append(filesToCache, remotePath) | |
| 				} | |
| 			} else { | |
| 				// File doesn't exist locally, needs to be created | |
| 				filesToUpdate = append(filesToUpdate, remotePath) | |
| 			} | |
| 		} else { | |
| 			// When deleteLocalExtra is disabled, we check each file individually | |
| 			// All remote files are candidates for update/creation | |
| 			filesToUpdate = append(filesToUpdate, remotePath) | |
| 
 | |
| 			// For caching, we need to check if the local file exists and needs caching | |
| 			if shouldCache { | |
| 				// We need to look up the local file to check if it needs caching | |
| 				localDir, name := util.FullPath(remotePath).DirAndName() | |
| 				err := commandEnv.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { | |
| 					lookupResp, lookupErr := client.LookupDirectoryEntry(context.Background(), &filer_pb.LookupDirectoryEntryRequest{ | |
| 						Directory: localDir, | |
| 						Name:      name, | |
| 					}) | |
| 					if lookupErr == nil { | |
| 						localEntry := lookupResp.Entry | |
| 						if shouldCacheToLocal(localEntry) && fileFilter.matches(localEntry) { | |
| 							filesToCache = append(filesToCache, remotePath) | |
| 						} | |
| 					} | |
| 					return nil // Don't propagate lookup errors here | |
| 				}) | |
| 				if err != nil { | |
| 					// Log error but continue | |
| 					fmt.Fprintf(writer, "Warning: failed to lookup local file %s for caching check: %v\n", remotePath, err) | |
| 				} | |
| 			} | |
| 		} | |
| 	} | |
| 
 | |
| 	fmt.Fprintf(writer, "Actions needed: %d files to delete, %d files to update, %d files to cache\n", | |
| 		len(filesToDelete), len(filesToUpdate), len(filesToCache)) | |
| 
 | |
| 	if dryRun { | |
| 		fmt.Fprintf(writer, "DRY RUN - showing what would be done:\n") | |
| 		for _, path := range filesToDelete { | |
| 			fmt.Fprintf(writer, "DELETE: %s\n", path) | |
| 		} | |
| 		for _, path := range filesToUpdate { | |
| 			fmt.Fprintf(writer, "UPDATE: %s\n", path) | |
| 		} | |
| 		for _, path := range filesToCache { | |
| 			fmt.Fprintf(writer, "CACHE: %s\n", path) | |
| 		} | |
| 		return nil | |
| 	} | |
| 
 | |
| 	// Step 4: Execute actions | |
| 	return commandEnv.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { | |
| 		ctx := context.Background() | |
| 
 | |
| 		// Delete files that no longer exist on remote (only if deleteLocalExtra is enabled) | |
| 		if deleteLocalExtra { | |
| 			for _, pathToDelete := range filesToDelete { | |
| 				fmt.Fprintf(writer, "Deleting %s... ", pathToDelete) | |
| 
 | |
| 				dir, name := util.FullPath(pathToDelete).DirAndName() | |
| 				_, err := client.DeleteEntry(ctx, &filer_pb.DeleteEntryRequest{ | |
| 					Directory:            dir, | |
| 					Name:                 name, | |
| 					IgnoreRecursiveError: false, | |
| 					IsDeleteData:         true, | |
| 					IsRecursive:          false, | |
| 					IsFromOtherCluster:   false, | |
| 				}) | |
| 				if err != nil { | |
| 					fmt.Fprintf(writer, "failed: %v\n", err) | |
| 					return err | |
| 				} | |
| 				fmt.Fprintf(writer, "done\n") | |
| 			} | |
| 		} | |
| 
 | |
| 		// Update metadata for files that exist on remote | |
| 		for _, pathToUpdate := range filesToUpdate { | |
| 			remoteEntry := remoteFiles[pathToUpdate] | |
| 			localDir, name := util.FullPath(pathToUpdate).DirAndName() | |
| 
 | |
| 			fmt.Fprintf(writer, "Updating metadata for %s... ", pathToUpdate) | |
| 
 | |
| 			// Check if file exists locally | |
| 			lookupResp, lookupErr := client.LookupDirectoryEntry(ctx, &filer_pb.LookupDirectoryEntryRequest{ | |
| 				Directory: string(localDir), | |
| 				Name:      name, | |
| 			}) | |
| 
 | |
| 			if lookupErr != nil && lookupErr != filer_pb.ErrNotFound { | |
| 				fmt.Fprintf(writer, "failed to lookup: %v\n", lookupErr) | |
| 				continue | |
| 			} | |
| 
 | |
| 			isDirectory := remoteEntry.RemoteSize == 0 && remoteEntry.RemoteMtime == 0 | |
| 			if lookupErr == filer_pb.ErrNotFound { | |
| 				// Create new entry | |
| 				_, createErr := client.CreateEntry(ctx, &filer_pb.CreateEntryRequest{ | |
| 					Directory: string(localDir), | |
| 					Entry: &filer_pb.Entry{ | |
| 						Name:        name, | |
| 						IsDirectory: isDirectory, | |
| 						Attributes: &filer_pb.FuseAttributes{ | |
| 							FileSize: uint64(remoteEntry.RemoteSize), | |
| 							Mtime:    remoteEntry.RemoteMtime, | |
| 							FileMode: uint32(0644), | |
| 						}, | |
| 						RemoteEntry: remoteEntry, | |
| 					}, | |
| 				}) | |
| 				if createErr != nil { | |
| 					fmt.Fprintf(writer, "failed to create: %v\n", createErr) | |
| 					continue | |
| 				} | |
| 			} else { | |
| 				// Update existing entry | |
| 				existingEntry := lookupResp.Entry | |
| 				if existingEntry.RemoteEntry == nil { | |
| 					// This is a local file, skip to avoid overwriting | |
| 					fmt.Fprintf(writer, "skipped (local file)\n") | |
| 					continue | |
| 				} | |
| 
 | |
| 				existingEntry.RemoteEntry = remoteEntry | |
| 				existingEntry.Attributes.FileSize = uint64(remoteEntry.RemoteSize) | |
| 				existingEntry.Attributes.Mtime = remoteEntry.RemoteMtime | |
| 				existingEntry.Attributes.Md5 = nil | |
| 				existingEntry.Chunks = nil | |
| 				existingEntry.Content = nil | |
| 
 | |
| 				_, updateErr := client.UpdateEntry(ctx, &filer_pb.UpdateEntryRequest{ | |
| 					Directory: string(localDir), | |
| 					Entry:     existingEntry, | |
| 				}) | |
| 				if updateErr != nil { | |
| 					fmt.Fprintf(writer, "failed to update: %v\n", updateErr) | |
| 					continue | |
| 				} | |
| 			} | |
| 			fmt.Fprintf(writer, "done\n") | |
| 		} | |
| 
 | |
| 		// Cache file content if requested | |
| 		if shouldCache && len(filesToCache) > 0 { | |
| 			fmt.Fprintf(writer, "Caching file content...\n") | |
| 
 | |
| 			var wg sync.WaitGroup | |
| 			limitedConcurrentExecutor := util.NewLimitedConcurrentExecutor(concurrency) | |
| 			var executionErr error | |
| 
 | |
| 			for _, pathToCache := range filesToCache { | |
| 				wg.Add(1) | |
| 				pathToCacheCopy := pathToCache // Capture for closure | |
| 				limitedConcurrentExecutor.Execute(func() { | |
| 					defer wg.Done() | |
| 
 | |
| 					// Get local entry (either from localFiles map or by lookup) | |
| 					var localEntry *filer_pb.Entry | |
| 					if deleteLocalExtra { | |
| 						localEntry = localFiles[pathToCacheCopy] | |
| 						if localEntry == nil { | |
| 							fmt.Fprintf(writer, "Warning: skipping cache for %s (local entry not found)\n", pathToCacheCopy) | |
| 							return | |
| 						} | |
| 					} else { | |
| 						// Look up the local entry since we don't have it in localFiles | |
| 						localDir, name := util.FullPath(pathToCacheCopy).DirAndName() | |
| 						lookupErr := commandEnv.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { | |
| 							lookupResp, err := client.LookupDirectoryEntry(context.Background(), &filer_pb.LookupDirectoryEntryRequest{ | |
| 								Directory: localDir, | |
| 								Name:      name, | |
| 							}) | |
| 							if err == nil { | |
| 								localEntry = lookupResp.Entry | |
| 							} | |
| 							return err | |
| 						}) | |
| 						if lookupErr != nil { | |
| 							fmt.Fprintf(writer, "Warning: failed to lookup local entry for caching %s: %v\n", pathToCacheCopy, lookupErr) | |
| 							return | |
| 						} | |
| 					} | |
| 
 | |
| 					dir, _ := util.FullPath(pathToCacheCopy).DirAndName() | |
| 					remoteLocation := filer.MapFullPathToRemoteStorageLocation(localMountedDir, remoteMountedLocation, util.FullPath(pathToCacheCopy)) | |
| 
 | |
| 					fmt.Fprintf(writer, "Caching %s... ", pathToCacheCopy) | |
| 
 | |
| 					if err := filer.CacheRemoteObjectToLocalCluster(commandEnv, remoteConf, remoteLocation, util.FullPath(dir), localEntry); err != nil { | |
| 						fmt.Fprintf(writer, "failed: %v\n", err) | |
| 						if executionErr == nil { | |
| 							executionErr = err | |
| 						} | |
| 						return | |
| 					} | |
| 					fmt.Fprintf(writer, "done\n") | |
| 				}) | |
| 			} | |
| 
 | |
| 			wg.Wait() | |
| 			if executionErr != nil { | |
| 				return executionErr | |
| 			} | |
| 		} | |
| 
 | |
| 		return nil | |
| 	}) | |
| } | |
| 
 | |
| func recursivelyTraverseDirectory(filerClient filer_pb.FilerClient, dirPath util.FullPath, visitEntry func(dir util.FullPath, entry *filer_pb.Entry) bool) (err error) { | |
| 
 | |
| 	err = filer_pb.ReadDirAllEntries(context.Background(), filerClient, dirPath, "", func(entry *filer_pb.Entry, isLast bool) error { | |
| 		if entry.IsDirectory { | |
| 			if !visitEntry(dirPath, entry) { | |
| 				return nil | |
| 			} | |
| 			subDir := dirPath.Child(entry.Name) | |
| 			if err := recursivelyTraverseDirectory(filerClient, subDir, visitEntry); err != nil { | |
| 				return err | |
| 			} | |
| 		} else { | |
| 			if !visitEntry(dirPath, entry) { | |
| 				return nil | |
| 			} | |
| 		} | |
| 		return nil | |
| 	}) | |
| 	return | |
| } | |
| 
 | |
| func shouldCacheToLocal(entry *filer_pb.Entry) bool { | |
| 	if entry.IsDirectory { | |
| 		return false | |
| 	} | |
| 	if entry.RemoteEntry == nil { | |
| 		return false | |
| 	} | |
| 	if entry.RemoteEntry.LastLocalSyncTsNs == 0 && entry.RemoteEntry.RemoteSize > 0 { | |
| 		return true | |
| 	} | |
| 	return false | |
| } | |
| 
 | |
| func mayHaveCachedToLocal(entry *filer_pb.Entry) bool { | |
| 	if entry.IsDirectory { | |
| 		return false | |
| 	} | |
| 	if entry.RemoteEntry == nil { | |
| 		return false // should not uncache an entry that is not in remote | |
| 	} | |
| 	if entry.RemoteEntry.LastLocalSyncTsNs > 0 { | |
| 		return true | |
| 	} | |
| 	return false | |
| }
 |