Browse Source

optimize readability

Signed-off-by: changlin.shi <changlin.shi@ly.com>
pull/4090/head
changlin.shi 2 years ago
parent
commit
0c3009911e
  1. 4
      weed/s3api/auth_credentials_subscribe.go
  2. 53
      weed/s3api/bucket_metadata.go
  3. 10
      weed/s3api/bucket_metadata_test.go
  4. 24
      weed/s3api/filer_multipart.go
  5. 4
      weed/s3api/s3_constants/acp_ownership.go
  6. 1
      weed/s3api/s3_constants/extend_key.go
  7. 1
      weed/s3api/s3_constants/header.go
  8. 365
      weed/s3api/s3acl/acl_helper.go
  9. 1391
      weed/s3api/s3acl/acl_helper_test.go
  10. 300
      weed/s3api/s3api_acp.go
  11. 43
      weed/s3api/s3api_bucket_handlers.go
  12. 14
      weed/s3api/s3api_object_handlers.go
  13. 63
      weed/s3api/s3api_object_multipart_handlers.go
  14. 10
      weed/s3api/s3api_server.go
  15. 14
      weed/s3api/s3err/s3api_errors.go
  16. 15
      weed/server/filer_server_handlers_read.go

4
weed/s3api/auth_credentials_subscribe.go

@ -70,10 +70,10 @@ func (s3a *S3ApiServer) onBucketMetadataChange(dir string, oldEntry *filer_pb.En
if dir == s3a.option.BucketsPath {
if newEntry != nil {
s3a.bucketRegistry.LoadBucketMetadata(newEntry)
glog.V(0).Infof("updated bucketMetadata %s/%s", dir, newEntry)
glog.V(1).Infof("updated bucketMetadata %s/%s", dir, newEntry)
} else {
s3a.bucketRegistry.RemoveBucketMetadata(oldEntry)
glog.V(0).Infof("remove bucketMetadata %s/%s", dir, newEntry)
glog.V(1).Infof("remove bucketMetadata %s/%s", dir, newEntry)
}
}
return nil

53
weed/s3api/bucket_metadata.go

@ -107,7 +107,6 @@ func buildBucketMetadata(accountManager *s3account.AccountManager, entry *filer_
}
}
//access control policy
//owner
acpOwnerBytes, ok := entry.Extended[s3_constants.ExtAmzOwnerKey]
if ok && len(acpOwnerBytes) > 0 {
@ -122,9 +121,11 @@ func buildBucketMetadata(accountManager *s3account.AccountManager, entry *filer_
}
}
}
//grants
acpGrantsBytes, ok := entry.Extended[s3_constants.ExtAmzAclKey]
if ok && len(acpGrantsBytes) > 0 {
if ok {
if len(acpGrantsBytes) > 0 {
var grants []*s3.Grant
err := json.Unmarshal(acpGrantsBytes, &grants)
if err == nil {
@ -133,6 +134,18 @@ func buildBucketMetadata(accountManager *s3account.AccountManager, entry *filer_
glog.Warningf("Unmarshal ACP grants: %s(%v), bucket: %s", string(acpGrantsBytes), err, bucketMetadata.Name)
}
}
} else {
bucketMetadata.Acl = []*s3.Grant{
{
Grantee: &s3.Grantee{
Type: &s3_constants.GrantTypeCanonicalUser,
ID: bucketMetadata.Owner.ID,
},
Permission: &s3_constants.PermissionFullControl,
},
}
}
}
return bucketMetadata
}
@ -143,17 +156,12 @@ func (r *BucketRegistry) RemoveBucketMetadata(entry *filer_pb.Entry) {
}
func (r *BucketRegistry) GetBucketMetadata(bucketName string) (*BucketMetaData, s3err.ErrorCode) {
r.metadataCacheLock.RLock()
bucketMetadata, ok := r.metadataCache[bucketName]
r.metadataCacheLock.RUnlock()
if ok {
bucketMetadata := r.getMetadataCache(bucketName)
if bucketMetadata != nil {
return bucketMetadata, s3err.ErrNone
}
r.notFoundLock.RLock()
_, ok = r.notFound[bucketName]
r.notFoundLock.RUnlock()
if ok {
if r.isNotFound(bucketName) {
return nil, s3err.ErrNoSuchBucket
}
@ -172,10 +180,8 @@ func (r *BucketRegistry) LoadBucketMetadataFromFiler(bucketName string) (*Bucket
defer r.notFoundLock.Unlock()
//check if already exists
r.metadataCacheLock.RLock()
bucketMetaData, ok := r.metadataCache[bucketName]
r.metadataCacheLock.RUnlock()
if ok {
bucketMetaData := r.getMetadataCache(bucketName)
if bucketMetaData != nil {
return bucketMetaData, s3err.ErrNone
}
@ -184,6 +190,7 @@ func (r *BucketRegistry) LoadBucketMetadataFromFiler(bucketName string) (*Bucket
if err != nil {
if err == filer_pb.ErrNotFound {
// The bucket doesn't actually exist and should no longer loaded from the filer
glog.Warning("bucket not found in filer: ", bucketName)
r.notFound[bucketName] = struct{}{}
return nil, s3err.ErrNoSuchBucket
}
@ -192,6 +199,15 @@ func (r *BucketRegistry) LoadBucketMetadataFromFiler(bucketName string) (*Bucket
return bucketMetadata, s3err.ErrNone
}
func (r *BucketRegistry) getMetadataCache(bucket string) *BucketMetaData {
r.metadataCacheLock.RLock()
defer r.metadataCacheLock.RUnlock()
if cache, ok := r.metadataCache[bucket]; ok {
return cache
}
return nil
}
func (r *BucketRegistry) setMetadataCache(metadata *BucketMetaData) {
r.metadataCacheLock.Lock()
defer r.metadataCacheLock.Unlock()
@ -204,10 +220,11 @@ func (r *BucketRegistry) removeMetadataCache(bucket string) {
delete(r.metadataCache, bucket)
}
func (r *BucketRegistry) markNotFound(bucket string) {
r.notFoundLock.Lock()
defer r.notFoundLock.Unlock()
r.notFound[bucket] = struct{}{}
func (r *BucketRegistry) isNotFound(bucket string) bool {
r.notFoundLock.RLock()
defer r.notFoundLock.RUnlock()
_, ok := r.notFound[bucket]
return ok
}
func (r *BucketRegistry) unMarkNotFound(bucket string) {

10
weed/s3api/bucket_metadata_test.go

@ -86,7 +86,7 @@ var tcs = []*BucketMetadataTestCase{
{
badEntry, &BucketMetaData{
Name: badEntry.Name,
ObjectOwnership: s3_constants.DefaultOwnershipForExists,
ObjectOwnership: s3_constants.DefaultObjectOwnership,
Owner: &s3.Owner{
DisplayName: &s3account.AccountAdmin.Name,
ID: &s3account.AccountAdmin.Id,
@ -108,7 +108,7 @@ var tcs = []*BucketMetadataTestCase{
{
ownershipEmptyStr, &BucketMetaData{
Name: ownershipEmptyStr.Name,
ObjectOwnership: s3_constants.DefaultOwnershipForExists,
ObjectOwnership: s3_constants.DefaultObjectOwnership,
Owner: &s3.Owner{
DisplayName: &s3account.AccountAdmin.Name,
ID: &s3account.AccountAdmin.Id,
@ -130,7 +130,7 @@ var tcs = []*BucketMetadataTestCase{
{
acpEmptyStr, &BucketMetaData{
Name: acpEmptyStr.Name,
ObjectOwnership: s3_constants.DefaultOwnershipForExists,
ObjectOwnership: s3_constants.DefaultObjectOwnership,
Owner: &s3.Owner{
DisplayName: &s3account.AccountAdmin.Name,
ID: &s3account.AccountAdmin.Id,
@ -141,7 +141,7 @@ var tcs = []*BucketMetadataTestCase{
{
acpEmptyObject, &BucketMetaData{
Name: acpEmptyObject.Name,
ObjectOwnership: s3_constants.DefaultOwnershipForExists,
ObjectOwnership: s3_constants.DefaultObjectOwnership,
Owner: &s3.Owner{
DisplayName: &s3account.AccountAdmin.Name,
ID: &s3account.AccountAdmin.Id,
@ -152,7 +152,7 @@ var tcs = []*BucketMetadataTestCase{
{
acpOwnerNil, &BucketMetaData{
Name: acpOwnerNil.Name,
ObjectOwnership: s3_constants.DefaultOwnershipForExists,
ObjectOwnership: s3_constants.DefaultObjectOwnership,
Owner: &s3.Owner{
DisplayName: &s3account.AccountAdmin.Name,
ID: &s3account.AccountAdmin.Id,

24
weed/s3api/filer_multipart.go

@ -5,6 +5,7 @@ import (
"encoding/xml"
"fmt"
"github.com/google/uuid"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3err"
"golang.org/x/exp/slices"
"math"
@ -27,7 +28,7 @@ type InitiateMultipartUploadResult struct {
s3.CreateMultipartUploadOutput
}
func (s3a *S3ApiServer) createMultipartUpload(input *s3.CreateMultipartUploadInput) (output *InitiateMultipartUploadResult, code s3err.ErrorCode) {
func (s3a *S3ApiServer) createMultipartUpload(initiatorId string, input *s3.CreateMultipartUploadInput) (output *InitiateMultipartUploadResult, code s3err.ErrorCode) {
glog.V(2).Infof("createMultipartUpload input %v", input)
@ -46,6 +47,7 @@ func (s3a *S3ApiServer) createMultipartUpload(input *s3.CreateMultipartUploadInp
if input.ContentType != nil {
entry.Attributes.Mime = *input.ContentType
}
entry.Extended[s3_constants.ExtAmzMultipartInitiator] = []byte(initiatorId)
}); err != nil {
glog.Errorf("NewMultipartUpload error: %v", err)
return nil, s3err.ErrInternalError
@ -236,7 +238,7 @@ type ListMultipartUploadsResult struct {
Upload []*s3.MultipartUpload `locationName:"Upload" type:"list" flattened:"true"`
}
func (s3a *S3ApiServer) listMultipartUploads(input *s3.ListMultipartUploadsInput) (output *ListMultipartUploadsResult, code s3err.ErrorCode) {
func (s3a *S3ApiServer) listMultipartUploads(bucketMetaData *BucketMetaData, input *s3.ListMultipartUploadsInput) (output *ListMultipartUploadsResult, code s3err.ErrorCode) {
// https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListMultipartUploads.html
glog.V(2).Infof("listMultipartUploads input %v", input)
@ -267,9 +269,27 @@ func (s3a *S3ApiServer) listMultipartUploads(input *s3.ListMultipartUploadsInput
if *input.Prefix != "" && !strings.HasPrefix(key, *input.Prefix) {
continue
}
initiatorId := string(entry.Extended[s3_constants.ExtAmzMultipartInitiator])
if initiatorId == "" {
initiatorId = *bucketMetaData.Owner.ID
}
initiatorDisplayName := s3a.accountManager.IdNameMapping[initiatorId]
ownerId := string(entry.Extended[s3_constants.ExtAmzOwnerKey])
if ownerId == "" {
ownerId = *bucketMetaData.Owner.ID
}
ownerDisplayName := s3a.accountManager.IdNameMapping[ownerId]
output.Upload = append(output.Upload, &s3.MultipartUpload{
Key: objectKey(aws.String(key)),
UploadId: aws.String(entry.Name),
Owner: &s3.Owner{
ID: &initiatorId,
DisplayName: &ownerDisplayName,
},
Initiator: &s3.Initiator{
ID: &initiatorId,
DisplayName: &initiatorDisplayName,
},
})
uploadsCount += 1
}

4
weed/s3api/s3_constants/acp_ownership.go

@ -4,9 +4,7 @@ var (
OwnershipBucketOwnerPreferred = "BucketOwnerPreferred"
OwnershipObjectWriter = "ObjectWriter"
OwnershipBucketOwnerEnforced = "BucketOwnerEnforced"
DefaultOwnershipForCreate = OwnershipObjectWriter
DefaultOwnershipForExists = OwnershipBucketOwnerEnforced
DefaultObjectOwnership = OwnershipBucketOwnerEnforced
)
func ValidateOwnership(ownership string) bool {

1
weed/s3api/s3_constants/extend_key.go

@ -2,6 +2,7 @@ package s3_constants
const (
ExtAmzOwnerKey = "Seaweed-X-Amz-Owner"
ExtAmzMultipartInitiator = "Seaweed-X-Amz-Multipart-Initiator"
ExtAmzAclKey = "Seaweed-X-Amz-Acl"
ExtOwnershipKey = "Seaweed-X-Amz-Ownership"
)

1
weed/s3api/s3_constants/header.go

@ -39,6 +39,7 @@ const (
X_SeaweedFS_Header_Directory_Key = "x-seaweedfs-is-directory-key"
XSeaweedFSHeaderAmzBucketOwnerId = "x-seaweedfs-amz-bucket-owner-id"
XSeaweedFSHeaderAmzBucketAccessDenied = "x-seaweedfs-amz-bucket-access-denied"
// S3 ACL headers
AmzCannedAcl = "X-Amz-Acl"

365
weed/s3api/s3acl/acl_helper.go

@ -3,6 +3,7 @@ package s3acl
import (
"encoding/json"
"encoding/xml"
"fmt"
"github.com/aws/aws-sdk-go/private/protocol/xml/xmlutil"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/seaweedfs/seaweedfs/weed/filer"
@ -16,6 +17,8 @@ import (
"strings"
)
var customAclHeaders = []string{s3_constants.AmzAclFullControl, s3_constants.AmzAclRead, s3_constants.AmzAclReadAcp, s3_constants.AmzAclWrite, s3_constants.AmzAclWriteAcp}
// GetAccountId get AccountId from request headers, AccountAnonymousId will be return if not presen
func GetAccountId(r *http.Request) string {
id := r.Header.Get(s3_constants.AmzAccountId)
@ -26,11 +29,36 @@ func GetAccountId(r *http.Request) string {
}
}
// ExtractAcl extracts the acl from the request body, or from the header if request body is empty
func ExtractAcl(r *http.Request, accountManager *s3account.AccountManager, ownership, bucketOwnerId, ownerId, accountId string) (grants []*s3.Grant, errCode s3err.ErrorCode) {
if r.Body != nil && r.Body != http.NoBody {
defer util.CloseRequest(r)
// ValidateAccount validate weather request account id is allowed to access
func ValidateAccount(requestAccountId string, allowedAccounts ...string) bool {
for _, allowedAccount := range allowedAccounts {
if requestAccountId == allowedAccount {
return true
}
}
return false
}
// ExtractBucketAcl extracts the acl from the request body, or from the header if request body is empty
func ExtractBucketAcl(r *http.Request, accountManager *s3account.AccountManager, objectOwnership, bucketOwnerId, requestAccountId string, createBucket bool) (grants []*s3.Grant, errCode s3err.ErrorCode) {
cannedAclPresent := false
if r.Header.Get(s3_constants.AmzCannedAcl) != "" {
cannedAclPresent = true
}
customAclPresent := false
for _, customAclHeader := range customAclHeaders {
if r.Header.Get(customAclHeader) != "" {
customAclPresent = true
break
}
}
// AccessControlList body is not support when create object/bucket
if !createBucket && r.Body != nil && r.Body != http.NoBody {
defer util.CloseRequest(r)
if cannedAclPresent || customAclPresent {
return nil, s3err.ErrUnexpectedContent
}
var acp s3.AccessControlPolicy
err := xmlutil.UnmarshalXML(&acp, xml.NewDecoder(r.Body), "")
if err != nil || acp.Owner == nil || acp.Owner.ID == nil {
@ -38,116 +66,127 @@ func ExtractAcl(r *http.Request, accountManager *s3account.AccountManager, owner
}
//owner should present && owner is immutable
if *acp.Owner.ID != ownerId {
glog.V(3).Infof("set acl denied! owner account is not consistent, request account id: %s, expect account id: %s", accountId, ownerId)
if *acp.Owner.ID == "" || *acp.Owner.ID != bucketOwnerId {
glog.V(3).Infof("set acl denied! owner account is not consistent, request account id: %s, expect account id: %s", *acp.Owner.ID, bucketOwnerId)
return nil, s3err.ErrAccessDenied
}
return ValidateAndTransferGrants(accountManager, acp.Grants)
grants = acp.Grants
} else {
_, grants, errCode = ParseAndValidateAclHeadersOrElseDefault(r, accountManager, ownership, bucketOwnerId, accountId, true)
return grants, errCode
if cannedAclPresent && customAclPresent {
return nil, s3err.ErrInvalidRequest
}
if cannedAclPresent {
grants, errCode = ExtractBucketCannedAcl(r, requestAccountId)
} else if customAclPresent {
grants, errCode = ExtractCustomAcl(r)
}
}
// ParseAndValidateAclHeadersOrElseDefault will callParseAndValidateAclHeaders to get Grants, if empty, it will return Grant that grant `accountId` with `FullControl` permission
func ParseAndValidateAclHeadersOrElseDefault(r *http.Request, accountManager *s3account.AccountManager, ownership, bucketOwnerId, accountId string, putAcl bool) (ownerId string, grants []*s3.Grant, errCode s3err.ErrorCode) {
ownerId, grants, errCode = ParseAndValidateAclHeaders(r, accountManager, ownership, bucketOwnerId, accountId, putAcl)
if errCode != s3err.ErrNone {
return
return nil, errCode
}
if len(grants) == 0 {
//if no acl(both customAcl and cannedAcl) specified, grant accountId(object writer) with full control permission
grants = append(grants, &s3.Grant{
Grantee: &s3.Grantee{
Type: &s3_constants.GrantTypeCanonicalUser,
ID: &accountId,
},
Permission: &s3_constants.PermissionFullControl,
})
}
return
}
// ParseAndValidateAclHeaders parse and validate acl from header
func ParseAndValidateAclHeaders(r *http.Request, accountManager *s3account.AccountManager, ownership, bucketOwnerId, accountId string, putAcl bool) (ownerId string, grants []*s3.Grant, errCode s3err.ErrorCode) {
ownerId, grants, errCode = ParseAclHeaders(r, ownership, bucketOwnerId, accountId, putAcl)
errCode = ValidateObjectOwnershipAndGrants(objectOwnership, bucketOwnerId, grants)
if errCode != s3err.ErrNone {
return
return nil, errCode
}
if len(grants) > 0 {
grants, errCode = ValidateAndTransferGrants(accountManager, grants)
if errCode != s3err.ErrNone {
return nil, errCode
}
return
return grants, s3err.ErrNone
}
// ParseAclHeaders parse acl headers
// When `putAcl` is true, only `CannedAcl` is parsed, such as `PutBucketAcl` or `PutObjectAcl`
// is requested, `CustomAcl` is parsed from the request body not from headers, and only if the
// request body is empty, `CannedAcl` is parsed from the header, and will not parse `CustomAcl` from the header
//
// Since `CustomAcl` has higher priority, it will be parsed first; if `CustomAcl` does not exist, `CannedAcl` will be parsed
func ParseAclHeaders(r *http.Request, ownership, bucketOwnerId, accountId string, putAcl bool) (ownerId string, grants []*s3.Grant, errCode s3err.ErrorCode) {
if !putAcl {
errCode = ParseCustomAclHeaders(r, &grants)
if errCode != s3err.ErrNone {
return "", nil, errCode
// ExtractObjectAcl extracts the acl from the request body, or from the header if request body is empty
func ExtractObjectAcl(r *http.Request, accountManager *s3account.AccountManager, objectOwnership, bucketOwnerId, requestAccountId string, createObject bool) (ownerId string, grants []*s3.Grant, errCode s3err.ErrorCode) {
cannedAclPresent := false
if r.Header.Get(s3_constants.AmzCannedAcl) != "" {
cannedAclPresent = true
}
customAclPresent := false
for _, customAclHeader := range customAclHeaders {
if r.Header.Get(customAclHeader) != "" {
customAclPresent = true
break
}
if len(grants) > 0 {
return accountId, grants, s3err.ErrNone
}
cannedAcl := r.Header.Get(s3_constants.AmzCannedAcl)
if len(cannedAcl) == 0 {
return accountId, grants, s3err.ErrNone
// AccessControlList body is not support when create object/bucket
if !createObject && r.Body != nil && r.Body != http.NoBody {
defer util.CloseRequest(r)
if cannedAclPresent || customAclPresent {
return "", nil, s3err.ErrUnexpectedContent
}
var acp s3.AccessControlPolicy
err := xmlutil.UnmarshalXML(&acp, xml.NewDecoder(r.Body), "")
if err != nil || acp.Owner == nil || acp.Owner.ID == nil {
return "", nil, s3err.ErrInvalidRequest
}
//if canned acl specified, parse cannedAcl (lower priority to custom acl)
ownerId, grants, errCode = ParseCannedAclHeader(ownership, bucketOwnerId, accountId, cannedAcl, putAcl)
//owner should present && owner is immutable
if *acp.Owner.ID == "" {
glog.V(1).Infof("Access denied! The owner id is required when specifying grants using AccessControlList")
return "", nil, s3err.ErrAccessDenied
}
ownerId = *acp.Owner.ID
grants = acp.Grants
} else {
if cannedAclPresent && customAclPresent {
return "", nil, s3err.ErrInvalidRequest
}
if cannedAclPresent {
ownerId, grants, errCode = ExtractObjectCannedAcl(r, objectOwnership, bucketOwnerId, requestAccountId, createObject)
} else {
grants, errCode = ExtractCustomAcl(r)
}
if errCode != s3err.ErrNone {
return "", nil, errCode
}
}
errCode = ValidateObjectOwnershipAndGrants(objectOwnership, bucketOwnerId, grants)
if errCode != s3err.ErrNone {
return "", nil, errCode
}
grants, errCode = ValidateAndTransferGrants(accountManager, grants)
return ownerId, grants, errCode
}
func ParseCustomAclHeaders(r *http.Request, grants *[]*s3.Grant) s3err.ErrorCode {
customAclHeaders := []string{s3_constants.AmzAclFullControl, s3_constants.AmzAclRead, s3_constants.AmzAclReadAcp, s3_constants.AmzAclWrite, s3_constants.AmzAclWriteAcp}
func ExtractCustomAcl(r *http.Request) ([]*s3.Grant, s3err.ErrorCode) {
var errCode s3err.ErrorCode
var grants []*s3.Grant
for _, customAclHeader := range customAclHeaders {
headerValue := r.Header.Get(customAclHeader)
switch customAclHeader {
case s3_constants.AmzAclRead:
errCode = ParseCustomAclHeader(headerValue, s3_constants.PermissionRead, grants)
errCode = ParseCustomAclHeader(headerValue, s3_constants.PermissionRead, &grants)
case s3_constants.AmzAclWrite:
errCode = ParseCustomAclHeader(headerValue, s3_constants.PermissionWrite, grants)
errCode = ParseCustomAclHeader(headerValue, s3_constants.PermissionWrite, &grants)
case s3_constants.AmzAclReadAcp:
errCode = ParseCustomAclHeader(headerValue, s3_constants.PermissionReadAcp, grants)
errCode = ParseCustomAclHeader(headerValue, s3_constants.PermissionReadAcp, &grants)
case s3_constants.AmzAclWriteAcp:
errCode = ParseCustomAclHeader(headerValue, s3_constants.PermissionWriteAcp, grants)
errCode = ParseCustomAclHeader(headerValue, s3_constants.PermissionWriteAcp, &grants)
case s3_constants.AmzAclFullControl:
errCode = ParseCustomAclHeader(headerValue, s3_constants.PermissionFullControl, grants)
errCode = ParseCustomAclHeader(headerValue, s3_constants.PermissionFullControl, &grants)
default:
errCode = s3err.ErrInvalidAclArgument
}
if errCode != s3err.ErrNone {
return errCode
return nil, errCode
}
}
return s3err.ErrNone
return grants, s3err.ErrNone
}
func ParseCustomAclHeader(headerValue, permission string, grants *[]*s3.Grant) s3err.ErrorCode {
if len(headerValue) > 0 {
split := strings.Split(headerValue, ", ")
split := strings.Split(headerValue, ",")
for _, grantStr := range split {
kv := strings.Split(grantStr, "=")
if len(kv) != 2 {
return s3err.ErrInvalidRequest
}
switch kv[0] {
switch strings.TrimSpace(kv[0]) {
case "id":
var accountId string
_ = json.Unmarshal([]byte(kv[1]), &accountId)
accountId := decodeGranteeValue(kv[1])
*grants = append(*grants, &s3.Grant{
Grantee: &s3.Grantee{
Type: &s3_constants.GrantTypeCanonicalUser,
@ -156,8 +195,7 @@ func ParseCustomAclHeader(headerValue, permission string, grants *[]*s3.Grant) s
Permission: &permission,
})
case "emailAddress":
var emailAddress string
_ = json.Unmarshal([]byte(kv[1]), &emailAddress)
emailAddress := decodeGranteeValue(kv[1])
*grants = append(*grants, &s3.Grant{
Grantee: &s3.Grantee{
Type: &s3_constants.GrantTypeAmazonCustomerByEmail,
@ -167,7 +205,7 @@ func ParseCustomAclHeader(headerValue, permission string, grants *[]*s3.Grant) s
})
case "uri":
var groupName string
_ = json.Unmarshal([]byte(kv[1]), &groupName)
groupName = decodeGranteeValue(kv[1])
*grants = append(*grants, &s3.Grant{
Grantee: &s3.Grantee{
Type: &s3_constants.GrantTypeGroup,
@ -179,17 +217,66 @@ func ParseCustomAclHeader(headerValue, permission string, grants *[]*s3.Grant) s
}
}
return s3err.ErrNone
}
func decodeGranteeValue(value string) (result string) {
if !strings.HasPrefix(value, "\"") {
return value
}
_ = json.Unmarshal([]byte(value), &result)
if result == "" {
result = value
}
return result
}
func ParseCannedAclHeader(bucketOwnership, bucketOwnerId, accountId, cannedAcl string, putAcl bool) (ownerId string, grants []*s3.Grant, err s3err.ErrorCode) {
// ExtractBucketCannedAcl parse bucket canned acl, includes: 'private'|'public-read'|'public-read-write'|'authenticated-read'
func ExtractBucketCannedAcl(request *http.Request, requestAccountId string) (grants []*s3.Grant, err s3err.ErrorCode) {
cannedAcl := request.Header.Get(s3_constants.AmzCannedAcl)
if cannedAcl == "" {
return grants, s3err.ErrNone
}
err = s3err.ErrNone
ownerId = accountId
objectWriterFullControl := &s3.Grant{
Grantee: &s3.Grantee{
ID: &requestAccountId,
Type: &s3_constants.GrantTypeCanonicalUser,
},
Permission: &s3_constants.PermissionFullControl,
}
switch cannedAcl {
case s3_constants.CannedAclPrivate:
grants = append(grants, objectWriterFullControl)
case s3_constants.CannedAclPublicRead:
grants = append(grants, objectWriterFullControl)
grants = append(grants, s3_constants.PublicRead...)
case s3_constants.CannedAclPublicReadWrite:
grants = append(grants, objectWriterFullControl)
grants = append(grants, s3_constants.PublicReadWrite...)
case s3_constants.CannedAclAuthenticatedRead:
grants = append(grants, objectWriterFullControl)
grants = append(grants, s3_constants.AuthenticatedRead...)
default:
err = s3err.ErrInvalidAclArgument
}
return
}
// ExtractObjectCannedAcl parse object canned acl, includes: 'private'|'public-read'|'public-read-write'|'authenticated-read'|'aws-exec-read'|'bucket-owner-read'|'bucket-owner-full-control'
func ExtractObjectCannedAcl(request *http.Request, objectOwnership, bucketOwnerId, requestAccountId string, createObject bool) (ownerId string, grants []*s3.Grant, errCode s3err.ErrorCode) {
if createObject {
ownerId = requestAccountId
}
cannedAcl := request.Header.Get(s3_constants.AmzCannedAcl)
if cannedAcl == "" {
return ownerId, grants, s3err.ErrNone
}
//objectWrite automatically has full control on current object
errCode = s3err.ErrNone
objectWriterFullControl := &s3.Grant{
Grantee: &s3.Grantee{
ID: &accountId,
ID: &requestAccountId,
Type: &s3_constants.GrantTypeCanonicalUser,
},
Permission: &s3_constants.PermissionFullControl,
@ -212,7 +299,7 @@ func ParseCannedAclHeader(bucketOwnership, bucketOwnerId, accountId, cannedAcl s
grants = append(grants, s3_constants.LogDeliveryWrite...)
case s3_constants.CannedAclBucketOwnerRead:
grants = append(grants, objectWriterFullControl)
if bucketOwnerId != "" && bucketOwnerId != accountId {
if requestAccountId != bucketOwnerId {
grants = append(grants,
&s3.Grant{
Grantee: &s3.Grantee{
@ -225,7 +312,7 @@ func ParseCannedAclHeader(bucketOwnership, bucketOwnerId, accountId, cannedAcl s
case s3_constants.CannedAclBucketOwnerFullControl:
if bucketOwnerId != "" {
// if set ownership to 'BucketOwnerPreferred' when upload object, the bucket owner will be the object owner
if !putAcl && bucketOwnership == s3_constants.OwnershipBucketOwnerPreferred {
if createObject && objectOwnership == s3_constants.OwnershipBucketOwnerPreferred {
ownerId = bucketOwnerId
grants = append(grants,
&s3.Grant{
@ -237,7 +324,7 @@ func ParseCannedAclHeader(bucketOwnership, bucketOwnerId, accountId, cannedAcl s
})
} else {
grants = append(grants, objectWriterFullControl)
if accountId != bucketOwnerId {
if requestAccountId != bucketOwnerId {
grants = append(grants,
&s3.Grant{
Grantee: &s3.Grantee{
@ -249,15 +336,13 @@ func ParseCannedAclHeader(bucketOwnership, bucketOwnerId, accountId, cannedAcl s
}
}
}
case s3_constants.CannedAclAwsExecRead:
err = s3err.ErrNotImplemented
default:
err = s3err.ErrInvalidRequest
errCode = s3err.ErrInvalidAclArgument
}
return
}
// ValidateAndTransferGrants validate grant & transfer Email-Grant to Id-Grant
// ValidateAndTransferGrants validate grant entity exists and transfer Email-Grant to Id-Grant
func ValidateAndTransferGrants(accountManager *s3account.AccountManager, grants []*s3.Grant) ([]*s3.Grant, s3err.ErrorCode) {
var result []*s3.Grant
for _, grant := range grants {
@ -314,15 +399,43 @@ func ValidateAndTransferGrants(accountManager *s3account.AccountManager, grants
return result, s3err.ErrNone
}
// DetermineReqGrants generates the grant set (Grants) according to accountId and reqPermission.
func DetermineReqGrants(accountId, aclAction string) (grants []*s3.Grant) {
// ValidateObjectOwnershipAndGrants validate if grants equals OwnerFullControl when 'ObjectOwnership' is 'BucketOwnerEnforced'
func ValidateObjectOwnershipAndGrants(objectOwnership, bucketOwnerId string, grants []*s3.Grant) s3err.ErrorCode {
if len(grants) == 0 {
return s3err.ErrNone
}
if objectOwnership == "" {
objectOwnership = s3_constants.DefaultObjectOwnership
}
if objectOwnership != s3_constants.OwnershipBucketOwnerEnforced {
return s3err.ErrNone
}
if len(grants) > 1 {
return s3err.AccessControlListNotSupported
}
bucketOwnerFullControlGrant := &s3.Grant{
Permission: &s3_constants.PermissionFullControl,
Grantee: &s3.Grantee{
Type: &s3_constants.GrantTypeCanonicalUser,
ID: &bucketOwnerId,
},
}
if GrantEquals(bucketOwnerFullControlGrant, grants[0]) {
return s3err.ErrNone
}
return s3err.AccessControlListNotSupported
}
// DetermineRequiredGrants generates the grant set (Grants) according to accountId and reqPermission.
func DetermineRequiredGrants(accountId, permission string) (grants []*s3.Grant) {
// group grantee (AllUsers)
grants = append(grants, &s3.Grant{
Grantee: &s3.Grantee{
Type: &s3_constants.GrantTypeGroup,
URI: &s3_constants.GranteeGroupAllUsers,
},
Permission: &aclAction,
Permission: &permission,
})
grants = append(grants, &s3.Grant{
Grantee: &s3.Grantee{
@ -338,7 +451,7 @@ func DetermineReqGrants(accountId, aclAction string) (grants []*s3.Grant) {
Type: &s3_constants.GrantTypeCanonicalUser,
ID: &accountId,
},
Permission: &aclAction,
Permission: &permission,
})
grants = append(grants, &s3.Grant{
Grantee: &s3.Grantee{
@ -355,7 +468,7 @@ func DetermineReqGrants(accountId, aclAction string) (grants []*s3.Grant) {
Type: &s3_constants.GrantTypeGroup,
URI: &s3_constants.GranteeGroupAuthenticatedUsers,
},
Permission: &aclAction,
Permission: &permission,
})
grants = append(grants, &s3.Grant{
Grantee: &s3.Grantee{
@ -392,7 +505,7 @@ func SetAcpGrantsHeader(r *http.Request, acpGrants []*s3.Grant) {
}
// GetAcpGrants return grants parsed from entry
func GetAcpGrants(entryExtended map[string][]byte) []*s3.Grant {
func GetAcpGrants(ownerId *string, entryExtended map[string][]byte) []*s3.Grant {
acpBytes, ok := entryExtended[s3_constants.ExtAmzAclKey]
if ok && len(acpBytes) > 0 {
var grants []*s3.Grant
@ -400,31 +513,43 @@ func GetAcpGrants(entryExtended map[string][]byte) []*s3.Grant {
if err == nil {
return grants
}
glog.Warning("grants Unmarshal error", err)
}
if ownerId == nil {
return nil
}
return []*s3.Grant{
{
Grantee: &s3.Grantee{
Type: &s3_constants.GrantTypeCanonicalUser,
ID: ownerId,
},
Permission: &s3_constants.PermissionFullControl,
},
}
}
// AssembleEntryWithAcp fill entry with owner and grants
func AssembleEntryWithAcp(objectEntry *filer_pb.Entry, objectOwner string, grants []*s3.Grant) s3err.ErrorCode {
if objectEntry.Extended == nil {
objectEntry.Extended = make(map[string][]byte)
func AssembleEntryWithAcp(filerEntry *filer_pb.Entry, ownerId string, grants []*s3.Grant) s3err.ErrorCode {
if filerEntry.Extended == nil {
filerEntry.Extended = make(map[string][]byte)
}
if len(objectOwner) > 0 {
objectEntry.Extended[s3_constants.ExtAmzOwnerKey] = []byte(objectOwner)
if len(ownerId) > 0 {
filerEntry.Extended[s3_constants.ExtAmzOwnerKey] = []byte(ownerId)
} else {
delete(objectEntry.Extended, s3_constants.ExtAmzOwnerKey)
delete(filerEntry.Extended, s3_constants.ExtAmzOwnerKey)
}
if len(grants) > 0 {
if grants != nil {
grantsBytes, err := json.Marshal(grants)
if err != nil {
glog.Warning("assemble acp to entry:", err)
return s3err.ErrInvalidRequest
}
objectEntry.Extended[s3_constants.ExtAmzAclKey] = grantsBytes
filerEntry.Extended[s3_constants.ExtAmzAclKey] = grantsBytes
} else {
delete(objectEntry.Extended, s3_constants.ExtAmzAclKey)
delete(filerEntry.Extended, s3_constants.ExtAmzAclKey)
}
return s3err.ErrNone
@ -509,6 +634,46 @@ func GrantEquals(a, b *s3.Grant) bool {
return true
}
func MarshalGrantsToJson(grants []*s3.Grant) ([]byte, error) {
if len(grants) == 0 {
return nil, nil
}
var GrantsToMap []map[string]any
for _, grant := range grants {
grantee := grant.Grantee
switch *grantee.Type {
case s3_constants.GrantTypeGroup:
GrantsToMap = append(GrantsToMap, map[string]any{
"Permission": grant.Permission,
"Grantee": map[string]any{
"Type": grantee.Type,
"URI": grantee.URI,
},
})
case s3_constants.GrantTypeCanonicalUser:
GrantsToMap = append(GrantsToMap, map[string]any{
"Permission": grant.Permission,
"Grantee": map[string]any{
"Type": grantee.Type,
"ID": grantee.ID,
},
})
case s3_constants.GrantTypeAmazonCustomerByEmail:
GrantsToMap = append(GrantsToMap, map[string]any{
"Permission": grant.Permission,
"Grantee": map[string]any{
"Type": grantee.Type,
"EmailAddress": grantee.EmailAddress,
},
})
default:
return nil, fmt.Errorf("grantee type[%s] is not valid", *grantee.Type)
}
}
return json.Marshal(GrantsToMap)
}
func GrantWithFullControl(accountId string) *s3.Grant {
return &s3.Grant{
Permission: &s3_constants.PermissionFullControl,
@ -524,22 +689,22 @@ func CheckObjectAccessForReadObject(r *http.Request, w http.ResponseWriter, entr
return http.StatusOK, true
}
accountId := GetAccountId(r)
if len(accountId) == 0 {
requestAccountId := GetAccountId(r)
if len(requestAccountId) == 0 {
glog.Warning("#checkObjectAccessForReadObject header[accountId] not exists!")
return http.StatusForbidden, false
}
//owner access
objectOwner := GetAcpOwner(entry.Extended, bucketOwnerId)
if accountId == objectOwner {
if ValidateAccount(requestAccountId, objectOwner) {
return http.StatusOK, true
}
//find in Grants
acpGrants := GetAcpGrants(entry.Extended)
acpGrants := GetAcpGrants(nil, entry.Extended)
if acpGrants != nil {
reqGrants := DetermineReqGrants(accountId, s3_constants.PermissionRead)
reqGrants := DetermineRequiredGrants(requestAccountId, s3_constants.PermissionRead)
for _, requiredGrant := range reqGrants {
for _, grant := range acpGrants {
if GrantEquals(requiredGrant, grant) {
@ -549,6 +714,6 @@ func CheckObjectAccessForReadObject(r *http.Request, w http.ResponseWriter, entr
}
}
glog.V(3).Infof("acl denied! request account id: %s", accountId)
glog.V(3).Infof("acl denied! request account id: %s", requestAccountId)
return http.StatusForbidden, false
}

1391
weed/s3api/s3acl/acl_helper_test.go
File diff suppressed because it is too large
View File

300
weed/s3api/s3api_acp.go

@ -23,19 +23,19 @@ func getAccountId(r *http.Request) string {
}
func (s3a *S3ApiServer) checkAccessByOwnership(r *http.Request, bucket string) s3err.ErrorCode {
metadata, errCode := s3a.bucketRegistry.GetBucketMetadata(bucket)
bucketMetadata, errCode := s3a.bucketRegistry.GetBucketMetadata(bucket)
if errCode != s3err.ErrNone {
return errCode
}
accountId := getAccountId(r)
if accountId == s3account.AccountAdmin.Id || accountId == *metadata.Owner.ID {
requestAccountId := getAccountId(r)
if s3acl.ValidateAccount(requestAccountId, *bucketMetadata.Owner.ID) {
return s3err.ErrNone
}
return s3err.ErrAccessDenied
}
//Check access for PutBucketAclHandler
func (s3a *S3ApiServer) checkAccessForPutBucketAcl(accountId, bucket string) (*BucketMetaData, s3err.ErrorCode) {
func (s3a *S3ApiServer) checkAccessForPutBucketAcl(requestAccountId, bucket string) (*BucketMetaData, s3err.ErrorCode) {
bucketMetadata, errCode := s3a.bucketRegistry.GetBucketMetadata(bucket)
if errCode != s3err.ErrNone {
return nil, errCode
@ -45,12 +45,12 @@ func (s3a *S3ApiServer) checkAccessForPutBucketAcl(accountId, bucket string) (*B
return nil, s3err.AccessControlListNotSupported
}
if accountId == s3account.AccountAdmin.Id || accountId == *bucketMetadata.Owner.ID {
if s3acl.ValidateAccount(requestAccountId, *bucketMetadata.Owner.ID) {
return bucketMetadata, s3err.ErrNone
}
if len(bucketMetadata.Acl) > 0 {
reqGrants := s3acl.DetermineReqGrants(accountId, s3_constants.PermissionWriteAcp)
reqGrants := s3acl.DetermineRequiredGrants(requestAccountId, s3_constants.PermissionWriteAcp)
for _, bucketGrant := range bucketMetadata.Acl {
for _, reqGrant := range reqGrants {
if s3acl.GrantEquals(bucketGrant, reqGrant) {
@ -59,7 +59,7 @@ func (s3a *S3ApiServer) checkAccessForPutBucketAcl(accountId, bucket string) (*B
}
}
}
glog.V(3).Infof("acl denied! request account id: %s", accountId)
glog.V(3).Infof("acl denied! request account id: %s", requestAccountId)
return nil, s3err.ErrAccessDenied
}
@ -73,6 +73,7 @@ func updateBucketEntry(s3a *S3ApiServer, entry *filer_pb.Entry) error {
// - GetBucketAclHandler
// - ListObjectsV1Handler
// - ListObjectsV2Handler
// - ListMultipartUploadsHandler
func (s3a *S3ApiServer) checkAccessForReadBucket(r *http.Request, bucket, aclAction string) (*BucketMetaData, s3err.ErrorCode) {
bucketMetadata, errCode := s3a.bucketRegistry.GetBucketMetadata(bucket)
if errCode != s3err.ErrNone {
@ -83,13 +84,13 @@ func (s3a *S3ApiServer) checkAccessForReadBucket(r *http.Request, bucket, aclAct
return bucketMetadata, s3err.ErrNone
}
accountId := s3acl.GetAccountId(r)
if accountId == s3account.AccountAdmin.Id || accountId == *bucketMetadata.Owner.ID {
requestAccountId := s3acl.GetAccountId(r)
if s3acl.ValidateAccount(requestAccountId, *bucketMetadata.Owner.ID) {
return bucketMetadata, s3err.ErrNone
}
if len(bucketMetadata.Acl) > 0 {
reqGrants := s3acl.DetermineReqGrants(accountId, aclAction)
reqGrants := s3acl.DetermineRequiredGrants(requestAccountId, aclAction)
for _, bucketGrant := range bucketMetadata.Acl {
for _, reqGrant := range reqGrants {
if s3acl.GrantEquals(bucketGrant, reqGrant) {
@ -99,7 +100,7 @@ func (s3a *S3ApiServer) checkAccessForReadBucket(r *http.Request, bucket, aclAct
}
}
glog.V(3).Infof("acl denied! request account id: %s", accountId)
glog.V(3).Infof("acl denied! request account id: %s", requestAccountId)
return nil, s3err.ErrAccessDenied
}
@ -121,14 +122,12 @@ func (s3a *S3ApiServer) checkAccessForReadObjectAcl(r *http.Request, bucket, obj
return nil, s3err.ErrInternalError
}
}
if entry.IsDirectory {
return nil, s3err.ErrExistingObjectIsDirectory
}
acpOwnerId := s3acl.GetAcpOwner(entry.Extended, *bucketMetadata.Owner.ID)
acpOwnerName := s3a.accountManager.IdNameMapping[acpOwnerId]
acpGrants := s3acl.GetAcpGrants(entry.Extended)
acpGrants := s3acl.GetAcpGrants(&acpOwnerId, entry.Extended)
acp = &s3.AccessControlPolicy{
Owner: &s3.Owner{
ID: &acpOwnerId,
@ -138,24 +137,19 @@ func (s3a *S3ApiServer) checkAccessForReadObjectAcl(r *http.Request, bucket, obj
}
return acp, s3err.ErrNone
}
if bucketMetadata.ObjectOwnership == s3_constants.OwnershipBucketOwnerEnforced {
return getAcpFunc()
} else {
accountId := s3acl.GetAccountId(r)
acp, errCode := getAcpFunc()
}
requestAccountId := s3acl.GetAccountId(r)
acp, errCode = getAcpFunc()
if errCode != s3err.ErrNone {
return nil, errCode
}
if accountId == *acp.Owner.ID {
if s3acl.ValidateAccount(requestAccountId, *acp.Owner.ID) {
return acp, s3err.ErrNone
}
//find in Grants
if acp.Grants != nil {
reqGrants := s3acl.DetermineReqGrants(accountId, s3_constants.PermissionReadAcp)
reqGrants := s3acl.DetermineRequiredGrants(requestAccountId, s3_constants.PermissionReadAcp)
for _, requiredGrant := range reqGrants {
for _, grant := range acp.Grants {
if s3acl.GrantEquals(requiredGrant, grant) {
@ -164,10 +158,8 @@ func (s3a *S3ApiServer) checkAccessForReadObjectAcl(r *http.Request, bucket, obj
}
}
}
glog.V(3).Infof("acl denied! request account id: %s", accountId)
glog.V(3).Infof("CheckAccessForReadObjectAcl denied! request account id: %s", requestAccountId)
return nil, s3err.ErrAccessDenied
}
}
// Check Object-Read related access
@ -184,6 +176,10 @@ func (s3a *S3ApiServer) checkBucketAccessForReadObject(r *http.Request, bucket s
if bucketMetadata.ObjectOwnership != s3_constants.OwnershipBucketOwnerEnforced {
//offload object acl validation to filer layer
_, defaultErrorCode := s3a.checkAccessForReadBucket(r, bucket, s3_constants.PermissionRead)
if defaultErrorCode != s3err.ErrNone {
r.Header.Set(s3_constants.XSeaweedFSHeaderAmzBucketAccessDenied, "true")
}
r.Header.Set(s3_constants.XSeaweedFSHeaderAmzBucketOwnerId, *bucketMetadata.Owner.ID)
}
@ -193,67 +189,58 @@ func (s3a *S3ApiServer) checkBucketAccessForReadObject(r *http.Request, bucket s
// Check ObjectAcl-Write related access
// includes:
// - PutObjectAclHandler
func (s3a *S3ApiServer) checkAccessForWriteObjectAcl(accountId, bucket, object string) (bucketMetadata *BucketMetaData, objectEntry *filer_pb.Entry, objectOwner string, errCode s3err.ErrorCode) {
bucketMetadata, errCode = s3a.bucketRegistry.GetBucketMetadata(bucket)
func (s3a *S3ApiServer) checkAccessForWriteObjectAcl(r *http.Request, bucket, object string) (*filer_pb.Entry, string, []*s3.Grant, s3err.ErrorCode) {
bucketMetadata, errCode := s3a.bucketRegistry.GetBucketMetadata(bucket)
if errCode != s3err.ErrNone {
return nil, nil, "", errCode
return nil, "", nil, errCode
}
if bucketMetadata.ObjectOwnership == s3_constants.OwnershipBucketOwnerEnforced {
return nil, nil, "", s3err.AccessControlListNotSupported
requestAccountId := s3acl.GetAccountId(r)
reqOwnerId, grants, errCode := s3acl.ExtractObjectAcl(r, s3a.accountManager, bucketMetadata.ObjectOwnership, *bucketMetadata.Owner.ID, requestAccountId, false)
if errCode != s3err.ErrNone {
return nil, "", nil, errCode
}
//bucket acl
bucketAclAllowed := false
reqGrants := s3acl.DetermineReqGrants(accountId, s3_constants.PermissionWrite)
if accountId == *bucketMetadata.Owner.ID {
bucketAclAllowed = true
} else if bucketMetadata.Acl != nil {
bucketLoop:
for _, bucketGrant := range bucketMetadata.Acl {
for _, requiredGrant := range reqGrants {
if s3acl.GrantEquals(bucketGrant, requiredGrant) {
bucketAclAllowed = true
break bucketLoop
}
}
}
}
if !bucketAclAllowed {
return nil, nil, "", s3err.ErrAccessDenied
if bucketMetadata.ObjectOwnership == s3_constants.OwnershipBucketOwnerEnforced {
return nil, "", nil, s3err.AccessControlListNotSupported
}
//object acl
objectEntry, err := getObjectEntry(s3a, bucket, object)
if err != nil {
if err == filer_pb.ErrNotFound {
return nil, nil, "", s3err.ErrNoSuchKey
return nil, "", nil, s3err.ErrNoSuchKey
}
return nil, nil, "", s3err.ErrInternalError
return nil, "", nil, s3err.ErrInternalError
}
if objectEntry.IsDirectory {
return nil, nil, "", s3err.ErrExistingObjectIsDirectory
return nil, "", nil, s3err.ErrExistingObjectIsDirectory
}
objectOwner = s3acl.GetAcpOwner(objectEntry.Extended, *bucketMetadata.Owner.ID)
if accountId == objectOwner {
return bucketMetadata, objectEntry, objectOwner, s3err.ErrNone
objectOwner := s3acl.GetAcpOwner(objectEntry.Extended, *bucketMetadata.Owner.ID)
//object owner is immutable
if reqOwnerId != "" && reqOwnerId != objectOwner {
return nil, "", nil, s3err.ErrAccessDenied
}
if s3acl.ValidateAccount(requestAccountId, objectOwner) {
return objectEntry, objectOwner, grants, s3err.ErrNone
}
objectGrants := s3acl.GetAcpGrants(objectEntry.Extended)
objectGrants := s3acl.GetAcpGrants(nil, objectEntry.Extended)
if objectGrants != nil {
requiredGrants := s3acl.DetermineRequiredGrants(requestAccountId, s3_constants.PermissionWriteAcp)
for _, objectGrant := range objectGrants {
for _, requiredGrant := range reqGrants {
for _, requiredGrant := range requiredGrants {
if s3acl.GrantEquals(objectGrant, requiredGrant) {
return bucketMetadata, objectEntry, objectOwner, s3err.ErrNone
return objectEntry, objectOwner, grants, s3err.ErrNone
}
}
}
}
glog.V(3).Infof("acl denied! request account id: %s", accountId)
return nil, nil, "", s3err.ErrAccessDenied
glog.V(3).Infof("checkAccessForWriteObjectAcl denied! request account id: %s", requestAccountId)
return nil, "", nil, s3err.ErrAccessDenied
}
func updateObjectEntry(s3a *S3ApiServer, bucket, object string, entry *filer_pb.Entry) error {
@ -269,150 +256,159 @@ func (s3a *S3ApiServer) CheckAccessForPutObject(r *http.Request, bucket, object
return s3a.checkAccessForWriteObject(r, bucket, object, accountId)
}
// CheckAccessForNewMultipartUpload Check Acl for InitiateMultipartUploadResult API
// CheckAccessForPutObjectPartHandler Check Acl for Upload object part
// includes:
// - NewMultipartUploadHandler
func (s3a *S3ApiServer) CheckAccessForNewMultipartUpload(r *http.Request, bucket, object string) s3err.ErrorCode {
// - PutObjectPartHandler
func (s3a *S3ApiServer) CheckAccessForPutObjectPartHandler(r *http.Request, bucket string) s3err.ErrorCode {
bucketMetadata, errCode := s3a.bucketRegistry.GetBucketMetadata(bucket)
if errCode != s3err.ErrNone {
return errCode
}
if bucketMetadata.ObjectOwnership == s3_constants.OwnershipBucketOwnerEnforced {
return s3err.ErrNone
}
accountId := s3acl.GetAccountId(r)
if accountId == IdentityAnonymous.AccountId {
if !CheckBucketAccess(accountId, bucketMetadata, s3_constants.PermissionWrite) {
return s3err.ErrAccessDenied
}
return s3a.checkAccessForWriteObject(r, bucket, object, accountId)
return s3err.ErrNone
}
// CheckAccessForCompleteMultipartUpload Check Acl for CompleteMultipartUpload API
// CheckAccessForNewMultipartUpload Check Acl for API
// includes:
// - CompleteMultipartUploadHandler
// - NewMultipartUploadHandler
func (s3a *S3ApiServer) CheckAccessForNewMultipartUpload(r *http.Request, bucket, object string) (s3err.ErrorCode, string) {
accountId := s3acl.GetAccountId(r)
if accountId == IdentityAnonymous.AccountId {
return s3err.ErrAccessDenied, ""
}
errCode := s3a.checkAccessForWriteObject(r, bucket, object, accountId)
return errCode, accountId
}
func (s3a *S3ApiServer) CheckAccessForAbortMultipartUpload(r *http.Request, bucket, object string) s3err.ErrorCode {
return s3a.CheckAccessWithBucketOwnerAndInitiator(r, bucket, object)
}
func (s3a *S3ApiServer) CheckAccessForCompleteMultipartUpload(r *http.Request, bucket, object string) s3err.ErrorCode {
bucketMetadata, errCode := s3a.bucketRegistry.GetBucketMetadata(bucket)
if errCode != s3err.ErrNone {
return errCode
}
if bucketMetadata.ObjectOwnership != s3_constants.OwnershipBucketOwnerEnforced {
accountId := getAccountId(r)
if !CheckBucketAccess(accountId, bucketMetadata, s3_constants.PermissionWrite) {
return s3err.ErrAccessDenied
}
}
return s3err.ErrNone
}
func (s3a *S3ApiServer) CheckAccessForListMultipartUploadParts(r *http.Request, bucket, object string) s3err.ErrorCode {
return s3a.CheckAccessWithBucketOwnerAndInitiator(r, bucket, object)
}
// CheckAccessWithBucketOwnerAndInitiator Check Access Permission with 'bucketOwner' and 'multipartUpload initiator'
func (s3a *S3ApiServer) CheckAccessWithBucketOwnerAndInitiator(r *http.Request, bucket, object string) s3err.ErrorCode {
bucketMetadata, errCode := s3a.bucketRegistry.GetBucketMetadata(bucket)
if errCode != s3err.ErrNone {
return errCode
}
//bucket access allowed
accountId := s3acl.GetAccountId(r)
if accountId == *bucketMetadata.Owner.ID {
return s3err.ErrNone
} else {
if len(bucketMetadata.Acl) > 0 {
reqGrants := s3acl.DetermineReqGrants(accountId, s3_constants.PermissionWrite)
for _, bucketGrant := range bucketMetadata.Acl {
for _, requiredGrant := range reqGrants {
if s3acl.GrantEquals(bucketGrant, requiredGrant) {
if s3acl.ValidateAccount(*bucketMetadata.Owner.ID, accountId) {
return s3err.ErrNone
}
//multipart initiator allowed
entry, err := getMultipartUpload(s3a, bucket, object)
if err != nil {
if err != filer_pb.ErrNotFound {
return s3err.ErrInternalError
}
} else {
uploadInitiator, ok := entry.Extended[s3_constants.ExtAmzMultipartInitiator]
if !ok || accountId == string(uploadInitiator) {
return s3err.ErrNone
}
}
}
glog.V(3).Infof("acl denied! request account id: %s", accountId)
glog.V(3).Infof("CheckAccessWithBucketOwnerAndInitiator denied! request account id: %s", accountId)
return s3err.ErrAccessDenied
}
func (s3a *S3ApiServer) checkAccessForWriteObject(r *http.Request, bucket, object, accountId string) s3err.ErrorCode {
func (s3a *S3ApiServer) checkAccessForWriteObject(r *http.Request, bucket, object, requestAccountId string) s3err.ErrorCode {
bucketMetadata, errCode := s3a.bucketRegistry.GetBucketMetadata(bucket)
if errCode != s3err.ErrNone {
return errCode
}
if bucketMetadata.ObjectOwnership == s3_constants.OwnershipBucketOwnerEnforced {
// validate grants (only bucketOwnerFullControl acl is allowed)
_, grants, errCode := s3acl.ParseAndValidateAclHeaders(r, s3a.accountManager, bucketMetadata.ObjectOwnership, *bucketMetadata.Owner.ID, accountId, false)
requestOwnerId, grants, errCode := s3acl.ExtractObjectAcl(r, s3a.accountManager, bucketMetadata.ObjectOwnership, *bucketMetadata.Owner.ID, requestAccountId, true)
if errCode != s3err.ErrNone {
return errCode
}
if len(grants) > 1 {
return s3err.AccessControlListNotSupported
}
bucketOwnerFullControlGrant := &s3.Grant{
Permission: &s3_constants.PermissionFullControl,
Grantee: &s3.Grantee{
Type: &s3_constants.GrantTypeCanonicalUser,
ID: bucketMetadata.Owner.ID,
},
}
if len(grants) == 0 {
return s3err.ErrNone
}
if s3acl.GrantEquals(bucketOwnerFullControlGrant, grants[0]) {
if bucketMetadata.ObjectOwnership == s3_constants.OwnershipBucketOwnerEnforced {
return s3err.ErrNone
}
return s3err.AccessControlListNotSupported
}
//bucket access allowed
bucketAclAllowed := false
if accountId == *bucketMetadata.Owner.ID {
bucketAclAllowed = true
} else {
if len(bucketMetadata.Acl) > 0 {
reqGrants := s3acl.DetermineReqGrants(accountId, s3_constants.PermissionWrite)
bucketLoop:
for _, bucketGrant := range bucketMetadata.Acl {
for _, requiredGrant := range reqGrants {
if s3acl.GrantEquals(bucketGrant, requiredGrant) {
bucketAclAllowed = true
break bucketLoop
}
}
}
}
}
if !bucketAclAllowed {
glog.V(3).Infof("acl denied! request account id: %s", accountId)
if !CheckBucketAccess(requestAccountId, bucketMetadata, s3_constants.PermissionWrite) {
return s3err.ErrAccessDenied
}
//object access allowed
if requestOwnerId == "" {
requestOwnerId = requestAccountId
}
entry, err := getObjectEntry(s3a, bucket, object)
if err != nil {
if err != filer_pb.ErrNotFound {
return s3err.ErrInternalError
if err == filer_pb.ErrNotFound {
s3acl.SetAcpOwnerHeader(r, requestOwnerId)
s3acl.SetAcpGrantsHeader(r, grants)
return s3err.ErrNone
}
} else {
if entry.IsDirectory {
return s3err.ErrExistingObjectIsDirectory
return s3err.ErrInternalError
}
//Only the owner of the bucket and the owner of the object can overwrite the object
objectOwner := s3acl.GetAcpOwner(entry.Extended, *bucketMetadata.Owner.ID)
if accountId != objectOwner && accountId != *bucketMetadata.Owner.ID {
glog.V(3).Infof("acl denied! request account id: %s, expect account id: %s", accountId, *bucketMetadata.Owner.ID)
objectOwnerId := s3acl.GetAcpOwner(entry.Extended, *bucketMetadata.Owner.ID)
//object owner is immutable
if requestOwnerId != "" && objectOwnerId != requestOwnerId {
return s3err.ErrAccessDenied
}
}
ownerId, grants, errCode := s3acl.ParseAndValidateAclHeadersOrElseDefault(r, s3a.accountManager, bucketMetadata.ObjectOwnership, *bucketMetadata.Owner.ID, accountId, false)
if errCode != s3err.ErrNone {
return errCode
//Only the owner of the bucket and the owner of the object can overwrite the object
if s3acl.ValidateAccount(requestOwnerId, objectOwnerId, *bucketMetadata.Owner.ID) {
glog.V(3).Infof("checkAccessForWriteObject denied! request account id: %s, expect account id: %s", requestAccountId, *bucketMetadata.Owner.ID)
return s3err.ErrAccessDenied
}
s3acl.SetAcpOwnerHeader(r, ownerId)
s3acl.SetAcpOwnerHeader(r, objectOwnerId)
s3acl.SetAcpGrantsHeader(r, grants)
return s3err.ErrNone
}
func CheckBucketAccess(requestAccountId string, bucketMetadata *BucketMetaData, permission string) bool {
if s3acl.ValidateAccount(requestAccountId, *bucketMetadata.Owner.ID) {
return true
} else {
if len(bucketMetadata.Acl) > 0 {
reqGrants := s3acl.DetermineRequiredGrants(requestAccountId, permission)
for _, bucketGrant := range bucketMetadata.Acl {
for _, requiredGrant := range reqGrants {
if s3acl.GrantEquals(bucketGrant, requiredGrant) {
return true
}
}
}
}
}
glog.V(3).Infof("CheckBucketAccess denied! request account id: %s", requestAccountId)
return false
}
func getObjectEntry(s3a *S3ApiServer, bucket, object string) (*filer_pb.Entry, error) {
return s3a.getEntry(util.Join(s3a.option.BucketsPath, bucket), object)
}
func (s3a *S3ApiServer) ExtractBucketAcp(r *http.Request, objectOwnership string) (owner string, grants []*s3.Grant, errCode s3err.ErrorCode) {
accountId := s3acl.GetAccountId(r)
if objectOwnership == "" || objectOwnership == s3_constants.OwnershipBucketOwnerEnforced {
return accountId, []*s3.Grant{
{
Permission: &s3_constants.PermissionFullControl,
Grantee: &s3.Grantee{
Type: &s3_constants.GrantTypeCanonicalUser,
ID: &accountId,
},
},
}, s3err.ErrNone
} else {
return s3acl.ParseAndValidateAclHeadersOrElseDefault(r, s3a.accountManager, objectOwnership, accountId, accountId, false)
}
func getMultipartUpload(s3a *S3ApiServer, bucket, object string) (*filer_pb.Entry, error) {
return s3a.getEntry(s3a.genUploadsFolder(bucket), object)
}

43
weed/s3api/s3api_bucket_handlers.go

@ -123,8 +123,9 @@ func (s3a *S3ApiServer) PutBucketHandler(w http.ResponseWriter, r *http.Request)
}
}
objectOwnership := r.Header.Get("ObjectOwnership")
acpOwner, acpGrants, errCode := s3a.ExtractBucketAcp(r, objectOwnership)
objectOwnership := r.Header.Get("X-Amz-Object-Ownership")
requestAccountId := s3acl.GetAccountId(r)
grants, errCode := s3acl.ExtractBucketAcl(r, s3a.accountManager, objectOwnership, requestAccountId, requestAccountId, true)
if errCode != s3err.ErrNone {
s3err.WriteErrorResponse(w, r, errCode)
return
@ -138,9 +139,12 @@ func (s3a *S3ApiServer) PutBucketHandler(w http.ResponseWriter, r *http.Request)
entry.Extended[s3_constants.AmzIdentityId] = []byte(identityId)
}
if objectOwnership != "" {
if entry.Extended == nil {
entry.Extended = make(map[string][]byte)
}
entry.Extended[s3_constants.ExtOwnershipKey] = []byte(objectOwnership)
}
s3acl.AssembleEntryWithAcp(entry, acpOwner, acpGrants)
s3acl.AssembleEntryWithAcp(entry, requestAccountId, grants)
}
// create the folder for bucket, but lazily create actual collection
@ -269,7 +273,7 @@ func (s3a *S3ApiServer) PutBucketAclHandler(w http.ResponseWriter, r *http.Reque
return
}
grants, errCode := s3acl.ExtractAcl(r, s3a.accountManager, bucketMetadata.ObjectOwnership, "", *bucketMetadata.Owner.ID, accountId)
grants, errCode := s3acl.ExtractBucketAcl(r, s3a.accountManager, bucketMetadata.ObjectOwnership, *bucketMetadata.Owner.ID, accountId, false)
if errCode != s3err.ErrNone {
s3err.WriteErrorResponse(w, r, errCode)
return
@ -293,6 +297,8 @@ func (s3a *S3ApiServer) PutBucketAclHandler(w http.ResponseWriter, r *http.Reque
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
return
}
//update local cache
bucketMetadata.Acl = grants
s3err.WriteEmptyResponse(w, r, http.StatusOK)
}
@ -402,9 +408,8 @@ func (s3a *S3ApiServer) PutBucketOwnershipControls(w http.ResponseWriter, r *htt
return
}
var v s3.OwnershipControls
defer util.CloseRequest(r)
var v s3.OwnershipControls
err := xmlutil.UnmarshalXML(&v, xml.NewDecoder(r.Body), "")
if err != nil {
s3err.WriteErrorResponse(w, r, s3err.ErrInvalidRequest)
@ -417,12 +422,11 @@ func (s3a *S3ApiServer) PutBucketOwnershipControls(w http.ResponseWriter, r *htt
}
printOwnership := true
ownership := *v.Rules[0].ObjectOwnership
switch ownership {
newObjectOwnership := *v.Rules[0].ObjectOwnership
switch newObjectOwnership {
case s3_constants.OwnershipObjectWriter:
case s3_constants.OwnershipBucketOwnerPreferred:
case s3_constants.OwnershipBucketOwnerEnforced:
printOwnership = false
default:
s3err.WriteErrorResponse(w, r, s3err.ErrInvalidRequest)
return
@ -439,29 +443,34 @@ func (s3a *S3ApiServer) PutBucketOwnershipControls(w http.ResponseWriter, r *htt
}
oldOwnership, ok := bucketEntry.Extended[s3_constants.ExtOwnershipKey]
if !ok || string(oldOwnership) != ownership {
if !ok || string(oldOwnership) != newObjectOwnership {
// must reset bucket acl to default(bucket owner with full control permission) before setting ownership
// to `OwnershipBucketOwnerEnforced` (bucket cannot have ACLs set with ObjectOwnership's BucketOwnerEnforced setting)
if ownership == s3_constants.OwnershipBucketOwnerEnforced {
acpGrants := s3acl.GetAcpGrants(bucketEntry.Extended)
if len(acpGrants) != 1 {
if newObjectOwnership == s3_constants.OwnershipBucketOwnerEnforced {
acpGrants := s3acl.GetAcpGrants(nil, bucketEntry.Extended)
if len(acpGrants) > 1 {
s3err.WriteErrorResponse(w, r, s3err.InvalidBucketAclWithObjectOwnership)
return
}
} else if len(acpGrants) == 1 {
bucketOwner := s3acl.GetAcpOwner(bucketEntry.Extended, s3account.AccountAdmin.Id)
expectGrant := s3acl.GrantWithFullControl(bucketOwner)
if s3acl.GrantEquals(acpGrants[0], expectGrant) {
if !s3acl.GrantEquals(acpGrants[0], expectGrant) {
s3err.WriteErrorResponse(w, r, s3err.InvalidBucketAclWithObjectOwnership)
return
}
}
}
if bucketEntry.Extended == nil {
bucketEntry.Extended = make(map[string][]byte)
}
bucketEntry.Extended[s3_constants.ExtOwnershipKey] = []byte(ownership)
//update local cache
bucketMetadata, eCode := s3a.bucketRegistry.GetBucketMetadata(bucket)
if eCode == s3err.ErrNone {
bucketMetadata.ObjectOwnership = newObjectOwnership
}
bucketEntry.Extended[s3_constants.ExtOwnershipKey] = []byte(newObjectOwnership)
err = s3a.updateEntry(s3a.option.BucketsPath, bucketEntry)
if err != nil {
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)

14
weed/s3api/s3api_object_handlers.go

@ -219,12 +219,16 @@ func (s3a *S3ApiServer) GetObjectAclHandler(w http.ResponseWriter, r *http.Reque
}
func (s3a *S3ApiServer) HeadObjectHandler(w http.ResponseWriter, r *http.Request) {
bucket, object := s3_constants.GetBucketAndObject(r)
glog.V(3).Infof("HeadObjectHandler %s %s", bucket, object)
destUrl := s3a.toFilerUrl(bucket, object)
errCode := s3a.checkBucketAccessForReadObject(r, bucket)
if errCode != s3err.ErrNone {
s3err.WriteErrorResponse(w, r, errCode)
return
}
destUrl := s3a.toFilerUrl(bucket, object)
s3a.proxyToFiler(w, r, destUrl, false, passThroughResponse)
}
@ -592,20 +596,18 @@ func (s3a *S3ApiServer) maybeGetFilerJwtAuthorizationToken(isWrite bool) string
func (s3a *S3ApiServer) PutObjectAclHandler(w http.ResponseWriter, r *http.Request) {
bucket, object := s3_constants.GetBucketAndObject(r)
accountId := s3acl.GetAccountId(r)
bucketMetadata, objectEntry, objectOwner, errCode := s3a.checkAccessForWriteObjectAcl(accountId, bucket, object)
objectEntry, ownerId, grants, errCode := s3a.checkAccessForWriteObjectAcl(r, bucket, object)
if errCode != s3err.ErrNone {
s3err.WriteErrorResponse(w, r, errCode)
return
}
grants, errCode := s3acl.ExtractAcl(r, s3a.accountManager, bucketMetadata.ObjectOwnership, *bucketMetadata.Owner.ID, objectOwner, accountId)
if errCode != s3err.ErrNone {
s3err.WriteErrorResponse(w, r, errCode)
return
}
errCode = s3acl.AssembleEntryWithAcp(objectEntry, objectOwner, grants)
errCode = s3acl.AssembleEntryWithAcp(objectEntry, ownerId, grants)
if errCode != s3err.ErrNone {
return
}

63
weed/s3api/s3api_object_multipart_handlers.go

@ -30,8 +30,7 @@ const (
func (s3a *S3ApiServer) NewMultipartUploadHandler(w http.ResponseWriter, r *http.Request) {
bucket, object := s3_constants.GetBucketAndObject(r)
//acl
errCode := s3a.CheckAccessForNewMultipartUpload(r, bucket, object)
errCode, initiatorId := s3a.CheckAccessForNewMultipartUpload(r, bucket, object)
if errCode != s3err.ErrNone {
s3err.WriteErrorResponse(w, r, errCode)
return
@ -52,7 +51,7 @@ func (s3a *S3ApiServer) NewMultipartUploadHandler(w http.ResponseWriter, r *http
if contentType != "" {
createMultipartUploadInput.ContentType = &contentType
}
response, errCode := s3a.createMultipartUpload(createMultipartUploadInput)
response, errCode := s3a.createMultipartUpload(initiatorId, createMultipartUploadInput)
glog.V(2).Info("NewMultipartUploadHandler", string(s3err.EncodeXMLResponse(response)), errCode)
@ -70,7 +69,6 @@ func (s3a *S3ApiServer) CompleteMultipartUploadHandler(w http.ResponseWriter, r
// https://docs.aws.amazon.com/AmazonS3/latest/API/API_CompleteMultipartUpload.html
bucket, object := s3_constants.GetBucketAndObject(r)
s3a.CheckAccessForCompleteMultipartUpload(r, bucket, object)
parts := &CompleteMultipartUpload{}
if err := xmlDecoder(r.Body, parts, r.ContentLength); err != nil {
@ -86,6 +84,12 @@ func (s3a *S3ApiServer) CompleteMultipartUploadHandler(w http.ResponseWriter, r
return
}
errCode := s3a.CheckAccessForCompleteMultipartUpload(r, bucket, uploadID)
if errCode != s3err.ErrNone {
s3err.WriteErrorResponse(w, r, errCode)
return
}
response, errCode := s3a.completeMultipartUpload(&s3.CompleteMultipartUploadInput{
Bucket: aws.String(bucket),
Key: objectKey(aws.String(object)),
@ -115,6 +119,24 @@ func (s3a *S3ApiServer) AbortMultipartUploadHandler(w http.ResponseWriter, r *ht
return
}
errCode := s3a.CheckAccessForAbortMultipartUpload(r, bucket, uploadID)
if errCode != s3err.ErrNone {
s3err.WriteErrorResponse(w, r, errCode)
return
}
exists, err := s3a.exists(s3a.genUploadsFolder(bucket), uploadID, true)
if err != nil {
glog.V(1).Infof("list parts error: %v, request url: %s", err, r.RequestURI)
s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchUpload)
return
}
if !exists {
glog.V(1).Infof("list parts not found, request url: %s", r.RequestURI)
s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchUpload)
return
}
response, errCode := s3a.abortMultipartUpload(&s3.AbortMultipartUploadInput{
Bucket: aws.String(bucket),
Key: objectKey(aws.String(object)),
@ -151,7 +173,12 @@ func (s3a *S3ApiServer) ListMultipartUploadsHandler(w http.ResponseWriter, r *ht
}
}
response, errCode := s3a.listMultipartUploads(&s3.ListMultipartUploadsInput{
bucketMetaData, errorCode := s3a.checkAccessForReadBucket(r, bucket, s3_constants.PermissionRead)
if errorCode != s3err.ErrNone {
s3err.WriteErrorResponse(w, r, errorCode)
return
}
response, errCode := s3a.listMultipartUploads(bucketMetaData, &s3.ListMultipartUploadsInput{
Bucket: aws.String(bucket),
Delimiter: aws.String(delimiter),
EncodingType: aws.String(encodingType),
@ -193,6 +220,23 @@ func (s3a *S3ApiServer) ListObjectPartsHandler(w http.ResponseWriter, r *http.Re
return
}
errCode := s3a.CheckAccessForListMultipartUploadParts(r, bucket, uploadID)
if errCode != s3err.ErrNone {
s3err.WriteErrorResponse(w, r, errCode)
return
}
exists, err := s3a.exists(s3a.genUploadsFolder(bucket), uploadID, true)
if err != nil {
glog.V(1).Infof("list parts error: %v, request url: %s", err, r.RequestURI)
s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchUpload)
return
}
if !exists {
glog.V(1).Infof("list parts not found, request url: %s", r.RequestURI)
s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchUpload)
return
}
response, errCode := s3a.listObjectParts(&s3.ListPartsInput{
Bucket: aws.String(bucket),
Key: objectKey(aws.String(object)),
@ -253,6 +297,12 @@ func (s3a *S3ApiServer) PutObjectPartHandler(w http.ResponseWriter, r *http.Requ
}
defer dataReader.Close()
errorCode := s3a.CheckAccessForPutObjectPartHandler(r, bucket)
if errorCode != s3err.ErrNone {
s3err.WriteErrorResponse(w, r, errorCode)
return
}
glog.V(2).Infof("PutObjectPartHandler %s %s %04d", bucket, uploadID, partID)
uploadUrl := fmt.Sprintf("http://%s%s/%s/%04d.part",
s3a.option.Filer.ToHttpAddress(), s3a.genUploadsFolder(bucket), uploadID, partID)
@ -260,9 +310,8 @@ func (s3a *S3ApiServer) PutObjectPartHandler(w http.ResponseWriter, r *http.Requ
if partID == 1 && r.Header.Get("Content-Type") == "" {
dataReader = mimeDetect(r, dataReader)
}
destination := fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, bucket, object)
etag, errCode := s3a.putToFiler(r, uploadUrl, dataReader, destination)
etag, errCode := s3a.putToFiler(r, uploadUrl, dataReader, "")
if errCode != s3err.ErrNone {
s3err.WriteErrorResponse(w, r, errCode)
return

10
weed/s3api/s3api_server.go

@ -128,15 +128,15 @@ func (s3a *S3ApiServer) registerRouter(router *mux.Router) {
// PutObjectPart
bucket.Methods("PUT").Path("/{object:.+}").HandlerFunc(track(s3a.Auth(withAcl(s3a.cb.Limit, s3a.PutObjectPartHandler, ACTION_WRITE)), "PUT")).Queries("partNumber", "{partNumber:[0-9]+}", "uploadId", "{uploadId:.*}")
// CompleteMultipartUpload
bucket.Methods("POST").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.CompleteMultipartUploadHandler, ACTION_WRITE)), "POST")).Queries("uploadId", "{uploadId:.*}")
bucket.Methods("POST").Path("/{object:.+}").HandlerFunc(track(s3a.Auth(withAcl(s3a.cb.Limit, s3a.CompleteMultipartUploadHandler, ACTION_WRITE)), "POST")).Queries("uploadId", "{uploadId:.*}")
// NewMultipartUpload
bucket.Methods("POST").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.NewMultipartUploadHandler, ACTION_WRITE)), "POST")).Queries("uploads", "")
bucket.Methods("POST").Path("/{object:.+}").HandlerFunc(track(s3a.Auth(withAcl(s3a.cb.Limit, s3a.NewMultipartUploadHandler, ACTION_WRITE)), "POST")).Queries("uploads", "")
// AbortMultipartUpload
bucket.Methods("DELETE").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.AbortMultipartUploadHandler, ACTION_WRITE)), "DELETE")).Queries("uploadId", "{uploadId:.*}")
bucket.Methods("DELETE").Path("/{object:.+}").HandlerFunc(track(s3a.Auth(withAcl(s3a.cb.Limit, s3a.AbortMultipartUploadHandler, ACTION_WRITE)), "DELETE")).Queries("uploadId", "{uploadId:.*}")
// ListObjectParts
bucket.Methods("GET").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.ListObjectPartsHandler, ACTION_READ)), "GET")).Queries("uploadId", "{uploadId:.*}")
bucket.Methods("GET").Path("/{object:.+}").HandlerFunc(track(s3a.Auth(withAcl(s3a.cb.Limit, s3a.ListObjectPartsHandler, ACTION_READ)), "GET")).Queries("uploadId", "{uploadId:.*}")
// ListMultipartUploads
bucket.Methods("GET").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.ListMultipartUploadsHandler, ACTION_READ)), "GET")).Queries("uploads", "")
bucket.Methods("GET").HandlerFunc(track(s3a.Auth(withAcl(s3a.cb.Limit, s3a.ListMultipartUploadsHandler, ACTION_READ)), "GET")).Queries("uploads", "")
// GetObjectTagging
bucket.Methods("GET").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.GetObjectTaggingHandler, ACTION_READ)), "GET")).Queries("tagging", "")

14
weed/s3api/s3err/s3api_errors.go

@ -111,6 +111,8 @@ const (
OwnershipControlsNotFoundError
InvalidBucketAclWithObjectOwnership
AccessControlListNotSupported
ErrUnexpectedContent
ErrInvalidAclArgument
)
// error code to APIError structure, these fields carry respective
@ -426,13 +428,23 @@ var errorCodeResponse = map[ErrorCode]APIError{
InvalidBucketAclWithObjectOwnership: {
Code: "InvalidBucketAclWithObjectOwnership",
Description: "Bucket cannot have ACLs set with ObjectOwnership's BucketOwnerEnforced setting",
HTTPStatusCode: http.StatusNotFound,
HTTPStatusCode: http.StatusBadRequest,
},
AccessControlListNotSupported: {
Code: "AccessControlListNotSupported",
Description: "The bucket does not allow ACLs",
HTTPStatusCode: http.StatusBadRequest,
},
ErrUnexpectedContent: {
Code: "UnexpectedContent",
Description: "This request does not support content",
HTTPStatusCode: http.StatusBadRequest,
},
ErrInvalidAclArgument: {
Code: "InvalidArgument",
Description: "ACL argument is invalid",
HTTPStatusCode: http.StatusBadRequest,
},
}
// GetAPIError provides API Error for input API error code.

15
weed/server/filer_server_handlers_read.go

@ -12,7 +12,7 @@ import (
"mime"
"net/http"
"path/filepath"
"strconv"
strconv "strconv"
"strings"
"time"
@ -99,7 +99,11 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request)
if err == filer_pb.ErrNotFound {
glog.V(2).Infof("Not found %s: %v", path, err)
stats.FilerRequestCounter.WithLabelValues(stats.ErrorReadNotFound).Inc()
if r.Header.Get(s3_constants.XSeaweedFSHeaderAmzBucketAccessDenied) == "true" {
w.WriteHeader(http.StatusForbidden)
} else {
w.WriteHeader(http.StatusNotFound)
}
} else {
glog.Errorf("Internal %s: %v", path, err)
stats.FilerRequestCounter.WithLabelValues(stats.ErrorReadInternal).Inc()
@ -174,10 +178,15 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request)
// print out the header from extended properties
for k, v := range entry.Extended {
if !strings.HasPrefix(k, "xattr-") {
if strings.HasPrefix(k, "xattr-") {
// "xattr-" prefix is set in filesys.XATTR_PREFIX
w.Header().Set(k, string(v))
continue
}
if strings.HasPrefix(k, "Seaweed-X-") {
// key with "Seaweed-X-" prefix is builtin and should not expose to user
continue
}
w.Header().Set(k, string(v))
}
//Seaweed custom header are not visible to Vue or javascript

Loading…
Cancel
Save