diff --git a/weed/filer/read_remote.go b/weed/filer/read_remote.go index 77ca81f15..eee536ff6 100644 --- a/weed/filer/read_remote.go +++ b/weed/filer/read_remote.go @@ -30,10 +30,14 @@ func MapFullPathToRemoteStorageLocation(localMountedDir util.FullPath, remoteMou Bucket: remoteMountedLocation.Bucket, Path: remoteMountedLocation.Path, } - remoteLocation.Path += string(fp)[len(localMountedDir):] + remoteLocation.Path = string(util.FullPath(remoteLocation.Path).Child(string(fp)[len(localMountedDir):])) return remoteLocation } +func MapRemoteStorageLocationPathToFullPath(localMountedDir util.FullPath, remoteMountedLocation *filer_pb.RemoteStorageLocation, remoteLocationPath string)(fp util.FullPath) { + return localMountedDir.Child(remoteLocationPath[len(remoteMountedLocation.Path):]) +} + func DownloadToLocal(filerClient filer_pb.FilerClient, remoteConf *filer_pb.RemoteConf, remoteLocation *filer_pb.RemoteStorageLocation, parent util.FullPath, entry *filer_pb.Entry) error { return filerClient.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { _, err := client.DownloadToLocal(context.Background(), &filer_pb.DownloadToLocalRequest{ diff --git a/weed/remote_storage/s3/s3_storage_client.go b/weed/remote_storage/s3/s3_storage_client.go index 17c5079de..7c4d895ec 100644 --- a/weed/remote_storage/s3/s3_storage_client.go +++ b/weed/remote_storage/s3/s3_storage_client.go @@ -70,11 +70,7 @@ func (s *s3RemoteStorageClient) Traverse(remote *filer_pb.RemoteStorageLocation, listErr := s.conn.ListObjectsV2Pages(listInput, func(page *s3.ListObjectsV2Output, lastPage bool) bool { for _, content := range page.Contents { key := *content.Key - if len(pathKey) == 0 { - key = "/" + key - } else { - key = key[len(pathKey):] - } + key = "/" + key dir, name := util.FullPath(key).DirAndName() if err := visitFn(dir, name, false, &filer_pb.RemoteEntry{ RemoteMtime: (*content.LastModified).Unix(), diff --git a/weed/shell/command_remote_cache.go b/weed/shell/command_remote_cache.go index 4e19b6a99..21c479258 100644 --- a/weed/shell/command_remote_cache.go +++ b/weed/shell/command_remote_cache.go @@ -7,7 +7,6 @@ import ( "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/util" "io" - "strings" ) func init() { @@ -53,33 +52,9 @@ func (c *commandRemoteCache) Do(args []string, commandEnv *CommandEnv, writer io return nil } - mappings, listErr := filer.ReadMountMappings(commandEnv.option.GrpcDialOption, commandEnv.option.FilerAddress) - if listErr != nil { - return listErr - } - if *dir == "" { - jsonPrintln(writer, mappings) - fmt.Fprintln(writer, "need to specify '-dir' option") - return nil - } - - var localMountedDir string - var remoteStorageMountedLocation *filer_pb.RemoteStorageLocation - for k, loc := range mappings.Mappings { - if strings.HasPrefix(*dir, k) { - localMountedDir, remoteStorageMountedLocation = k, loc - } - } - if localMountedDir == "" { - jsonPrintln(writer, mappings) - fmt.Fprintf(writer, "%s is not mounted\n", *dir) - return nil - } - - // find remote storage configuration - remoteStorageConf, err := filer.ReadRemoteStorageConf(commandEnv.option.GrpcDialOption, commandEnv.option.FilerAddress, remoteStorageMountedLocation.Name) - if err != nil { - return err + localMountedDir, remoteStorageMountedLocation, remoteStorageConf, detectErr := detectMountInfo(commandEnv, writer, *dir) + if detectErr != nil{ + return detectErr } // pull content from remote diff --git a/weed/shell/command_remote_meta_sync.go b/weed/shell/command_remote_meta_sync.go new file mode 100644 index 000000000..7e111143c --- /dev/null +++ b/weed/shell/command_remote_meta_sync.go @@ -0,0 +1,208 @@ +package shell + +import ( + "context" + "flag" + "fmt" + "github.com/chrislusf/seaweedfs/weed/filer" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/remote_storage" + "github.com/chrislusf/seaweedfs/weed/util" + "io" + "strings" +) + +func init() { + Commands = append(Commands, &commandRemoteMetaSync{}) +} + +type commandRemoteMetaSync struct { +} + +func (c *commandRemoteMetaSync) Name() string { + return "remote.meta.sync" +} + +func (c *commandRemoteMetaSync) Help() string { + return `synchronize the local file meta data with the remote file metadata + + # assume a remote storage is configured to name "cloud1" + remote.configure -name=cloud1 -type=s3 -access_key=xxx -secret_key=yyy + # mount and pull one bucket + remote.mount -dir=/xxx -remote=cloud1/bucket + + After mount, if the remote file can be changed, + run this command to synchronize the metadata of the mounted folder or any sub folder + + remote.meta.sync -dir=/xxx + remote.meta.sync -dir=/xxx/some/subdir + + This is designed to run regularly. So you can add it to some cronjob. + + If there are no other operations changing remote files, this operation is not needed. + +` +} + +func (c *commandRemoteMetaSync) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { + + remoteMetaSyncCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) + + dir := remoteMetaSyncCommand.String("dir", "", "a directory in filer") + + if err = remoteMetaSyncCommand.Parse(args); err != nil { + return nil + } + + localMountedDir, remoteStorageMountedLocation, remoteStorageConf, detectErr := detectMountInfo(commandEnv, writer, *dir) + if detectErr != nil{ + return detectErr + } + + // pull metadata from remote + if err = pullMetadata(commandEnv, writer, util.FullPath(localMountedDir), remoteStorageMountedLocation, util.FullPath(*dir), remoteStorageConf); err != nil { + return fmt.Errorf("cache content data: %v", err) + } + + return nil +} + +func detectMountInfo(commandEnv *CommandEnv, writer io.Writer, dir string) (string, *filer_pb.RemoteStorageLocation, *filer_pb.RemoteConf, error) { + mappings, listErr := filer.ReadMountMappings(commandEnv.option.GrpcDialOption, commandEnv.option.FilerAddress) + if listErr != nil { + return "", nil, nil, listErr + } + if dir == "" { + jsonPrintln(writer, mappings) + return "", nil, nil, fmt.Errorf("need to specify '-dir' option") + } + + var localMountedDir string + var remoteStorageMountedLocation *filer_pb.RemoteStorageLocation + for k, loc := range mappings.Mappings { + if strings.HasPrefix(dir, k) { + localMountedDir, remoteStorageMountedLocation = k, loc + } + } + if localMountedDir == "" { + jsonPrintln(writer, mappings) + return "", nil, nil, fmt.Errorf("%s is not mounted", dir) + } + + // find remote storage configuration + remoteStorageConf, err := filer.ReadRemoteStorageConf(commandEnv.option.GrpcDialOption, commandEnv.option.FilerAddress, remoteStorageMountedLocation.Name) + if err != nil { + return "", nil, nil, err + } + + return localMountedDir, remoteStorageMountedLocation, remoteStorageConf, nil +} + +/* + This function update entry.RemoteEntry if the remote has any changes. + + To pull remote updates, or created for the first time, the criteria is: + entry == nil or (entry.RemoteEntry != nil and entry.RemoteEntry.RemoteTag != remote.RemoteTag) + After the meta pull, the entry.RemoteEntry will have: + remoteEntry.LastLocalSyncTsNs == 0 + Attributes.FileSize = uint64(remoteEntry.RemoteSize) + Attributes.Mtime = remoteEntry.RemoteMtime + remoteEntry.RemoteTag = actual remote tag + chunks = nil + + When reading the file content or pulling the file content in "remote.cache", the criteria is: + Attributes.FileSize > 0 and len(chunks) == 0 + After caching the file content, the entry.RemoteEntry will be + remoteEntry.LastLocalSyncTsNs == time.Now.UnixNano() + Attributes.FileSize = uint64(remoteEntry.RemoteSize) + Attributes.Mtime = remoteEntry.RemoteMtime + chunks = non-emtpy + + When "weed filer.remote.sync" to upload local changes to remote, the criteria is: + Attributes.Mtime > remoteEntry.RemoteMtime + Right after "weed filer.remote.sync", the entry.RemoteEntry will be + remoteEntry.LastLocalSyncTsNs = time.Now.UnixNano() + remoteEntry.RemoteSize = actual remote size, which should equal to entry.Attributes.FileSize + remoteEntry.RemoteMtime = actual remote mtime, which should be a little greater than entry.Attributes.Mtime + remoteEntry.RemoteTag = actual remote tag + + + If entry does not exists, need to pull meta + If entry.RemoteEntry == nil, this is a new local change and should not be overwritten + If entry.RemoteEntry.RemoteTag != remoteEntry.RemoteTag { + the remote version is updated, need to pull meta + } + */ +func pullMetadata(commandEnv *CommandEnv, writer io.Writer, localMountedDir util.FullPath, remoteMountedLocation *filer_pb.RemoteStorageLocation, dirToCache util.FullPath, remoteConf *filer_pb.RemoteConf) error { + + // visit remote storage + remoteStorage, err := remote_storage.GetRemoteStorage(remoteConf) + if err != nil { + return err + } + + remote := filer.MapFullPathToRemoteStorageLocation(localMountedDir, remoteMountedLocation, dirToCache) + println("local :", localMountedDir) + println("remote:", remoteMountedLocation.Path) + println("local+:", dirToCache) + println("remote+:", remote.Path) + + err = commandEnv.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + ctx := context.Background() + err = remoteStorage.Traverse(remote, func(remoteDir, name string, isDirectory bool, remoteEntry *filer_pb.RemoteEntry) error { + localDir := filer.MapRemoteStorageLocationPathToFullPath(localMountedDir, remoteMountedLocation, remoteDir) + fmt.Fprint(writer, localDir.Child(name)) + + lookupResponse, lookupErr := filer_pb.LookupEntry(client, &filer_pb.LookupDirectoryEntryRequest{ + Directory: string(localDir), + Name: name, + }) + var existingEntry *filer_pb.Entry + if lookupErr != nil { + if lookupErr != filer_pb.ErrNotFound { + return lookupErr + } + } else { + existingEntry = lookupResponse.Entry + } + + if existingEntry == nil { + _, createErr := client.CreateEntry(ctx, &filer_pb.CreateEntryRequest{ + Directory: string(localDir), + Entry: &filer_pb.Entry{ + Name: name, + IsDirectory: isDirectory, + Attributes: &filer_pb.FuseAttributes{ + FileSize: uint64(remoteEntry.RemoteSize), + Mtime: remoteEntry.RemoteMtime, + FileMode: uint32(0644), + }, + RemoteEntry: remoteEntry, + }, + }) + fmt.Fprintln(writer, " (create)") + return createErr + } else { + if existingEntry.RemoteEntry == nil { + // this is a new local change and should not be overwritten + fmt.Fprintln(writer, " (skip)") + return nil + } + if existingEntry.RemoteEntry.RemoteETag != remoteEntry.RemoteETag { + // the remote version is updated, need to pull meta + fmt.Fprintln(writer, " (update)") + return doSaveRemoteEntry(client, string(localDir), existingEntry, remoteEntry) + } + } + fmt.Fprintln(writer, " (skip)") + return nil + }) + return err + }) + + if err != nil { + return err + } + + return nil +} diff --git a/weed/shell/command_remote_mount.go b/weed/shell/command_remote_mount.go index 2483fa5be..077c64e94 100644 --- a/weed/shell/command_remote_mount.go +++ b/weed/shell/command_remote_mount.go @@ -67,8 +67,8 @@ func (c *commandRemoteMount) Do(args []string, commandEnv *CommandEnv, writer io return fmt.Errorf("find configuration for %s: %v", *remote, err) } - // pull metadata from remote - if err = c.pullMetadata(commandEnv, writer, *dir, *nonEmpty, remoteConf, remoteStorageLocation); err != nil { + // sync metadata from remote + if err = c.syncMetadata(commandEnv, writer, *dir, *nonEmpty, remoteConf, remoteStorageLocation); err != nil { return fmt.Errorf("pull metadata: %v", err) } @@ -111,7 +111,7 @@ func (c *commandRemoteMount) findRemoteStorageConfiguration(commandEnv *CommandE } -func (c *commandRemoteMount) pullMetadata(commandEnv *CommandEnv, writer io.Writer, dir string, nonEmpty bool, remoteConf *filer_pb.RemoteConf, remote *filer_pb.RemoteStorageLocation) error { +func (c *commandRemoteMount) syncMetadata(commandEnv *CommandEnv, writer io.Writer, dir string, nonEmpty bool, remoteConf *filer_pb.RemoteConf, remote *filer_pb.RemoteStorageLocation) error { // find existing directory, and ensure the directory is empty err := commandEnv.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { @@ -146,58 +146,9 @@ func (c *commandRemoteMount) pullMetadata(commandEnv *CommandEnv, writer io.Writ return err } - // visit remote storage - remoteStorage, err := remote_storage.GetRemoteStorage(remoteConf) - if err != nil { - return err - } - - err = commandEnv.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { - ctx := context.Background() - err = remoteStorage.Traverse(remote, func(remoteDir, name string, isDirectory bool, remoteEntry *filer_pb.RemoteEntry) error { - localDir := dir + remoteDir - println(util.NewFullPath(localDir, name)) - - lookupResponse, lookupErr := filer_pb.LookupEntry(client, &filer_pb.LookupDirectoryEntryRequest{ - Directory: localDir, - Name: name, - }) - var existingEntry *filer_pb.Entry - if lookupErr != nil { - if lookupErr != filer_pb.ErrNotFound { - return lookupErr - } - } else { - existingEntry = lookupResponse.Entry - } - - if existingEntry == nil { - _, createErr := client.CreateEntry(ctx, &filer_pb.CreateEntryRequest{ - Directory: localDir, - Entry: &filer_pb.Entry{ - Name: name, - IsDirectory: isDirectory, - Attributes: &filer_pb.FuseAttributes{ - FileSize: uint64(remoteEntry.RemoteSize), - Mtime: remoteEntry.RemoteMtime, - FileMode: uint32(0644), - }, - RemoteEntry: remoteEntry, - }, - }) - return createErr - } else { - if existingEntry.RemoteEntry == nil || existingEntry.RemoteEntry.RemoteETag != remoteEntry.RemoteETag { - return doSaveRemoteEntry(client, localDir, existingEntry, remoteEntry) - } - } - return nil - }) - return err - }) - - if err != nil { - return err + // pull metadata from remote + if err = pullMetadata(commandEnv, writer, util.FullPath(dir), remote, util.FullPath(dir), remoteConf); err != nil { + return fmt.Errorf("cache content data: %v", err) } return nil diff --git a/weed/util/fullpath.go b/weed/util/fullpath.go index f2119707e..85028b052 100644 --- a/weed/util/fullpath.go +++ b/weed/util/fullpath.go @@ -31,10 +31,14 @@ func (fp FullPath) Name() string { func (fp FullPath) Child(name string) FullPath { dir := string(fp) + noPrefix := name + if strings.HasPrefix(name, "/") { + noPrefix = name[1:] + } if strings.HasSuffix(dir, "/") { - return FullPath(dir + name) + return FullPath(dir + noPrefix) } - return FullPath(dir + "/" + name) + return FullPath(dir + "/" + noPrefix) } func (fp FullPath) AsInode() uint64 {