diff --git a/weed/shell/command_remote_cache.go b/weed/shell/command_remote_cache.go index 987502c14..23a1eccad 100644 --- a/weed/shell/command_remote_cache.go +++ b/weed/shell/command_remote_cache.go @@ -4,12 +4,14 @@ import ( "context" "flag" "fmt" + "io" + "sync" + "github.com/seaweedfs/seaweedfs/weed/filer" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/pb/remote_pb" + "github.com/seaweedfs/seaweedfs/weed/remote_storage" "github.com/seaweedfs/seaweedfs/weed/util" - "io" - "sync" ) func init() { @@ -24,25 +26,28 @@ func (c *commandRemoteCache) Name() string { } func (c *commandRemoteCache) Help() string { - return `cache the file content for mounted directories or files + return `comprehensive synchronization and caching between local and remote storage # assume a remote storage is configured to name "cloud1" remote.configure -name=cloud1 -type=s3 -s3.access_key=xxx -s3.secret_key=yyy # mount and pull one bucket remote.mount -dir=/xxx -remote=cloud1/bucket - # after mount, run one of these command to cache the content of the files - remote.cache -dir=/xxx - remote.cache -dir=/xxx/some/sub/dir - remote.cache -dir=/xxx/some/sub/dir -include=*.pdf - remote.cache -dir=/xxx/some/sub/dir -exclude=*.txt - remote.cache -maxSize=1024000 # cache files smaller than 100K - remote.cache -maxAge=3600 # cache files less than 1 hour old + # comprehensive sync and cache: update metadata, cache content, and remove deleted files + remote.cache -dir=/xxx # sync metadata, cache content, and remove deleted files (default) + remote.cache -dir=/xxx -cacheContent=false # sync metadata and cleanup only, no caching + remote.cache -dir=/xxx -deleteLocalExtra=false # skip removal of local files missing from remote + remote.cache -dir=/xxx -concurrent=32 # with custom concurrency + remote.cache -dir=/xxx -include=*.pdf # only sync PDF files + remote.cache -dir=/xxx -exclude=*.tmp # exclude temporary files + remote.cache -dir=/xxx -dryRun=true # show what would be done without making changes - This is designed to run regularly. So you can add it to some cronjob. - If a file is already synchronized with the remote copy, the file will be skipped to avoid unnecessary copy. + This command will: + 1. Synchronize metadata from remote storage + 2. Cache file content from remote by default + 3. Remove local files that no longer exist on remote by default (use -deleteLocalExtra=false to disable) - The actual data copying goes through volume severs in parallel. + This is designed to run regularly. So you can add it to some cronjob. ` } @@ -53,50 +58,312 @@ func (c *commandRemoteCache) HasTag(CommandTag) bool { func (c *commandRemoteCache) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { - remoteMountCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) + remoteCacheCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) - dir := remoteMountCommand.String("dir", "", "a mounted directory or one of its sub folders in filer") - concurrency := remoteMountCommand.Int("concurrent", 32, "concurrent file downloading") - fileFiler := newFileFilter(remoteMountCommand) + dir := remoteCacheCommand.String("dir", "", "a directory in filer") + cache := remoteCacheCommand.Bool("cacheContent", true, "cache file content from remote") + deleteLocalExtra := remoteCacheCommand.Bool("deleteLocalExtra", true, "delete local files that no longer exist on remote") + concurrency := remoteCacheCommand.Int("concurrent", 16, "concurrent file operations") + dryRun := remoteCacheCommand.Bool("dryRun", false, "show what would be done without making changes") + fileFiler := newFileFilter(remoteCacheCommand) - if err = remoteMountCommand.Parse(args); err != nil { + if err = remoteCacheCommand.Parse(args); err != nil { return nil } - if *dir != "" { - if err := c.doCacheOneDirectory(commandEnv, writer, *dir, fileFiler, *concurrency); err != nil { - return err - } - return nil + if *dir == "" { + return fmt.Errorf("need to specify -dir option") } - mappings, err := filer.ReadMountMappings(commandEnv.option.GrpcDialOption, commandEnv.option.FilerAddress) + mappings, localMountedDir, remoteStorageMountedLocation, remoteStorageConf, detectErr := detectMountInfo(commandEnv, writer, *dir) + if detectErr != nil { + jsonPrintln(writer, mappings) + return detectErr + } + + // perform comprehensive sync + return c.doComprehensiveSync(commandEnv, writer, util.FullPath(localMountedDir), remoteStorageMountedLocation, util.FullPath(*dir), remoteStorageConf, *cache, *deleteLocalExtra, *concurrency, *dryRun, fileFiler) +} + +func (c *commandRemoteCache) doComprehensiveSync(commandEnv *CommandEnv, writer io.Writer, localMountedDir util.FullPath, remoteMountedLocation *remote_pb.RemoteStorageLocation, dirToSync util.FullPath, remoteConf *remote_pb.RemoteConf, shouldCache bool, deleteLocalExtra bool, concurrency int, dryRun bool, fileFilter *FileFilter) error { + + // visit remote storage + remoteStorage, err := remote_storage.GetRemoteStorage(remoteConf) if err != nil { return err } - for key, _ := range mappings.Mappings { - if err := c.doCacheOneDirectory(commandEnv, writer, key, fileFiler, *concurrency); err != nil { - return err + remote := filer.MapFullPathToRemoteStorageLocation(localMountedDir, remoteMountedLocation, dirToSync) + + // Step 1: Collect all remote files + remoteFiles := make(map[string]*filer_pb.RemoteEntry) + err = remoteStorage.Traverse(remote, func(remoteDir, name string, isDirectory bool, remoteEntry *filer_pb.RemoteEntry) error { + localDir := filer.MapRemoteStorageLocationPathToFullPath(localMountedDir, remoteMountedLocation, remoteDir) + fullPath := string(localDir.Child(name)) + remoteFiles[fullPath] = remoteEntry + return nil + }) + if err != nil { + return fmt.Errorf("failed to traverse remote storage: %v", err) + } + + fmt.Fprintf(writer, "Found %d files/directories in remote storage\n", len(remoteFiles)) + + // Step 2: Collect all local files (only if we need to delete local extra files) + localFiles := make(map[string]*filer_pb.Entry) + if deleteLocalExtra { + err = recursivelyTraverseDirectory(commandEnv, dirToSync, func(dir util.FullPath, entry *filer_pb.Entry) bool { + if entry.RemoteEntry != nil { // only consider files that are part of remote mount + fullPath := string(dir.Child(entry.Name)) + localFiles[fullPath] = entry + } + return true + }) + if err != nil { + return fmt.Errorf("failed to traverse local directory: %v", err) } + fmt.Fprintf(writer, "Found %d files/directories in local storage\n", len(localFiles)) + } else { + fmt.Fprintf(writer, "Skipping local file collection (deleteLocalExtra=false)\n") } - return nil -} + // Step 3: Determine actions needed + var filesToDelete []string + var filesToUpdate []string + var filesToCache []string -func (c *commandRemoteCache) doCacheOneDirectory(commandEnv *CommandEnv, writer io.Writer, dir string, fileFiler *FileFilter, concurrency int) error { - mappings, localMountedDir, remoteStorageMountedLocation, remoteStorageConf, detectErr := detectMountInfo(commandEnv, writer, dir) - if detectErr != nil { - jsonPrintln(writer, mappings) - return detectErr + // Find files to delete (exist locally but not remotely) - only if deleteLocalExtra is enabled + if deleteLocalExtra { + for localPath := range localFiles { + if _, exists := remoteFiles[localPath]; !exists { + filesToDelete = append(filesToDelete, localPath) + } + } + } + + // Find files to update/cache (exist remotely) + for remotePath, remoteEntry := range remoteFiles { + if deleteLocalExtra { + // When deleteLocalExtra is enabled, we have localFiles to compare with + if localEntry, exists := localFiles[remotePath]; exists { + // File exists locally, check if it needs updating + if localEntry.RemoteEntry == nil || + localEntry.RemoteEntry.RemoteETag != remoteEntry.RemoteETag || + localEntry.RemoteEntry.RemoteMtime < remoteEntry.RemoteMtime { + filesToUpdate = append(filesToUpdate, remotePath) + } + // Check if it needs caching + if shouldCache && shouldCacheToLocal(localEntry) && fileFilter.matches(localEntry) { + filesToCache = append(filesToCache, remotePath) + } + } else { + // File doesn't exist locally, needs to be created + filesToUpdate = append(filesToUpdate, remotePath) + } + } else { + // When deleteLocalExtra is disabled, we check each file individually + // All remote files are candidates for update/creation + filesToUpdate = append(filesToUpdate, remotePath) + + // For caching, we need to check if the local file exists and needs caching + if shouldCache { + // We need to look up the local file to check if it needs caching + localDir, name := util.FullPath(remotePath).DirAndName() + err := commandEnv.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { + lookupResp, lookupErr := client.LookupDirectoryEntry(context.Background(), &filer_pb.LookupDirectoryEntryRequest{ + Directory: localDir, + Name: name, + }) + if lookupErr == nil { + localEntry := lookupResp.Entry + if shouldCacheToLocal(localEntry) && fileFilter.matches(localEntry) { + filesToCache = append(filesToCache, remotePath) + } + } + return nil // Don't propagate lookup errors here + }) + if err != nil { + // Log error but continue + fmt.Fprintf(writer, "Warning: failed to lookup local file %s for caching check: %v\n", remotePath, err) + } + } + } } - // pull content from remote - if err := c.cacheContentData(commandEnv, writer, util.FullPath(localMountedDir), remoteStorageMountedLocation, util.FullPath(dir), fileFiler, remoteStorageConf, concurrency); err != nil { - return fmt.Errorf("cache content data on %s: %v", localMountedDir, err) + fmt.Fprintf(writer, "Actions needed: %d files to delete, %d files to update, %d files to cache\n", + len(filesToDelete), len(filesToUpdate), len(filesToCache)) + + if dryRun { + fmt.Fprintf(writer, "DRY RUN - showing what would be done:\n") + for _, path := range filesToDelete { + fmt.Fprintf(writer, "DELETE: %s\n", path) + } + for _, path := range filesToUpdate { + fmt.Fprintf(writer, "UPDATE: %s\n", path) + } + for _, path := range filesToCache { + fmt.Fprintf(writer, "CACHE: %s\n", path) + } + return nil } - return nil + // Step 4: Execute actions + return commandEnv.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { + ctx := context.Background() + + // Delete files that no longer exist on remote (only if deleteLocalExtra is enabled) + if deleteLocalExtra { + for _, pathToDelete := range filesToDelete { + fmt.Fprintf(writer, "Deleting %s... ", pathToDelete) + + dir, name := util.FullPath(pathToDelete).DirAndName() + _, err := client.DeleteEntry(ctx, &filer_pb.DeleteEntryRequest{ + Directory: dir, + Name: name, + IgnoreRecursiveError: false, + IsDeleteData: true, + IsRecursive: false, + IsFromOtherCluster: false, + }) + if err != nil { + fmt.Fprintf(writer, "failed: %v\n", err) + return err + } + fmt.Fprintf(writer, "done\n") + } + } + + // Update metadata for files that exist on remote + for _, pathToUpdate := range filesToUpdate { + remoteEntry := remoteFiles[pathToUpdate] + localDir, name := util.FullPath(pathToUpdate).DirAndName() + + fmt.Fprintf(writer, "Updating metadata for %s... ", pathToUpdate) + + // Check if file exists locally + lookupResp, lookupErr := client.LookupDirectoryEntry(ctx, &filer_pb.LookupDirectoryEntryRequest{ + Directory: string(localDir), + Name: name, + }) + + if lookupErr != nil && lookupErr != filer_pb.ErrNotFound { + fmt.Fprintf(writer, "failed to lookup: %v\n", lookupErr) + continue + } + + isDirectory := remoteEntry.RemoteSize == 0 && remoteEntry.RemoteMtime == 0 + if lookupErr == filer_pb.ErrNotFound { + // Create new entry + _, createErr := client.CreateEntry(ctx, &filer_pb.CreateEntryRequest{ + Directory: string(localDir), + Entry: &filer_pb.Entry{ + Name: name, + IsDirectory: isDirectory, + Attributes: &filer_pb.FuseAttributes{ + FileSize: uint64(remoteEntry.RemoteSize), + Mtime: remoteEntry.RemoteMtime, + FileMode: uint32(0644), + }, + RemoteEntry: remoteEntry, + }, + }) + if createErr != nil { + fmt.Fprintf(writer, "failed to create: %v\n", createErr) + continue + } + } else { + // Update existing entry + existingEntry := lookupResp.Entry + if existingEntry.RemoteEntry == nil { + // This is a local file, skip to avoid overwriting + fmt.Fprintf(writer, "skipped (local file)\n") + continue + } + + existingEntry.RemoteEntry = remoteEntry + existingEntry.Attributes.FileSize = uint64(remoteEntry.RemoteSize) + existingEntry.Attributes.Mtime = remoteEntry.RemoteMtime + existingEntry.Attributes.Md5 = nil + existingEntry.Chunks = nil + existingEntry.Content = nil + + _, updateErr := client.UpdateEntry(ctx, &filer_pb.UpdateEntryRequest{ + Directory: string(localDir), + Entry: existingEntry, + }) + if updateErr != nil { + fmt.Fprintf(writer, "failed to update: %v\n", updateErr) + continue + } + } + fmt.Fprintf(writer, "done\n") + } + + // Cache file content if requested + if shouldCache && len(filesToCache) > 0 { + fmt.Fprintf(writer, "Caching file content...\n") + + var wg sync.WaitGroup + limitedConcurrentExecutor := util.NewLimitedConcurrentExecutor(concurrency) + var executionErr error + + for _, pathToCache := range filesToCache { + wg.Add(1) + pathToCacheCopy := pathToCache // Capture for closure + limitedConcurrentExecutor.Execute(func() { + defer wg.Done() + + // Get local entry (either from localFiles map or by lookup) + var localEntry *filer_pb.Entry + if deleteLocalExtra { + localEntry = localFiles[pathToCacheCopy] + if localEntry == nil { + fmt.Fprintf(writer, "Warning: skipping cache for %s (local entry not found)\n", pathToCacheCopy) + return + } + } else { + // Look up the local entry since we don't have it in localFiles + localDir, name := util.FullPath(pathToCacheCopy).DirAndName() + lookupErr := commandEnv.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { + lookupResp, err := client.LookupDirectoryEntry(context.Background(), &filer_pb.LookupDirectoryEntryRequest{ + Directory: localDir, + Name: name, + }) + if err == nil { + localEntry = lookupResp.Entry + } + return err + }) + if lookupErr != nil { + fmt.Fprintf(writer, "Warning: failed to lookup local entry for caching %s: %v\n", pathToCacheCopy, lookupErr) + return + } + } + + dir, _ := util.FullPath(pathToCacheCopy).DirAndName() + remoteLocation := filer.MapFullPathToRemoteStorageLocation(localMountedDir, remoteMountedLocation, util.FullPath(pathToCacheCopy)) + + fmt.Fprintf(writer, "Caching %s... ", pathToCacheCopy) + + if err := filer.CacheRemoteObjectToLocalCluster(commandEnv, remoteConf, remoteLocation, util.FullPath(dir), localEntry); err != nil { + fmt.Fprintf(writer, "failed: %v\n", err) + if executionErr == nil { + executionErr = err + } + return + } + fmt.Fprintf(writer, "done\n") + }) + } + + wg.Wait() + if executionErr != nil { + return executionErr + } + } + + return nil + }) } func recursivelyTraverseDirectory(filerClient filer_pb.FilerClient, dirPath util.FullPath, visitEntry func(dir util.FullPath, entry *filer_pb.Entry) bool) (err error) { @@ -145,48 +412,3 @@ func mayHaveCachedToLocal(entry *filer_pb.Entry) bool { } return false } - -func (c *commandRemoteCache) cacheContentData(commandEnv *CommandEnv, writer io.Writer, localMountedDir util.FullPath, remoteMountedLocation *remote_pb.RemoteStorageLocation, dirToCache util.FullPath, fileFilter *FileFilter, remoteConf *remote_pb.RemoteConf, concurrency int) error { - - var wg sync.WaitGroup - limitedConcurrentExecutor := util.NewLimitedConcurrentExecutor(concurrency) - var executionErr error - - traverseErr := recursivelyTraverseDirectory(commandEnv, dirToCache, func(dir util.FullPath, entry *filer_pb.Entry) bool { - if !shouldCacheToLocal(entry) { - return true // true means recursive traversal should continue - } - - if !fileFilter.matches(entry) { - return true - } - - wg.Add(1) - limitedConcurrentExecutor.Execute(func() { - defer wg.Done() - fmt.Fprintf(writer, "Cache %+v ...\n", dir.Child(entry.Name)) - - remoteLocation := filer.MapFullPathToRemoteStorageLocation(localMountedDir, remoteMountedLocation, dir.Child(entry.Name)) - - if err := filer.CacheRemoteObjectToLocalCluster(commandEnv, remoteConf, remoteLocation, dir, entry); err != nil { - fmt.Fprintf(writer, "CacheRemoteObjectToLocalCluster %+v: %v\n", remoteLocation, err) - if executionErr == nil { - executionErr = fmt.Errorf("CacheRemoteObjectToLocalCluster %+v: %v\n", remoteLocation, err) - } - return - } - fmt.Fprintf(writer, "Cache %+v Done\n", dir.Child(entry.Name)) - }) - - return true - }) - wg.Wait() - - if traverseErr != nil { - return traverseErr - } - if executionErr != nil { - return executionErr - } - return nil -}