From d335f04de6861b571190c13bd7d65e9a0c02f187 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Wed, 29 Jan 2020 09:09:55 -0800 Subject: [PATCH] support env variables to overwrite toml file --- go.mod | 2 +- .../repeated_vacuum/repeated_vacuum.go | 6 ++--- unmaintained/volume_tailer/volume_tailer.go | 2 +- weed/command/backup.go | 4 +--- weed/command/benchmark.go | 3 +-- weed/command/filer.go | 7 +++--- weed/command/filer_copy.go | 6 ++--- weed/command/filer_replication.go | 17 ++++++------- weed/command/master.go | 12 +++++----- weed/command/mount_std.go | 3 +-- weed/command/s3.go | 7 +++--- weed/command/scaffold.go | 8 +++++++ weed/command/shell.go | 3 +-- weed/command/upload.go | 6 ++--- weed/command/volume.go | 2 +- weed/command/webdav.go | 3 +-- weed/filer2/cassandra/cassandra_store.go | 6 ++--- weed/filer2/configuration.go | 3 +-- weed/filer2/etcd/etcd_store.go | 6 ++--- weed/filer2/filechunks_test.go | 20 ++++++++-------- weed/filer2/filerstore.go | 6 ++--- weed/filer2/leveldb/leveldb_store.go | 4 ++-- weed/filer2/leveldb2/leveldb2_store.go | 4 ++-- weed/filer2/mysql/mysql_store.go | 18 +++++++------- weed/filer2/postgres/postgres_store.go | 18 +++++++------- weed/filer2/redis/redis_cluster_store.go | 14 +++++------ weed/filer2/redis/redis_store.go | 8 +++---- weed/filer2/tikv/tikv_store.go | 4 ++-- weed/filesys/dirty_page_interval.go | 2 +- weed/notification/aws_sqs/aws_sqs_pub.go | 14 +++++------ weed/notification/configuration.go | 9 ++++--- .../gocdk_pub_sub/gocdk_pub_sub.go | 11 +++++---- .../google_pub_sub/google_pub_sub.go | 12 +++++----- weed/notification/kafka/kafka_queue.go | 10 ++++---- weed/notification/log/log_queue.go | 2 +- weed/replication/replicator.go | 4 ++-- weed/replication/sink/azuresink/azure_sink.go | 10 ++++---- weed/replication/sink/b2sink/b2_sink.go | 10 ++++---- weed/replication/sink/filersink/filer_sink.go | 19 ++++++++------- weed/replication/sink/gcssink/gcs_sink.go | 8 +++---- weed/replication/sink/replication_sink.go | 2 +- weed/replication/sink/s3sink/s3_sink.go | 18 +++++++------- weed/replication/source/filer_source.go | 15 ++++++------ weed/replication/sub/notification_aws_sqs.go | 14 +++++------ .../sub/notification_gocdk_pub_sub.go | 4 ++-- .../sub/notification_google_pub_sub.go | 12 +++++----- weed/replication/sub/notification_kafka.go | 14 +++++------ weed/replication/sub/notifications.go | 2 +- weed/security/tls.go | 4 ++-- weed/server/filer_server.go | 8 +++---- weed/server/master_server.go | 16 ++++++------- weed/server/volume_grpc_client_to_master.go | 9 +++---- weed/server/volume_server.go | 7 +++--- weed/server/webdav_server.go | 4 +--- weed/shell/command_fs_meta_notify.go | 6 ++--- weed/storage/backend/backend.go | 14 +++++------ weed/storage/backend/s3_backend/s3_backend.go | 14 +++++------ weed/storage/volume_vacuum.go | 2 +- weed/util/config.go | 13 +++++++++- weed/util/config_test.go | 24 +++++++++++++++++++ 60 files changed, 268 insertions(+), 247 deletions(-) create mode 100644 weed/util/config_test.go diff --git a/go.mod b/go.mod index 8fab3bb84..48879fd8c 100644 --- a/go.mod +++ b/go.mod @@ -69,7 +69,7 @@ require ( github.com/spf13/jwalterweatherman v1.1.0 // indirect github.com/spf13/viper v1.4.0 github.com/streadway/amqp v0.0.0-20190827072141-edfb9018d271 // indirect - github.com/stretchr/testify v1.4.0 // indirect + github.com/stretchr/testify v1.4.0 github.com/syndtr/goleveldb v1.0.0 github.com/tidwall/gjson v1.3.2 github.com/tidwall/match v1.0.1 diff --git a/unmaintained/repeated_vacuum/repeated_vacuum.go b/unmaintained/repeated_vacuum/repeated_vacuum.go index 28bcabb9b..718b6faa1 100644 --- a/unmaintained/repeated_vacuum/repeated_vacuum.go +++ b/unmaintained/repeated_vacuum/repeated_vacuum.go @@ -7,10 +7,8 @@ import ( "log" "math/rand" - "github.com/chrislusf/seaweedfs/weed/security" - "github.com/spf13/viper" - "github.com/chrislusf/seaweedfs/weed/operation" + "github.com/chrislusf/seaweedfs/weed/security" "github.com/chrislusf/seaweedfs/weed/util" ) @@ -23,7 +21,7 @@ func main() { flag.Parse() util.LoadConfiguration("security", false) - grpcDialOption := security.LoadClientTLS(viper.Sub("grpc"), "client") + grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client") for i := 0; i < *repeat; i++ { assignResult, err := operation.Assign(*master, grpcDialOption, &operation.VolumeAssignRequest{Count: 1}) diff --git a/unmaintained/volume_tailer/volume_tailer.go b/unmaintained/volume_tailer/volume_tailer.go index f0ef51c09..3c2d36d22 100644 --- a/unmaintained/volume_tailer/volume_tailer.go +++ b/unmaintained/volume_tailer/volume_tailer.go @@ -25,7 +25,7 @@ func main() { flag.Parse() util2.LoadConfiguration("security", false) - grpcDialOption := security.LoadClientTLS(viper.Sub("grpc"), "client") + grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client") vid := needle.VolumeId(*volumeId) diff --git a/weed/command/backup.go b/weed/command/backup.go index 0f6bed225..eb2b5ba4a 100644 --- a/weed/command/backup.go +++ b/weed/command/backup.go @@ -3,8 +3,6 @@ package command import ( "fmt" - "github.com/spf13/viper" - "github.com/chrislusf/seaweedfs/weed/security" "github.com/chrislusf/seaweedfs/weed/storage/needle" "github.com/chrislusf/seaweedfs/weed/storage/super_block" @@ -66,7 +64,7 @@ var cmdBackup = &Command{ func runBackup(cmd *Command, args []string) bool { util.LoadConfiguration("security", false) - grpcDialOption := security.LoadClientTLS(viper.Sub("grpc"), "client") + grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client") if *s.volumeId == -1 { return false diff --git a/weed/command/benchmark.go b/weed/command/benchmark.go index 26be1fe3a..382e7c850 100644 --- a/weed/command/benchmark.go +++ b/weed/command/benchmark.go @@ -15,7 +15,6 @@ import ( "sync" "time" - "github.com/spf13/viper" "google.golang.org/grpc" "github.com/chrislusf/seaweedfs/weed/glog" @@ -109,7 +108,7 @@ var ( func runBenchmark(cmd *Command, args []string) bool { util.LoadConfiguration("security", false) - b.grpcDialOption = security.LoadClientTLS(viper.Sub("grpc"), "client") + b.grpcDialOption = security.LoadClientTLS(util.GetViper(), "grpc.client") fmt.Printf("This is SeaweedFS version %s %s %s\n", util.VERSION, runtime.GOOS, runtime.GOARCH) if *b.maxCpu < 1 { diff --git a/weed/command/filer.go b/weed/command/filer.go index b1ceb46f5..ea8392fac 100644 --- a/weed/command/filer.go +++ b/weed/command/filer.go @@ -6,14 +6,13 @@ import ( "strings" "time" - "github.com/chrislusf/seaweedfs/weed/security" - "github.com/spf13/viper" + "google.golang.org/grpc/reflection" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/security" "github.com/chrislusf/seaweedfs/weed/server" "github.com/chrislusf/seaweedfs/weed/util" - "google.golang.org/grpc/reflection" ) var ( @@ -145,7 +144,7 @@ func (fo *FilerOptions) startFiler() { if err != nil { glog.Fatalf("failed to listen on grpc port %d: %v", grpcPort, err) } - grpcS := util.NewGrpcServer(security.LoadServerTLS(viper.Sub("grpc"), "filer")) + grpcS := util.NewGrpcServer(security.LoadServerTLS(util.GetViper(), "grpc.filer")) filer_pb.RegisterSeaweedFilerServer(grpcS, fs) reflection.Register(grpcS) go grpcS.Serve(grpcL) diff --git a/weed/command/filer_copy.go b/weed/command/filer_copy.go index e74ea7d93..e5979d786 100644 --- a/weed/command/filer_copy.go +++ b/weed/command/filer_copy.go @@ -14,13 +14,13 @@ import ( "sync" "time" + "google.golang.org/grpc" + "github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/security" "github.com/chrislusf/seaweedfs/weed/util" "github.com/chrislusf/seaweedfs/weed/wdclient" - "github.com/spf13/viper" - "google.golang.org/grpc" ) var ( @@ -105,7 +105,7 @@ func runCopy(cmd *Command, args []string) bool { filerGrpcPort := filerPort + 10000 filerGrpcAddress := fmt.Sprintf("%s:%d", filerUrl.Hostname(), filerGrpcPort) - copy.grpcDialOption = security.LoadClientTLS(viper.Sub("grpc"), "client") + copy.grpcDialOption = security.LoadClientTLS(util.GetViper(), "grpc.client") ctx := context.Background() diff --git a/weed/command/filer_replication.go b/weed/command/filer_replication.go index c6e7f5dba..737f0d24a 100644 --- a/weed/command/filer_replication.go +++ b/weed/command/filer_replication.go @@ -39,7 +39,7 @@ func runFilerReplicate(cmd *Command, args []string) bool { util.LoadConfiguration("security", false) util.LoadConfiguration("replication", true) util.LoadConfiguration("notification", true) - config := viper.GetViper() + config := util.GetViper() var notificationInput sub.NotificationInput @@ -47,8 +47,7 @@ func runFilerReplicate(cmd *Command, args []string) bool { for _, input := range sub.NotificationInputs { if config.GetBool("notification." + input.GetName() + ".enabled") { - viperSub := config.Sub("notification." + input.GetName()) - if err := input.Initialize(viperSub); err != nil { + if err := input.Initialize(config, "notification."+input.GetName()+"."); err != nil { glog.Fatalf("Failed to initialize notification input for %s: %+v", input.GetName(), err) } @@ -66,10 +65,9 @@ func runFilerReplicate(cmd *Command, args []string) bool { // avoid recursive replication if config.GetBool("notification.source.filer.enabled") && config.GetBool("notification.sink.filer.enabled") { - sourceConfig, sinkConfig := config.Sub("source.filer"), config.Sub("sink.filer") - if sourceConfig.GetString("grpcAddress") == sinkConfig.GetString("grpcAddress") { - fromDir := sourceConfig.GetString("directory") - toDir := sinkConfig.GetString("directory") + if config.GetString("source.filer.grpcAddress") == config.GetString("sink.filer.grpcAddress") { + fromDir := config.GetString("source.filer.directory") + toDir := config.GetString("sink.filer.directory") if strings.HasPrefix(toDir, fromDir) { glog.Fatalf("recursive replication! source directory %s includes the sink directory %s", fromDir, toDir) } @@ -79,8 +77,7 @@ func runFilerReplicate(cmd *Command, args []string) bool { var dataSink sink.ReplicationSink for _, sk := range sink.Sinks { if config.GetBool("sink." + sk.GetName() + ".enabled") { - viperSub := config.Sub("sink." + sk.GetName()) - if err := sk.Initialize(viperSub); err != nil { + if err := sk.Initialize(config, "sink."+sk.GetName()+"."); err != nil { glog.Fatalf("Failed to initialize sink for %s: %+v", sk.GetName(), err) } @@ -98,7 +95,7 @@ func runFilerReplicate(cmd *Command, args []string) bool { return true } - replicator := replication.NewReplicator(config.Sub("source.filer"), dataSink) + replicator := replication.NewReplicator(config, "source.filer.", dataSink) for { key, m, err := notificationInput.ReceiveMessage() diff --git a/weed/command/master.go b/weed/command/master.go index 8d0a3289c..c4b11119b 100644 --- a/weed/command/master.go +++ b/weed/command/master.go @@ -8,15 +8,15 @@ import ( "strings" "github.com/chrislusf/raft/protobuf" + "github.com/gorilla/mux" + "google.golang.org/grpc/reflection" + "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/master_pb" "github.com/chrislusf/seaweedfs/weed/security" "github.com/chrislusf/seaweedfs/weed/server" "github.com/chrislusf/seaweedfs/weed/storage/backend" "github.com/chrislusf/seaweedfs/weed/util" - "github.com/gorilla/mux" - "github.com/spf13/viper" - "google.golang.org/grpc/reflection" ) var ( @@ -102,7 +102,7 @@ func runMaster(cmd *Command, args []string) bool { func startMaster(masterOption MasterOptions, masterWhiteList []string) { - backend.LoadConfiguration(viper.GetViper()) + backend.LoadConfiguration(util.GetViper()) myMasterAddress, peers := checkPeers(*masterOption.ip, *masterOption.port, *masterOption.peers) @@ -115,7 +115,7 @@ func startMaster(masterOption MasterOptions, masterWhiteList []string) { glog.Fatalf("Master startup error: %v", e) } // start raftServer - raftServer := weed_server.NewRaftServer(security.LoadClientTLS(viper.Sub("grpc"), "master"), + raftServer := weed_server.NewRaftServer(security.LoadClientTLS(util.GetViper(), "grpc.master"), peers, myMasterAddress, *masterOption.metaFolder, ms.Topo, *masterOption.pulseSeconds) if raftServer == nil { glog.Fatalf("please verify %s is writable, see https://github.com/chrislusf/seaweedfs/issues/717", *masterOption.metaFolder) @@ -129,7 +129,7 @@ func startMaster(masterOption MasterOptions, masterWhiteList []string) { glog.Fatalf("master failed to listen on grpc port %d: %v", grpcPort, err) } // Create your protocol servers. - grpcS := util.NewGrpcServer(security.LoadServerTLS(viper.Sub("grpc"), "master")) + grpcS := util.NewGrpcServer(security.LoadServerTLS(util.GetViper(), "grpc.master")) master_pb.RegisterSeaweedServer(grpcS, ms) protobuf.RegisterRaftServer(grpcS, raftServer) reflection.Register(grpcS) diff --git a/weed/command/mount_std.go b/weed/command/mount_std.go index 453531d00..891810e61 100644 --- a/weed/command/mount_std.go +++ b/weed/command/mount_std.go @@ -13,7 +13,6 @@ import ( "time" "github.com/jacobsa/daemonize" - "github.com/spf13/viper" "github.com/chrislusf/seaweedfs/weed/filesys" "github.com/chrislusf/seaweedfs/weed/glog" @@ -148,7 +147,7 @@ func RunMount(filer, filerMountRootPath, dir, collection, replication, dataCente err = fs.Serve(c, filesys.NewSeaweedFileSystem(&filesys.Option{ FilerGrpcAddress: filerGrpcAddress, - GrpcDialOption: security.LoadClientTLS(viper.Sub("grpc"), "client"), + GrpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.client"), FilerMountRootPath: mountRoot, Collection: collection, Replication: replication, diff --git a/weed/command/s3.go b/weed/command/s3.go index e004bb066..10a486657 100644 --- a/weed/command/s3.go +++ b/weed/command/s3.go @@ -1,18 +1,17 @@ package command import ( + "fmt" "net/http" "time" "github.com/chrislusf/seaweedfs/weed/security" - "github.com/spf13/viper" - "fmt" + "github.com/gorilla/mux" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/s3api" "github.com/chrislusf/seaweedfs/weed/util" - "github.com/gorilla/mux" ) var ( @@ -69,7 +68,7 @@ func (s3opt *S3Options) startS3Server() bool { FilerGrpcAddress: filerGrpcAddress, DomainName: *s3opt.domainName, BucketsPath: *s3opt.filerBucketsPath, - GrpcDialOption: security.LoadClientTLS(viper.Sub("grpc"), "client"), + GrpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.client"), }) if s3ApiServer_err != nil { glog.Fatalf("S3 API Server startup error: %v", s3ApiServer_err) diff --git a/weed/command/scaffold.go b/weed/command/scaffold.go index 78eec277c..524bf5e13 100644 --- a/weed/command/scaffold.go +++ b/weed/command/scaffold.go @@ -14,6 +14,14 @@ var cmdScaffold = &Command{ Short: "generate basic configuration files", Long: `Generate filer.toml with all possible configurations for you to customize. + The options can also be overwritten by environment variables. + For example, the filer.toml mysql password can be overwritten by environment variable + export weed.mysql.password=some_password + Environment variable rules: + * Prefix fix with "WEED_" + * Upppercase the reset of variable name. + * Replace '.' with '_' + `, } diff --git a/weed/command/shell.go b/weed/command/shell.go index 34b5aef31..dcf70608f 100644 --- a/weed/command/shell.go +++ b/weed/command/shell.go @@ -6,7 +6,6 @@ import ( "github.com/chrislusf/seaweedfs/weed/security" "github.com/chrislusf/seaweedfs/weed/shell" "github.com/chrislusf/seaweedfs/weed/util" - "github.com/spf13/viper" ) var ( @@ -31,7 +30,7 @@ var cmdShell = &Command{ func runShell(command *Command, args []string) bool { util.LoadConfiguration("security", false) - shellOptions.GrpcDialOption = security.LoadClientTLS(viper.Sub("grpc"), "client") + shellOptions.GrpcDialOption = security.LoadClientTLS(util.GetViper(), "grpc.client") var filerPwdErr error shellOptions.FilerHost, shellOptions.FilerPort, shellOptions.Directory, filerPwdErr = util.ParseFilerUrl(*shellInitialFilerUrl) diff --git a/weed/command/upload.go b/weed/command/upload.go index 25e938d9b..d71046131 100644 --- a/weed/command/upload.go +++ b/weed/command/upload.go @@ -6,11 +6,9 @@ import ( "os" "path/filepath" + "github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/security" "github.com/chrislusf/seaweedfs/weed/util" - "github.com/spf13/viper" - - "github.com/chrislusf/seaweedfs/weed/operation" ) var ( @@ -63,7 +61,7 @@ var cmdUpload = &Command{ func runUpload(cmd *Command, args []string) bool { util.LoadConfiguration("security", false) - grpcDialOption := security.LoadClientTLS(viper.Sub("grpc"), "client") + grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client") if len(args) == 0 { if *upload.dir == "" { diff --git a/weed/command/volume.go b/weed/command/volume.go index b0f46bbf3..9d665d143 100644 --- a/weed/command/volume.go +++ b/weed/command/volume.go @@ -234,7 +234,7 @@ func (v VolumeServerOptions) startGrpcService(vs volume_server_pb.VolumeServerSe if err != nil { glog.Fatalf("failed to listen on grpc port %d: %v", grpcPort, err) } - grpcS := util.NewGrpcServer(security.LoadServerTLS(viper.Sub("grpc"), "volume")) + grpcS := util.NewGrpcServer(security.LoadServerTLS(util.GetViper(), "grpc.volume")) volume_server_pb.RegisterVolumeServerServer(grpcS, vs) reflection.Register(grpcS) go func() { diff --git a/weed/command/webdav.go b/weed/command/webdav.go index 371c4a9ad..0e6f89040 100644 --- a/weed/command/webdav.go +++ b/weed/command/webdav.go @@ -11,7 +11,6 @@ import ( "github.com/chrislusf/seaweedfs/weed/security" "github.com/chrislusf/seaweedfs/weed/server" "github.com/chrislusf/seaweedfs/weed/util" - "github.com/spf13/viper" ) var ( @@ -75,7 +74,7 @@ func (wo *WebDavOption) startWebDav() bool { ws, webdavServer_err := weed_server.NewWebDavServer(&weed_server.WebDavOption{ Filer: *wo.filer, FilerGrpcAddress: filerGrpcAddress, - GrpcDialOption: security.LoadClientTLS(viper.Sub("grpc"), "client"), + GrpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.client"), Collection: *wo.collection, Uid: uid, Gid: gid, diff --git a/weed/filer2/cassandra/cassandra_store.go b/weed/filer2/cassandra/cassandra_store.go index dcaab8bc4..f81ef946f 100644 --- a/weed/filer2/cassandra/cassandra_store.go +++ b/weed/filer2/cassandra/cassandra_store.go @@ -22,10 +22,10 @@ func (store *CassandraStore) GetName() string { return "cassandra" } -func (store *CassandraStore) Initialize(configuration util.Configuration) (err error) { +func (store *CassandraStore) Initialize(configuration util.Configuration, prefix string) (err error) { return store.initialize( - configuration.GetString("keyspace"), - configuration.GetStringSlice("hosts"), + configuration.GetString(prefix+"keyspace"), + configuration.GetStringSlice(prefix+"hosts"), ) } diff --git a/weed/filer2/configuration.go b/weed/filer2/configuration.go index 7b05b53dc..a174117ea 100644 --- a/weed/filer2/configuration.go +++ b/weed/filer2/configuration.go @@ -17,8 +17,7 @@ func (f *Filer) LoadConfiguration(config *viper.Viper) { for _, store := range Stores { if config.GetBool(store.GetName() + ".enabled") { - viperSub := config.Sub(store.GetName()) - if err := store.Initialize(viperSub); err != nil { + if err := store.Initialize(config, store.GetName()+"."); err != nil { glog.Fatalf("Failed to initialize store for %s: %+v", store.GetName(), err) } diff --git a/weed/filer2/etcd/etcd_store.go b/weed/filer2/etcd/etcd_store.go index 2eb9e3e86..0f0c01426 100644 --- a/weed/filer2/etcd/etcd_store.go +++ b/weed/filer2/etcd/etcd_store.go @@ -28,13 +28,13 @@ func (store *EtcdStore) GetName() string { return "etcd" } -func (store *EtcdStore) Initialize(configuration weed_util.Configuration) (err error) { - servers := configuration.GetString("servers") +func (store *EtcdStore) Initialize(configuration weed_util.Configuration, prefix string) (err error) { + servers := configuration.GetString(prefix + "servers") if servers == "" { servers = "localhost:2379" } - timeout := configuration.GetString("timeout") + timeout := configuration.GetString(prefix + "timeout") if timeout == "" { timeout = "3s" } diff --git a/weed/filer2/filechunks_test.go b/weed/filer2/filechunks_test.go index ed30c2abc..bb4a6c74d 100644 --- a/weed/filer2/filechunks_test.go +++ b/weed/filer2/filechunks_test.go @@ -350,21 +350,21 @@ func TestChunksReading(t *testing.T) { { Chunks: []*filer_pb.FileChunk{ {Offset: 0, Size: 43175947, FileId: "2,111fc2cbfac1", Mtime: 1}, - {Offset: 43175936, Size: 52981771-43175936, FileId: "2,112a36ea7f85", Mtime: 2}, - {Offset: 52981760, Size: 72564747-52981760, FileId: "4,112d5f31c5e7", Mtime: 3}, - {Offset: 72564736, Size: 133255179-72564736, FileId: "1,113245f0cdb6", Mtime: 4}, - {Offset: 133255168, Size: 137269259-133255168, FileId: "3,1141a70733b5", Mtime: 5}, - {Offset: 137269248, Size: 153578836-137269248, FileId: "1,114201d5bbdb", Mtime: 6}, + {Offset: 43175936, Size: 52981771 - 43175936, FileId: "2,112a36ea7f85", Mtime: 2}, + {Offset: 52981760, Size: 72564747 - 52981760, FileId: "4,112d5f31c5e7", Mtime: 3}, + {Offset: 72564736, Size: 133255179 - 72564736, FileId: "1,113245f0cdb6", Mtime: 4}, + {Offset: 133255168, Size: 137269259 - 133255168, FileId: "3,1141a70733b5", Mtime: 5}, + {Offset: 137269248, Size: 153578836 - 137269248, FileId: "1,114201d5bbdb", Mtime: 6}, }, Offset: 0, Size: 153578836, Expected: []*ChunkView{ {Offset: 0, Size: 43175936, FileId: "2,111fc2cbfac1", LogicOffset: 0}, - {Offset: 0, Size: 52981760-43175936, FileId: "2,112a36ea7f85", LogicOffset: 43175936}, - {Offset: 0, Size: 72564736-52981760, FileId: "4,112d5f31c5e7", LogicOffset: 52981760}, - {Offset: 0, Size: 133255168-72564736, FileId: "1,113245f0cdb6", LogicOffset: 72564736}, - {Offset: 0, Size: 137269248-133255168, FileId: "3,1141a70733b5", LogicOffset: 133255168}, - {Offset: 0, Size: 153578836-137269248, FileId: "1,114201d5bbdb", LogicOffset: 137269248}, + {Offset: 0, Size: 52981760 - 43175936, FileId: "2,112a36ea7f85", LogicOffset: 43175936}, + {Offset: 0, Size: 72564736 - 52981760, FileId: "4,112d5f31c5e7", LogicOffset: 52981760}, + {Offset: 0, Size: 133255168 - 72564736, FileId: "1,113245f0cdb6", LogicOffset: 72564736}, + {Offset: 0, Size: 137269248 - 133255168, FileId: "3,1141a70733b5", LogicOffset: 133255168}, + {Offset: 0, Size: 153578836 - 137269248, FileId: "1,114201d5bbdb", LogicOffset: 137269248}, }, }, } diff --git a/weed/filer2/filerstore.go b/weed/filer2/filerstore.go index 0bb0bd611..ae25534ed 100644 --- a/weed/filer2/filerstore.go +++ b/weed/filer2/filerstore.go @@ -14,7 +14,7 @@ type FilerStore interface { // GetName gets the name to locate the configuration in filer.toml file GetName() string // Initialize initializes the file store - Initialize(configuration util.Configuration) error + Initialize(configuration util.Configuration, prefix string) error InsertEntry(context.Context, *Entry) error UpdateEntry(context.Context, *Entry) (err error) // err == filer2.ErrNotFound if not found @@ -47,8 +47,8 @@ func (fsw *FilerStoreWrapper) GetName() string { return fsw.actualStore.GetName() } -func (fsw *FilerStoreWrapper) Initialize(configuration util.Configuration) error { - return fsw.actualStore.Initialize(configuration) +func (fsw *FilerStoreWrapper) Initialize(configuration util.Configuration, prefix string) error { + return fsw.actualStore.Initialize(configuration, prefix) } func (fsw *FilerStoreWrapper) InsertEntry(ctx context.Context, entry *Entry) error { diff --git a/weed/filer2/leveldb/leveldb_store.go b/weed/filer2/leveldb/leveldb_store.go index 4952b3b3a..44e6ac0eb 100644 --- a/weed/filer2/leveldb/leveldb_store.go +++ b/weed/filer2/leveldb/leveldb_store.go @@ -30,8 +30,8 @@ func (store *LevelDBStore) GetName() string { return "leveldb" } -func (store *LevelDBStore) Initialize(configuration weed_util.Configuration) (err error) { - dir := configuration.GetString("dir") +func (store *LevelDBStore) Initialize(configuration weed_util.Configuration, prefix string) (err error) { + dir := configuration.GetString(prefix + "dir") return store.initialize(dir) } diff --git a/weed/filer2/leveldb2/leveldb2_store.go b/weed/filer2/leveldb2/leveldb2_store.go index 8a16822ab..358d4d92a 100644 --- a/weed/filer2/leveldb2/leveldb2_store.go +++ b/weed/filer2/leveldb2/leveldb2_store.go @@ -30,8 +30,8 @@ func (store *LevelDB2Store) GetName() string { return "leveldb2" } -func (store *LevelDB2Store) Initialize(configuration weed_util.Configuration) (err error) { - dir := configuration.GetString("dir") +func (store *LevelDB2Store) Initialize(configuration weed_util.Configuration, prefix string) (err error) { + dir := configuration.GetString(prefix + "dir") return store.initialize(dir, 8) } diff --git a/weed/filer2/mysql/mysql_store.go b/weed/filer2/mysql/mysql_store.go index d1b06ece5..63d99cd9d 100644 --- a/weed/filer2/mysql/mysql_store.go +++ b/weed/filer2/mysql/mysql_store.go @@ -26,16 +26,16 @@ func (store *MysqlStore) GetName() string { return "mysql" } -func (store *MysqlStore) Initialize(configuration util.Configuration) (err error) { +func (store *MysqlStore) Initialize(configuration util.Configuration, prefix string) (err error) { return store.initialize( - configuration.GetString("username"), - configuration.GetString("password"), - configuration.GetString("hostname"), - configuration.GetInt("port"), - configuration.GetString("database"), - configuration.GetInt("connection_max_idle"), - configuration.GetInt("connection_max_open"), - configuration.GetBool("interpolateParams"), + configuration.GetString(prefix+"username"), + configuration.GetString(prefix+"password"), + configuration.GetString(prefix+"hostname"), + configuration.GetInt(prefix+"port"), + configuration.GetString(prefix+"database"), + configuration.GetInt(prefix+"connection_max_idle"), + configuration.GetInt(prefix+"connection_max_open"), + configuration.GetBool(prefix+"interpolateParams"), ) } diff --git a/weed/filer2/postgres/postgres_store.go b/weed/filer2/postgres/postgres_store.go index 3ec000fe0..27a0c2513 100644 --- a/weed/filer2/postgres/postgres_store.go +++ b/weed/filer2/postgres/postgres_store.go @@ -26,16 +26,16 @@ func (store *PostgresStore) GetName() string { return "postgres" } -func (store *PostgresStore) Initialize(configuration util.Configuration) (err error) { +func (store *PostgresStore) Initialize(configuration util.Configuration, prefix string) (err error) { return store.initialize( - configuration.GetString("username"), - configuration.GetString("password"), - configuration.GetString("hostname"), - configuration.GetInt("port"), - configuration.GetString("database"), - configuration.GetString("sslmode"), - configuration.GetInt("connection_max_idle"), - configuration.GetInt("connection_max_open"), + configuration.GetString(prefix+"username"), + configuration.GetString(prefix+"password"), + configuration.GetString(prefix+"hostname"), + configuration.GetInt(prefix+"port"), + configuration.GetString(prefix+"database"), + configuration.GetString(prefix+"sslmode"), + configuration.GetInt(prefix+"connection_max_idle"), + configuration.GetInt(prefix+"connection_max_open"), ) } diff --git a/weed/filer2/redis/redis_cluster_store.go b/weed/filer2/redis/redis_cluster_store.go index f1ad4b35c..eaaecb740 100644 --- a/weed/filer2/redis/redis_cluster_store.go +++ b/weed/filer2/redis/redis_cluster_store.go @@ -18,16 +18,16 @@ func (store *RedisClusterStore) GetName() string { return "redis_cluster" } -func (store *RedisClusterStore) Initialize(configuration util.Configuration) (err error) { +func (store *RedisClusterStore) Initialize(configuration util.Configuration, prefix string) (err error) { - configuration.SetDefault("useReadOnly", true) - configuration.SetDefault("routeByLatency", true) + configuration.SetDefault(prefix+"useReadOnly", true) + configuration.SetDefault(prefix+"routeByLatency", true) return store.initialize( - configuration.GetStringSlice("addresses"), - configuration.GetString("password"), - configuration.GetBool("useReadOnly"), - configuration.GetBool("routeByLatency"), + configuration.GetStringSlice(prefix+"addresses"), + configuration.GetString(prefix+"password"), + configuration.GetBool(prefix+"useReadOnly"), + configuration.GetBool(prefix+"routeByLatency"), ) } diff --git a/weed/filer2/redis/redis_store.go b/weed/filer2/redis/redis_store.go index c56fa014c..9debdb070 100644 --- a/weed/filer2/redis/redis_store.go +++ b/weed/filer2/redis/redis_store.go @@ -18,11 +18,11 @@ func (store *RedisStore) GetName() string { return "redis" } -func (store *RedisStore) Initialize(configuration util.Configuration) (err error) { +func (store *RedisStore) Initialize(configuration util.Configuration, prefix string) (err error) { return store.initialize( - configuration.GetString("address"), - configuration.GetString("password"), - configuration.GetInt("database"), + configuration.GetString(prefix+"address"), + configuration.GetString(prefix+"password"), + configuration.GetInt(prefix+"database"), ) } diff --git a/weed/filer2/tikv/tikv_store.go b/weed/filer2/tikv/tikv_store.go index 4eb8cb90d..24e05e3ad 100644 --- a/weed/filer2/tikv/tikv_store.go +++ b/weed/filer2/tikv/tikv_store.go @@ -30,8 +30,8 @@ func (store *TikvStore) GetName() string { return "tikv" } -func (store *TikvStore) Initialize(configuration weed_util.Configuration) (err error) { - pdAddr := configuration.GetString("pdAddress") +func (store *TikvStore) Initialize(configuration weed_util.Configuration, prefix string) (err error) { + pdAddr := configuration.GetString(prefix + "pdAddress") return store.initialize(pdAddr) } diff --git a/weed/filesys/dirty_page_interval.go b/weed/filesys/dirty_page_interval.go index 162b9be64..ec94c6df1 100644 --- a/weed/filesys/dirty_page_interval.go +++ b/weed/filesys/dirty_page_interval.go @@ -74,7 +74,7 @@ func subList(list *IntervalLinkedList, start, stop int64) *IntervalLinkedList { nodes = append(nodes, &IntervalNode{ Data: t.Data[nodeStart-t.Offset : nodeStop-t.Offset], Offset: nodeStart, - Size: nodeStop-nodeStart, + Size: nodeStop - nodeStart, Next: nil, }) } diff --git a/weed/notification/aws_sqs/aws_sqs_pub.go b/weed/notification/aws_sqs/aws_sqs_pub.go index 4c1302abb..d881049dd 100644 --- a/weed/notification/aws_sqs/aws_sqs_pub.go +++ b/weed/notification/aws_sqs/aws_sqs_pub.go @@ -27,14 +27,14 @@ func (k *AwsSqsPub) GetName() string { return "aws_sqs" } -func (k *AwsSqsPub) Initialize(configuration util.Configuration) (err error) { - glog.V(0).Infof("filer.notification.aws_sqs.region: %v", configuration.GetString("region")) - glog.V(0).Infof("filer.notification.aws_sqs.sqs_queue_name: %v", configuration.GetString("sqs_queue_name")) +func (k *AwsSqsPub) Initialize(configuration util.Configuration, prefix string) (err error) { + glog.V(0).Infof("filer.notification.aws_sqs.region: %v", configuration.GetString(prefix+"region")) + glog.V(0).Infof("filer.notification.aws_sqs.sqs_queue_name: %v", configuration.GetString(prefix+"sqs_queue_name")) return k.initialize( - configuration.GetString("aws_access_key_id"), - configuration.GetString("aws_secret_access_key"), - configuration.GetString("region"), - configuration.GetString("sqs_queue_name"), + configuration.GetString(prefix+"aws_access_key_id"), + configuration.GetString(prefix+"aws_secret_access_key"), + configuration.GetString(prefix+"region"), + configuration.GetString(prefix+"sqs_queue_name"), ) } diff --git a/weed/notification/configuration.go b/weed/notification/configuration.go index 7f8765cc3..36211692c 100644 --- a/weed/notification/configuration.go +++ b/weed/notification/configuration.go @@ -11,7 +11,7 @@ type MessageQueue interface { // GetName gets the name to locate the configuration in filer.toml file GetName() string // Initialize initializes the file store - Initialize(configuration util.Configuration) error + Initialize(configuration util.Configuration, prefix string) error SendMessage(key string, message proto.Message) error } @@ -21,7 +21,7 @@ var ( Queue MessageQueue ) -func LoadConfiguration(config *viper.Viper) { +func LoadConfiguration(config *viper.Viper, prefix string) { if config == nil { return @@ -30,9 +30,8 @@ func LoadConfiguration(config *viper.Viper) { validateOneEnabledQueue(config) for _, queue := range MessageQueues { - if config.GetBool(queue.GetName() + ".enabled") { - viperSub := config.Sub(queue.GetName()) - if err := queue.Initialize(viperSub); err != nil { + if config.GetBool(prefix + queue.GetName() + ".enabled") { + if err := queue.Initialize(config, prefix+queue.GetName()+"."); err != nil { glog.Fatalf("Failed to initialize notification for %s: %+v", queue.GetName(), err) } diff --git a/weed/notification/gocdk_pub_sub/gocdk_pub_sub.go b/weed/notification/gocdk_pub_sub/gocdk_pub_sub.go index ebf44ea6f..706261b3a 100644 --- a/weed/notification/gocdk_pub_sub/gocdk_pub_sub.go +++ b/weed/notification/gocdk_pub_sub/gocdk_pub_sub.go @@ -18,12 +18,13 @@ import ( "context" "fmt" - "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/notification" - "github.com/chrislusf/seaweedfs/weed/util" "github.com/golang/protobuf/proto" "gocloud.dev/pubsub" _ "gocloud.dev/pubsub/awssnssqs" + + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/notification" + "github.com/chrislusf/seaweedfs/weed/util" // _ "gocloud.dev/pubsub/azuresb" _ "gocloud.dev/pubsub/gcppubsub" _ "gocloud.dev/pubsub/natspubsub" @@ -43,8 +44,8 @@ func (k *GoCDKPubSub) GetName() string { return "gocdk_pub_sub" } -func (k *GoCDKPubSub) Initialize(config util.Configuration) error { - k.topicURL = config.GetString("topic_url") +func (k *GoCDKPubSub) Initialize(configuration util.Configuration, prefix string) error { + k.topicURL = configuration.GetString(prefix + "topic_url") glog.V(0).Infof("notification.gocdk_pub_sub.topic_url: %v", k.topicURL) topic, err := pubsub.OpenTopic(context.Background(), k.topicURL) if err != nil { diff --git a/weed/notification/google_pub_sub/google_pub_sub.go b/weed/notification/google_pub_sub/google_pub_sub.go index 7b26bfe38..363a86eb6 100644 --- a/weed/notification/google_pub_sub/google_pub_sub.go +++ b/weed/notification/google_pub_sub/google_pub_sub.go @@ -25,13 +25,13 @@ func (k *GooglePubSub) GetName() string { return "google_pub_sub" } -func (k *GooglePubSub) Initialize(configuration util.Configuration) (err error) { - glog.V(0).Infof("notification.google_pub_sub.project_id: %v", configuration.GetString("project_id")) - glog.V(0).Infof("notification.google_pub_sub.topic: %v", configuration.GetString("topic")) +func (k *GooglePubSub) Initialize(configuration util.Configuration, prefix string) (err error) { + glog.V(0).Infof("notification.google_pub_sub.project_id: %v", configuration.GetString(prefix+"project_id")) + glog.V(0).Infof("notification.google_pub_sub.topic: %v", configuration.GetString(prefix+"topic")) return k.initialize( - configuration.GetString("google_application_credentials"), - configuration.GetString("project_id"), - configuration.GetString("topic"), + configuration.GetString(prefix+"google_application_credentials"), + configuration.GetString(prefix+"project_id"), + configuration.GetString(prefix+"topic"), ) } diff --git a/weed/notification/kafka/kafka_queue.go b/weed/notification/kafka/kafka_queue.go index fd545722b..8d83b5892 100644 --- a/weed/notification/kafka/kafka_queue.go +++ b/weed/notification/kafka/kafka_queue.go @@ -21,12 +21,12 @@ func (k *KafkaQueue) GetName() string { return "kafka" } -func (k *KafkaQueue) Initialize(configuration util.Configuration) (err error) { - glog.V(0).Infof("filer.notification.kafka.hosts: %v\n", configuration.GetStringSlice("hosts")) - glog.V(0).Infof("filer.notification.kafka.topic: %v\n", configuration.GetString("topic")) +func (k *KafkaQueue) Initialize(configuration util.Configuration, prefix string) (err error) { + glog.V(0).Infof("filer.notification.kafka.hosts: %v\n", configuration.GetStringSlice(prefix+"hosts")) + glog.V(0).Infof("filer.notification.kafka.topic: %v\n", configuration.GetString(prefix+"topic")) return k.initialize( - configuration.GetStringSlice("hosts"), - configuration.GetString("topic"), + configuration.GetStringSlice(prefix+"hosts"), + configuration.GetString(prefix+"topic"), ) } diff --git a/weed/notification/log/log_queue.go b/weed/notification/log/log_queue.go index dcc038dfc..1ca4786a1 100644 --- a/weed/notification/log/log_queue.go +++ b/weed/notification/log/log_queue.go @@ -18,7 +18,7 @@ func (k *LogQueue) GetName() string { return "log" } -func (k *LogQueue) Initialize(configuration util.Configuration) (err error) { +func (k *LogQueue) Initialize(configuration util.Configuration, prefix string) (err error) { return nil } diff --git a/weed/replication/replicator.go b/weed/replication/replicator.go index 7353cdc91..a0ef6591c 100644 --- a/weed/replication/replicator.go +++ b/weed/replication/replicator.go @@ -18,10 +18,10 @@ type Replicator struct { source *source.FilerSource } -func NewReplicator(sourceConfig util.Configuration, dataSink sink.ReplicationSink) *Replicator { +func NewReplicator(sourceConfig util.Configuration, configPrefix string, dataSink sink.ReplicationSink) *Replicator { source := &source.FilerSource{} - source.Initialize(sourceConfig) + source.Initialize(sourceConfig, configPrefix) dataSink.SetSourceFiler(source) diff --git a/weed/replication/sink/azuresink/azure_sink.go b/weed/replication/sink/azuresink/azure_sink.go index 6381908a1..a0b1a41ab 100644 --- a/weed/replication/sink/azuresink/azure_sink.go +++ b/weed/replication/sink/azuresink/azure_sink.go @@ -35,12 +35,12 @@ func (g *AzureSink) GetSinkToDirectory() string { return g.dir } -func (g *AzureSink) Initialize(configuration util.Configuration) error { +func (g *AzureSink) Initialize(configuration util.Configuration, prefix string) error { return g.initialize( - configuration.GetString("account_name"), - configuration.GetString("account_key"), - configuration.GetString("container"), - configuration.GetString("directory"), + configuration.GetString(prefix+"account_name"), + configuration.GetString(prefix+"account_key"), + configuration.GetString(prefix+"container"), + configuration.GetString(prefix+"directory"), ) } diff --git a/weed/replication/sink/b2sink/b2_sink.go b/weed/replication/sink/b2sink/b2_sink.go index 35c2230fa..8c80a64bd 100644 --- a/weed/replication/sink/b2sink/b2_sink.go +++ b/weed/replication/sink/b2sink/b2_sink.go @@ -31,12 +31,12 @@ func (g *B2Sink) GetSinkToDirectory() string { return g.dir } -func (g *B2Sink) Initialize(configuration util.Configuration) error { +func (g *B2Sink) Initialize(configuration util.Configuration, prefix string) error { return g.initialize( - configuration.GetString("b2_account_id"), - configuration.GetString("b2_master_application_key"), - configuration.GetString("bucket"), - configuration.GetString("directory"), + configuration.GetString(prefix+"b2_account_id"), + configuration.GetString(prefix+"b2_master_application_key"), + configuration.GetString(prefix+"bucket"), + configuration.GetString(prefix+"directory"), ) } diff --git a/weed/replication/sink/filersink/filer_sink.go b/weed/replication/sink/filersink/filer_sink.go index 4790d1562..de99fbe1c 100644 --- a/weed/replication/sink/filersink/filer_sink.go +++ b/weed/replication/sink/filersink/filer_sink.go @@ -3,10 +3,11 @@ package filersink import ( "context" "fmt" - "github.com/chrislusf/seaweedfs/weed/security" - "github.com/spf13/viper" + "google.golang.org/grpc" + "github.com/chrislusf/seaweedfs/weed/security" + "github.com/chrislusf/seaweedfs/weed/filer2" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" @@ -38,13 +39,13 @@ func (fs *FilerSink) GetSinkToDirectory() string { return fs.dir } -func (fs *FilerSink) Initialize(configuration util.Configuration) error { +func (fs *FilerSink) Initialize(configuration util.Configuration, prefix string) error { return fs.initialize( - configuration.GetString("grpcAddress"), - configuration.GetString("directory"), - configuration.GetString("replication"), - configuration.GetString("collection"), - configuration.GetInt("ttlSec"), + configuration.GetString(prefix+"grpcAddress"), + configuration.GetString(prefix+"directory"), + configuration.GetString(prefix+"replication"), + configuration.GetString(prefix+"collection"), + configuration.GetInt(prefix+"ttlSec"), ) } @@ -59,7 +60,7 @@ func (fs *FilerSink) initialize(grpcAddress string, dir string, fs.replication = replication fs.collection = collection fs.ttlSec = int32(ttlSec) - fs.grpcDialOption = security.LoadClientTLS(viper.Sub("grpc"), "client") + fs.grpcDialOption = security.LoadClientTLS(util.GetViper(), "grpc.client") return nil } diff --git a/weed/replication/sink/gcssink/gcs_sink.go b/weed/replication/sink/gcssink/gcs_sink.go index abd7c49b9..5aa978ab8 100644 --- a/weed/replication/sink/gcssink/gcs_sink.go +++ b/weed/replication/sink/gcssink/gcs_sink.go @@ -34,11 +34,11 @@ func (g *GcsSink) GetSinkToDirectory() string { return g.dir } -func (g *GcsSink) Initialize(configuration util.Configuration) error { +func (g *GcsSink) Initialize(configuration util.Configuration, prefix string) error { return g.initialize( - configuration.GetString("google_application_credentials"), - configuration.GetString("bucket"), - configuration.GetString("directory"), + configuration.GetString(prefix+"google_application_credentials"), + configuration.GetString(prefix+"bucket"), + configuration.GetString(prefix+"directory"), ) } diff --git a/weed/replication/sink/replication_sink.go b/weed/replication/sink/replication_sink.go index dd54f0005..208bbdf87 100644 --- a/weed/replication/sink/replication_sink.go +++ b/weed/replication/sink/replication_sink.go @@ -9,7 +9,7 @@ import ( type ReplicationSink interface { GetName() string - Initialize(configuration util.Configuration) error + Initialize(configuration util.Configuration, prefix string) error DeleteEntry(ctx context.Context, key string, isDirectory, deleteIncludeChunks bool) error CreateEntry(ctx context.Context, key string, entry *filer_pb.Entry) error UpdateEntry(ctx context.Context, key string, oldEntry *filer_pb.Entry, newParentPath string, newEntry *filer_pb.Entry, deleteIncludeChunks bool) (foundExistingEntry bool, err error) diff --git a/weed/replication/sink/s3sink/s3_sink.go b/weed/replication/sink/s3sink/s3_sink.go index 4cff341d0..e4e097c0f 100644 --- a/weed/replication/sink/s3sink/s3_sink.go +++ b/weed/replication/sink/s3sink/s3_sink.go @@ -39,16 +39,16 @@ func (s3sink *S3Sink) GetSinkToDirectory() string { return s3sink.dir } -func (s3sink *S3Sink) Initialize(configuration util.Configuration) error { - glog.V(0).Infof("sink.s3.region: %v", configuration.GetString("region")) - glog.V(0).Infof("sink.s3.bucket: %v", configuration.GetString("bucket")) - glog.V(0).Infof("sink.s3.directory: %v", configuration.GetString("directory")) +func (s3sink *S3Sink) Initialize(configuration util.Configuration, prefix string) error { + glog.V(0).Infof("sink.s3.region: %v", configuration.GetString(prefix+"region")) + glog.V(0).Infof("sink.s3.bucket: %v", configuration.GetString(prefix+"bucket")) + glog.V(0).Infof("sink.s3.directory: %v", configuration.GetString(prefix+"directory")) return s3sink.initialize( - configuration.GetString("aws_access_key_id"), - configuration.GetString("aws_secret_access_key"), - configuration.GetString("region"), - configuration.GetString("bucket"), - configuration.GetString("directory"), + configuration.GetString(prefix+"aws_access_key_id"), + configuration.GetString(prefix+"aws_secret_access_key"), + configuration.GetString(prefix+"region"), + configuration.GetString(prefix+"bucket"), + configuration.GetString(prefix+"directory"), ) } diff --git a/weed/replication/source/filer_source.go b/weed/replication/source/filer_source.go index aef13be75..c3ea44671 100644 --- a/weed/replication/source/filer_source.go +++ b/weed/replication/source/filer_source.go @@ -3,13 +3,14 @@ package source import ( "context" "fmt" - "github.com/chrislusf/seaweedfs/weed/security" - "github.com/spf13/viper" - "google.golang.org/grpc" "io" "net/http" "strings" + "google.golang.org/grpc" + + "github.com/chrislusf/seaweedfs/weed/security" + "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/util" @@ -25,17 +26,17 @@ type FilerSource struct { Dir string } -func (fs *FilerSource) Initialize(configuration util.Configuration) error { +func (fs *FilerSource) Initialize(configuration util.Configuration, prefix string) error { return fs.initialize( - configuration.GetString("grpcAddress"), - configuration.GetString("directory"), + configuration.GetString(prefix+"grpcAddress"), + configuration.GetString(prefix+"directory"), ) } func (fs *FilerSource) initialize(grpcAddress string, dir string) (err error) { fs.grpcAddress = grpcAddress fs.Dir = dir - fs.grpcDialOption = security.LoadClientTLS(viper.Sub("grpc"), "client") + fs.grpcDialOption = security.LoadClientTLS(util.GetViper(), "grpc.client") return nil } diff --git a/weed/replication/sub/notification_aws_sqs.go b/weed/replication/sub/notification_aws_sqs.go index bed26c79c..06869e619 100644 --- a/weed/replication/sub/notification_aws_sqs.go +++ b/weed/replication/sub/notification_aws_sqs.go @@ -27,14 +27,14 @@ func (k *AwsSqsInput) GetName() string { return "aws_sqs" } -func (k *AwsSqsInput) Initialize(configuration util.Configuration) error { - glog.V(0).Infof("replication.notification.aws_sqs.region: %v", configuration.GetString("region")) - glog.V(0).Infof("replication.notification.aws_sqs.sqs_queue_name: %v", configuration.GetString("sqs_queue_name")) +func (k *AwsSqsInput) Initialize(configuration util.Configuration, prefix string) error { + glog.V(0).Infof("replication.notification.aws_sqs.region: %v", configuration.GetString(prefix+"region")) + glog.V(0).Infof("replication.notification.aws_sqs.sqs_queue_name: %v", configuration.GetString(prefix+"sqs_queue_name")) return k.initialize( - configuration.GetString("aws_access_key_id"), - configuration.GetString("aws_secret_access_key"), - configuration.GetString("region"), - configuration.GetString("sqs_queue_name"), + configuration.GetString(prefix+"aws_access_key_id"), + configuration.GetString(prefix+"aws_secret_access_key"), + configuration.GetString(prefix+"region"), + configuration.GetString(prefix+"sqs_queue_name"), ) } diff --git a/weed/replication/sub/notification_gocdk_pub_sub.go b/weed/replication/sub/notification_gocdk_pub_sub.go index eddba9ff8..9726096e5 100644 --- a/weed/replication/sub/notification_gocdk_pub_sub.go +++ b/weed/replication/sub/notification_gocdk_pub_sub.go @@ -27,8 +27,8 @@ func (k *GoCDKPubSubInput) GetName() string { return "gocdk_pub_sub" } -func (k *GoCDKPubSubInput) Initialize(config util.Configuration) error { - subURL := config.GetString("sub_url") +func (k *GoCDKPubSubInput) Initialize(configuration util.Configuration, prefix string) error { + subURL := configuration.GetString(prefix + "sub_url") glog.V(0).Infof("notification.gocdk_pub_sub.sub_url: %v", subURL) sub, err := pubsub.OpenSubscription(context.Background(), subURL) if err != nil { diff --git a/weed/replication/sub/notification_google_pub_sub.go b/weed/replication/sub/notification_google_pub_sub.go index ad6b42a2e..a950bb42b 100644 --- a/weed/replication/sub/notification_google_pub_sub.go +++ b/weed/replication/sub/notification_google_pub_sub.go @@ -27,13 +27,13 @@ func (k *GooglePubSubInput) GetName() string { return "google_pub_sub" } -func (k *GooglePubSubInput) Initialize(configuration util.Configuration) error { - glog.V(0).Infof("notification.google_pub_sub.project_id: %v", configuration.GetString("project_id")) - glog.V(0).Infof("notification.google_pub_sub.topic: %v", configuration.GetString("topic")) +func (k *GooglePubSubInput) Initialize(configuration util.Configuration, prefix string) error { + glog.V(0).Infof("notification.google_pub_sub.project_id: %v", configuration.GetString(prefix+"project_id")) + glog.V(0).Infof("notification.google_pub_sub.topic: %v", configuration.GetString(prefix+"topic")) return k.initialize( - configuration.GetString("google_application_credentials"), - configuration.GetString("project_id"), - configuration.GetString("topic"), + configuration.GetString(prefix+"google_application_credentials"), + configuration.GetString(prefix+"project_id"), + configuration.GetString(prefix+"topic"), ) } diff --git a/weed/replication/sub/notification_kafka.go b/weed/replication/sub/notification_kafka.go index 1a86a8307..fa9cfad9b 100644 --- a/weed/replication/sub/notification_kafka.go +++ b/weed/replication/sub/notification_kafka.go @@ -28,14 +28,14 @@ func (k *KafkaInput) GetName() string { return "kafka" } -func (k *KafkaInput) Initialize(configuration util.Configuration) error { - glog.V(0).Infof("replication.notification.kafka.hosts: %v\n", configuration.GetStringSlice("hosts")) - glog.V(0).Infof("replication.notification.kafka.topic: %v\n", configuration.GetString("topic")) +func (k *KafkaInput) Initialize(configuration util.Configuration, prefix string) error { + glog.V(0).Infof("replication.notification.kafka.hosts: %v\n", configuration.GetStringSlice(prefix+"hosts")) + glog.V(0).Infof("replication.notification.kafka.topic: %v\n", configuration.GetString(prefix+"topic")) return k.initialize( - configuration.GetStringSlice("hosts"), - configuration.GetString("topic"), - configuration.GetString("offsetFile"), - configuration.GetInt("offsetSaveIntervalSeconds"), + configuration.GetStringSlice(prefix+"hosts"), + configuration.GetString(prefix+"topic"), + configuration.GetString(prefix+"offsetFile"), + configuration.GetInt(prefix+"offsetSaveIntervalSeconds"), ) } diff --git a/weed/replication/sub/notifications.go b/weed/replication/sub/notifications.go index 66fbef824..8a2668f98 100644 --- a/weed/replication/sub/notifications.go +++ b/weed/replication/sub/notifications.go @@ -9,7 +9,7 @@ type NotificationInput interface { // GetName gets the name to locate the configuration in sync.toml file GetName() string // Initialize initializes the file store - Initialize(configuration util.Configuration) error + Initialize(configuration util.Configuration, prefix string) error ReceiveMessage() (key string, message *filer_pb.EventNotification, err error) } diff --git a/weed/security/tls.go b/weed/security/tls.go index e81ba4831..f4f525ede 100644 --- a/weed/security/tls.go +++ b/weed/security/tls.go @@ -22,7 +22,7 @@ func LoadServerTLS(config *viper.Viper, component string) grpc.ServerOption { glog.Errorf("load cert/key error: %v", err) return nil } - caCert, err := ioutil.ReadFile(config.GetString("ca")) + caCert, err := ioutil.ReadFile(config.GetString(component + ".ca")) if err != nil { glog.Errorf("read ca cert file error: %v", err) return nil @@ -49,7 +49,7 @@ func LoadClientTLS(config *viper.Viper, component string) grpc.DialOption { glog.Errorf("load cert/key error: %v", err) return grpc.WithInsecure() } - caCert, err := ioutil.ReadFile(config.GetString("ca")) + caCert, err := ioutil.ReadFile(config.GetString(component + ".ca")) if err != nil { glog.Errorf("read ca cert file error: %v", err) return grpc.WithInsecure() diff --git a/weed/server/filer_server.go b/weed/server/filer_server.go index 3a2eca6d4..72cca1f6f 100644 --- a/weed/server/filer_server.go +++ b/weed/server/filer_server.go @@ -14,8 +14,6 @@ import ( "github.com/chrislusf/seaweedfs/weed/stats" "github.com/chrislusf/seaweedfs/weed/util" - "github.com/spf13/viper" - "github.com/chrislusf/seaweedfs/weed/filer2" _ "github.com/chrislusf/seaweedfs/weed/filer2/cassandra" _ "github.com/chrislusf/seaweedfs/weed/filer2/etcd" @@ -61,7 +59,7 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption) fs = &FilerServer{ option: option, - grpcDialOption: security.LoadClientTLS(viper.Sub("grpc"), "filer"), + grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.filer"), } if len(option.Masters) == 0 { @@ -72,7 +70,7 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption) go fs.filer.KeepConnectedToMaster() - v := viper.GetViper() + v := util.GetViper() if !util.LoadConfiguration("filer", false) { v.Set("leveldb2.enabled", true) v.Set("leveldb2.dir", option.DefaultLevelDbDir) @@ -86,7 +84,7 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption) fs.option.recursiveDelete = v.GetBool("filer.options.recursive_delete") fs.filer.LoadConfiguration(v) - notification.LoadConfiguration(v.Sub("notification")) + notification.LoadConfiguration(v, "notification.") handleStaticResources(defaultMux) if !option.DisableHttp { diff --git a/weed/server/master_server.go b/weed/server/master_server.go index 33a5129da..b3cc310e6 100644 --- a/weed/server/master_server.go +++ b/weed/server/master_server.go @@ -14,6 +14,9 @@ import ( "time" "github.com/chrislusf/raft" + "github.com/gorilla/mux" + "google.golang.org/grpc" + "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/master_pb" "github.com/chrislusf/seaweedfs/weed/security" @@ -22,9 +25,6 @@ import ( "github.com/chrislusf/seaweedfs/weed/topology" "github.com/chrislusf/seaweedfs/weed/util" "github.com/chrislusf/seaweedfs/weed/wdclient" - "github.com/gorilla/mux" - "github.com/spf13/viper" - "google.golang.org/grpc" ) const ( @@ -69,7 +69,7 @@ type MasterServer struct { func NewMasterServer(r *mux.Router, option *MasterOption, peers []string) *MasterServer { - v := viper.GetViper() + v := util.GetViper() signingKey := v.GetString("jwt.signing.key") v.SetDefault("jwt.signing.expires_after_seconds", 10) expiresAfterSec := v.GetInt("jwt.signing.expires_after_seconds") @@ -83,7 +83,7 @@ func NewMasterServer(r *mux.Router, option *MasterOption, peers []string) *Maste preallocateSize = int64(option.VolumeSizeLimitMB) * (1 << 20) } - grpcDialOption := security.LoadClientTLS(v.Sub("grpc"), "master") + grpcDialOption := security.LoadClientTLS(v, "grpc.master") ms := &MasterServer{ option: option, preallocateSize: preallocateSize, @@ -183,7 +183,7 @@ func (ms *MasterServer) proxyToLeader(f func(w http.ResponseWriter, r *http.Requ func (ms *MasterServer) startAdminScripts() { var err error - v := viper.GetViper() + v := util.GetViper() adminScripts := v.GetString("master.maintenance.scripts") glog.V(0).Infof("adminScripts:\n%v", adminScripts) if adminScripts == "" { @@ -201,7 +201,7 @@ func (ms *MasterServer) startAdminScripts() { masterAddress := "localhost:" + strconv.Itoa(ms.option.Port) var shellOptions shell.ShellOptions - shellOptions.GrpcDialOption = security.LoadClientTLS(viper.Sub("grpc"), "master") + shellOptions.GrpcDialOption = security.LoadClientTLS(v, "grpc.master") shellOptions.Masters = &masterAddress shellOptions.FilerHost, shellOptions.FilerPort, shellOptions.Directory, err = util.ParseFilerUrl(filerURL) @@ -250,7 +250,7 @@ func (ms *MasterServer) startAdminScripts() { func (ms *MasterServer) createSequencer(option *MasterOption) sequence.Sequencer { var seq sequence.Sequencer - v := viper.GetViper() + v := util.GetViper() seqType := strings.ToLower(v.GetString(SequencerType)) glog.V(1).Infof("[%s] : [%s]", SequencerType, seqType) switch strings.ToLower(seqType) { diff --git a/weed/server/volume_grpc_client_to_master.go b/weed/server/volume_grpc_client_to_master.go index 6038752d2..dc47c2884 100644 --- a/weed/server/volume_grpc_client_to_master.go +++ b/weed/server/volume_grpc_client_to_master.go @@ -5,16 +5,17 @@ import ( "net" "time" + "google.golang.org/grpc" + "github.com/chrislusf/seaweedfs/weed/security" "github.com/chrislusf/seaweedfs/weed/storage/backend" "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding" - "github.com/spf13/viper" - "google.golang.org/grpc" + + "golang.org/x/net/context" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/master_pb" "github.com/chrislusf/seaweedfs/weed/util" - "golang.org/x/net/context" ) func (vs *VolumeServer) GetMaster() string { @@ -26,7 +27,7 @@ func (vs *VolumeServer) heartbeat() { vs.store.SetDataCenter(vs.dataCenter) vs.store.SetRack(vs.rack) - grpcDialOption := security.LoadClientTLS(viper.Sub("grpc"), "volume") + grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.volume") var err error var newLeader string diff --git a/weed/server/volume_server.go b/weed/server/volume_server.go index a406b36cc..0fdcf662a 100644 --- a/weed/server/volume_server.go +++ b/weed/server/volume_server.go @@ -7,8 +7,7 @@ import ( "google.golang.org/grpc" "github.com/chrislusf/seaweedfs/weed/stats" - - "github.com/spf13/viper" + "github.com/chrislusf/seaweedfs/weed/util" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/security" @@ -47,7 +46,7 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string, fileSizeLimitMB int, ) *VolumeServer { - v := viper.GetViper() + v := util.GetViper() signingKey := v.GetString("jwt.signing.key") v.SetDefault("jwt.signing.expires_after_seconds", 10) expiresAfterSec := v.GetInt("jwt.signing.expires_after_seconds") @@ -64,7 +63,7 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string, needleMapKind: needleMapKind, FixJpgOrientation: fixJpgOrientation, ReadRedirect: readRedirect, - grpcDialOption: security.LoadClientTLS(viper.Sub("grpc"), "volume"), + grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.volume"), compactionBytePerSecond: int64(compactionMBPerSecond) * 1024 * 1024, fileSizeLimitBytes: int64(fileSizeLimitMB) * 1024 * 1024, } diff --git a/weed/server/webdav_server.go b/weed/server/webdav_server.go index bdb6b61a9..d75869f30 100644 --- a/weed/server/webdav_server.go +++ b/weed/server/webdav_server.go @@ -17,8 +17,6 @@ import ( "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/util" - "github.com/spf13/viper" - "github.com/chrislusf/seaweedfs/weed/filer2" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/security" @@ -49,7 +47,7 @@ func NewWebDavServer(option *WebDavOption) (ws *WebDavServer, err error) { ws = &WebDavServer{ option: option, - grpcDialOption: security.LoadClientTLS(viper.Sub("grpc"), "filer"), + grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.filer"), Handler: &webdav.Handler{ FileSystem: fs, LockSystem: webdav.NewMemLS(), diff --git a/weed/shell/command_fs_meta_notify.go b/weed/shell/command_fs_meta_notify.go index a898df7a0..e2b2d22cc 100644 --- a/weed/shell/command_fs_meta_notify.go +++ b/weed/shell/command_fs_meta_notify.go @@ -5,8 +5,6 @@ import ( "fmt" "io" - "github.com/spf13/viper" - "github.com/chrislusf/seaweedfs/weed/filer2" "github.com/chrislusf/seaweedfs/weed/notification" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" @@ -42,8 +40,8 @@ func (c *commandFsMetaNotify) Do(args []string, commandEnv *CommandEnv, writer i } util.LoadConfiguration("notification", true) - v := viper.GetViper() - notification.LoadConfiguration(v.Sub("notification")) + v := util.GetViper() + notification.LoadConfiguration(v, "notification.") ctx := context.Background() diff --git a/weed/storage/backend/backend.go b/weed/storage/backend/backend.go index 6ea850543..6941ca5a1 100644 --- a/weed/storage/backend/backend.go +++ b/weed/storage/backend/backend.go @@ -35,7 +35,7 @@ type StringProperties interface { type StorageType string type BackendStorageFactory interface { StorageType() StorageType - BuildStorage(configuration StringProperties, id string) (BackendStorage, error) + BuildStorage(configuration StringProperties, configPrefix string, id string) (BackendStorage, error) } var ( @@ -48,19 +48,17 @@ func LoadConfiguration(config *viper.Viper) { StorageBackendPrefix := "storage.backend" - backendSub := config.Sub(StorageBackendPrefix) - for backendTypeName := range config.GetStringMap(StorageBackendPrefix) { backendStorageFactory, found := BackendStorageFactories[StorageType(backendTypeName)] if !found { glog.Fatalf("backend storage type %s not found", backendTypeName) } - backendTypeSub := backendSub.Sub(backendTypeName) - for backendStorageId := range backendSub.GetStringMap(backendTypeName) { - if !backendTypeSub.GetBool(backendStorageId + ".enabled") { + for backendStorageId := range config.GetStringMap(StorageBackendPrefix + "." + backendTypeName) { + if !config.GetBool(StorageBackendPrefix + "." + backendTypeName + "." + backendStorageId + ".enabled") { continue } - backendStorage, buildErr := backendStorageFactory.BuildStorage(backendTypeSub.Sub(backendStorageId), backendStorageId) + backendStorage, buildErr := backendStorageFactory.BuildStorage(config, + StorageBackendPrefix+"."+backendTypeName+"."+backendStorageId+".", backendStorageId) if buildErr != nil { glog.Fatalf("fail to create backend storage %s.%s", backendTypeName, backendStorageId) } @@ -82,7 +80,7 @@ func LoadFromPbStorageBackends(storageBackends []*master_pb.StorageBackend) { glog.Warningf("storage type %s not found", storageBackend.Type) continue } - backendStorage, buildErr := backendStorageFactory.BuildStorage(newProperties(storageBackend.Properties), storageBackend.Id) + backendStorage, buildErr := backendStorageFactory.BuildStorage(newProperties(storageBackend.Properties), "", storageBackend.Id) if buildErr != nil { glog.Fatalf("fail to create backend storage %s.%s", storageBackend.Type, storageBackend.Id) } diff --git a/weed/storage/backend/s3_backend/s3_backend.go b/weed/storage/backend/s3_backend/s3_backend.go index 9f03cfa81..8d71861c2 100644 --- a/weed/storage/backend/s3_backend/s3_backend.go +++ b/weed/storage/backend/s3_backend/s3_backend.go @@ -26,8 +26,8 @@ type S3BackendFactory struct { func (factory *S3BackendFactory) StorageType() backend.StorageType { return backend.StorageType("s3") } -func (factory *S3BackendFactory) BuildStorage(configuration backend.StringProperties, id string) (backend.BackendStorage, error) { - return newS3BackendStorage(configuration, id) +func (factory *S3BackendFactory) BuildStorage(configuration backend.StringProperties, configPrefix string, id string) (backend.BackendStorage, error) { + return newS3BackendStorage(configuration, configPrefix, id) } type S3BackendStorage struct { @@ -39,13 +39,13 @@ type S3BackendStorage struct { conn s3iface.S3API } -func newS3BackendStorage(configuration backend.StringProperties, id string) (s *S3BackendStorage, err error) { +func newS3BackendStorage(configuration backend.StringProperties, configPrefix string, id string) (s *S3BackendStorage, err error) { s = &S3BackendStorage{} s.id = id - s.aws_access_key_id = configuration.GetString("aws_access_key_id") - s.aws_secret_access_key = configuration.GetString("aws_secret_access_key") - s.region = configuration.GetString("region") - s.bucket = configuration.GetString("bucket") + s.aws_access_key_id = configuration.GetString(configPrefix + "aws_access_key_id") + s.aws_secret_access_key = configuration.GetString(configPrefix + "aws_secret_access_key") + s.region = configuration.GetString(configPrefix + "region") + s.bucket = configuration.GetString(configPrefix + "bucket") s.conn, err = createSession(s.aws_access_key_id, s.aws_secret_access_key, s.region) glog.V(0).Infof("created backend storage s3.%s for region %s bucket %s", s.id, s.region, s.bucket) diff --git a/weed/storage/volume_vacuum.go b/weed/storage/volume_vacuum.go index 0ca9016c8..523b37e34 100644 --- a/weed/storage/volume_vacuum.go +++ b/weed/storage/volume_vacuum.go @@ -356,7 +356,7 @@ func (v *Volume) copyDataAndGenerateIndexFile(dstName, idxName string, prealloca func copyDataBasedOnIndexFile(srcDatName, srcIdxName, dstDatName, datIdxName string, sb super_block.SuperBlock, version needle.Version, preallocate int64) (err error) { var ( srcDatBackend, dstDatBackend backend.BackendStorageFile - dataFile *os.File + dataFile *os.File ) if dstDatBackend, err = createVolumeFile(dstDatName, preallocate, 0); err != nil { return diff --git a/weed/util/config.go b/weed/util/config.go index 7b86b749e..dfbfdbd82 100644 --- a/weed/util/config.go +++ b/weed/util/config.go @@ -1,8 +1,11 @@ package util import ( - "github.com/chrislusf/seaweedfs/weed/glog" + "strings" + "github.com/spf13/viper" + + "github.com/chrislusf/seaweedfs/weed/glog" ) type Configuration interface { @@ -37,3 +40,11 @@ func LoadConfiguration(configFileName string, required bool) (loaded bool) { return true } + +func GetViper() *viper.Viper { + v := viper.GetViper() + v.AutomaticEnv() + v.SetEnvPrefix("weed") + v.SetEnvKeyReplacer(strings.NewReplacer(".", "_")) + return v +} diff --git a/weed/util/config_test.go b/weed/util/config_test.go new file mode 100644 index 000000000..659814a4a --- /dev/null +++ b/weed/util/config_test.go @@ -0,0 +1,24 @@ +package util + +import ( + "os" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestAllKeysWithEnv(t *testing.T) { + + v := GetViper() + v.BindEnv("id") + v.BindEnv("foo", "foo") + + // bind and define environment variables (including a nested one) + os.Setenv("WEED_ID", "13") + os.Setenv("WEED_FOO_BAR", "baz") + + sub := v.Sub("foo") + + assert.Equal(t, "13", v.GetString("id")) + assert.Equal(t, "baz", sub.GetString("bar")) +}