216 lines
6.2 KiB

  1. package s3api
  2. import (
  3. "encoding/json"
  4. "github.com/aws/aws-sdk-go/service/s3"
  5. "github.com/seaweedfs/seaweedfs/weed/glog"
  6. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  7. "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
  8. "github.com/seaweedfs/seaweedfs/weed/s3api/s3err"
  9. "github.com/seaweedfs/seaweedfs/weed/util"
  10. "math"
  11. "sync"
  12. )
  13. var loadBucketMetadataFromFiler = func(r *BucketRegistry, bucketName string) (*BucketMetaData, error) {
  14. entry, err := filer_pb.GetEntry(r.s3a, util.NewFullPath(r.s3a.option.BucketsPath, bucketName))
  15. if err != nil {
  16. return nil, err
  17. }
  18. return buildBucketMetadata(r.s3a.iam, entry), nil
  19. }
  20. type BucketMetaData struct {
  21. _ struct{} `type:"structure"`
  22. Name string
  23. //By default, when another AWS account uploads an object to S3 bucket,
  24. //that account (the object writer) owns the object, has access to it, and
  25. //can grant other users access to it through ACLs. You can use Object Ownership
  26. //to change this default behavior so that ACLs are disabled and you, as the
  27. //bucket owner, automatically own every object in your bucket.
  28. ObjectOwnership string
  29. // Container for the bucket owner's display name and ID.
  30. Owner *s3.Owner `type:"structure"`
  31. // A list of grants for access controls.
  32. Acl []*s3.Grant `locationName:"AccessControlList" locationNameList:"Grant" type:"list"`
  33. }
  34. type BucketRegistry struct {
  35. metadataCache map[string]*BucketMetaData
  36. metadataCacheLock sync.RWMutex
  37. notFound map[string]struct{}
  38. notFoundLock sync.RWMutex
  39. s3a *S3ApiServer
  40. }
  41. func NewBucketRegistry(s3a *S3ApiServer) *BucketRegistry {
  42. br := &BucketRegistry{
  43. metadataCache: make(map[string]*BucketMetaData),
  44. notFound: make(map[string]struct{}),
  45. s3a: s3a,
  46. }
  47. err := br.init()
  48. if err != nil {
  49. glog.Fatal("init bucket registry failed", err)
  50. return nil
  51. }
  52. return br
  53. }
  54. func (r *BucketRegistry) init() error {
  55. err := filer_pb.List(r.s3a, r.s3a.option.BucketsPath, "", func(entry *filer_pb.Entry, isLast bool) error {
  56. r.LoadBucketMetadata(entry)
  57. return nil
  58. }, "", false, math.MaxUint32)
  59. return err
  60. }
  61. func (r *BucketRegistry) LoadBucketMetadata(entry *filer_pb.Entry) {
  62. bucketMetadata := buildBucketMetadata(r.s3a.iam, entry)
  63. r.metadataCacheLock.Lock()
  64. defer r.metadataCacheLock.Unlock()
  65. r.metadataCache[entry.Name] = bucketMetadata
  66. }
  67. func buildBucketMetadata(accountManager AccountManager, entry *filer_pb.Entry) *BucketMetaData {
  68. entryJson, _ := json.Marshal(entry)
  69. glog.V(3).Infof("build bucket metadata,entry=%s", entryJson)
  70. bucketMetadata := &BucketMetaData{
  71. Name: entry.Name,
  72. //Default ownership: OwnershipBucketOwnerEnforced, which means Acl is disabled
  73. ObjectOwnership: s3_constants.OwnershipBucketOwnerEnforced,
  74. // Default owner: `AccountAdmin`
  75. Owner: &s3.Owner{
  76. ID: &AccountAdmin.Id,
  77. DisplayName: &AccountAdmin.DisplayName,
  78. },
  79. }
  80. if entry.Extended != nil {
  81. //ownership control
  82. ownership, ok := entry.Extended[s3_constants.ExtOwnershipKey]
  83. if ok {
  84. ownership := string(ownership)
  85. valid := s3_constants.ValidateOwnership(ownership)
  86. if valid {
  87. bucketMetadata.ObjectOwnership = ownership
  88. } else {
  89. glog.Warningf("Invalid ownership: %s, bucket: %s", ownership, bucketMetadata.Name)
  90. }
  91. }
  92. //access control policy
  93. //owner
  94. acpOwnerBytes, ok := entry.Extended[s3_constants.ExtAmzOwnerKey]
  95. if ok && len(acpOwnerBytes) > 0 {
  96. ownerAccountId := string(acpOwnerBytes)
  97. ownerAccountName := accountManager.GetAccountNameById(ownerAccountId)
  98. if ownerAccountName == "" {
  99. glog.Warningf("owner[id=%s] is invalid, bucket: %s", ownerAccountId, bucketMetadata.Name)
  100. } else {
  101. bucketMetadata.Owner = &s3.Owner{
  102. ID: &ownerAccountId,
  103. DisplayName: &ownerAccountName,
  104. }
  105. }
  106. }
  107. //grants
  108. acpGrantsBytes, ok := entry.Extended[s3_constants.ExtAmzAclKey]
  109. if ok && len(acpGrantsBytes) > 0 {
  110. var grants []*s3.Grant
  111. err := json.Unmarshal(acpGrantsBytes, &grants)
  112. if err == nil {
  113. bucketMetadata.Acl = grants
  114. } else {
  115. glog.Warningf("Unmarshal ACP grants: %s(%v), bucket: %s", string(acpGrantsBytes), err, bucketMetadata.Name)
  116. }
  117. }
  118. }
  119. return bucketMetadata
  120. }
  121. func (r *BucketRegistry) RemoveBucketMetadata(entry *filer_pb.Entry) {
  122. r.removeMetadataCache(entry.Name)
  123. r.unMarkNotFound(entry.Name)
  124. }
  125. func (r *BucketRegistry) GetBucketMetadata(bucketName string) (*BucketMetaData, s3err.ErrorCode) {
  126. r.metadataCacheLock.RLock()
  127. bucketMetadata, ok := r.metadataCache[bucketName]
  128. r.metadataCacheLock.RUnlock()
  129. if ok {
  130. return bucketMetadata, s3err.ErrNone
  131. }
  132. r.notFoundLock.RLock()
  133. _, ok = r.notFound[bucketName]
  134. r.notFoundLock.RUnlock()
  135. if ok {
  136. return nil, s3err.ErrNoSuchBucket
  137. }
  138. bucketMetadata, errCode := r.LoadBucketMetadataFromFiler(bucketName)
  139. if errCode != s3err.ErrNone {
  140. return nil, errCode
  141. }
  142. r.setMetadataCache(bucketMetadata)
  143. r.unMarkNotFound(bucketName)
  144. return bucketMetadata, s3err.ErrNone
  145. }
  146. func (r *BucketRegistry) LoadBucketMetadataFromFiler(bucketName string) (*BucketMetaData, s3err.ErrorCode) {
  147. r.notFoundLock.Lock()
  148. defer r.notFoundLock.Unlock()
  149. //check if already exists
  150. r.metadataCacheLock.RLock()
  151. bucketMetaData, ok := r.metadataCache[bucketName]
  152. r.metadataCacheLock.RUnlock()
  153. if ok {
  154. return bucketMetaData, s3err.ErrNone
  155. }
  156. //if not exists, load from filer
  157. bucketMetadata, err := loadBucketMetadataFromFiler(r, bucketName)
  158. if err != nil {
  159. if err == filer_pb.ErrNotFound {
  160. // The bucket doesn't actually exist and should no longer loaded from the filer
  161. r.notFound[bucketName] = struct{}{}
  162. return nil, s3err.ErrNoSuchBucket
  163. }
  164. return nil, s3err.ErrInternalError
  165. }
  166. return bucketMetadata, s3err.ErrNone
  167. }
  168. func (r *BucketRegistry) setMetadataCache(metadata *BucketMetaData) {
  169. r.metadataCacheLock.Lock()
  170. defer r.metadataCacheLock.Unlock()
  171. r.metadataCache[metadata.Name] = metadata
  172. }
  173. func (r *BucketRegistry) removeMetadataCache(bucket string) {
  174. r.metadataCacheLock.Lock()
  175. defer r.metadataCacheLock.Unlock()
  176. delete(r.metadataCache, bucket)
  177. }
  178. func (r *BucketRegistry) markNotFound(bucket string) {
  179. r.notFoundLock.Lock()
  180. defer r.notFoundLock.Unlock()
  181. r.notFound[bucket] = struct{}{}
  182. }
  183. func (r *BucketRegistry) unMarkNotFound(bucket string) {
  184. r.notFoundLock.Lock()
  185. defer r.notFoundLock.Unlock()
  186. delete(r.notFound, bucket)
  187. }