You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

238 lines
6.7 KiB

  1. package s3api
  2. import (
  3. "encoding/json"
  4. "github.com/aws/aws-sdk-go/service/s3"
  5. "github.com/seaweedfs/seaweedfs/weed/glog"
  6. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  7. "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
  8. "github.com/seaweedfs/seaweedfs/weed/s3api/s3err"
  9. "github.com/seaweedfs/seaweedfs/weed/util"
  10. "math"
  11. "sync"
  12. )
  13. var loadBucketMetadataFromFiler = func(r *BucketRegistry, bucketName string) (*BucketMetaData, error) {
  14. entry, err := filer_pb.GetEntry(r.s3a, util.NewFullPath(r.s3a.option.BucketsPath, bucketName))
  15. if err != nil {
  16. return nil, err
  17. }
  18. return buildBucketMetadata(r.s3a.iam, entry), nil
  19. }
  20. type BucketMetaData struct {
  21. _ struct{} `type:"structure"`
  22. Name string
  23. //By default, when another AWS account uploads an object to S3 bucket,
  24. //that account (the object writer) owns the object, has access to it, and
  25. //can grant other users access to it through ACLs. You can use Object Ownership
  26. //to change this default behavior so that ACLs are disabled and you, as the
  27. //bucket owner, automatically own every object in your bucket.
  28. ObjectOwnership string
  29. // Container for the bucket owner's display name and ID.
  30. Owner *s3.Owner `type:"structure"`
  31. // A list of grants for access controls.
  32. Acl []*s3.Grant `locationName:"AccessControlList" locationNameList:"Grant" type:"list"`
  33. }
  34. type BucketRegistry struct {
  35. metadataCache map[string]*BucketMetaData
  36. metadataCacheLock sync.RWMutex
  37. notFound map[string]struct{}
  38. notFoundLock sync.RWMutex
  39. s3a *S3ApiServer
  40. }
  41. func NewBucketRegistry(s3a *S3ApiServer) *BucketRegistry {
  42. br := &BucketRegistry{
  43. metadataCache: make(map[string]*BucketMetaData),
  44. notFound: make(map[string]struct{}),
  45. s3a: s3a,
  46. }
  47. err := br.init()
  48. if err != nil {
  49. glog.Fatal("init bucket registry failed", err)
  50. return nil
  51. }
  52. return br
  53. }
  54. func (r *BucketRegistry) init() error {
  55. err := filer_pb.List(r.s3a, r.s3a.option.BucketsPath, "", func(entry *filer_pb.Entry, isLast bool) error {
  56. r.LoadBucketMetadata(entry)
  57. return nil
  58. }, "", false, math.MaxUint32)
  59. return err
  60. }
  61. func (r *BucketRegistry) LoadBucketMetadata(entry *filer_pb.Entry) {
  62. bucketMetadata := buildBucketMetadata(r.s3a.iam, entry)
  63. r.metadataCacheLock.Lock()
  64. defer r.metadataCacheLock.Unlock()
  65. r.metadataCache[entry.Name] = bucketMetadata
  66. }
  67. func buildBucketMetadata(accountManager AccountManager, entry *filer_pb.Entry) *BucketMetaData {
  68. entryJson, _ := json.Marshal(entry)
  69. glog.V(3).Infof("build bucket metadata,entry=%s", entryJson)
  70. bucketMetadata := &BucketMetaData{
  71. Name: entry.Name,
  72. //Default ownership: OwnershipBucketOwnerEnforced, which means Acl is disabled
  73. ObjectOwnership: s3_constants.OwnershipBucketOwnerEnforced,
  74. // Default owner: `AccountAdmin`
  75. Owner: &s3.Owner{
  76. ID: &AccountAdmin.Id,
  77. DisplayName: &AccountAdmin.DisplayName,
  78. },
  79. }
  80. if entry.Extended != nil {
  81. //ownership control
  82. ownership, ok := entry.Extended[s3_constants.ExtOwnershipKey]
  83. if ok {
  84. ownership := string(ownership)
  85. valid := s3_constants.ValidateOwnership(ownership)
  86. if valid {
  87. bucketMetadata.ObjectOwnership = ownership
  88. } else {
  89. glog.Warningf("Invalid ownership: %s, bucket: %s", ownership, bucketMetadata.Name)
  90. }
  91. }
  92. //owner
  93. acpOwnerBytes, ok := entry.Extended[s3_constants.ExtAmzOwnerKey]
  94. if ok && len(acpOwnerBytes) > 0 {
  95. ownerAccountId := string(acpOwnerBytes)
  96. ownerAccountName := accountManager.GetAccountNameById(ownerAccountId)
  97. if ownerAccountName == "" {
  98. glog.Warningf("owner[id=%s] is invalid, bucket: %s", ownerAccountId, bucketMetadata.Name)
  99. } else {
  100. bucketMetadata.Owner = &s3.Owner{
  101. ID: &ownerAccountId,
  102. DisplayName: &ownerAccountName,
  103. }
  104. }
  105. }
  106. //grants
  107. acpGrantsBytes, ok := entry.Extended[s3_constants.ExtAmzAclKey]
  108. if ok {
  109. if len(acpGrantsBytes) > 0 {
  110. var grants []*s3.Grant
  111. err := json.Unmarshal(acpGrantsBytes, &grants)
  112. if err == nil {
  113. bucketMetadata.Acl = grants
  114. } else {
  115. glog.Warningf("Unmarshal ACP grants: %s(%v), bucket: %s", string(acpGrantsBytes), err, bucketMetadata.Name)
  116. }
  117. }
  118. } else {
  119. bucketMetadata.Acl = []*s3.Grant{
  120. {
  121. Grantee: &s3.Grantee{
  122. Type: &s3_constants.GrantTypeCanonicalUser,
  123. ID: bucketMetadata.Owner.ID,
  124. },
  125. Permission: &s3_constants.PermissionFullControl,
  126. },
  127. }
  128. }
  129. }
  130. return bucketMetadata
  131. }
  132. func (r *BucketRegistry) RemoveBucketMetadata(entry *filer_pb.Entry) {
  133. r.removeMetadataCache(entry.Name)
  134. r.unMarkNotFound(entry.Name)
  135. }
  136. func (r *BucketRegistry) GetBucketMetadata(bucketName string) (*BucketMetaData, s3err.ErrorCode) {
  137. bucketMetadata := r.getMetadataCache(bucketName)
  138. if bucketMetadata != nil {
  139. return bucketMetadata, s3err.ErrNone
  140. }
  141. if r.isNotFound(bucketName) {
  142. return nil, s3err.ErrNoSuchBucket
  143. }
  144. bucketMetadata, errCode := r.LoadBucketMetadataFromFiler(bucketName)
  145. if errCode != s3err.ErrNone {
  146. return nil, errCode
  147. }
  148. r.setMetadataCache(bucketMetadata)
  149. r.unMarkNotFound(bucketName)
  150. return bucketMetadata, s3err.ErrNone
  151. }
  152. func (r *BucketRegistry) LoadBucketMetadataFromFiler(bucketName string) (*BucketMetaData, s3err.ErrorCode) {
  153. r.notFoundLock.Lock()
  154. defer r.notFoundLock.Unlock()
  155. //check if already exists
  156. bucketMetaData := r.getMetadataCache(bucketName)
  157. if bucketMetaData != nil {
  158. return bucketMetaData, s3err.ErrNone
  159. }
  160. //if not exists, load from filer
  161. bucketMetadata, err := loadBucketMetadataFromFiler(r, bucketName)
  162. if err != nil {
  163. if err == filer_pb.ErrNotFound {
  164. // The bucket doesn't actually exist and should no longer loaded from the filer
  165. glog.V(4).Info("bucket not found in filer: ", bucketName)
  166. r.notFound[bucketName] = struct{}{}
  167. return nil, s3err.ErrNoSuchBucket
  168. }
  169. return nil, s3err.ErrInternalError
  170. }
  171. return bucketMetadata, s3err.ErrNone
  172. }
  173. func (r *BucketRegistry) getMetadataCache(bucket string) *BucketMetaData {
  174. r.metadataCacheLock.RLock()
  175. defer r.metadataCacheLock.RUnlock()
  176. if cache, ok := r.metadataCache[bucket]; ok {
  177. return cache
  178. }
  179. return nil
  180. }
  181. func (r *BucketRegistry) setMetadataCache(metadata *BucketMetaData) {
  182. r.metadataCacheLock.Lock()
  183. defer r.metadataCacheLock.Unlock()
  184. r.metadataCache[metadata.Name] = metadata
  185. }
  186. func (r *BucketRegistry) removeMetadataCache(bucket string) {
  187. r.metadataCacheLock.Lock()
  188. defer r.metadataCacheLock.Unlock()
  189. delete(r.metadataCache, bucket)
  190. }
  191. func (r *BucketRegistry) isNotFound(bucket string) bool {
  192. r.notFoundLock.RLock()
  193. defer r.notFoundLock.RUnlock()
  194. _, ok := r.notFound[bucket]
  195. return ok
  196. }
  197. func (r *BucketRegistry) unMarkNotFound(bucket string) {
  198. r.notFoundLock.Lock()
  199. defer r.notFoundLock.Unlock()
  200. delete(r.notFound, bucket)
  201. }
  202. func (r *BucketRegistry) ClearCache(bucket string) {
  203. r.removeMetadataCache(bucket)
  204. r.unMarkNotFound(bucket)
  205. }