From c218ef20c760fafde03430cac2066586cd2cf4d0 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Mon, 6 Sep 2021 15:10:55 -0700 Subject: [PATCH] filer.remote.sync: automatically detect the primary remote storage --- weed/command/filer_remote_sync.go | 63 +++++++++++++---------- weed/command/filer_remote_sync_buckets.go | 26 ++++++++-- weed/pb/remote.proto | 1 + weed/pb/remote_pb/remote.pb.go | 52 ++++++++++++------- 4 files changed, 93 insertions(+), 49 deletions(-) diff --git a/weed/command/filer_remote_sync.go b/weed/command/filer_remote_sync.go index 18e8fcc3c..0324a8103 100644 --- a/weed/command/filer_remote_sync.go +++ b/weed/command/filer_remote_sync.go @@ -11,6 +11,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/security" "github.com/chrislusf/seaweedfs/weed/util" "google.golang.org/grpc" + "os" "time" ) @@ -47,7 +48,7 @@ var ( func init() { cmdFilerRemoteSynchronize.Run = runFilerRemoteSynchronize // break init cycle remoteSyncOptions.filerAddress = cmdFilerRemoteSynchronize.Flag.String("filer", "localhost:8888", "filer of the SeaweedFS cluster") - remoteSyncOptions.dir = cmdFilerRemoteSynchronize.Flag.String("dir", "/", "a mounted directory on filer") + remoteSyncOptions.dir = cmdFilerRemoteSynchronize.Flag.String("dir", "", "a mounted directory on filer") remoteSyncOptions.createBucketAt = cmdFilerRemoteSynchronize.Flag.String("createBucketAt", "", "one remote storage name to create new buckets in") remoteSyncOptions.createBucketRandomSuffix = cmdFilerRemoteSynchronize.Flag.Bool("createBucketWithRandomSuffix", false, "add randomized suffix to bucket name to avoid conflicts") remoteSyncOptions.readChunkFromFiler = cmdFilerRemoteSynchronize.Flag.Bool("filerProxy", false, "read file chunks from filer instead of volume servers") @@ -66,9 +67,15 @@ var cmdFilerRemoteSynchronize = &Command{ There are two modes: 1)Write back one mounted folder to remote storage + weed filer.remote.sync -dir=/mount/s3_on_cloud + 2)Watch /buckets folder and write back all changes. Any new buckets will be created in this remote storage. + + # if there is only one remote storage configured + weed filer.remote.sync + # if there are multiple remote storages configured weed filer.remote.sync -createBucketAt=cloud1 `, @@ -91,32 +98,18 @@ func runFilerRemoteSynchronize(cmd *Command, args []string) bool { *remoteSyncOptions.readChunkFromFiler, ) - storageName := *remoteSyncOptions.createBucketAt - if storageName != "" { - - remoteSyncOptions.bucketsDir = "/buckets" - // check buckets again - remoteSyncOptions.WithFilerClient(func(filerClient filer_pb.SeaweedFilerClient) error { - resp, err := filerClient.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{}) - if err != nil { - return err - } - remoteSyncOptions.bucketsDir = resp.DirBuckets - return nil - }) - - fmt.Printf("synchronize %s, default new bucket creation in %s ...\n", remoteSyncOptions.bucketsDir, storageName) - util.RetryForever("filer.remote.sync buckets "+storageName, func() error { - return remoteSyncOptions.followBucketUpdatesAndUploadToRemote(filerSource) - }, func(err error) bool { - if err != nil { - glog.Errorf("synchronize %s to %s: %v", remoteSyncOptions.bucketsDir, storageName, err) - } - return true - }) - } + remoteSyncOptions.bucketsDir = "/buckets" + // check buckets again + remoteSyncOptions.WithFilerClient(func(filerClient filer_pb.SeaweedFilerClient) error { + resp, err := filerClient.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{}) + if err != nil { + return err + } + remoteSyncOptions.bucketsDir = resp.DirBuckets + return nil + }) - if dir != "" { + if dir != "" && dir != remoteSyncOptions.bucketsDir { fmt.Printf("synchronize %s to remote storage...\n", dir) util.RetryForever("filer.remote.sync "+dir, func() error { return followUpdatesAndUploadToRemote(&remoteSyncOptions, filerSource, dir) @@ -126,7 +119,25 @@ func runFilerRemoteSynchronize(cmd *Command, args []string) bool { } return true }) + return true } + // read filer remote storage mount mappings + if detectErr := remoteSyncOptions.collectRemoteStorageConf(); detectErr != nil { + fmt.Fprintf(os.Stderr, "read mount info: %v\n", detectErr) + return true + } + + // synchronize /buckets folder + fmt.Printf("synchronize buckets in %s ...\n", remoteSyncOptions.bucketsDir) + util.RetryForever("filer.remote.sync buckets", func() error { + return remoteSyncOptions.followBucketUpdatesAndUploadToRemote(filerSource) + }, func(err error) bool { + if err != nil { + glog.Errorf("synchronize %s: %v", remoteSyncOptions.bucketsDir, err) + } + return true + }) return true + } diff --git a/weed/command/filer_remote_sync_buckets.go b/weed/command/filer_remote_sync_buckets.go index 92383614b..70f9f49c1 100644 --- a/weed/command/filer_remote_sync_buckets.go +++ b/weed/command/filer_remote_sync_buckets.go @@ -51,6 +51,19 @@ func (option *RemoteSyncOptions) makeBucketedEventProcessor(filerSource *source. // this directory is imported from "remote.mount.buckets" or "remote.mount" return nil } + if option.mappings.PrimaryBucketStorageName != "" && *option.createBucketAt == "" { + *option.createBucketAt = option.mappings.PrimaryBucketStorageName + glog.V(0).Infof("%s is set as the primary remote storage", *option.createBucketAt) + } + if len(option.mappings.Mappings) == 1 && *option.createBucketAt == "" { + for k := range option.mappings.Mappings { + *option.createBucketAt = k + glog.V(0).Infof("%s is set as the only remote storage", *option.createBucketAt) + } + } + if *option.createBucketAt == "" { + return nil + } remoteConf, found := option.remoteConfs[*option.createBucketAt] if !found { return fmt.Errorf("un-configured remote storage %s", *option.createBucketAt) @@ -72,7 +85,7 @@ func (option *RemoteSyncOptions) makeBucketedEventProcessor(filerSource *source. glog.V(0).Infof("create bucket %s", bucketName) if err := client.CreateBucket(bucketName); err != nil { - return err + return fmt.Errorf("create bucket %s in %s: %v", bucketName, remoteConf.Name, err) } bucketPath := util.FullPath(option.bucketsDir).Child(entry.Name) @@ -95,12 +108,12 @@ func (option *RemoteSyncOptions) makeBucketedEventProcessor(filerSource *source. client, remoteStorageMountLocation, err := option.findRemoteStorageClient(entry.Name) if err != nil { - return err + return fmt.Errorf("findRemoteStorageClient %s: %v", entry.Name, err) } glog.V(0).Infof("delete remote bucket %s", remoteStorageMountLocation.Bucket) if err := client.DeleteBucket(remoteStorageMountLocation.Bucket); err != nil { - return err + return fmt.Errorf("delete remote bucket %s: %v", remoteStorageMountLocation.Bucket, err) } bucketPath := util.FullPath(option.bucketsDir).Child(entry.Name) @@ -351,6 +364,7 @@ func (option *RemoteSyncOptions) collectRemoteStorageConf() (err error) { } option.remoteConfs = make(map[string]*remote_pb.RemoteConf) + var lastConfName string err = filer_pb.List(option, filer.DirectoryEtcRemote, "", func(entry *filer_pb.Entry, isLast bool) error { if !strings.HasSuffix(entry.Name, filer.REMOTE_STORAGE_CONF_SUFFIX) { return nil @@ -360,8 +374,14 @@ func (option *RemoteSyncOptions) collectRemoteStorageConf() (err error) { return fmt.Errorf("unmarshal %s/%s: %v", filer.DirectoryEtcRemote, entry.Name, err) } option.remoteConfs[conf.Name] = conf + lastConfName = conf.Name return nil }, "", false, math.MaxUint32) + if option.mappings.PrimaryBucketStorageName == "" && len(option.remoteConfs) == 1 { + glog.V(0).Infof("%s is set to the default remote storage", lastConfName) + option.mappings.PrimaryBucketStorageName = lastConfName + } + return } diff --git a/weed/pb/remote.proto b/weed/pb/remote.proto index 784dc2f7b..65722c9ae 100644 --- a/weed/pb/remote.proto +++ b/weed/pb/remote.proto @@ -66,6 +66,7 @@ message RemoteConf { message RemoteStorageMapping { map mappings = 1; + string primary_bucket_storage_name = 2; } message RemoteStorageLocation { string name = 1; diff --git a/weed/pb/remote_pb/remote.pb.go b/weed/pb/remote_pb/remote.pb.go index 2cf9f72b8..fef9556fe 100644 --- a/weed/pb/remote_pb/remote.pb.go +++ b/weed/pb/remote_pb/remote.pb.go @@ -400,7 +400,8 @@ type RemoteStorageMapping struct { sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - Mappings map[string]*RemoteStorageLocation `protobuf:"bytes,1,rep,name=mappings,proto3" json:"mappings,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` + Mappings map[string]*RemoteStorageLocation `protobuf:"bytes,1,rep,name=mappings,proto3" json:"mappings,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` + PrimaryBucketStorageName string `protobuf:"bytes,2,opt,name=primary_bucket_storage_name,json=primaryBucketStorageName,proto3" json:"primary_bucket_storage_name,omitempty"` } func (x *RemoteStorageMapping) Reset() { @@ -442,6 +443,13 @@ func (x *RemoteStorageMapping) GetMappings() map[string]*RemoteStorageLocation { return nil } +func (x *RemoteStorageMapping) GetPrimaryBucketStorageName() string { + if x != nil { + return x.PrimaryBucketStorageName + } + return "" +} + type RemoteStorageLocation struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -619,31 +627,35 @@ var file_remote_proto_rawDesc = []byte{ 0x79, 0x18, 0x42, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0e, 0x73, 0x74, 0x6f, 0x72, 0x6a, 0x53, 0x65, 0x63, 0x72, 0x65, 0x74, 0x4b, 0x65, 0x79, 0x12, 0x25, 0x0a, 0x0e, 0x73, 0x74, 0x6f, 0x72, 0x6a, 0x5f, 0x65, 0x6e, 0x64, 0x70, 0x6f, 0x69, 0x6e, 0x74, 0x18, 0x43, 0x20, 0x01, 0x28, 0x09, 0x52, - 0x0d, 0x73, 0x74, 0x6f, 0x72, 0x6a, 0x45, 0x6e, 0x64, 0x70, 0x6f, 0x69, 0x6e, 0x74, 0x22, 0xc0, + 0x0d, 0x73, 0x74, 0x6f, 0x72, 0x6a, 0x45, 0x6e, 0x64, 0x70, 0x6f, 0x69, 0x6e, 0x74, 0x22, 0xff, 0x01, 0x0a, 0x14, 0x52, 0x65, 0x6d, 0x6f, 0x74, 0x65, 0x53, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x4d, 0x61, 0x70, 0x70, 0x69, 0x6e, 0x67, 0x12, 0x49, 0x0a, 0x08, 0x6d, 0x61, 0x70, 0x70, 0x69, 0x6e, 0x67, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x2d, 0x2e, 0x72, 0x65, 0x6d, 0x6f, 0x74, 0x65, 0x5f, 0x70, 0x62, 0x2e, 0x52, 0x65, 0x6d, 0x6f, 0x74, 0x65, 0x53, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x4d, 0x61, 0x70, 0x70, 0x69, 0x6e, 0x67, 0x2e, 0x4d, 0x61, 0x70, 0x70, 0x69, 0x6e, 0x67, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x08, 0x6d, 0x61, 0x70, 0x70, 0x69, 0x6e, - 0x67, 0x73, 0x1a, 0x5d, 0x0a, 0x0d, 0x4d, 0x61, 0x70, 0x70, 0x69, 0x6e, 0x67, 0x73, 0x45, 0x6e, - 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, - 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x36, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, - 0x20, 0x01, 0x28, 0x0b, 0x32, 0x20, 0x2e, 0x72, 0x65, 0x6d, 0x6f, 0x74, 0x65, 0x5f, 0x70, 0x62, - 0x2e, 0x52, 0x65, 0x6d, 0x6f, 0x74, 0x65, 0x53, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x4c, 0x6f, - 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, - 0x01, 0x22, 0x57, 0x0a, 0x15, 0x52, 0x65, 0x6d, 0x6f, 0x74, 0x65, 0x53, 0x74, 0x6f, 0x72, 0x61, - 0x67, 0x65, 0x4c, 0x6f, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, - 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x16, - 0x0a, 0x06, 0x62, 0x75, 0x63, 0x6b, 0x65, 0x74, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, - 0x62, 0x75, 0x63, 0x6b, 0x65, 0x74, 0x12, 0x12, 0x0a, 0x04, 0x70, 0x61, 0x74, 0x68, 0x18, 0x03, - 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x70, 0x61, 0x74, 0x68, 0x42, 0x50, 0x0a, 0x10, 0x73, 0x65, - 0x61, 0x77, 0x65, 0x65, 0x64, 0x66, 0x73, 0x2e, 0x63, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x42, 0x0a, - 0x46, 0x69, 0x6c, 0x65, 0x72, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x5a, 0x30, 0x67, 0x69, 0x74, 0x68, - 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x63, 0x68, 0x72, 0x69, 0x73, 0x6c, 0x75, 0x73, 0x66, - 0x2f, 0x73, 0x65, 0x61, 0x77, 0x65, 0x65, 0x64, 0x66, 0x73, 0x2f, 0x77, 0x65, 0x65, 0x64, 0x2f, - 0x70, 0x62, 0x2f, 0x72, 0x65, 0x6d, 0x6f, 0x74, 0x65, 0x5f, 0x70, 0x62, 0x62, 0x06, 0x70, 0x72, - 0x6f, 0x74, 0x6f, 0x33, + 0x67, 0x73, 0x12, 0x3d, 0x0a, 0x1b, 0x70, 0x72, 0x69, 0x6d, 0x61, 0x72, 0x79, 0x5f, 0x62, 0x75, + 0x63, 0x6b, 0x65, 0x74, 0x5f, 0x73, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x5f, 0x6e, 0x61, 0x6d, + 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x18, 0x70, 0x72, 0x69, 0x6d, 0x61, 0x72, 0x79, + 0x42, 0x75, 0x63, 0x6b, 0x65, 0x74, 0x53, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x4e, 0x61, 0x6d, + 0x65, 0x1a, 0x5d, 0x0a, 0x0d, 0x4d, 0x61, 0x70, 0x70, 0x69, 0x6e, 0x67, 0x73, 0x45, 0x6e, 0x74, + 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, + 0x03, 0x6b, 0x65, 0x79, 0x12, 0x36, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, + 0x01, 0x28, 0x0b, 0x32, 0x20, 0x2e, 0x72, 0x65, 0x6d, 0x6f, 0x74, 0x65, 0x5f, 0x70, 0x62, 0x2e, + 0x52, 0x65, 0x6d, 0x6f, 0x74, 0x65, 0x53, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x4c, 0x6f, 0x63, + 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, 0x01, + 0x22, 0x57, 0x0a, 0x15, 0x52, 0x65, 0x6d, 0x6f, 0x74, 0x65, 0x53, 0x74, 0x6f, 0x72, 0x61, 0x67, + 0x65, 0x4c, 0x6f, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, + 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x16, 0x0a, + 0x06, 0x62, 0x75, 0x63, 0x6b, 0x65, 0x74, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x62, + 0x75, 0x63, 0x6b, 0x65, 0x74, 0x12, 0x12, 0x0a, 0x04, 0x70, 0x61, 0x74, 0x68, 0x18, 0x03, 0x20, + 0x01, 0x28, 0x09, 0x52, 0x04, 0x70, 0x61, 0x74, 0x68, 0x42, 0x50, 0x0a, 0x10, 0x73, 0x65, 0x61, + 0x77, 0x65, 0x65, 0x64, 0x66, 0x73, 0x2e, 0x63, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x42, 0x0a, 0x46, + 0x69, 0x6c, 0x65, 0x72, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x5a, 0x30, 0x67, 0x69, 0x74, 0x68, 0x75, + 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x63, 0x68, 0x72, 0x69, 0x73, 0x6c, 0x75, 0x73, 0x66, 0x2f, + 0x73, 0x65, 0x61, 0x77, 0x65, 0x65, 0x64, 0x66, 0x73, 0x2f, 0x77, 0x65, 0x65, 0x64, 0x2f, 0x70, + 0x62, 0x2f, 0x72, 0x65, 0x6d, 0x6f, 0x74, 0x65, 0x5f, 0x70, 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, + 0x74, 0x6f, 0x33, } var (