Browse Source

Merge branch '6_object_read' into 10_acl_merged

# Conflicts:
#	weed/s3api/s3acl/acl_helper.go
#	weed/s3api/s3api_acp.go
pull/4090/head
changlin.shi 2 years ago
parent
commit
a05785edc9
  1. 1
      weed/s3api/s3_constants/header.go
  2. 36
      weed/s3api/s3acl/acl_helper.go
  3. 20
      weed/s3api/s3api_acp.go
  4. 17
      weed/s3api/s3api_object_handlers.go
  5. 10
      weed/server/filer_server_handlers_read.go

1
weed/s3api/s3_constants/header.go

@ -38,6 +38,7 @@ const (
AmzTagCount = "x-amz-tagging-count"
X_SeaweedFS_Header_Directory_Key = "x-seaweedfs-is-directory-key"
XSeaweedFSHeaderAmzBucketOwnerId = "x-seaweedfs-amz-bucket-owner-id"
// S3 ACL headers
AmzCannedAcl = "X-Amz-Acl"

36
weed/s3api/s3acl/acl_helper.go

@ -5,6 +5,7 @@ import (
"encoding/xml"
"github.com/aws/aws-sdk-go/private/protocol/xml/xmlutil"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
@ -517,3 +518,38 @@ func GrantWithFullControl(accountId string) *s3.Grant {
},
}
}
func CheckObjectAccessForReadObject(r *http.Request, w http.ResponseWriter, entry *filer.Entry, bucketOwnerId string) (statusCode int, ok bool) {
if entry.IsDirectory() {
w.Header().Set(s3_constants.X_SeaweedFS_Header_Directory_Key, "true")
return http.StatusMethodNotAllowed, false
}
accountId := GetAccountId(r)
if len(accountId) == 0 {
glog.Warning("#checkObjectAccessForReadObject header[accountId] not exists!")
return http.StatusForbidden, false
}
//owner access
objectOwner := GetAcpOwner(entry.Extended, bucketOwnerId)
if accountId == objectOwner {
return http.StatusOK, true
}
//find in Grants
acpGrants := GetAcpGrants(entry.Extended)
if acpGrants != nil {
reqGrants := DetermineReqGrants(accountId, s3_constants.PermissionRead)
for _, requiredGrant := range reqGrants {
for _, grant := range acpGrants {
if GrantEquals(requiredGrant, grant) {
return http.StatusOK, true
}
}
}
}
glog.V(3).Infof("acl denied! request account id: %s", accountId)
return http.StatusForbidden, false
}

20
weed/s3api/s3api_acp.go

@ -172,3 +172,23 @@ func (s3a *S3ApiServer) checkAccessForReadObjectAcl(r *http.Request, bucket, obj
func getObjectEntry(s3a *S3ApiServer, bucket, object string) (*filer_pb.Entry, error) {
return s3a.getEntry(util.Join(s3a.option.BucketsPath, bucket), object)
}
// Check Object-Read related access
// includes:
// - GetObjectHandler
//
// offload object access validation to Filer layer
// - s3acl.CheckObjectAccessForReadObject
func (s3a *S3ApiServer) checkBucketAccessForReadObject(r *http.Request, bucket string) s3err.ErrorCode {
bucketMetadata, errCode := s3a.bucketRegistry.GetBucketMetadata(bucket)
if errCode != s3err.ErrNone {
return errCode
}
if bucketMetadata.ObjectOwnership != s3_constants.OwnershipBucketOwnerEnforced {
//offload object acl validation to filer layer
r.Header.Set(s3_constants.XSeaweedFSHeaderAmzBucketOwnerId, *bucketMetadata.Owner.ID)
}
return s3err.ErrNone
}

17
weed/s3api/s3api_object_handlers.go

@ -165,6 +165,12 @@ func (s3a *S3ApiServer) GetObjectHandler(w http.ResponseWriter, r *http.Request)
bucket, object := s3_constants.GetBucketAndObject(r)
glog.V(3).Infof("GetObjectHandler %s %s", bucket, object)
errCode := s3a.checkBucketAccessForReadObject(r, bucket)
if errCode != s3err.ErrNone {
s3err.WriteErrorResponse(w, r, errCode)
return
}
if strings.HasSuffix(r.URL.Path, "/") {
s3err.WriteErrorResponse(w, r, s3err.ErrNotImplemented)
return
@ -387,14 +393,17 @@ func (s3a *S3ApiServer) proxyToFiler(w http.ResponseWriter, r *http.Request, des
}
defer util.CloseResponse(resp)
if resp.StatusCode == http.StatusPreconditionFailed {
switch resp.StatusCode {
case http.StatusPreconditionFailed:
s3err.WriteErrorResponse(w, r, s3err.ErrPreconditionFailed)
return
}
if resp.StatusCode == http.StatusRequestedRangeNotSatisfiable {
case http.StatusRequestedRangeNotSatisfiable:
s3err.WriteErrorResponse(w, r, s3err.ErrInvalidRange)
return
case http.StatusForbidden:
s3err.WriteErrorResponse(w, r, s3err.ErrAccessDenied)
return
default:
}
if r.Method == "DELETE" {

10
weed/server/filer_server_handlers_read.go

@ -5,6 +5,7 @@ import (
"context"
"fmt"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3acl"
"github.com/seaweedfs/seaweedfs/weed/util/mem"
"io"
"math"
@ -107,6 +108,15 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request)
return
}
//s3 acl offload to filer
offloadHeaderBucketOwner := r.Header.Get(s3_constants.XSeaweedFSHeaderAmzBucketOwnerId)
if len(offloadHeaderBucketOwner) > 0 {
if statusCode, ok := s3acl.CheckObjectAccessForReadObject(r, w, entry, offloadHeaderBucketOwner); !ok {
w.WriteHeader(statusCode)
return
}
}
query := r.URL.Query()
if entry.IsDirectory() {

Loading…
Cancel
Save