From ab1b9697e6c1eabd3f097a6fc28caf40e73ba7ed Mon Sep 17 00:00:00 2001 From: guosj <515878133@qq.com> Date: Wed, 13 Jul 2022 17:28:20 +0800 Subject: [PATCH 1/5] supplement check duplicate accesskey --- weed/filer/s3iam_conf.go | 21 ++++++- weed/filer/s3iam_conf_test.go | 93 +++++++++++++++++++++++++++++- weed/s3api/auth_credentials.go | 5 ++ weed/shell/command_s3_configure.go | 15 ++--- 4 files changed, 121 insertions(+), 13 deletions(-) diff --git a/weed/filer/s3iam_conf.go b/weed/filer/s3iam_conf.go index 891bf925b..acff1e1bb 100644 --- a/weed/filer/s3iam_conf.go +++ b/weed/filer/s3iam_conf.go @@ -2,9 +2,13 @@ package filer import ( "bytes" + "errors" + "fmt" + "io" + + "github.com/chrislusf/seaweedfs/weed/pb/iam_pb" "github.com/golang/protobuf/jsonpb" "github.com/golang/protobuf/proto" - "io" ) func ParseS3ConfigurationFromBytes[T proto.Message](content []byte, config T) error { @@ -23,3 +27,18 @@ func ProtoToText(writer io.Writer, config proto.Message) error { return m.Marshal(writer, config) } + +// CheckDuplicateAccessKey returns an error message when s3cfg has duplicate access keys +func CheckDuplicateAccessKey(s3cfg *iam_pb.S3ApiConfiguration) error { + accessKeySet := make(map[string]string) + for _, ident := range s3cfg.Identities { + for _, cred := range ident.Credentials { + if userName, found := accessKeySet[cred.AccessKey]; !found { + accessKeySet[cred.AccessKey] = ident.Name + } else { + return errors.New(fmt.Sprintf("duplicate accessKey[%s], already configured in user[%s]", cred.AccessKey, userName)) + } + } + } + return nil +} diff --git a/weed/filer/s3iam_conf_test.go b/weed/filer/s3iam_conf_test.go index da7d9c9f1..bd9eb85ae 100644 --- a/weed/filer/s3iam_conf_test.go +++ b/weed/filer/s3iam_conf_test.go @@ -2,9 +2,10 @@ package filer import ( "bytes" - . "github.com/chrislusf/seaweedfs/weed/s3api/s3_constants" "testing" + . "github.com/chrislusf/seaweedfs/weed/s3api/s3_constants" + "github.com/chrislusf/seaweedfs/weed/pb/iam_pb" "github.com/stretchr/testify/assert" @@ -55,3 +56,93 @@ func TestS3Conf(t *testing.T) { assert.Equal(t, "some_access_key1", s3ConfSaved.Identities[0].Credentials[0].AccessKey) assert.Equal(t, "some_secret_key2", s3ConfSaved.Identities[1].Credentials[0].SecretKey) } + +func TestCheckDuplicateAccessKey(t *testing.T) { + var tests = []struct { + s3cfg *iam_pb.S3ApiConfiguration + err string + }{ + { + &iam_pb.S3ApiConfiguration{ + Identities: []*iam_pb.Identity{ + { + Name: "some_name", + Credentials: []*iam_pb.Credential{ + { + AccessKey: "some_access_key1", + SecretKey: "some_secret_key1", + }, + }, + Actions: []string{ + ACTION_ADMIN, + ACTION_READ, + ACTION_WRITE, + }, + }, + { + Name: "some_read_only_user", + Credentials: []*iam_pb.Credential{ + { + AccessKey: "some_access_key2", + SecretKey: "some_secret_key2", + }, + }, + Actions: []string{ + ACTION_READ, + ACTION_TAGGING, + ACTION_LIST, + }, + }, + }, + }, + "", + }, + { + &iam_pb.S3ApiConfiguration{ + Identities: []*iam_pb.Identity{ + { + Name: "some_name", + Credentials: []*iam_pb.Credential{ + { + AccessKey: "some_access_key1", + SecretKey: "some_secret_key1", + }, + }, + Actions: []string{ + ACTION_ADMIN, + ACTION_READ, + ACTION_WRITE, + }, + }, + { + Name: "some_read_only_user", + Credentials: []*iam_pb.Credential{ + { + AccessKey: "some_access_key1", + SecretKey: "some_secret_key1", + }, + }, + Actions: []string{ + ACTION_READ, + ACTION_TAGGING, + ACTION_LIST, + }, + }, + }, + }, + "duplicate accessKey[some_access_key1], already configured in user[some_name]", + }, + } + for i, test := range tests { + err := CheckDuplicateAccessKey(test.s3cfg) + var errString string + if err == nil { + errString = "" + } else { + errString = err.Error() + } + if errString != test.err { + t.Errorf("[%d]: got: %s expected: %s", i, errString, test.err) + } + } +} diff --git a/weed/s3api/auth_credentials.go b/weed/s3api/auth_credentials.go index fb23d9ce9..f9e97ea22 100644 --- a/weed/s3api/auth_credentials.go +++ b/weed/s3api/auth_credentials.go @@ -109,6 +109,11 @@ func (iam *IdentityAccessManagement) LoadS3ApiConfigurationFromBytes(content []b glog.Warningf("unmarshal error: %v", err) return fmt.Errorf("unmarshal error: %v", err) } + + if err := filer.CheckDuplicateAccessKey(s3ApiConfiguration); err != nil { + return err + } + if err := iam.loadS3ApiConfiguration(s3ApiConfiguration); err != nil { return err } diff --git a/weed/shell/command_s3_configure.go b/weed/shell/command_s3_configure.go index 0660b7889..422df2e75 100644 --- a/weed/shell/command_s3_configure.go +++ b/weed/shell/command_s3_configure.go @@ -2,14 +2,14 @@ package shell import ( "bytes" - "errors" "flag" "fmt" - "github.com/chrislusf/seaweedfs/weed/filer" "io" "sort" "strings" + "github.com/chrislusf/seaweedfs/weed/filer" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/pb/iam_pb" ) @@ -165,15 +165,8 @@ func (c *commandS3Configure) Do(args []string, commandEnv *CommandEnv, writer io s3cfg.Identities = append(s3cfg.Identities, &identity) } - accessKeySet := make(map[string]string) - for _, ident := range s3cfg.Identities { - for _, cred := range ident.Credentials { - if userName, found := accessKeySet[cred.AccessKey]; !found { - accessKeySet[cred.AccessKey] = ident.Name - } else { - return errors.New(fmt.Sprintf("duplicate accessKey[%s], already configured in user[%s]", cred.AccessKey, userName)) - } - } + if err = filer.CheckDuplicateAccessKey(s3cfg); err != nil { + return err } buf.Reset() From 354b7bdff04fe74244793713da9bae8a8e655523 Mon Sep 17 00:00:00 2001 From: guosj <515878133@qq.com> Date: Wed, 13 Jul 2022 22:49:03 +0800 Subject: [PATCH 2/5] replace errors.New(fmt.Sprintf(...)) with fmt.Errorf() --- weed/filer/s3iam_conf.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/weed/filer/s3iam_conf.go b/weed/filer/s3iam_conf.go index acff1e1bb..d8f3c2445 100644 --- a/weed/filer/s3iam_conf.go +++ b/weed/filer/s3iam_conf.go @@ -2,7 +2,6 @@ package filer import ( "bytes" - "errors" "fmt" "io" @@ -36,7 +35,7 @@ func CheckDuplicateAccessKey(s3cfg *iam_pb.S3ApiConfiguration) error { if userName, found := accessKeySet[cred.AccessKey]; !found { accessKeySet[cred.AccessKey] = ident.Name } else { - return errors.New(fmt.Sprintf("duplicate accessKey[%s], already configured in user[%s]", cred.AccessKey, userName)) + return fmt.Errorf("duplicate accessKey[%s], already configured in user[%s]", cred.AccessKey, userName) } } } From 308a48c0c20627ea47ddbfe9ba092d1235ab965b Mon Sep 17 00:00:00 2001 From: guol-fnst Date: Wed, 13 Jul 2022 09:40:46 +0800 Subject: [PATCH 3/5] optimiz concurrency user can customize number of workers via env "GOMAXPROCS" --- weed/storage/disk_location.go | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/weed/storage/disk_location.go b/weed/storage/disk_location.go index a2a63acbb..69d249259 100644 --- a/weed/storage/disk_location.go +++ b/weed/storage/disk_location.go @@ -5,6 +5,7 @@ import ( "os" "path/filepath" "runtime" + "strconv" "strings" "sync" "time" @@ -208,8 +209,14 @@ func (l *DiskLocation) concurrentLoadingVolumes(needleMapKind NeedleMapKind, con func (l *DiskLocation) loadExistingVolumes(needleMapKind NeedleMapKind) { workerNum := runtime.NumCPU() - if workerNum <= 10 { - workerNum = 10 + val, ok := os.LookupEnv("GOMAXPROCS") + if ok { + num, err := strconv.Atoi(val) + if err != nil || num < 1 { + num = 10 + glog.Warningf("failed to set worker number from GOMAXPROCS , set to default:10") + } + workerNum = num } l.concurrentLoadingVolumes(needleMapKind, workerNum) glog.V(0).Infof("Store started on dir: %s with %d volumes max %d", l.Directory, len(l.volumes), l.MaxVolumeCount) From 300b383cdf7ab607c92417256dfdc5ca52ea24b0 Mon Sep 17 00:00:00 2001 From: guol-fnst Date: Thu, 14 Jul 2022 10:37:13 +0800 Subject: [PATCH 4/5] use 10 or numCPU workers if env is not found --- weed/storage/disk_location.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/weed/storage/disk_location.go b/weed/storage/disk_location.go index 69d249259..8af8ea663 100644 --- a/weed/storage/disk_location.go +++ b/weed/storage/disk_location.go @@ -217,6 +217,10 @@ func (l *DiskLocation) loadExistingVolumes(needleMapKind NeedleMapKind) { glog.Warningf("failed to set worker number from GOMAXPROCS , set to default:10") } workerNum = num + } else { + if workerNum <= 10 { + workerNum = 10 + } } l.concurrentLoadingVolumes(needleMapKind, workerNum) glog.V(0).Infof("Store started on dir: %s with %d volumes max %d", l.Directory, len(l.volumes), l.MaxVolumeCount) From fbd8f868a124af47d1c91b742d587c4fab6f2a32 Mon Sep 17 00:00:00 2001 From: chrislu Date: Thu, 14 Jul 2022 12:15:31 -0700 Subject: [PATCH 5/5] filer may have trouble to re-connect clientId is used twice: one for local metadata subscription, one for combined metadata subscription. --- weed/server/filer_grpc_server_sub_meta.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/weed/server/filer_grpc_server_sub_meta.go b/weed/server/filer_grpc_server_sub_meta.go index 745379e7c..6a4a5bb17 100644 --- a/weed/server/filer_grpc_server_sub_meta.go +++ b/weed/server/filer_grpc_server_sub_meta.go @@ -90,6 +90,9 @@ func (fs *FilerServer) SubscribeLocalMetadata(req *filer_pb.SubscribeMetadataReq peerAddress := findClientAddress(stream.Context(), 0) + // use negative client id to differentiate from addClient()/deleteClient() used in SubscribeMetadata() + req.ClientId = -req.ClientId + alreadyKnown, clientName := fs.addClient(req.ClientName, peerAddress, req.ClientId) if alreadyKnown { return fmt.Errorf("duplicated local subscription detected for client %s id %d", clientName, req.ClientId)