diff --git a/weed/s3api/auth_credentials_subscribe.go b/weed/s3api/auth_credentials_subscribe.go index d567bb5d7..377cf2728 100644 --- a/weed/s3api/auth_credentials_subscribe.go +++ b/weed/s3api/auth_credentials_subscribe.go @@ -9,7 +9,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/util" ) -func (s3a *S3ApiServer) subscribeMetaEvents(clientName string, prefix string, lastTsNs int64) { +func (s3a *S3ApiServer) subscribeMetaEvents(clientName string, lastTsNs int64, prefix string, directoriesToWatch []string) { processEventFn := func(resp *filer_pb.SubscribeMetadataResponse) error { @@ -28,6 +28,7 @@ func (s3a *S3ApiServer) subscribeMetaEvents(clientName string, prefix string, la _ = s3a.onIamConfigUpdate(dir, fileName, content) _ = s3a.onCircuitBreakerConfigUpdate(dir, fileName, content) + _ = s3a.onBucketMetadataChange(dir, message.OldEntry, message.NewEntry) return nil } @@ -35,7 +36,7 @@ func (s3a *S3ApiServer) subscribeMetaEvents(clientName string, prefix string, la var clientEpoch int32 util.RetryForever("followIamChanges", func() error { clientEpoch++ - return pb.WithFilerClientFollowMetadata(s3a, clientName, s3a.randomClientId, clientEpoch, prefix, nil, &lastTsNs, 0, 0, processEventFn, pb.FatalOnError) + return pb.WithFilerClientFollowMetadata(s3a, clientName, s3a.randomClientId, clientEpoch, prefix, directoriesToWatch, &lastTsNs, 0, 0, processEventFn, pb.FatalOnError) }, func(err error) bool { glog.V(0).Infof("iam follow metadata changes: %v", err) return true @@ -63,3 +64,17 @@ func (s3a *S3ApiServer) onCircuitBreakerConfigUpdate(dir, filename string, conte } return nil } + +//reload bucket metadata +func (s3a *S3ApiServer) onBucketMetadataChange(dir string, oldEntry *filer_pb.Entry, newEntry *filer_pb.Entry) error { + if dir == s3a.option.BucketsPath { + if newEntry != nil { + s3a.bucketRegistry.LoadBucketMetadata(newEntry) + glog.V(0).Infof("updated bucketMetadata %s/%s", dir, newEntry) + } else { + s3a.bucketRegistry.RemoveBucketMetadata(oldEntry) + glog.V(0).Infof("remove bucketMetadata %s/%s", dir, newEntry) + } + } + return nil +} diff --git a/weed/s3api/bucket_metadata.go b/weed/s3api/bucket_metadata.go new file mode 100644 index 000000000..1b9b09981 --- /dev/null +++ b/weed/s3api/bucket_metadata.go @@ -0,0 +1,213 @@ +package s3api + +import ( + "bytes" + "encoding/json" + "github.com/aws/aws-sdk-go/private/protocol/json/jsonutil" + "github.com/aws/aws-sdk-go/service/s3" + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants" + + //"github.com/seaweedfs/seaweedfs/weed/s3api" + "github.com/seaweedfs/seaweedfs/weed/s3api/s3err" + "github.com/seaweedfs/seaweedfs/weed/util" + "math" + "sync" +) + +var loadBucketMetadataFromFiler = func(r *BucketRegistry, bucketName string) (*BucketMetaData, error) { + entry, err := filer_pb.GetEntry(r.s3a, util.NewFullPath(r.s3a.option.BucketsPath, bucketName)) + if err != nil { + return nil, err + } + + return buildBucketMetadata(entry), nil +} + +type BucketMetaData struct { + _ struct{} `type:"structure"` + + Name string + + //By default, when another AWS account uploads an object to S3 bucket, + //that account (the object writer) owns the object, has access to it, and + //can grant other users access to it through ACLs. You can use Object Ownership + //to change this default behavior so that ACLs are disabled and you, as the + //bucket owner, automatically own every object in your bucket. + ObjectOwnership string + + // Container for the bucket owner's display name and ID. + Owner *s3.Owner `type:"structure"` + + // A list of grants for access controls. + Acl []*s3.Grant `locationName:"AccessControlList" locationNameList:"Grant" type:"list"` +} + +type BucketRegistry struct { + metadataCache map[string]*BucketMetaData + metadataCacheLock sync.RWMutex + + notFound map[string]struct{} + notFoundLock sync.RWMutex + s3a *S3ApiServer +} + +func NewBucketRegistry(s3a *S3ApiServer) *BucketRegistry { + br := &BucketRegistry{ + metadataCache: make(map[string]*BucketMetaData), + notFound: make(map[string]struct{}), + s3a: s3a, + } + err := br.init() + if err != nil { + glog.Fatal("init bucket registry failed", err) + return nil + } + return br +} + +func (r *BucketRegistry) init() error { + err := filer_pb.List(r.s3a, r.s3a.option.BucketsPath, "", func(entry *filer_pb.Entry, isLast bool) error { + r.LoadBucketMetadata(entry) + return nil + }, "", false, math.MaxUint32) + return err +} + +func (r *BucketRegistry) LoadBucketMetadata(entry *filer_pb.Entry) { + bucketMetadata := buildBucketMetadata(entry) + r.metadataCacheLock.Lock() + defer r.metadataCacheLock.Unlock() + r.metadataCache[entry.Name] = bucketMetadata +} + +func buildBucketMetadata(entry *filer_pb.Entry) *BucketMetaData { + entryJson, _ := json.Marshal(entry) + glog.V(3).Infof("build bucket metadata,entry=%s", entryJson) + bucketMetadata := &BucketMetaData{ + Name: entry.Name, + + //Default ownership: OwnershipBucketOwnerEnforced, which means Acl is disabled + ObjectOwnership: s3_constants.OwnershipBucketOwnerEnforced, + + // Default owner: `AccountAdmin` + Owner: &s3.Owner{ + ID: &AccountAdmin.Id, + DisplayName: &AccountAdmin.Name, + }, + } + if entry.Extended != nil { + //ownership control + ownership, ok := entry.Extended[s3_constants.ExtOwnershipKey] + if ok { + ownership := string(ownership) + valid := s3_constants.ValidateOwnership(ownership) + if valid { + bucketMetadata.ObjectOwnership = ownership + } else { + glog.Warningf("Invalid ownership: %s, bucket: %s", ownership, bucketMetadata.Name) + } + } + + //access control policy + acpBytes, ok := entry.Extended[s3_constants.ExtAcpKey] + if ok { + var acp s3.AccessControlPolicy + err := jsonutil.UnmarshalJSON(&acp, bytes.NewReader(acpBytes)) + if err == nil { + //validate owner + if acp.Owner != nil && acp.Owner.ID != nil { + bucketMetadata.Owner = acp.Owner + } else { + glog.Warningf("bucket ownerId is empty! bucket: %s", bucketMetadata.Name) + } + + //acl + bucketMetadata.Acl = acp.Grants + } else { + glog.Warningf("Unmarshal ACP: %s(%v), bucket: %s", string(acpBytes), err, bucketMetadata.Name) + } + } + } + return bucketMetadata +} + +func (r *BucketRegistry) RemoveBucketMetadata(entry *filer_pb.Entry) { + r.removeMetadataCache(entry.Name) + r.unMarkNotFound(entry.Name) +} + +func (r *BucketRegistry) GetBucketMetadata(bucketName string) (*BucketMetaData, s3err.ErrorCode) { + r.metadataCacheLock.RLock() + bucketMetadata, ok := r.metadataCache[bucketName] + r.metadataCacheLock.RUnlock() + if ok { + return bucketMetadata, s3err.ErrNone + } + + r.notFoundLock.RLock() + _, ok = r.notFound[bucketName] + r.notFoundLock.RUnlock() + if ok { + return nil, s3err.ErrNoSuchBucket + } + + bucketMetadata, errCode := r.LoadBucketMetadataFromFiler(bucketName) + if errCode != s3err.ErrNone { + return nil, errCode + } + + r.setMetadataCache(bucketMetadata) + r.unMarkNotFound(bucketName) + return bucketMetadata, s3err.ErrNone +} + +func (r *BucketRegistry) LoadBucketMetadataFromFiler(bucketName string) (*BucketMetaData, s3err.ErrorCode) { + r.notFoundLock.Lock() + defer r.notFoundLock.Unlock() + + //check if already exists + r.metadataCacheLock.RLock() + bucketMetaData, ok := r.metadataCache[bucketName] + r.metadataCacheLock.RUnlock() + if ok { + return bucketMetaData, s3err.ErrNone + } + + //if not exists, load from filer + bucketMetadata, err := loadBucketMetadataFromFiler(r, bucketName) + if err != nil { + if err == filer_pb.ErrNotFound { + // The bucket doesn't actually exist and should no longer loaded from the filer + r.notFound[bucketName] = struct{}{} + return nil, s3err.ErrNoSuchBucket + } + return nil, s3err.ErrInternalError + } + return bucketMetadata, s3err.ErrNone +} + +func (r *BucketRegistry) setMetadataCache(metadata *BucketMetaData) { + r.metadataCacheLock.Lock() + defer r.metadataCacheLock.Unlock() + r.metadataCache[metadata.Name] = metadata +} + +func (r *BucketRegistry) removeMetadataCache(bucket string) { + r.metadataCacheLock.Lock() + defer r.metadataCacheLock.Unlock() + delete(r.metadataCache, bucket) +} + +func (r *BucketRegistry) markNotFound(bucket string) { + r.notFoundLock.Lock() + defer r.notFoundLock.Unlock() + r.notFound[bucket] = struct{}{} +} + +func (r *BucketRegistry) unMarkNotFound(bucket string) { + r.notFoundLock.Lock() + defer r.notFoundLock.Unlock() + delete(r.notFound, bucket) +} diff --git a/weed/s3api/bucket_metadata_test.go b/weed/s3api/bucket_metadata_test.go new file mode 100644 index 000000000..f3c3610cc --- /dev/null +++ b/weed/s3api/bucket_metadata_test.go @@ -0,0 +1,236 @@ +package s3api + +import ( + "fmt" + "github.com/aws/aws-sdk-go/private/protocol/json/jsonutil" + "github.com/aws/aws-sdk-go/service/s3" + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants" + "github.com/seaweedfs/seaweedfs/weed/s3api/s3err" + "reflect" + "sync" + "testing" + "time" +) + +type BucketMetadataTestCase struct { + filerEntry *filer_pb.Entry + expectBucketMetadata *BucketMetaData +} + +var ( + //bad entry + badEntry = &filer_pb.Entry{ + Name: "badEntry", + } + + //good entry + goodEntryAcp, _ = jsonutil.BuildJSON(&s3.AccessControlPolicy{ + Owner: &s3.Owner{ + DisplayName: &AccountAdmin.Name, + ID: &AccountAdmin.Id, + }, + Grants: s3_constants.PublicRead, + }) + goodEntry = &filer_pb.Entry{ + Name: "entryWithValidAcp", + Extended: map[string][]byte{ + s3_constants.ExtOwnershipKey: []byte(s3_constants.OwnershipBucketOwnerEnforced), + s3_constants.ExtAcpKey: goodEntryAcp, + }, + } + + //ownership is "" + ownershipEmptyStr = &filer_pb.Entry{ + Name: "ownershipEmptyStr", + Extended: map[string][]byte{ + s3_constants.ExtOwnershipKey: []byte(""), + }, + } + + //ownership valid + ownershipValid = &filer_pb.Entry{ + Name: "ownershipValid", + Extended: map[string][]byte{ + s3_constants.ExtOwnershipKey: []byte(s3_constants.OwnershipBucketOwnerEnforced), + }, + } + + //acp is "" + acpEmptyStr = &filer_pb.Entry{ + Name: "acpEmptyStr", + Extended: map[string][]byte{ + s3_constants.ExtAcpKey: []byte(""), + }, + } + + //acp is empty object + acpEmptyObjectAcp, _ = jsonutil.BuildJSON(&s3.AccessControlPolicy{ + Owner: nil, + Grants: nil, + }) + acpEmptyObject = &filer_pb.Entry{ + Name: "acpEmptyObject", + Extended: map[string][]byte{ + s3_constants.ExtAcpKey: acpEmptyObjectAcp, + }, + } + + //acp owner is nil + acpOwnerNilAcp, _ = jsonutil.BuildJSON(&s3.AccessControlPolicy{ + Owner: nil, + Grants: make([]*s3.Grant, 1), + }) + acpOwnerNil = &filer_pb.Entry{ + Name: "acpOwnerNil", + Extended: map[string][]byte{ + s3_constants.ExtAcpKey: acpOwnerNilAcp, + }, + } + + //load filer is + loadFilerBucket = make(map[string]int, 1) + //override `loadBucketMetadataFromFiler` to avoid really load from filer +) + +var tcs = []*BucketMetadataTestCase{ + { + badEntry, &BucketMetaData{ + Name: badEntry.Name, + ObjectOwnership: s3_constants.DefaultOwnershipForExists, + Owner: &s3.Owner{ + DisplayName: &AccountAdmin.Name, + ID: &AccountAdmin.Id, + }, + Acl: nil, + }, + }, + { + goodEntry, &BucketMetaData{ + Name: goodEntry.Name, + ObjectOwnership: s3_constants.OwnershipBucketOwnerEnforced, + Owner: &s3.Owner{ + DisplayName: &AccountAdmin.Name, + ID: &AccountAdmin.Id, + }, + Acl: s3_constants.PublicRead, + }, + }, + { + ownershipEmptyStr, &BucketMetaData{ + Name: ownershipEmptyStr.Name, + ObjectOwnership: s3_constants.DefaultOwnershipForExists, + Owner: &s3.Owner{ + DisplayName: &AccountAdmin.Name, + ID: &AccountAdmin.Id, + }, + Acl: nil, + }, + }, + { + ownershipValid, &BucketMetaData{ + Name: ownershipValid.Name, + ObjectOwnership: s3_constants.OwnershipBucketOwnerEnforced, + Owner: &s3.Owner{ + DisplayName: &AccountAdmin.Name, + ID: &AccountAdmin.Id, + }, + Acl: nil, + }, + }, + { + acpEmptyStr, &BucketMetaData{ + Name: acpEmptyStr.Name, + ObjectOwnership: s3_constants.DefaultOwnershipForExists, + Owner: &s3.Owner{ + DisplayName: &AccountAdmin.Name, + ID: &AccountAdmin.Id, + }, + Acl: nil, + }, + }, + { + acpEmptyObject, &BucketMetaData{ + Name: acpEmptyObject.Name, + ObjectOwnership: s3_constants.DefaultOwnershipForExists, + Owner: &s3.Owner{ + DisplayName: &AccountAdmin.Name, + ID: &AccountAdmin.Id, + }, + Acl: nil, + }, + }, + { + acpOwnerNil, &BucketMetaData{ + Name: acpOwnerNil.Name, + ObjectOwnership: s3_constants.DefaultOwnershipForExists, + Owner: &s3.Owner{ + DisplayName: &AccountAdmin.Name, + ID: &AccountAdmin.Id, + }, + Acl: make([]*s3.Grant, 0), + }, + }, +} + +func TestBuildBucketMetadata(t *testing.T) { + for _, tc := range tcs { + resultBucketMetadata := buildBucketMetadata(tc.filerEntry) + if !reflect.DeepEqual(resultBucketMetadata, tc.expectBucketMetadata) { + t.Fatalf("result is unexpect: \nresult: %v, \nexpect: %v", resultBucketMetadata, tc.expectBucketMetadata) + } + } +} + +func TestGetBucketMetadata(t *testing.T) { + loadBucketMetadataFromFiler = func(r *BucketRegistry, bucketName string) (*BucketMetaData, error) { + time.Sleep(time.Second) + loadFilerBucket[bucketName] = loadFilerBucket[bucketName] + 1 + return &BucketMetaData{ + Name: bucketName, + }, nil + } + + br := &BucketRegistry{ + metadataCache: make(map[string]*BucketMetaData), + notFound: make(map[string]struct{}), + s3a: nil, + } + + //start 40 goroutine for + var wg sync.WaitGroup + closeCh := make(chan struct{}) + for i := 0; i < 40; i++ { + wg.Add(1) + go func() { + defer wg.Done() + outLoop: + for { + for j := 0; j < 5; j++ { + select { + case <-closeCh: + break outLoop + default: + reqBucket := fmt.Sprintf("%c", 67+j) + _, errCode := br.GetBucketMetadata(reqBucket) + if errCode != s3err.ErrNone { + close(closeCh) + t.Error("not expect") + } + } + } + time.Sleep(10 * time.Microsecond) + } + }() + } + time.Sleep(time.Second) + close(closeCh) + wg.Wait() + + //Each bucket is loaded from the filer only once + for bucketName, loadCount := range loadFilerBucket { + if loadCount != 1 { + t.Fatalf("lock is uneffict: %s, %d", bucketName, loadCount) + } + } +} diff --git a/weed/s3api/s3_constants/acp_canned_acl.go b/weed/s3api/s3_constants/acp_canned_acl.go new file mode 100644 index 000000000..eab497872 --- /dev/null +++ b/weed/s3api/s3_constants/acp_canned_acl.go @@ -0,0 +1,65 @@ +package s3_constants + +import ( + "github.com/aws/aws-sdk-go/service/s3" +) + +const ( + CannedAclPrivate = "private" + CannedAclPublicRead = "public-read" + CannedAclPublicReadWrite = "public-read-write" + CannedAclAuthenticatedRead = "authenticated-read" + CannedAclLogDeliveryWrite = "log-delivery-write" + CannedAclBucketOwnerRead = "bucket-owner-read" + CannedAclBucketOwnerFullControl = "bucket-owner-full-control" + CannedAclAwsExecRead = "aws-exec-read" +) + +var ( + PublicRead = []*s3.Grant{ + { + Grantee: &s3.Grantee{ + Type: &GrantTypeGroup, + URI: &GranteeGroupAllUsers, + }, + Permission: &PermissionRead, + }, + } + + PublicReadWrite = []*s3.Grant{ + { + Grantee: &s3.Grantee{ + Type: &GrantTypeGroup, + URI: &GranteeGroupAllUsers, + }, + Permission: &PermissionRead, + }, + { + Grantee: &s3.Grantee{ + Type: &GrantTypeGroup, + URI: &GranteeGroupAllUsers, + }, + Permission: &PermissionWrite, + }, + } + + AuthenticatedRead = []*s3.Grant{ + { + Grantee: &s3.Grantee{ + Type: &GrantTypeGroup, + URI: &GranteeGroupAuthenticatedUsers, + }, + Permission: &PermissionRead, + }, + } + + LogDeliveryWrite = []*s3.Grant{ + { + Grantee: &s3.Grantee{ + Type: &GrantTypeGroup, + URI: &GranteeGroupLogDelivery, + }, + Permission: &PermissionWrite, + }, + } +) diff --git a/weed/s3api/s3_constants/acp_grantee_group.go b/weed/s3api/s3_constants/acp_grantee_group.go new file mode 100644 index 000000000..a315fb0f7 --- /dev/null +++ b/weed/s3api/s3_constants/acp_grantee_group.go @@ -0,0 +1,8 @@ +package s3_constants + +//Amazon S3 predefined groups +var ( + GranteeGroupAllUsers = "http://acs.amazonaws.com/groups/global/AllUsers" + GranteeGroupAuthenticatedUsers = "http://acs.amazonaws.com/groups/global/AuthenticatedUsers" + GranteeGroupLogDelivery = "http://acs.amazonaws.com/groups/s3/LogDelivery" +) diff --git a/weed/s3api/s3_constants/acp_grantee_type.go b/weed/s3api/s3_constants/acp_grantee_type.go new file mode 100644 index 000000000..7a4dfaf16 --- /dev/null +++ b/weed/s3api/s3_constants/acp_grantee_type.go @@ -0,0 +1,7 @@ +package s3_constants + +var ( + GrantTypeCanonicalUser = "CanonicalUser" + GrantTypeAmazonCustomerByEmail = "AmazonCustomerByEmail" + GrantTypeGroup = "Group" +) diff --git a/weed/s3api/s3_constants/acp_ownership.go b/weed/s3api/s3_constants/acp_ownership.go new file mode 100644 index 000000000..e11e95935 --- /dev/null +++ b/weed/s3api/s3_constants/acp_ownership.go @@ -0,0 +1,18 @@ +package s3_constants + +var ( + OwnershipBucketOwnerPreferred = "BucketOwnerPreferred" + OwnershipObjectWriter = "ObjectWriter" + OwnershipBucketOwnerEnforced = "BucketOwnerEnforced" + + DefaultOwnershipForCreate = OwnershipObjectWriter + DefaultOwnershipForExists = OwnershipBucketOwnerEnforced +) + +func ValidateOwnership(ownership string) bool { + if ownership == "" || (ownership != OwnershipBucketOwnerPreferred && ownership != OwnershipObjectWriter && ownership != OwnershipBucketOwnerEnforced) { + return false + } else { + return true + } +} diff --git a/weed/s3api/s3_constants/acp_permisson.go b/weed/s3api/s3_constants/acp_permisson.go new file mode 100644 index 000000000..4b875ff49 --- /dev/null +++ b/weed/s3api/s3_constants/acp_permisson.go @@ -0,0 +1,9 @@ +package s3_constants + +var ( + PermissionFullControl = "FULL_CONTROL" + PermissionRead = "READ" + PermissionWrite = "WRITE" + PermissionReadAcp = "READ_ACP" + PermissionWriteAcp = "WRITE_ACP" +) diff --git a/weed/s3api/s3_constants/extend_key.go b/weed/s3api/s3_constants/extend_key.go new file mode 100644 index 000000000..10b69979e --- /dev/null +++ b/weed/s3api/s3_constants/extend_key.go @@ -0,0 +1,6 @@ +package s3_constants + +const ( + ExtAcpKey = "Seaweed-X-Amz-Acp" + ExtOwnershipKey = "Seaweed-X-Amz-Ownership" +) diff --git a/weed/s3api/s3api_server.go b/weed/s3api/s3api_server.go index 7ed5d4e87..e94611d6a 100644 --- a/weed/s3api/s3api_server.go +++ b/weed/s3api/s3api_server.go @@ -41,6 +41,7 @@ type S3ApiServer struct { filerGuard *security.Guard client *http.Client accountManager *AccountManager + bucketRegistry *BucketRegistry } func NewS3ApiServer(router *mux.Router, option *S3ApiServerOption) (s3ApiServer *S3ApiServer, err error) { @@ -61,6 +62,7 @@ func NewS3ApiServer(router *mux.Router, option *S3ApiServerOption) (s3ApiServer cb: NewCircuitBreaker(option), } s3ApiServer.accountManager = NewAccountManager(s3ApiServer) + s3ApiServer.bucketRegistry = NewBucketRegistry(s3ApiServer) if option.LocalFilerSocket == "" { s3ApiServer.client = &http.Client{Transport: &http.Transport{ MaxIdleConns: 1024, @@ -78,7 +80,7 @@ func NewS3ApiServer(router *mux.Router, option *S3ApiServerOption) (s3ApiServer s3ApiServer.registerRouter(router) - go s3ApiServer.subscribeMetaEvents("s3", filer.DirectoryEtcRoot, time.Now().UnixNano()) + go s3ApiServer.subscribeMetaEvents("s3", time.Now().UnixNano(), filer.DirectoryEtcRoot, []string{option.BucketsPath}) return s3ApiServer, nil } diff --git a/weed/server/filer_server_handlers_write_autochunk.go b/weed/server/filer_server_handlers_write_autochunk.go index 89941b340..96e5018da 100644 --- a/weed/server/filer_server_handlers_write_autochunk.go +++ b/weed/server/filer_server_handlers_write_autochunk.go @@ -3,6 +3,7 @@ package weed_server import ( "context" "fmt" + //"github.com/seaweedfs/seaweedfs/weed/s3api" "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants" "io" "net/http" @@ -374,6 +375,12 @@ func SaveAmzMetaData(r *http.Request, existing map[string][]byte, isReplace bool } } + //acp + acp := r.Header.Get(s3_constants.ExtAcpKey) + if len(acp) > 0 { + metadata[s3_constants.ExtAcpKey] = []byte(acp) + } + return }