You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

227 lines
5.4 KiB

  1. package s3api
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "github.com/aws/aws-sdk-go/service/s3"
  6. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  7. "github.com/seaweedfs/seaweedfs/weed/pb/iam_pb"
  8. "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
  9. "github.com/seaweedfs/seaweedfs/weed/s3api/s3err"
  10. "reflect"
  11. "sync"
  12. "testing"
  13. "time"
  14. )
  15. type BucketMetadataTestCase struct {
  16. filerEntry *filer_pb.Entry
  17. expectBucketMetadata *BucketMetaData
  18. }
  19. var (
  20. //bad entry
  21. badEntry = &filer_pb.Entry{
  22. Name: "badEntry",
  23. }
  24. //good entry
  25. goodEntryAcl, _ = json.Marshal(s3_constants.PublicRead)
  26. goodEntry = &filer_pb.Entry{
  27. Name: "entryWithValidAcp",
  28. Extended: map[string][]byte{
  29. s3_constants.ExtOwnershipKey: []byte(s3_constants.OwnershipBucketOwnerEnforced),
  30. s3_constants.ExtAmzOwnerKey: []byte(AccountAdmin.DisplayName),
  31. s3_constants.ExtAmzAclKey: goodEntryAcl,
  32. },
  33. }
  34. //ownership is ""
  35. ownershipEmptyStr = &filer_pb.Entry{
  36. Name: "ownershipEmptyStr",
  37. Extended: map[string][]byte{
  38. s3_constants.ExtOwnershipKey: []byte(""),
  39. },
  40. }
  41. //ownership valid
  42. ownershipValid = &filer_pb.Entry{
  43. Name: "ownershipValid",
  44. Extended: map[string][]byte{
  45. s3_constants.ExtOwnershipKey: []byte(s3_constants.OwnershipBucketOwnerEnforced),
  46. },
  47. }
  48. //owner is ""
  49. acpEmptyStr = &filer_pb.Entry{
  50. Name: "acpEmptyStr",
  51. Extended: map[string][]byte{
  52. s3_constants.ExtAmzOwnerKey: []byte(""),
  53. },
  54. }
  55. //owner not exists
  56. acpEmptyObject = &filer_pb.Entry{
  57. Name: "acpEmptyObject",
  58. Extended: map[string][]byte{
  59. s3_constants.ExtAmzOwnerKey: []byte("xxxxx"),
  60. },
  61. }
  62. //grants is nil
  63. acpOwnerNilAcp, _ = json.Marshal(make([]*s3.Grant, 0))
  64. acpOwnerNil = &filer_pb.Entry{
  65. Name: "acpOwnerNil",
  66. Extended: map[string][]byte{
  67. s3_constants.ExtAmzAclKey: acpOwnerNilAcp,
  68. },
  69. }
  70. //load filer is
  71. loadFilerBucket = make(map[string]int, 1)
  72. //override `loadBucketMetadataFromFiler` to avoid really load from filer
  73. )
  74. var tcs = []*BucketMetadataTestCase{
  75. {
  76. badEntry, &BucketMetaData{
  77. Name: badEntry.Name,
  78. ObjectOwnership: s3_constants.DefaultOwnershipForExists,
  79. Owner: &s3.Owner{
  80. DisplayName: &AccountAdmin.DisplayName,
  81. ID: &AccountAdmin.Id,
  82. },
  83. Acl: nil,
  84. },
  85. },
  86. {
  87. goodEntry, &BucketMetaData{
  88. Name: goodEntry.Name,
  89. ObjectOwnership: s3_constants.OwnershipBucketOwnerEnforced,
  90. Owner: &s3.Owner{
  91. DisplayName: &AccountAdmin.DisplayName,
  92. ID: &AccountAdmin.Id,
  93. },
  94. Acl: s3_constants.PublicRead,
  95. },
  96. },
  97. {
  98. ownershipEmptyStr, &BucketMetaData{
  99. Name: ownershipEmptyStr.Name,
  100. ObjectOwnership: s3_constants.DefaultOwnershipForExists,
  101. Owner: &s3.Owner{
  102. DisplayName: &AccountAdmin.DisplayName,
  103. ID: &AccountAdmin.Id,
  104. },
  105. Acl: nil,
  106. },
  107. },
  108. {
  109. ownershipValid, &BucketMetaData{
  110. Name: ownershipValid.Name,
  111. ObjectOwnership: s3_constants.OwnershipBucketOwnerEnforced,
  112. Owner: &s3.Owner{
  113. DisplayName: &AccountAdmin.DisplayName,
  114. ID: &AccountAdmin.Id,
  115. },
  116. Acl: nil,
  117. },
  118. },
  119. {
  120. acpEmptyStr, &BucketMetaData{
  121. Name: acpEmptyStr.Name,
  122. ObjectOwnership: s3_constants.DefaultOwnershipForExists,
  123. Owner: &s3.Owner{
  124. DisplayName: &AccountAdmin.DisplayName,
  125. ID: &AccountAdmin.Id,
  126. },
  127. Acl: nil,
  128. },
  129. },
  130. {
  131. acpEmptyObject, &BucketMetaData{
  132. Name: acpEmptyObject.Name,
  133. ObjectOwnership: s3_constants.DefaultOwnershipForExists,
  134. Owner: &s3.Owner{
  135. DisplayName: &AccountAdmin.DisplayName,
  136. ID: &AccountAdmin.Id,
  137. },
  138. Acl: nil,
  139. },
  140. },
  141. {
  142. acpOwnerNil, &BucketMetaData{
  143. Name: acpOwnerNil.Name,
  144. ObjectOwnership: s3_constants.DefaultOwnershipForExists,
  145. Owner: &s3.Owner{
  146. DisplayName: &AccountAdmin.DisplayName,
  147. ID: &AccountAdmin.Id,
  148. },
  149. Acl: make([]*s3.Grant, 0),
  150. },
  151. },
  152. }
  153. func TestBuildBucketMetadata(t *testing.T) {
  154. iam := &IdentityAccessManagement{}
  155. _ = iam.loadS3ApiConfiguration(&iam_pb.S3ApiConfiguration{})
  156. for _, tc := range tcs {
  157. resultBucketMetadata := buildBucketMetadata(iam, tc.filerEntry)
  158. if !reflect.DeepEqual(resultBucketMetadata, tc.expectBucketMetadata) {
  159. t.Fatalf("result is unexpect: \nresult: %v, \nexpect: %v", resultBucketMetadata, tc.expectBucketMetadata)
  160. }
  161. }
  162. }
  163. func TestGetBucketMetadata(t *testing.T) {
  164. loadBucketMetadataFromFiler = func(r *BucketRegistry, bucketName string) (*BucketMetaData, error) {
  165. time.Sleep(time.Second)
  166. loadFilerBucket[bucketName] = loadFilerBucket[bucketName] + 1
  167. return &BucketMetaData{
  168. Name: bucketName,
  169. }, nil
  170. }
  171. br := &BucketRegistry{
  172. metadataCache: make(map[string]*BucketMetaData),
  173. notFound: make(map[string]struct{}),
  174. s3a: nil,
  175. }
  176. //start 40 goroutine for
  177. var wg sync.WaitGroup
  178. closeCh := make(chan struct{})
  179. for i := 0; i < 40; i++ {
  180. wg.Add(1)
  181. go func() {
  182. defer wg.Done()
  183. outLoop:
  184. for {
  185. for j := 0; j < 5; j++ {
  186. select {
  187. case <-closeCh:
  188. break outLoop
  189. default:
  190. reqBucket := fmt.Sprintf("%c", 67+j)
  191. _, errCode := br.GetBucketMetadata(reqBucket)
  192. if errCode != s3err.ErrNone {
  193. close(closeCh)
  194. t.Error("not expect")
  195. }
  196. }
  197. }
  198. time.Sleep(10 * time.Microsecond)
  199. }
  200. }()
  201. }
  202. time.Sleep(time.Second)
  203. close(closeCh)
  204. wg.Wait()
  205. //Each bucket is loaded from the filer only once
  206. for bucketName, loadCount := range loadFilerBucket {
  207. if loadCount != 1 {
  208. t.Fatalf("lock is uneffict: %s, %d", bucketName, loadCount)
  209. }
  210. }
  211. }