From 0d817bc347188c2dd994245f992472690395c85d Mon Sep 17 00:00:00 2001 From: chrislu Date: Tue, 11 Oct 2022 21:58:17 -0700 Subject: [PATCH 01/12] fix invalid memory address or nil pointer dereference on filer.sync fix https://github.com/seaweedfs/seaweedfs/issues/3826 --- weed/replication/sink/filersink/filer_sink.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/weed/replication/sink/filersink/filer_sink.go b/weed/replication/sink/filersink/filer_sink.go index 3af5a4a80..b922be568 100644 --- a/weed/replication/sink/filersink/filer_sink.go +++ b/weed/replication/sink/filersink/filer_sink.go @@ -54,7 +54,6 @@ func (fs *FilerSink) IsIncremental() bool { func (fs *FilerSink) Initialize(configuration util.Configuration, prefix string) error { fs.isIncremental = configuration.GetBool(prefix + "is_incremental") fs.dataCenter = configuration.GetString(prefix + "dataCenter") - fs.executor = util.NewLimitedConcurrentExecutor(32) return fs.DoInitialize( "", configuration.GetString(prefix+"grpcAddress"), @@ -85,6 +84,7 @@ func (fs *FilerSink) DoInitialize(address, grpcAddress string, dir string, fs.diskType = diskType fs.grpcDialOption = grpcDialOption fs.writeChunkByFiler = writeChunkByFiler + fs.executor = util.NewLimitedConcurrentExecutor(32) return nil } From dff85e9c71efad812d92410dad56d6ab7c13c67c Mon Sep 17 00:00:00 2001 From: chrislu Date: Wed, 12 Oct 2022 00:03:14 -0700 Subject: [PATCH 02/12] fix error handling --- weed/filer/sqlite/sqlite_store.go | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/weed/filer/sqlite/sqlite_store.go b/weed/filer/sqlite/sqlite_store.go index 2bacf051a..202834fa2 100644 --- a/weed/filer/sqlite/sqlite_store.go +++ b/weed/filer/sqlite/sqlite_store.go @@ -63,9 +63,11 @@ func (store *SqliteStore) initialize(dbFile, createTable, upsertQuery string) (e var dbErr error store.DB, dbErr = sql.Open("sqlite", dbFile) if dbErr != nil { - store.DB.Close() - store.DB = nil - return fmt.Errorf("can not connect to %s error:%v", dbFile, err) + if store.DB != nil { + store.DB.Close() + store.DB = nil + } + return fmt.Errorf("can not connect to %s error:%v", dbFile, dbErr) } if err = store.DB.Ping(); err != nil { From cea73ac00818c08db807bf0121188f2ac926dc0f Mon Sep 17 00:00:00 2001 From: chrislu Date: Wed, 12 Oct 2022 00:03:27 -0700 Subject: [PATCH 03/12] serialize sqlite operations fix https://github.com/seaweedfs/seaweedfs/issues/3827 --- weed/filer/sqlite/sqlite_store.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/weed/filer/sqlite/sqlite_store.go b/weed/filer/sqlite/sqlite_store.go index 202834fa2..6c9ca4ecc 100644 --- a/weed/filer/sqlite/sqlite_store.go +++ b/weed/filer/sqlite/sqlite_store.go @@ -74,6 +74,8 @@ func (store *SqliteStore) initialize(dbFile, createTable, upsertQuery string) (e return fmt.Errorf("connect to %s error:%v", dbFile, err) } + store.DB.SetMaxOpenConns(1) + if err = store.CreateTable(context.Background(), abstract_sql.DEFAULT_TABLE); err != nil { return fmt.Errorf("init table %s: %v", abstract_sql.DEFAULT_TABLE, err) } From e55076c46f4d4f4b08d940ac0a972f445776e029 Mon Sep 17 00:00:00 2001 From: chrislu Date: Wed, 12 Oct 2022 00:38:32 -0700 Subject: [PATCH 04/12] cloud tier: add retry when copying data file fix https://github.com/seaweedfs/seaweedfs/issues/3828 --- weed/storage/backend/s3_backend/s3_backend.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/weed/storage/backend/s3_backend/s3_backend.go b/weed/storage/backend/s3_backend/s3_backend.go index 0b3db3c67..73b33716c 100644 --- a/weed/storage/backend/s3_backend/s3_backend.go +++ b/weed/storage/backend/s3_backend/s3_backend.go @@ -2,6 +2,7 @@ package s3_backend import ( "fmt" + "github.com/seaweedfs/seaweedfs/weed/util" "io" "os" "strings" @@ -91,7 +92,10 @@ func (s *S3BackendStorage) CopyFile(f *os.File, fn func(progressed int64, percen glog.V(1).Infof("copying dat file of %s to remote s3.%s as %s", f.Name(), s.id, key) - size, err = uploadToS3(s.conn, f.Name(), s.bucket, key, s.storageClass, fn) + util.Retry("upload to S3", func() error { + size, err = uploadToS3(s.conn, f.Name(), s.bucket, key, s.storageClass, fn) + return err + }) return } From a05725aea6b6c4e99a5b6d9541b94fd7bbd15ab9 Mon Sep 17 00:00:00 2001 From: zemul Date: Wed, 12 Oct 2022 22:14:49 +0800 Subject: [PATCH 05/12] filer: get directory metadata (#3833) --- weed/server/filer_server_handlers_read.go | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/weed/server/filer_server_handlers_read.go b/weed/server/filer_server_handlers_read.go index 8c3d4bcd8..645a3fb44 100644 --- a/weed/server/filer_server_handlers_read.go +++ b/weed/server/filer_server_handlers_read.go @@ -107,9 +107,15 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request) return } + query := r.URL.Query() + if entry.IsDirectory() { if fs.option.DisableDirListing { - w.WriteHeader(http.StatusMethodNotAllowed) + w.WriteHeader(http.StatusForbidden) + return + } + if query.Get("metadata") == "true" { + writeJsonQuiet(w, r, http.StatusOK, entry) return } if entry.Attr.Mime == "" { @@ -125,7 +131,6 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request) return } - query := r.URL.Query() if query.Get("metadata") == "true" { if query.Get("resolveManifest") == "true" { if entry.Chunks, _, err = filer.ResolveChunkManifest( From 401315f33750c7fa0374c00bcba8f26f8b67fe7b Mon Sep 17 00:00:00 2001 From: Konstantin Lebedev <9497591+kmlebedev@users.noreply.github.com> Date: Wed, 12 Oct 2022 19:18:40 +0500 Subject: [PATCH 06/12] master fix interruption through ctrl+c (#3834) --- weed/command/master.go | 2 +- weed/util/grace/signal_handling.go | 8 ++++++++ 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/weed/command/master.go b/weed/command/master.go index a74389b1f..39dbf42ed 100644 --- a/weed/command/master.go +++ b/weed/command/master.go @@ -255,7 +255,7 @@ func startMaster(masterOption MasterOptions, masterWhiteList []string) { } grace.OnInterrupt(ms.Shutdown) - grace.OnInterrupt(grpcS.GracefulStop) + grace.OnInterrupt(grpcS.Stop) grace.OnReload(func() { if ms.Topo.HashicorpRaft != nil && ms.Topo.HashicorpRaft.State() == hashicorpRaft.Leader { ms.Topo.HashicorpRaft.LeadershipTransfer() diff --git a/weed/util/grace/signal_handling.go b/weed/util/grace/signal_handling.go index 14b998796..0fc0f43e1 100644 --- a/weed/util/grace/signal_handling.go +++ b/weed/util/grace/signal_handling.go @@ -4,8 +4,11 @@ package grace import ( + "github.com/seaweedfs/seaweedfs/weed/glog" "os" "os/signal" + "reflect" + "runtime" "sync" "syscall" ) @@ -16,6 +19,10 @@ var interruptHookLock sync.RWMutex var reloadHooks = make([]func(), 0) var reloadHookLock sync.RWMutex +func GetFunctionName(i interface{}) string { + return runtime.FuncForPC(reflect.ValueOf(i).Pointer()).Name() +} + func init() { signalChan = make(chan os.Signal, 1) signal.Notify(signalChan, @@ -38,6 +45,7 @@ func init() { } else { interruptHookLock.RLock() for _, hook := range interruptHooks { + glog.V(4).Infof("exec interrupt hook func name:%s", GetFunctionName(hook)) hook() } interruptHookLock.RUnlock() From bf5e45b66a29e15553ebc5b63b6d414ffca5ce6a Mon Sep 17 00:00:00 2001 From: LHHDZ Date: Thu, 13 Oct 2022 11:15:16 +0800 Subject: [PATCH 07/12] add acl helper functionalities (#3831) --- weed/s3api/s3_constants/acp_grantee_group.go | 12 + weed/s3api/s3_constants/header.go | 8 + weed/s3api/s3acl/acl_helper.go | 505 +++++++++++++ weed/s3api/s3acl/acl_helper_test.go | 708 +++++++++++++++++++ 4 files changed, 1233 insertions(+) create mode 100644 weed/s3api/s3acl/acl_helper.go create mode 100644 weed/s3api/s3acl/acl_helper_test.go diff --git a/weed/s3api/s3_constants/acp_grantee_group.go b/weed/s3api/s3_constants/acp_grantee_group.go index a315fb0f7..7058a4e9f 100644 --- a/weed/s3api/s3_constants/acp_grantee_group.go +++ b/weed/s3api/s3_constants/acp_grantee_group.go @@ -6,3 +6,15 @@ var ( GranteeGroupAuthenticatedUsers = "http://acs.amazonaws.com/groups/global/AuthenticatedUsers" GranteeGroupLogDelivery = "http://acs.amazonaws.com/groups/s3/LogDelivery" ) + +func ValidateGroup(group string) bool { + valid := true + switch group { + case GranteeGroupAllUsers: + case GranteeGroupLogDelivery: + case GranteeGroupAuthenticatedUsers: + default: + valid = false + } + return valid +} diff --git a/weed/s3api/s3_constants/header.go b/weed/s3api/s3_constants/header.go index 5e19d67be..5037f4691 100644 --- a/weed/s3api/s3_constants/header.go +++ b/weed/s3api/s3_constants/header.go @@ -38,6 +38,14 @@ const ( AmzTagCount = "x-amz-tagging-count" X_SeaweedFS_Header_Directory_Key = "x-seaweedfs-is-directory-key" + + // S3 ACL headers + AmzCannedAcl = "X-Amz-Acl" + AmzAclFullControl = "X-Amz-Grant-Full-Control" + AmzAclRead = "X-Amz-Grant-Read" + AmzAclWrite = "X-Amz-Grant-Write" + AmzAclReadAcp = "X-Amz-Grant-Read-Acp" + AmzAclWriteAcp = "X-Amz-Grant-Write-Acp" ) // Non-Standard S3 HTTP request constants diff --git a/weed/s3api/s3acl/acl_helper.go b/weed/s3api/s3acl/acl_helper.go new file mode 100644 index 000000000..e54e67556 --- /dev/null +++ b/weed/s3api/s3acl/acl_helper.go @@ -0,0 +1,505 @@ +package s3acl + +import ( + "encoding/json" + "encoding/xml" + "github.com/aws/aws-sdk-go/private/protocol/xml/xmlutil" + "github.com/aws/aws-sdk-go/service/s3" + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants" + "github.com/seaweedfs/seaweedfs/weed/s3api/s3account" + "github.com/seaweedfs/seaweedfs/weed/s3api/s3err" + "github.com/seaweedfs/seaweedfs/weed/util" + "net/http" + "strings" +) + +// GetAccountId get AccountId from request headers, AccountAnonymousId will be return if not presen +func GetAccountId(r *http.Request) string { + id := r.Header.Get(s3_constants.AmzAccountId) + if len(id) == 0 { + return s3account.AccountAnonymous.Id + } else { + return id + } +} + +// ExtractAcl extracts the acl from the request body, or from the header if request body is empty +func ExtractAcl(r *http.Request, accountManager *s3account.AccountManager, ownership, bucketOwnerId, ownerId, accountId string) (grants []*s3.Grant, errCode s3err.ErrorCode) { + if r.Body != nil && r.Body != http.NoBody { + defer util.CloseRequest(r) + + var acp s3.AccessControlPolicy + err := xmlutil.UnmarshalXML(&acp, xml.NewDecoder(r.Body), "") + if err != nil || acp.Owner == nil || acp.Owner.ID == nil { + return nil, s3err.ErrInvalidRequest + } + + //owner should present && owner is immutable + if *acp.Owner.ID != ownerId { + glog.V(3).Infof("set acl denied! owner account is not consistent, request account id: %s, expect account id: %s", accountId, ownerId) + return nil, s3err.ErrAccessDenied + } + + return ValidateAndTransferGrants(accountManager, acp.Grants) + } else { + _, grants, errCode = ParseAndValidateAclHeadersOrElseDefault(r, accountManager, ownership, bucketOwnerId, accountId, true) + return grants, errCode + } +} + +// ParseAndValidateAclHeadersOrElseDefault will callParseAndValidateAclHeaders to get Grants, if empty, it will return Grant that grant `accountId` with `FullControl` permission +func ParseAndValidateAclHeadersOrElseDefault(r *http.Request, accountManager *s3account.AccountManager, ownership, bucketOwnerId, accountId string, putAcl bool) (ownerId string, grants []*s3.Grant, errCode s3err.ErrorCode) { + ownerId, grants, errCode = ParseAndValidateAclHeaders(r, accountManager, ownership, bucketOwnerId, accountId, putAcl) + if errCode != s3err.ErrNone { + return + } + if len(grants) == 0 { + //if no acl(both customAcl and cannedAcl) specified, grant accountId(object writer) with full control permission + grants = append(grants, &s3.Grant{ + Grantee: &s3.Grantee{ + Type: &s3_constants.GrantTypeCanonicalUser, + ID: &accountId, + }, + Permission: &s3_constants.PermissionFullControl, + }) + } + return +} + +// ParseAndValidateAclHeaders parse and validate acl from header +func ParseAndValidateAclHeaders(r *http.Request, accountManager *s3account.AccountManager, ownership, bucketOwnerId, accountId string, putAcl bool) (ownerId string, grants []*s3.Grant, errCode s3err.ErrorCode) { + ownerId, grants, errCode = ParseAclHeaders(r, ownership, bucketOwnerId, accountId, putAcl) + if errCode != s3err.ErrNone { + return + } + if len(grants) > 0 { + grants, errCode = ValidateAndTransferGrants(accountManager, grants) + } + return +} + +// ParseAclHeaders parse acl headers +// When `putAcl` is true, only `CannedAcl` is parsed, such as `PutBucketAcl` or `PutObjectAcl` +// is requested, `CustomAcl` is parsed from the request body not from headers, and only if the +// request body is empty, `CannedAcl` is parsed from the header, and will not parse `CustomAcl` from the header +// +// Since `CustomAcl` has higher priority, it will be parsed first; if `CustomAcl` does not exist, `CannedAcl` will be parsed +func ParseAclHeaders(r *http.Request, ownership, bucketOwnerId, accountId string, putAcl bool) (ownerId string, grants []*s3.Grant, errCode s3err.ErrorCode) { + if !putAcl { + errCode = ParseCustomAclHeaders(r, &grants) + if errCode != s3err.ErrNone { + return "", nil, errCode + } + } + if len(grants) > 0 { + return accountId, grants, s3err.ErrNone + } + + cannedAcl := r.Header.Get(s3_constants.AmzCannedAcl) + if len(cannedAcl) == 0 { + return accountId, grants, s3err.ErrNone + } + + //if canned acl specified, parse cannedAcl (lower priority to custom acl) + ownerId, grants, errCode = ParseCannedAclHeader(ownership, bucketOwnerId, accountId, cannedAcl, putAcl) + if errCode != s3err.ErrNone { + return "", nil, errCode + } + return ownerId, grants, errCode +} + +func ParseCustomAclHeaders(r *http.Request, grants *[]*s3.Grant) s3err.ErrorCode { + customAclHeaders := []string{s3_constants.AmzAclFullControl, s3_constants.AmzAclRead, s3_constants.AmzAclReadAcp, s3_constants.AmzAclWrite, s3_constants.AmzAclWriteAcp} + var errCode s3err.ErrorCode + for _, customAclHeader := range customAclHeaders { + headerValue := r.Header.Get(customAclHeader) + switch customAclHeader { + case s3_constants.AmzAclRead: + errCode = ParseCustomAclHeader(headerValue, s3_constants.PermissionRead, grants) + case s3_constants.AmzAclWrite: + errCode = ParseCustomAclHeader(headerValue, s3_constants.PermissionWrite, grants) + case s3_constants.AmzAclReadAcp: + errCode = ParseCustomAclHeader(headerValue, s3_constants.PermissionReadAcp, grants) + case s3_constants.AmzAclWriteAcp: + errCode = ParseCustomAclHeader(headerValue, s3_constants.PermissionWriteAcp, grants) + case s3_constants.AmzAclFullControl: + errCode = ParseCustomAclHeader(headerValue, s3_constants.PermissionFullControl, grants) + } + if errCode != s3err.ErrNone { + return errCode + } + } + return s3err.ErrNone +} + +func ParseCustomAclHeader(headerValue, permission string, grants *[]*s3.Grant) s3err.ErrorCode { + if len(headerValue) > 0 { + split := strings.Split(headerValue, ", ") + for _, grantStr := range split { + kv := strings.Split(grantStr, "=") + if len(kv) != 2 { + return s3err.ErrInvalidRequest + } + + switch kv[0] { + case "id": + var accountId string + _ = json.Unmarshal([]byte(kv[1]), &accountId) + *grants = append(*grants, &s3.Grant{ + Grantee: &s3.Grantee{ + Type: &s3_constants.GrantTypeCanonicalUser, + ID: &accountId, + }, + Permission: &permission, + }) + case "emailAddress": + var emailAddress string + _ = json.Unmarshal([]byte(kv[1]), &emailAddress) + *grants = append(*grants, &s3.Grant{ + Grantee: &s3.Grantee{ + Type: &s3_constants.GrantTypeAmazonCustomerByEmail, + EmailAddress: &emailAddress, + }, + Permission: &permission, + }) + case "uri": + var groupName string + _ = json.Unmarshal([]byte(kv[1]), &groupName) + *grants = append(*grants, &s3.Grant{ + Grantee: &s3.Grantee{ + Type: &s3_constants.GrantTypeGroup, + URI: &groupName, + }, + Permission: &permission, + }) + } + } + } + return s3err.ErrNone + +} + +func ParseCannedAclHeader(bucketOwnership, bucketOwnerId, accountId, cannedAcl string, putAcl bool) (ownerId string, grants []*s3.Grant, err s3err.ErrorCode) { + err = s3err.ErrNone + ownerId = accountId + + //objectWrite automatically has full control on current object + objectWriterFullControl := &s3.Grant{ + Grantee: &s3.Grantee{ + ID: &accountId, + Type: &s3_constants.GrantTypeCanonicalUser, + }, + Permission: &s3_constants.PermissionFullControl, + } + + switch cannedAcl { + case s3_constants.CannedAclPrivate: + grants = append(grants, objectWriterFullControl) + case s3_constants.CannedAclPublicRead: + grants = append(grants, objectWriterFullControl) + grants = append(grants, s3_constants.PublicRead...) + case s3_constants.CannedAclPublicReadWrite: + grants = append(grants, objectWriterFullControl) + grants = append(grants, s3_constants.PublicReadWrite...) + case s3_constants.CannedAclAuthenticatedRead: + grants = append(grants, objectWriterFullControl) + grants = append(grants, s3_constants.AuthenticatedRead...) + case s3_constants.CannedAclLogDeliveryWrite: + grants = append(grants, objectWriterFullControl) + grants = append(grants, s3_constants.LogDeliveryWrite...) + case s3_constants.CannedAclBucketOwnerRead: + grants = append(grants, objectWriterFullControl) + if bucketOwnerId != "" && bucketOwnerId != accountId { + grants = append(grants, + &s3.Grant{ + Grantee: &s3.Grantee{ + Type: &s3_constants.GrantTypeCanonicalUser, + ID: &bucketOwnerId, + }, + Permission: &s3_constants.PermissionRead, + }) + } + case s3_constants.CannedAclBucketOwnerFullControl: + if bucketOwnerId != "" { + // if set ownership to 'BucketOwnerPreferred' when upload object, the bucket owner will be the object owner + if !putAcl && bucketOwnership == s3_constants.OwnershipBucketOwnerPreferred { + ownerId = bucketOwnerId + grants = append(grants, + &s3.Grant{ + Grantee: &s3.Grantee{ + Type: &s3_constants.GrantTypeCanonicalUser, + ID: &bucketOwnerId, + }, + Permission: &s3_constants.PermissionFullControl, + }) + } else { + grants = append(grants, objectWriterFullControl) + if accountId != bucketOwnerId { + grants = append(grants, + &s3.Grant{ + Grantee: &s3.Grantee{ + Type: &s3_constants.GrantTypeCanonicalUser, + ID: &bucketOwnerId, + }, + Permission: &s3_constants.PermissionFullControl, + }) + } + } + } + case s3_constants.CannedAclAwsExecRead: + err = s3err.ErrNotImplemented + default: + err = s3err.ErrInvalidRequest + } + return +} + +// ValidateAndTransferGrants validate grant & transfer Email-Grant to Id-Grant +func ValidateAndTransferGrants(accountManager *s3account.AccountManager, grants []*s3.Grant) ([]*s3.Grant, s3err.ErrorCode) { + var result []*s3.Grant + for _, grant := range grants { + grantee := grant.Grantee + if grantee == nil || grantee.Type == nil { + glog.Warning("invalid grantee! grantee or granteeType is nil") + return nil, s3err.ErrInvalidRequest + } + + switch *grantee.Type { + case s3_constants.GrantTypeGroup: + if grantee.URI == nil { + glog.Warning("invalid group grantee! group URI is nil") + return nil, s3err.ErrInvalidRequest + } + ok := s3_constants.ValidateGroup(*grantee.URI) + if !ok { + glog.Warningf("invalid group grantee! group name[%s] is not valid", *grantee.URI) + return nil, s3err.ErrInvalidRequest + } + result = append(result, grant) + case s3_constants.GrantTypeCanonicalUser: + if grantee.ID == nil { + glog.Warning("invalid canonical grantee! account id is nil") + return nil, s3err.ErrInvalidRequest + } + _, ok := accountManager.IdNameMapping[*grantee.ID] + if !ok { + glog.Warningf("invalid canonical grantee! account id[%s] is not exists", *grantee.ID) + return nil, s3err.ErrInvalidRequest + } + result = append(result, grant) + case s3_constants.GrantTypeAmazonCustomerByEmail: + if grantee.EmailAddress == nil { + glog.Warning("invalid email grantee! email address is nil") + return nil, s3err.ErrInvalidRequest + } + accountId, ok := accountManager.EmailIdMapping[*grantee.EmailAddress] + if !ok { + glog.Warningf("invalid email grantee! email address[%s] is not exists", *grantee.EmailAddress) + return nil, s3err.ErrInvalidRequest + } + result = append(result, &s3.Grant{ + Grantee: &s3.Grantee{ + Type: &s3_constants.GrantTypeCanonicalUser, + ID: &accountId, + }, + Permission: grant.Permission, + }) + default: + return nil, s3err.ErrInvalidRequest + } + } + return result, s3err.ErrNone +} + +// DetermineReqGrants generates the grant set (Grants) according to accountId and reqPermission. +func DetermineReqGrants(accountId, aclAction string) (grants []*s3.Grant) { + // group grantee (AllUsers) + grants = append(grants, &s3.Grant{ + Grantee: &s3.Grantee{ + Type: &s3_constants.GrantTypeGroup, + URI: &s3_constants.GranteeGroupAllUsers, + }, + Permission: &aclAction, + }) + grants = append(grants, &s3.Grant{ + Grantee: &s3.Grantee{ + Type: &s3_constants.GrantTypeGroup, + URI: &s3_constants.GranteeGroupAllUsers, + }, + Permission: &s3_constants.PermissionFullControl, + }) + + // canonical grantee (accountId) + grants = append(grants, &s3.Grant{ + Grantee: &s3.Grantee{ + Type: &s3_constants.GrantTypeCanonicalUser, + ID: &accountId, + }, + Permission: &aclAction, + }) + grants = append(grants, &s3.Grant{ + Grantee: &s3.Grantee{ + Type: &s3_constants.GrantTypeCanonicalUser, + ID: &accountId, + }, + Permission: &s3_constants.PermissionFullControl, + }) + + // group grantee (AuthenticateUsers) + if accountId != s3account.AccountAnonymous.Id { + grants = append(grants, &s3.Grant{ + Grantee: &s3.Grantee{ + Type: &s3_constants.GrantTypeGroup, + URI: &s3_constants.GranteeGroupAuthenticatedUsers, + }, + Permission: &aclAction, + }) + grants = append(grants, &s3.Grant{ + Grantee: &s3.Grantee{ + Type: &s3_constants.GrantTypeGroup, + URI: &s3_constants.GranteeGroupAuthenticatedUsers, + }, + Permission: &s3_constants.PermissionFullControl, + }) + } + return +} + +func SetAcpOwnerHeader(r *http.Request, acpOwnerId string) { + r.Header.Set(s3_constants.ExtAmzOwnerKey, acpOwnerId) +} + +func GetAcpOwner(entryExtended map[string][]byte, defaultOwner string) string { + ownerIdBytes, ok := entryExtended[s3_constants.ExtAmzOwnerKey] + if ok && len(ownerIdBytes) > 0 { + return string(ownerIdBytes) + } + return defaultOwner +} + +func SetAcpGrantsHeader(r *http.Request, acpGrants []*s3.Grant) { + if len(acpGrants) > 0 { + a, err := json.Marshal(acpGrants) + if err == nil { + r.Header.Set(s3_constants.ExtAmzAclKey, string(a)) + } else { + glog.Warning("Marshal acp grants err", err) + } + } +} + +// GetAcpGrants return grants parsed from entry +func GetAcpGrants(entryExtended map[string][]byte) []*s3.Grant { + acpBytes, ok := entryExtended[s3_constants.ExtAmzAclKey] + if ok && len(acpBytes) > 0 { + var grants []*s3.Grant + err := json.Unmarshal(acpBytes, &grants) + if err == nil { + return grants + } + } + return nil +} + +// AssembleEntryWithAcp fill entry with owner and grants +func AssembleEntryWithAcp(objectEntry *filer_pb.Entry, objectOwner string, grants []*s3.Grant) s3err.ErrorCode { + if objectEntry.Extended == nil { + objectEntry.Extended = make(map[string][]byte) + } + + if len(objectOwner) > 0 { + objectEntry.Extended[s3_constants.ExtAmzOwnerKey] = []byte(objectOwner) + } + + if len(grants) > 0 { + grantsBytes, err := json.Marshal(grants) + if err != nil { + glog.Warning("assemble acp to entry:", err) + return s3err.ErrInvalidRequest + } + objectEntry.Extended[s3_constants.ExtAmzAclKey] = grantsBytes + } + + return s3err.ErrNone +} + +// GrantEquals Compare whether two Grants are equal in meaning, not completely +// equal (compare Grantee.Type and the corresponding Value for equality, other +// fields of Grantee are ignored) +func GrantEquals(a, b *s3.Grant) bool { + // grant + if a == b { + return true + } + + if a == nil || b == nil { + return false + } + + // grant.Permission + if a.Permission != b.Permission { + if a.Permission == nil || b.Permission == nil { + return false + } + + if *a.Permission != *b.Permission { + return false + } + } + + // grant.Grantee + ag := a.Grantee + bg := b.Grantee + if ag != bg { + if ag == nil || bg == nil { + return false + } + // grantee.Type + if ag.Type != bg.Type { + if ag.Type == nil || bg.Type == nil { + return false + } + if *ag.Type != *bg.Type { + return false + } + } + // value corresponding to granteeType + if ag.Type != nil { + switch *ag.Type { + case s3_constants.GrantTypeGroup: + if ag.URI != bg.URI { + if ag.URI == nil || bg.URI == nil { + return false + } + + if *ag.URI != *bg.URI { + return false + } + } + case s3_constants.GrantTypeCanonicalUser: + if ag.ID != bg.ID { + if ag.ID == nil || bg.ID == nil { + return false + } + + if *ag.ID != *bg.ID { + return false + } + } + case s3_constants.GrantTypeAmazonCustomerByEmail: + if ag.EmailAddress != bg.EmailAddress { + if ag.EmailAddress == nil || bg.EmailAddress == nil { + return false + } + + if *ag.EmailAddress != *bg.EmailAddress { + return false + } + } + } + } + } + return true +} diff --git a/weed/s3api/s3acl/acl_helper_test.go b/weed/s3api/s3acl/acl_helper_test.go new file mode 100644 index 000000000..efc137989 --- /dev/null +++ b/weed/s3api/s3acl/acl_helper_test.go @@ -0,0 +1,708 @@ +package s3acl + +import ( + "bytes" + "encoding/json" + "github.com/aws/aws-sdk-go/service/s3" + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants" + "github.com/seaweedfs/seaweedfs/weed/s3api/s3account" + "github.com/seaweedfs/seaweedfs/weed/s3api/s3err" + "io" + "net/http" + "testing" +) + +var ( + accountManager = &s3account.AccountManager{ + IdNameMapping: map[string]string{ + s3account.AccountAdmin.Id: s3account.AccountAdmin.Name, + s3account.AccountAnonymous.Id: s3account.AccountAnonymous.Name, + "accountA": "accountA", + "accountB": "accountB", + }, + EmailIdMapping: map[string]string{ + s3account.AccountAdmin.EmailAddress: s3account.AccountAdmin.Id, + s3account.AccountAnonymous.EmailAddress: s3account.AccountAnonymous.Id, + "accountA@example.com": "accountA", + "accountBexample.com": "accountB", + }, + } +) + +func TestGetAccountId(t *testing.T) { + req := &http.Request{ + Header: make(map[string][]string), + } + //case1 + //accountId: "admin" + req.Header.Set(s3_constants.AmzAccountId, s3account.AccountAdmin.Id) + if GetAccountId(req) != s3account.AccountAdmin.Id { + t.Fatal("expect accountId: admin") + } + + //case2 + //accountId: "anoymous" + req.Header.Set(s3_constants.AmzAccountId, s3account.AccountAnonymous.Id) + if GetAccountId(req) != s3account.AccountAnonymous.Id { + t.Fatal("expect accountId: anonymous") + } + + //case3 + //accountId is nil => "anonymous" + req.Header.Del(s3_constants.AmzAccountId) + if GetAccountId(req) != s3account.AccountAnonymous.Id { + t.Fatal("expect accountId: anonymous") + } +} + +func TestExtractAcl(t *testing.T) { + type Case struct { + id int + resultErrCode, expectErrCode s3err.ErrorCode + resultGrants, expectGrants []*s3.Grant + } + testCases := make([]*Case, 0) + accountAdminId := "admin" + + { + //case1 (good case) + //parse acp from request body + req := &http.Request{ + Header: make(map[string][]string), + } + req.Body = io.NopCloser(bytes.NewReader([]byte(` + + + admin + admin + + + + + admin + + FULL_CONTROL + + + + http://acs.amazonaws.com/groups/global/AllUsers + + FULL_CONTROL + + + + `))) + objectWriter := "accountA" + grants, errCode := ExtractAcl(req, accountManager, s3_constants.OwnershipObjectWriter, accountAdminId, accountAdminId, objectWriter) + testCases = append(testCases, &Case{ + 1, + errCode, s3err.ErrNone, + grants, []*s3.Grant{ + { + Grantee: &s3.Grantee{ + Type: &s3_constants.GrantTypeCanonicalUser, + ID: &accountAdminId, + }, + Permission: &s3_constants.PermissionFullControl, + }, + { + Grantee: &s3.Grantee{ + Type: &s3_constants.GrantTypeGroup, + URI: &s3_constants.GranteeGroupAllUsers, + }, + Permission: &s3_constants.PermissionFullControl, + }, + }, + }) + } + + { + //case2 (good case) + //parse acp from header (cannedAcl) + req := &http.Request{ + Header: make(map[string][]string), + } + req.Body = nil + req.Header.Set(s3_constants.AmzCannedAcl, s3_constants.CannedAclPrivate) + objectWriter := "accountA" + grants, errCode := ExtractAcl(req, accountManager, s3_constants.OwnershipObjectWriter, accountAdminId, accountAdminId, objectWriter) + testCases = append(testCases, &Case{ + 2, + errCode, s3err.ErrNone, + grants, []*s3.Grant{ + { + Grantee: &s3.Grantee{ + Type: &s3_constants.GrantTypeCanonicalUser, + ID: &objectWriter, + }, + Permission: &s3_constants.PermissionFullControl, + }, + }, + }) + } + + { + //case3 (bad case) + //parse acp from request body (content is invalid) + req := &http.Request{ + Header: make(map[string][]string), + } + req.Body = io.NopCloser(bytes.NewReader([]byte("zdfsaf"))) + req.Header.Set(s3_constants.AmzCannedAcl, s3_constants.CannedAclPrivate) + objectWriter := "accountA" + _, errCode := ExtractAcl(req, accountManager, s3_constants.OwnershipObjectWriter, accountAdminId, accountAdminId, objectWriter) + testCases = append(testCases, &Case{ + id: 3, + resultErrCode: errCode, expectErrCode: s3err.ErrInvalidRequest, + }) + } + + //case4 (bad case) + //parse acp from header (cannedAcl is invalid) + req := &http.Request{ + Header: make(map[string][]string), + } + req.Body = nil + req.Header.Set(s3_constants.AmzCannedAcl, "dfaksjfk") + objectWriter := "accountA" + _, errCode := ExtractAcl(req, accountManager, s3_constants.OwnershipObjectWriter, accountAdminId, "", objectWriter) + testCases = append(testCases, &Case{ + id: 4, + resultErrCode: errCode, expectErrCode: s3err.ErrInvalidRequest, + }) + + { + //case5 (bad case) + //parse acp from request body: owner is inconsistent + req.Body = io.NopCloser(bytes.NewReader([]byte(` + + + admin + admin + + + + + admin + + FULL_CONTROL + + + + http://acs.amazonaws.com/groups/global/AllUsers + + FULL_CONTROL + + + + `))) + objectWriter = "accountA" + _, errCode := ExtractAcl(req, accountManager, s3_constants.OwnershipObjectWriter, accountAdminId, objectWriter, objectWriter) + testCases = append(testCases, &Case{ + id: 5, + resultErrCode: errCode, expectErrCode: s3err.ErrAccessDenied, + }) + } + + for _, tc := range testCases { + if tc.resultErrCode != tc.expectErrCode { + t.Fatalf("case[%d]: errorCode not expect", tc.id) + } + if !grantsEquals(tc.resultGrants, tc.expectGrants) { + t.Fatalf("case[%d]: grants not expect", tc.id) + } + } +} + +func TestParseAndValidateAclHeaders(t *testing.T) { + type Case struct { + id int + resultOwner, expectOwner string + resultErrCode, expectErrCode s3err.ErrorCode + resultGrants, expectGrants []*s3.Grant + } + testCases := make([]*Case, 0) + bucketOwner := "admin" + + { + //case1 (good case) + //parse custom acl + req := &http.Request{ + Header: make(map[string][]string), + } + objectWriter := "accountA" + req.Header.Set(s3_constants.AmzAclFullControl, `uri="http://acs.amazonaws.com/groups/global/AllUsers", id="anonymous", emailAddress="admin@example.com"`) + ownerId, grants, errCode := ParseAndValidateAclHeaders(req, accountManager, s3_constants.OwnershipObjectWriter, bucketOwner, objectWriter, false) + testCases = append(testCases, &Case{ + 1, + ownerId, objectWriter, + errCode, s3err.ErrNone, + grants, []*s3.Grant{ + { + Grantee: &s3.Grantee{ + Type: &s3_constants.GrantTypeGroup, + URI: &s3_constants.GranteeGroupAllUsers, + }, + Permission: &s3_constants.PermissionFullControl, + }, + { + Grantee: &s3.Grantee{ + Type: &s3_constants.GrantTypeCanonicalUser, + ID: &s3account.AccountAnonymous.Id, + }, + Permission: &s3_constants.PermissionFullControl, + }, + { + Grantee: &s3.Grantee{ + Type: &s3_constants.GrantTypeCanonicalUser, + ID: &s3account.AccountAdmin.Id, + }, + Permission: &s3_constants.PermissionFullControl, + }, + }, + }) + } + { + //case2 (good case) + //parse canned acl (ownership=ObjectWriter) + req := &http.Request{ + Header: make(map[string][]string), + } + objectWriter := "accountA" + req.Header.Set(s3_constants.AmzCannedAcl, s3_constants.CannedAclBucketOwnerFullControl) + ownerId, grants, errCode := ParseAndValidateAclHeaders(req, accountManager, s3_constants.OwnershipObjectWriter, bucketOwner, objectWriter, false) + testCases = append(testCases, &Case{ + 2, + ownerId, objectWriter, + errCode, s3err.ErrNone, + grants, []*s3.Grant{ + { + Grantee: &s3.Grantee{ + Type: &s3_constants.GrantTypeCanonicalUser, + ID: &objectWriter, + }, + Permission: &s3_constants.PermissionFullControl, + }, + { + Grantee: &s3.Grantee{ + Type: &s3_constants.GrantTypeCanonicalUser, + ID: &bucketOwner, + }, + Permission: &s3_constants.PermissionFullControl, + }, + }, + }) + } + { + //case3 (good case) + //parse canned acl (ownership=OwnershipBucketOwnerPreferred) + req := &http.Request{ + Header: make(map[string][]string), + } + objectWriter := "accountA" + req.Header.Set(s3_constants.AmzCannedAcl, s3_constants.CannedAclBucketOwnerFullControl) + ownerId, grants, errCode := ParseAndValidateAclHeaders(req, accountManager, s3_constants.OwnershipBucketOwnerPreferred, bucketOwner, objectWriter, false) + testCases = append(testCases, &Case{ + 3, + ownerId, bucketOwner, + errCode, s3err.ErrNone, + grants, []*s3.Grant{ + { + Grantee: &s3.Grantee{ + Type: &s3_constants.GrantTypeCanonicalUser, + ID: &bucketOwner, + }, + Permission: &s3_constants.PermissionFullControl, + }, + }, + }) + } + { + //case4 (bad case) + //parse custom acl (grantee id not exists) + req := &http.Request{ + Header: make(map[string][]string), + } + objectWriter := "accountA" + req.Header.Set(s3_constants.AmzAclFullControl, `uri="http://acs.amazonaws.com/groups/global/AllUsers", id="notExistsAccount", emailAddress="admin@example.com"`) + _, _, errCode := ParseAndValidateAclHeaders(req, accountManager, s3_constants.OwnershipObjectWriter, bucketOwner, objectWriter, false) + testCases = append(testCases, &Case{ + id: 4, + resultErrCode: errCode, expectErrCode: s3err.ErrInvalidRequest, + }) + } + + { + //case5 (bad case) + //parse custom acl (invalid format) + req := &http.Request{ + Header: make(map[string][]string), + } + objectWriter := "accountA" + req.Header.Set(s3_constants.AmzAclFullControl, `uri="http:sfasf"`) + _, _, errCode := ParseAndValidateAclHeaders(req, accountManager, s3_constants.OwnershipObjectWriter, bucketOwner, objectWriter, false) + testCases = append(testCases, &Case{ + id: 5, + resultErrCode: errCode, expectErrCode: s3err.ErrInvalidRequest, + }) + } + + { + //case6 (bad case) + //parse canned acl (invalid value) + req := &http.Request{ + Header: make(map[string][]string), + } + objectWriter := "accountA" + req.Header.Set(s3_constants.AmzCannedAcl, `uri="http:sfasf"`) + _, _, errCode := ParseAndValidateAclHeaders(req, accountManager, s3_constants.OwnershipObjectWriter, bucketOwner, objectWriter, false) + testCases = append(testCases, &Case{ + id: 5, + resultErrCode: errCode, expectErrCode: s3err.ErrInvalidRequest, + }) + } + + for _, tc := range testCases { + if tc.expectErrCode != tc.resultErrCode { + t.Errorf("case[%d]: errCode unexpect", tc.id) + } + if tc.resultOwner != tc.expectOwner { + t.Errorf("case[%d]: ownerId unexpect", tc.id) + } + if !grantsEquals(tc.resultGrants, tc.expectGrants) { + t.Fatalf("case[%d]: grants not expect", tc.id) + } + } +} + +func grantsEquals(a, b []*s3.Grant) bool { + if len(a) != len(b) { + return false + } + for i, grant := range a { + if !GrantEquals(grant, b[i]) { + return false + } + } + return true +} + +func TestDetermineReqGrants(t *testing.T) { + { + //case1: request account is anonymous + accountId := s3account.AccountAnonymous.Id + reqPermission := s3_constants.PermissionRead + + resultGrants := DetermineReqGrants(accountId, reqPermission) + expectGrants := []*s3.Grant{ + { + Grantee: &s3.Grantee{ + Type: &s3_constants.GrantTypeGroup, + URI: &s3_constants.GranteeGroupAllUsers, + }, + Permission: &reqPermission, + }, + { + Grantee: &s3.Grantee{ + Type: &s3_constants.GrantTypeGroup, + URI: &s3_constants.GranteeGroupAllUsers, + }, + Permission: &s3_constants.PermissionFullControl, + }, + { + Grantee: &s3.Grantee{ + Type: &s3_constants.GrantTypeCanonicalUser, + ID: &accountId, + }, + Permission: &reqPermission, + }, + { + Grantee: &s3.Grantee{ + Type: &s3_constants.GrantTypeCanonicalUser, + ID: &accountId, + }, + Permission: &s3_constants.PermissionFullControl, + }, + } + if !grantsEquals(resultGrants, expectGrants) { + t.Fatalf("grants not expect") + } + } + { + //case2: request account is not anonymous (Iam authed) + accountId := "accountX" + reqPermission := s3_constants.PermissionRead + + resultGrants := DetermineReqGrants(accountId, reqPermission) + expectGrants := []*s3.Grant{ + { + Grantee: &s3.Grantee{ + Type: &s3_constants.GrantTypeGroup, + URI: &s3_constants.GranteeGroupAllUsers, + }, + Permission: &reqPermission, + }, + { + Grantee: &s3.Grantee{ + Type: &s3_constants.GrantTypeGroup, + URI: &s3_constants.GranteeGroupAllUsers, + }, + Permission: &s3_constants.PermissionFullControl, + }, + { + Grantee: &s3.Grantee{ + Type: &s3_constants.GrantTypeCanonicalUser, + ID: &accountId, + }, + Permission: &reqPermission, + }, + { + Grantee: &s3.Grantee{ + Type: &s3_constants.GrantTypeCanonicalUser, + ID: &accountId, + }, + Permission: &s3_constants.PermissionFullControl, + }, + { + Grantee: &s3.Grantee{ + Type: &s3_constants.GrantTypeGroup, + URI: &s3_constants.GranteeGroupAuthenticatedUsers, + }, + Permission: &reqPermission, + }, + { + Grantee: &s3.Grantee{ + Type: &s3_constants.GrantTypeGroup, + URI: &s3_constants.GranteeGroupAuthenticatedUsers, + }, + Permission: &s3_constants.PermissionFullControl, + }, + } + if !grantsEquals(resultGrants, expectGrants) { + t.Fatalf("grants not expect") + } + } +} + +func TestAssembleEntryWithAcp(t *testing.T) { + defaultOwner := "admin" + { + //case1 + expectOwner := "accountS" + expectGrants := []*s3.Grant{ + { + Permission: &s3_constants.PermissionRead, + Grantee: &s3.Grantee{ + Type: &s3_constants.GrantTypeGroup, + ID: &s3account.AccountAdmin.Id, + URI: &s3_constants.GranteeGroupAllUsers, + }, + }, + } + entry := &filer_pb.Entry{} + AssembleEntryWithAcp(entry, expectOwner, expectGrants) + + resultOwner := GetAcpOwner(entry.Extended, defaultOwner) + if resultOwner != expectOwner { + t.Fatalf("owner not expect") + } + + resultGrants := GetAcpGrants(entry.Extended) + if !grantsEquals(resultGrants, expectGrants) { + t.Fatal("grants not expect") + } + } + { + //case2 + entry := &filer_pb.Entry{} + AssembleEntryWithAcp(entry, "", nil) + + resultOwner := GetAcpOwner(entry.Extended, defaultOwner) + if resultOwner != defaultOwner { + t.Fatalf("owner not expect") + } + + resultGrants := GetAcpGrants(entry.Extended) + if len(resultGrants) != 0 { + t.Fatal("grants not expect") + } + } + +} + +func TestGrantEquals(t *testing.T) { + testCases := map[bool]bool{ + GrantEquals(nil, nil): true, + + GrantEquals(&s3.Grant{}, nil): false, + + GrantEquals(&s3.Grant{}, &s3.Grant{}): true, + + GrantEquals(&s3.Grant{ + Permission: &s3_constants.PermissionRead, + }, &s3.Grant{}): false, + + GrantEquals(&s3.Grant{ + Permission: &s3_constants.PermissionRead, + }, &s3.Grant{ + Permission: &s3_constants.PermissionRead, + }): true, + + GrantEquals(&s3.Grant{ + Permission: &s3_constants.PermissionRead, + Grantee: &s3.Grantee{}, + }, &s3.Grant{ + Permission: &s3_constants.PermissionRead, + Grantee: &s3.Grantee{}, + }): true, + + GrantEquals(&s3.Grant{ + Permission: &s3_constants.PermissionRead, + Grantee: &s3.Grantee{ + Type: &s3_constants.GrantTypeGroup, + }, + }, &s3.Grant{ + Permission: &s3_constants.PermissionRead, + Grantee: &s3.Grantee{}, + }): false, + + //type not present, compare other fields of grant is meaningless + GrantEquals(&s3.Grant{ + Permission: &s3_constants.PermissionRead, + Grantee: &s3.Grantee{ + ID: &s3account.AccountAdmin.Id, + EmailAddress: &s3account.AccountAdmin.EmailAddress, + }, + }, &s3.Grant{ + Permission: &s3_constants.PermissionRead, + Grantee: &s3.Grantee{ + ID: &s3account.AccountAdmin.Id, + }, + }): true, + + GrantEquals(&s3.Grant{ + Permission: &s3_constants.PermissionRead, + Grantee: &s3.Grantee{ + Type: &s3_constants.GrantTypeGroup, + }, + }, &s3.Grant{ + Permission: &s3_constants.PermissionRead, + Grantee: &s3.Grantee{ + Type: &s3_constants.GrantTypeGroup, + }, + }): true, + + GrantEquals(&s3.Grant{ + Permission: &s3_constants.PermissionRead, + Grantee: &s3.Grantee{ + Type: &s3_constants.GrantTypeGroup, + URI: &s3_constants.GranteeGroupAllUsers, + }, + }, &s3.Grant{ + Permission: &s3_constants.PermissionRead, + Grantee: &s3.Grantee{ + Type: &s3_constants.GrantTypeGroup, + URI: &s3_constants.GranteeGroupAllUsers, + }, + }): true, + + GrantEquals(&s3.Grant{ + Permission: &s3_constants.PermissionWrite, + Grantee: &s3.Grantee{ + Type: &s3_constants.GrantTypeGroup, + URI: &s3_constants.GranteeGroupAllUsers, + }, + }, &s3.Grant{ + Permission: &s3_constants.PermissionRead, + Grantee: &s3.Grantee{ + Type: &s3_constants.GrantTypeGroup, + URI: &s3_constants.GranteeGroupAllUsers, + }, + }): false, + + GrantEquals(&s3.Grant{ + Permission: &s3_constants.PermissionRead, + Grantee: &s3.Grantee{ + Type: &s3_constants.GrantTypeGroup, + ID: &s3account.AccountAdmin.Id, + }, + }, &s3.Grant{ + Permission: &s3_constants.PermissionRead, + Grantee: &s3.Grantee{ + Type: &s3_constants.GrantTypeGroup, + ID: &s3account.AccountAdmin.Id, + }, + }): true, + + GrantEquals(&s3.Grant{ + Permission: &s3_constants.PermissionRead, + Grantee: &s3.Grantee{ + Type: &s3_constants.GrantTypeGroup, + ID: &s3account.AccountAdmin.Id, + URI: &s3_constants.GranteeGroupAllUsers, + }, + }, &s3.Grant{ + Permission: &s3_constants.PermissionRead, + Grantee: &s3.Grantee{ + Type: &s3_constants.GrantTypeGroup, + ID: &s3account.AccountAdmin.Id, + }, + }): false, + + GrantEquals(&s3.Grant{ + Permission: &s3_constants.PermissionRead, + Grantee: &s3.Grantee{ + Type: &s3_constants.GrantTypeGroup, + ID: &s3account.AccountAdmin.Id, + URI: &s3_constants.GranteeGroupAllUsers, + }, + }, &s3.Grant{ + Permission: &s3_constants.PermissionRead, + Grantee: &s3.Grantee{ + Type: &s3_constants.GrantTypeGroup, + URI: &s3_constants.GranteeGroupAllUsers, + }, + }): true, + } + + for tc, expect := range testCases { + if tc != expect { + t.Fatal("TestGrantEquals not expect!") + } + } +} + +func TestSetAcpOwnerHeader(t *testing.T) { + ownerId := "accountZ" + req := &http.Request{ + Header: make(map[string][]string), + } + SetAcpOwnerHeader(req, ownerId) + + if req.Header.Get(s3_constants.ExtAmzOwnerKey) != ownerId { + t.Fatalf("owner unexpect") + } +} + +func TestSetAcpGrantsHeader(t *testing.T) { + req := &http.Request{ + Header: make(map[string][]string), + } + grants := []*s3.Grant{ + { + Permission: &s3_constants.PermissionRead, + Grantee: &s3.Grantee{ + Type: &s3_constants.GrantTypeGroup, + ID: &s3account.AccountAdmin.Id, + URI: &s3_constants.GranteeGroupAllUsers, + }, + }, + } + SetAcpGrantsHeader(req, grants) + + grantsJson, _ := json.Marshal(grants) + if req.Header.Get(s3_constants.ExtAmzAclKey) != string(grantsJson) { + t.Fatalf("owner unexpect") + } +} From cacc3e883b6285f688eda5718dfc9c9817ea07d0 Mon Sep 17 00:00:00 2001 From: famosss Date: Thu, 13 Oct 2022 12:13:26 +0800 Subject: [PATCH 08/12] volume server:set the default value of "hasSlowRead" to true (#3710) * simplify a bit * feat: volume: add "readBufSize" option to customize read optimization * refactor : redbufSIze -> readBufferSize * simplify a bit * simplify a bit * volume server:set the default value of "hasSlowRead" to true --- weed/command/volume.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/weed/command/volume.go b/weed/command/volume.go index 4c6bb20c2..e0bc8b8fe 100644 --- a/weed/command/volume.go +++ b/weed/command/volume.go @@ -98,7 +98,7 @@ func init() { v.metricsHttpPort = cmdVolume.Flag.Int("metricsPort", 0, "Prometheus metrics listen port") v.idxFolder = cmdVolume.Flag.String("dir.idx", "", "directory to store .idx files") v.inflightUploadDataTimeout = cmdVolume.Flag.Duration("inflightUploadDataTimeout", 60*time.Second, "inflight upload data wait timeout of volume servers") - v.hasSlowRead = cmdVolume.Flag.Bool("hasSlowRead", false, " if true, this prevents slow reads from blocking other requests, but large file read P99 latency will increase.") + v.hasSlowRead = cmdVolume.Flag.Bool("hasSlowRead", true, " if true, this prevents slow reads from blocking other requests, but large file read P99 latency will increase.") v.readBufferSizeMB = cmdVolume.Flag.Int("readBufferSizeMB", 4, " larger values can optimize query performance but will increase some memory usage,Use with hasSlowRead normally.") } From 9a339a9cfbeae9f1851ed738659d72de913e798b Mon Sep 17 00:00:00 2001 From: chrislu Date: Wed, 12 Oct 2022 21:15:10 -0700 Subject: [PATCH 09/12] default hasSlowRead to true --- weed/command/server.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/weed/command/server.go b/weed/command/server.go index 42cd4b0cd..526841e28 100644 --- a/weed/command/server.go +++ b/weed/command/server.go @@ -132,7 +132,7 @@ func init() { serverOptions.v.pprof = cmdServer.Flag.Bool("volume.pprof", false, "enable pprof http handlers. precludes --memprofile and --cpuprofile") serverOptions.v.idxFolder = cmdServer.Flag.String("volume.dir.idx", "", "directory to store .idx files") serverOptions.v.inflightUploadDataTimeout = cmdServer.Flag.Duration("volume.inflightUploadDataTimeout", 60*time.Second, "inflight upload data wait timeout of volume servers") - serverOptions.v.hasSlowRead = cmdServer.Flag.Bool("volume.hasSlowRead", false, " if true, this prevents slow reads from blocking other requests, but large file read P99 latency will increase.") + serverOptions.v.hasSlowRead = cmdServer.Flag.Bool("volume.hasSlowRead", true, " if true, this prevents slow reads from blocking other requests, but large file read P99 latency will increase.") serverOptions.v.readBufferSizeMB = cmdServer.Flag.Int("volume.readBufferSizeMB", 4, " larger values can optimize query performance but will increase some memory usage,Use with hasSlowRead normally") s3Options.port = cmdServer.Flag.Int("s3.port", 8333, "s3 server http listen port") From f95c25e113c2c19353d91ad61e533567dfa877ba Mon Sep 17 00:00:00 2001 From: Guo Lei Date: Thu, 13 Oct 2022 13:59:07 +0800 Subject: [PATCH 10/12] types packages is imported more than onece (#3838) --- weed/storage/needle_map_leveldb.go | 9 ++++----- weed/storage/volume_vacuum.go | 3 +-- 2 files changed, 5 insertions(+), 7 deletions(-) diff --git a/weed/storage/needle_map_leveldb.go b/weed/storage/needle_map_leveldb.go index 30ed96c3b..1566ca7a0 100644 --- a/weed/storage/needle_map_leveldb.go +++ b/weed/storage/needle_map_leveldb.go @@ -10,7 +10,6 @@ import ( "github.com/syndtr/goleveldb/leveldb/opt" "github.com/seaweedfs/seaweedfs/weed/storage/idx" - "github.com/seaweedfs/seaweedfs/weed/storage/types" "github.com/seaweedfs/seaweedfs/weed/util" "github.com/syndtr/goleveldb/leveldb" @@ -56,7 +55,7 @@ func NewLevelDbNeedleMap(dbFileName string, indexFile *os.File, opts *opt.Option } } glog.V(0).Infof("Loading %s... , watermark: %d", dbFileName, getWatermark(m.db)) - m.recordCount = uint64(m.indexFileOffset / types.NeedleMapEntrySize) + m.recordCount = uint64(m.indexFileOffset / NeedleMapEntrySize) watermark := (m.recordCount / watermarkBatchSize) * watermarkBatchSize err = setWatermark(m.db, watermark) if err != nil { @@ -100,10 +99,10 @@ func generateLevelDbFile(dbFileName string, indexFile *os.File) error { glog.Fatalf("stat file %s: %v", indexFile.Name(), err) return err } else { - if watermark*types.NeedleMapEntrySize > uint64(stat.Size()) { + if watermark*NeedleMapEntrySize > uint64(stat.Size()) { glog.Warningf("wrong watermark %d for filesize %d", watermark, stat.Size()) } - glog.V(0).Infof("generateLevelDbFile %s, watermark %d, num of entries:%d", dbFileName, watermark, (uint64(stat.Size())-watermark*types.NeedleMapEntrySize)/types.NeedleMapEntrySize) + glog.V(0).Infof("generateLevelDbFile %s, watermark %d, num of entries:%d", dbFileName, watermark, (uint64(stat.Size())-watermark*NeedleMapEntrySize)/NeedleMapEntrySize) } return idx.WalkIndexFile(indexFile, watermark, func(key NeedleId, offset Offset, size Size) error { if !offset.IsZero() && size.IsValid() { @@ -270,7 +269,7 @@ func (m *LevelDbNeedleMap) UpdateNeedleMap(v *Volume, indexFile *os.File, opts * return e } m.indexFileOffset = stat.Size() - m.recordCount = uint64(stat.Size() / types.NeedleMapEntrySize) + m.recordCount = uint64(stat.Size() / NeedleMapEntrySize) //set watermark watermark := (m.recordCount / watermarkBatchSize) * watermarkBatchSize diff --git a/weed/storage/volume_vacuum.go b/weed/storage/volume_vacuum.go index 38b5c0080..47b0800eb 100644 --- a/weed/storage/volume_vacuum.go +++ b/weed/storage/volume_vacuum.go @@ -13,7 +13,6 @@ import ( "github.com/seaweedfs/seaweedfs/weed/storage/needle" "github.com/seaweedfs/seaweedfs/weed/storage/needle_map" "github.com/seaweedfs/seaweedfs/weed/storage/super_block" - "github.com/seaweedfs/seaweedfs/weed/storage/types" . "github.com/seaweedfs/seaweedfs/weed/storage/types" "github.com/seaweedfs/seaweedfs/weed/util" ) @@ -341,7 +340,7 @@ func (v *Volume) makeupDiff(newDatFileName, newIdxFileName, oldDatFileName, oldI } } - return v.tmpNm.DoOffsetLoading(v, idx, uint64(idxSize)/types.NeedleMapEntrySize) + return v.tmpNm.DoOffsetLoading(v, idx, uint64(idxSize)/NeedleMapEntrySize) } type VolumeFileScanner4Vacuum struct { From f5d4952d7306ba013bb9c054b221d795a3e110d6 Mon Sep 17 00:00:00 2001 From: chrislu Date: Wed, 12 Oct 2022 23:50:09 -0700 Subject: [PATCH 11/12] filer: redis store reduce from 2 redis operations to 1 for updates. --- weed/filer/redis/universal_redis_store.go | 27 ++++++++++++------- weed/filer/redis2/universal_redis_store.go | 31 +++++++++++++--------- weed/filer/redis3/universal_redis_store.go | 29 ++++++++++++-------- 3 files changed, 54 insertions(+), 33 deletions(-) diff --git a/weed/filer/redis/universal_redis_store.go b/weed/filer/redis/universal_redis_store.go index f5dc513c4..8e1fa326b 100644 --- a/weed/filer/redis/universal_redis_store.go +++ b/weed/filer/redis/universal_redis_store.go @@ -35,6 +35,22 @@ func (store *UniversalRedisStore) RollbackTransaction(ctx context.Context) error func (store *UniversalRedisStore) InsertEntry(ctx context.Context, entry *filer.Entry) (err error) { + if err = store.doInsertEntry(ctx, entry); err != nil { + return err + } + + dir, name := entry.FullPath.DirAndName() + if name != "" { + _, err = store.Client.SAdd(ctx, genDirectoryListKey(dir), name).Result() + if err != nil { + return fmt.Errorf("persisting %s in parent dir: %v", entry.FullPath, err) + } + } + + return nil +} + +func (store *UniversalRedisStore) doInsertEntry(ctx context.Context, entry *filer.Entry) error { value, err := entry.EncodeAttributesAndChunks() if err != nil { return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err) @@ -49,21 +65,12 @@ func (store *UniversalRedisStore) InsertEntry(ctx context.Context, entry *filer. if err != nil { return fmt.Errorf("persisting %s : %v", entry.FullPath, err) } - - dir, name := entry.FullPath.DirAndName() - if name != "" { - _, err = store.Client.SAdd(ctx, genDirectoryListKey(dir), name).Result() - if err != nil { - return fmt.Errorf("persisting %s in parent dir: %v", entry.FullPath, err) - } - } - return nil } func (store *UniversalRedisStore) UpdateEntry(ctx context.Context, entry *filer.Entry) (err error) { - return store.InsertEntry(ctx, entry) + return store.doInsertEntry(ctx, entry) } func (store *UniversalRedisStore) FindEntry(ctx context.Context, fullpath util.FullPath) (entry *filer.Entry, err error) { diff --git a/weed/filer/redis2/universal_redis_store.go b/weed/filer/redis2/universal_redis_store.go index 8b23472b9..0c79c5255 100644 --- a/weed/filer/redis2/universal_redis_store.go +++ b/weed/filer/redis2/universal_redis_store.go @@ -47,17 +47,8 @@ func (store *UniversalRedis2Store) RollbackTransaction(ctx context.Context) erro func (store *UniversalRedis2Store) InsertEntry(ctx context.Context, entry *filer.Entry) (err error) { - value, err := entry.EncodeAttributesAndChunks() - if err != nil { - return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err) - } - - if len(entry.Chunks) > filer.CountEntryChunksForGzip { - value = util.MaybeGzipData(value) - } - - if err = store.Client.Set(ctx, string(entry.FullPath), value, time.Duration(entry.TtlSec)*time.Second).Err(); err != nil { - return fmt.Errorf("persisting %s : %v", entry.FullPath, err) + if err = store.doInsertEntry(ctx, entry); err != nil { + return err } dir, name := entry.FullPath.DirAndName() @@ -74,9 +65,25 @@ func (store *UniversalRedis2Store) InsertEntry(ctx context.Context, entry *filer return nil } +func (store *UniversalRedis2Store) doInsertEntry(ctx context.Context, entry *filer.Entry) error { + value, err := entry.EncodeAttributesAndChunks() + if err != nil { + return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err) + } + + if len(entry.Chunks) > filer.CountEntryChunksForGzip { + value = util.MaybeGzipData(value) + } + + if err = store.Client.Set(ctx, string(entry.FullPath), value, time.Duration(entry.TtlSec)*time.Second).Err(); err != nil { + return fmt.Errorf("persisting %s : %v", entry.FullPath, err) + } + return nil +} + func (store *UniversalRedis2Store) UpdateEntry(ctx context.Context, entry *filer.Entry) (err error) { - return store.InsertEntry(ctx, entry) + return store.doInsertEntry(ctx, entry) } func (store *UniversalRedis2Store) FindEntry(ctx context.Context, fullpath util.FullPath) (entry *filer.Entry, err error) { diff --git a/weed/filer/redis3/universal_redis_store.go b/weed/filer/redis3/universal_redis_store.go index b076f78e7..88d4ed1e3 100644 --- a/weed/filer/redis3/universal_redis_store.go +++ b/weed/filer/redis3/universal_redis_store.go @@ -3,11 +3,11 @@ package redis3 import ( "context" "fmt" - "github.com/go-redsync/redsync/v4" "time" "github.com/go-redis/redis/v8" + redsync "github.com/go-redsync/redsync/v4" "github.com/seaweedfs/seaweedfs/weed/filer" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" @@ -35,6 +35,22 @@ func (store *UniversalRedis3Store) RollbackTransaction(ctx context.Context) erro func (store *UniversalRedis3Store) InsertEntry(ctx context.Context, entry *filer.Entry) (err error) { + if err = store.doInsertEntry(ctx, entry); err != nil { + return err + } + + dir, name := entry.FullPath.DirAndName() + + if name != "" { + if err = insertChild(ctx, store, genDirectoryListKey(dir), name); err != nil { + return fmt.Errorf("persisting %s in parent dir: %v", entry.FullPath, err) + } + } + + return nil +} + +func (store *UniversalRedis3Store) doInsertEntry(ctx context.Context, entry *filer.Entry) error { value, err := entry.EncodeAttributesAndChunks() if err != nil { return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err) @@ -47,21 +63,12 @@ func (store *UniversalRedis3Store) InsertEntry(ctx context.Context, entry *filer if err = store.Client.Set(ctx, string(entry.FullPath), value, time.Duration(entry.TtlSec)*time.Second).Err(); err != nil { return fmt.Errorf("persisting %s : %v", entry.FullPath, err) } - - dir, name := entry.FullPath.DirAndName() - - if name != "" { - if err = insertChild(ctx, store, genDirectoryListKey(dir), name); err != nil { - return fmt.Errorf("persisting %s in parent dir: %v", entry.FullPath, err) - } - } - return nil } func (store *UniversalRedis3Store) UpdateEntry(ctx context.Context, entry *filer.Entry) (err error) { - return store.InsertEntry(ctx, entry) + return store.doInsertEntry(ctx, entry) } func (store *UniversalRedis3Store) FindEntry(ctx context.Context, fullpath util.FullPath) (entry *filer.Entry, err error) { From 1f7e52c63e210ccb3a177c1e58d5a0c8e79ad870 Mon Sep 17 00:00:00 2001 From: Konstantin Lebedev <9497591+kmlebedev@users.noreply.github.com> Date: Thu, 13 Oct 2022 12:51:20 +0500 Subject: [PATCH 12/12] vacuum metrics and force sync dst files (#3832) --- weed/server/volume_grpc_vacuum.go | 21 ++++++++++++++---- weed/stats/metrics.go | 28 ++++++++++++++++++++++++ weed/storage/backend/disk_file.go | 3 +++ weed/storage/disk_location.go | 2 +- weed/storage/erasure_coding/ec_volume.go | 1 + weed/storage/needle_map/memdb.go | 5 ++++- weed/storage/volume.go | 18 +-------------- weed/storage/volume_vacuum.go | 25 ++++++++++++--------- 8 files changed, 70 insertions(+), 33 deletions(-) diff --git a/weed/server/volume_grpc_vacuum.go b/weed/server/volume_grpc_vacuum.go index 5252584e1..296760ba6 100644 --- a/weed/server/volume_grpc_vacuum.go +++ b/weed/server/volume_grpc_vacuum.go @@ -2,6 +2,9 @@ package weed_server import ( "context" + "github.com/seaweedfs/seaweedfs/weed/stats" + "strconv" + "time" "github.com/prometheus/procfs" "github.com/seaweedfs/seaweedfs/weed/glog" @@ -29,6 +32,10 @@ func (vs *VolumeServer) VacuumVolumeCheck(ctx context.Context, req *volume_serve } func (vs *VolumeServer) VacuumVolumeCompact(req *volume_server_pb.VacuumVolumeCompactRequest, stream volume_server_pb.VolumeServer_VacuumVolumeCompactServer) error { + start := time.Now() + defer func(start time.Time) { + stats.VolumeServerVacuumingHistogram.WithLabelValues("compact").Observe(time.Since(start).Seconds()) + }(start) resp := &volume_server_pb.VacuumVolumeCompactResponse{} reportInterval := int64(1024 * 1024 * 128) @@ -51,12 +58,13 @@ func (vs *VolumeServer) VacuumVolumeCompact(req *volume_server_pb.VacuumVolumeCo return true }) + stats.VolumeServerVacuumingCompactCounter.WithLabelValues(strconv.FormatBool(err == nil && sendErr == nil)).Inc() if err != nil { - glog.Errorf("compact volume %d: %v", req.VolumeId, err) + glog.Errorf("failed compact volume %d: %v", req.VolumeId, err) return err } if sendErr != nil { - glog.Errorf("compact volume %d report progress: %v", req.VolumeId, sendErr) + glog.Errorf("failed compact volume %d report progress: %v", req.VolumeId, sendErr) return sendErr } @@ -66,16 +74,21 @@ func (vs *VolumeServer) VacuumVolumeCompact(req *volume_server_pb.VacuumVolumeCo } func (vs *VolumeServer) VacuumVolumeCommit(ctx context.Context, req *volume_server_pb.VacuumVolumeCommitRequest) (*volume_server_pb.VacuumVolumeCommitResponse, error) { + start := time.Now() + defer func(start time.Time) { + stats.VolumeServerVacuumingHistogram.WithLabelValues("commit").Observe(time.Since(start).Seconds()) + }(start) resp := &volume_server_pb.VacuumVolumeCommitResponse{} readOnly, err := vs.store.CommitCompactVolume(needle.VolumeId(req.VolumeId)) if err != nil { - glog.Errorf("commit volume %d: %v", req.VolumeId, err) + glog.Errorf("failed commit volume %d: %v", req.VolumeId, err) } else { glog.V(1).Infof("commit volume %d", req.VolumeId) } + stats.VolumeServerVacuumingCommitCounter.WithLabelValues(strconv.FormatBool(err == nil)).Inc() resp.IsReadOnly = readOnly return resp, err @@ -88,7 +101,7 @@ func (vs *VolumeServer) VacuumVolumeCleanup(ctx context.Context, req *volume_ser err := vs.store.CommitCleanupVolume(needle.VolumeId(req.VolumeId)) if err != nil { - glog.Errorf("cleanup volume %d: %v", req.VolumeId, err) + glog.Errorf("failed cleanup volume %d: %v", req.VolumeId, err) } else { glog.V(1).Infof("cleanup volume %d", req.VolumeId) } diff --git a/weed/stats/metrics.go b/weed/stats/metrics.go index d1723fdc6..9f9c0c18d 100644 --- a/weed/stats/metrics.go +++ b/weed/stats/metrics.go @@ -137,6 +137,31 @@ var ( Help: "Counter of volume server requests.", }, []string{"type"}) + VolumeServerVacuumingCompactCounter = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: Namespace, + Subsystem: "volumeServer", + Name: "vacuuming_compact_count", + Help: "Counter of volume vacuuming Compact counter", + }, []string{"success"}) + + VolumeServerVacuumingCommitCounter = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: Namespace, + Subsystem: "volumeServer", + Name: "vacuuming_commit_count", + Help: "Counter of volume vacuuming commit counter", + }, []string{"success"}) + + VolumeServerVacuumingHistogram = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: Namespace, + Subsystem: "volumeServer", + Name: "vacuuming_seconds", + Help: "Bucketed histogram of volume server vacuuming processing time.", + Buckets: prometheus.ExponentialBuckets(0.0001, 2, 24), + }, []string{"type"}) + VolumeServerRequestHistogram = prometheus.NewHistogramVec( prometheus.HistogramOpts{ Namespace: Namespace, @@ -223,6 +248,9 @@ func init() { Gather.MustRegister(VolumeServerRequestCounter) Gather.MustRegister(VolumeServerRequestHistogram) + Gather.MustRegister(VolumeServerVacuumingCompactCounter) + Gather.MustRegister(VolumeServerVacuumingCommitCounter) + Gather.MustRegister(VolumeServerVacuumingHistogram) Gather.MustRegister(VolumeServerVolumeCounter) Gather.MustRegister(VolumeServerMaxVolumeCounter) Gather.MustRegister(VolumeServerReadOnlyVolumeGauge) diff --git a/weed/storage/backend/disk_file.go b/weed/storage/backend/disk_file.go index 7a3a40977..18dde8dca 100644 --- a/weed/storage/backend/disk_file.go +++ b/weed/storage/backend/disk_file.go @@ -69,6 +69,9 @@ func (df *DiskFile) Truncate(off int64) error { } func (df *DiskFile) Close() error { + if err := df.Sync(); err != nil { + return err + } return df.File.Close() } diff --git a/weed/storage/disk_location.go b/weed/storage/disk_location.go index 6f938da8f..b3be04703 100644 --- a/weed/storage/disk_location.go +++ b/weed/storage/disk_location.go @@ -364,7 +364,7 @@ func (l *DiskLocation) VolumesLen() int { func (l *DiskLocation) SetStopping() { l.volumesLock.Lock() for _, v := range l.volumes { - v.SetStopping() + v.SyncToDisk() } l.volumesLock.Unlock() diff --git a/weed/storage/erasure_coding/ec_volume.go b/weed/storage/erasure_coding/ec_volume.go index aa1e15722..ddee742a8 100644 --- a/weed/storage/erasure_coding/ec_volume.go +++ b/weed/storage/erasure_coding/ec_volume.go @@ -125,6 +125,7 @@ func (ev *EcVolume) Close() { ev.ecjFileAccessLock.Unlock() } if ev.ecxFile != nil { + _ = ev.ecxFile.Sync() _ = ev.ecxFile.Close() ev.ecxFile = nil } diff --git a/weed/storage/needle_map/memdb.go b/weed/storage/needle_map/memdb.go index 7fb98dcea..463245cd1 100644 --- a/weed/storage/needle_map/memdb.go +++ b/weed/storage/needle_map/memdb.go @@ -86,7 +86,10 @@ func (cm *MemDb) SaveToIdx(idxName string) (ret error) { if err != nil { return } - defer idxFile.Close() + defer func() { + idxFile.Sync() + idxFile.Close() + }() return cm.AscendingVisit(func(value NeedleValue) error { if value.Offset.IsZero() || value.Size.IsDeleted() { diff --git a/weed/storage/volume.go b/weed/storage/volume.go index 1a9c8bd24..ab8af91e2 100644 --- a/weed/storage/volume.go +++ b/weed/storage/volume.go @@ -180,21 +180,6 @@ func (v *Volume) DiskType() types.DiskType { return v.location.DiskType } -func (v *Volume) SetStopping() { - v.dataFileAccessLock.Lock() - defer v.dataFileAccessLock.Unlock() - if v.nm != nil { - if err := v.nm.Sync(); err != nil { - glog.Warningf("Volume SetStopping fail to sync volume idx %d", v.Id) - } - } - if v.DataBackend != nil { - if err := v.DataBackend.Sync(); err != nil { - glog.Warningf("Volume SetStopping fail to sync volume %d", v.Id) - } - } -} - func (v *Volume) SyncToDisk() { v.dataFileAccessLock.Lock() defer v.dataFileAccessLock.Unlock() @@ -228,10 +213,9 @@ func (v *Volume) Close() { v.nm = nil } if v.DataBackend != nil { - if err := v.DataBackend.Sync(); err != nil { + if err := v.DataBackend.Close(); err != nil { glog.Warningf("Volume Close fail to sync volume %d", v.Id) } - _ = v.DataBackend.Close() v.DataBackend = nil stats.VolumeServerVolumeCounter.WithLabelValues(v.Collection, "volume").Dec() } diff --git a/weed/storage/volume_vacuum.go b/weed/storage/volume_vacuum.go index 47b0800eb..0eaca5ff4 100644 --- a/weed/storage/volume_vacuum.go +++ b/weed/storage/volume_vacuum.go @@ -55,10 +55,10 @@ func (v *Volume) Compact(preallocate int64, compactionBytePerSecond int64) error v.lastCompactRevision = v.SuperBlock.CompactionRevision glog.V(3).Infof("creating copies for volume %d ,last offset %d...", v.Id, v.lastCompactIndexOffset) if err := v.DataBackend.Sync(); err != nil { - glog.V(0).Infof("compact fail to sync volume %d", v.Id) + glog.V(0).Infof("compact failed to sync volume %d", v.Id) } if err := v.nm.Sync(); err != nil { - glog.V(0).Infof("compact fail to sync volume idx %d", v.Id) + glog.V(0).Infof("compact failed to sync volume idx %d", v.Id) } return v.copyDataAndGenerateIndexFile(v.FileName(".cpd"), v.FileName(".cpx"), preallocate, compactionBytePerSecond) } @@ -83,10 +83,10 @@ func (v *Volume) Compact2(preallocate int64, compactionBytePerSecond int64, prog return fmt.Errorf("volume %d backend is empty remote:%v", v.Id, v.HasRemoteFile()) } if err := v.DataBackend.Sync(); err != nil { - glog.V(0).Infof("compact2 fail to sync volume dat %d: %v", v.Id, err) + glog.V(0).Infof("compact2 failed to sync volume dat %d: %v", v.Id, err) } if err := v.nm.Sync(); err != nil { - glog.V(0).Infof("compact2 fail to sync volume idx %d: %v", v.Id, err) + glog.V(0).Infof("compact2 failed to sync volume idx %d: %v", v.Id, err) } return v.copyDataBasedOnIndexFile( v.FileName(".dat"), v.FileName(".idx"), @@ -120,7 +120,7 @@ func (v *Volume) CommitCompact() error { } if v.DataBackend != nil { if err := v.DataBackend.Close(); err != nil { - glog.V(0).Infof("fail to close volume %d", v.Id) + glog.V(0).Infof("failed to close volume %d", v.Id) } } v.DataBackend = nil @@ -270,7 +270,11 @@ func (v *Volume) makeupDiff(newDatFileName, newIdxFileName, oldDatFileName, oldI return fmt.Errorf("open idx file %s failed: %v", newIdxFileName, err) } - defer idx.Close() + defer func() { + idx.Sync() + idx.Close() + }() + stat, err := idx.Stat() if err != nil { return fmt.Errorf("stat file %s: %v", idx.Name(), err) @@ -387,9 +391,7 @@ func (scanner *VolumeFileScanner4Vacuum) VisitNeedle(n *needle.Needle, offset in } func (v *Volume) copyDataAndGenerateIndexFile(dstName, idxName string, preallocate int64, compactionBytePerSecond int64) (err error) { - var ( - dst backend.BackendStorageFile - ) + var dst backend.BackendStorageFile if dst, err = backend.CreateVolumeFile(dstName, preallocate, 0); err != nil { return err } @@ -493,7 +495,10 @@ func (v *Volume) copyDataBasedOnIndexFile(srcDatName, srcIdxName, dstDatName, da glog.Errorf("cannot open Volume Index %s: %v", datIdxName, err) return err } - defer indexFile.Close() + defer func() { + indexFile.Sync() + indexFile.Close() + }() if v.tmpNm != nil { v.tmpNm.Close() v.tmpNm = nil