From 49b5e47bd13032a720e0236e65f92eb96933838b Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sun, 15 Aug 2021 12:38:26 -0700 Subject: [PATCH] retry forever with filer.remote.sync, and some refactoring --- weed/command/filer_remote_sync.go | 42 ++++++++++---------------- weed/filer/filer_remote_storage.go | 30 ++++++++++++++++++ weed/shell/command_remote_meta_sync.go | 28 +---------------- 3 files changed, 47 insertions(+), 53 deletions(-) diff --git a/weed/command/filer_remote_sync.go b/weed/command/filer_remote_sync.go index 8b20957e4..41b9c67e8 100644 --- a/weed/command/filer_remote_sync.go +++ b/weed/command/filer_remote_sync.go @@ -70,43 +70,33 @@ func runFilerRemoteSynchronize(cmd *Command, args []string) bool { grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client") remoteSyncOptions.grpcDialOption = grpcDialOption + dir := *remoteSyncOptions.dir + filerAddress := *remoteSyncOptions.filerAddress + // read filer remote storage mount mappings - mappings, readErr := filer.ReadMountMappings(grpcDialOption, *remoteSyncOptions.filerAddress) - if readErr != nil { - fmt.Printf("read mount mapping: %v", readErr) + _, _, remoteStorageMountLocation, storageConf, detectErr := filer.DetectMountInfo(grpcDialOption, filerAddress, dir) + if detectErr != nil { + fmt.Printf("read mount info: %v", detectErr) return false } filerSource := &source.FilerSource{} filerSource.DoInitialize( - *remoteSyncOptions.filerAddress, - pb.ServerToGrpcAddress(*remoteSyncOptions.filerAddress), + filerAddress, + pb.ServerToGrpcAddress(filerAddress), "/", // does not matter *remoteSyncOptions.readChunkFromFiler, ) - var found bool - for dir, remoteStorageMountLocation := range mappings.Mappings { - if *remoteSyncOptions.dir == dir { - found = true - storageConf, readErr := filer.ReadRemoteStorageConf(grpcDialOption, *remoteSyncOptions.filerAddress, remoteStorageMountLocation.Name) - if readErr != nil { - fmt.Printf("read remote storage configuration for %s: %v", dir, readErr) - continue - } - fmt.Printf("synchronize %s to remote storage...\n", *remoteSyncOptions.dir) - if err := util.Retry("filer.remote.sync "+dir, func() error { - return followUpdatesAndUploadToRemote(&remoteSyncOptions, filerSource, dir, storageConf, remoteStorageMountLocation) - }); err != nil { - fmt.Printf("synchronize %s: %v\n", *remoteSyncOptions.dir, err) - } - break + fmt.Printf("synchronize %s to remote storage...\n", dir) + util.RetryForever("filer.remote.sync "+dir, func() error { + return followUpdatesAndUploadToRemote(&remoteSyncOptions, filerSource, dir, storageConf, remoteStorageMountLocation) + }, func(err error) bool { + if err != nil { + fmt.Printf("synchronize %s: %v\n", dir, err) } - } - if !found { - fmt.Printf("directory %s is not mounted to any remote storage\n", *remoteSyncOptions.dir) - return false - } + return true + }) return true } diff --git a/weed/filer/filer_remote_storage.go b/weed/filer/filer_remote_storage.go index 99ea1d3bb..dd8d2303c 100644 --- a/weed/filer/filer_remote_storage.go +++ b/weed/filer/filer_remote_storage.go @@ -197,3 +197,33 @@ func ReadRemoteStorageConf(grpcDialOption grpc.DialOption, filerAddress string, return } + +func DetectMountInfo(grpcDialOption grpc.DialOption, filerAddress string, dir string) (*filer_pb.RemoteStorageMapping, string, *filer_pb.RemoteStorageLocation, *filer_pb.RemoteConf, error) { + + mappings, listErr := ReadMountMappings(grpcDialOption, filerAddress) + if listErr != nil { + return nil, "", nil, nil, listErr + } + if dir == "" { + return mappings, "", nil, nil, fmt.Errorf("need to specify '-dir' option") + } + + var localMountedDir string + var remoteStorageMountedLocation *filer_pb.RemoteStorageLocation + for k, loc := range mappings.Mappings { + if strings.HasPrefix(dir, k) { + localMountedDir, remoteStorageMountedLocation = k, loc + } + } + if localMountedDir == "" { + return mappings, localMountedDir, remoteStorageMountedLocation, nil, fmt.Errorf("%s is not mounted", dir) + } + + // find remote storage configuration + remoteStorageConf, err := ReadRemoteStorageConf(grpcDialOption, filerAddress, remoteStorageMountedLocation.Name) + if err != nil { + return mappings, localMountedDir, remoteStorageMountedLocation, remoteStorageConf, err + } + + return mappings, localMountedDir, remoteStorageMountedLocation, remoteStorageConf, nil +} diff --git a/weed/shell/command_remote_meta_sync.go b/weed/shell/command_remote_meta_sync.go index d2b88ec16..08d08a46d 100644 --- a/weed/shell/command_remote_meta_sync.go +++ b/weed/shell/command_remote_meta_sync.go @@ -9,7 +9,6 @@ import ( "github.com/chrislusf/seaweedfs/weed/remote_storage" "github.com/chrislusf/seaweedfs/weed/util" "io" - "strings" ) func init() { @@ -69,32 +68,7 @@ func (c *commandRemoteMetaSync) Do(args []string, commandEnv *CommandEnv, writer } func detectMountInfo(commandEnv *CommandEnv, writer io.Writer, dir string) (*filer_pb.RemoteStorageMapping, string, *filer_pb.RemoteStorageLocation, *filer_pb.RemoteConf, error) { - mappings, listErr := filer.ReadMountMappings(commandEnv.option.GrpcDialOption, commandEnv.option.FilerAddress) - if listErr != nil { - return nil, "", nil, nil, listErr - } - if dir == "" { - return mappings, "", nil, nil, fmt.Errorf("need to specify '-dir' option") - } - - var localMountedDir string - var remoteStorageMountedLocation *filer_pb.RemoteStorageLocation - for k, loc := range mappings.Mappings { - if strings.HasPrefix(dir, k) { - localMountedDir, remoteStorageMountedLocation = k, loc - } - } - if localMountedDir == "" { - return mappings, localMountedDir, remoteStorageMountedLocation, nil, fmt.Errorf("%s is not mounted", dir) - } - - // find remote storage configuration - remoteStorageConf, err := filer.ReadRemoteStorageConf(commandEnv.option.GrpcDialOption, commandEnv.option.FilerAddress, remoteStorageMountedLocation.Name) - if err != nil { - return mappings, localMountedDir, remoteStorageMountedLocation, remoteStorageConf, err - } - - return mappings, localMountedDir, remoteStorageMountedLocation, remoteStorageConf, nil + return filer.DetectMountInfo(commandEnv.option.GrpcDialOption, commandEnv.option.FilerAddress, dir) } /*