You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

97 lines
2.6 KiB

  1. package s3api
  2. import (
  3. "github.com/chrislusf/seaweedfs/weed/pb/s3_pb"
  4. "github.com/chrislusf/seaweedfs/weed/s3api/s3_constants"
  5. "github.com/chrislusf/seaweedfs/weed/s3api/s3err"
  6. "net/http"
  7. "sync"
  8. "sync/atomic"
  9. "testing"
  10. )
  11. type TestLimitCase struct {
  12. actionName string
  13. limitType string
  14. bucketLimitValue int64
  15. globalLimitValue int64
  16. routineCount int
  17. reqBytes int64
  18. successCount int64
  19. }
  20. var (
  21. bucket = "/test"
  22. action = s3_constants.ACTION_READ
  23. TestLimitCases = []*TestLimitCase{
  24. {action, s3_constants.LimitTypeCount, 5, 5, 6, 1024, 5},
  25. {action, s3_constants.LimitTypeCount, 6, 6, 6, 1024, 6},
  26. {action, s3_constants.LimitTypeCount, 5, 6, 6, 1024, 5},
  27. {action, s3_constants.LimitTypeBytes, 1024, 1024, 6, 200, 5},
  28. {action, s3_constants.LimitTypeBytes, 1200, 1200, 6, 200, 6},
  29. {action, s3_constants.LimitTypeBytes, 11990, 11990, 60, 200, 59},
  30. {action, s3_constants.LimitTypeBytes, 11790, 11990, 70, 200, 58},
  31. }
  32. )
  33. func TestLimit(t *testing.T) {
  34. for _, tc := range TestLimitCases {
  35. circuitBreakerConfig := &s3_pb.S3CircuitBreakerConfig{
  36. Global: &s3_pb.S3CircuitBreakerOptions{
  37. Enabled: true,
  38. Actions: map[string]int64{
  39. s3_constants.Concat(tc.actionName, tc.limitType): tc.globalLimitValue,
  40. s3_constants.Concat(tc.actionName, tc.limitType): tc.globalLimitValue,
  41. },
  42. },
  43. Buckets: map[string]*s3_pb.S3CircuitBreakerOptions{
  44. bucket: {
  45. Enabled: true,
  46. Actions: map[string]int64{
  47. s3_constants.Concat(tc.actionName, tc.limitType): tc.bucketLimitValue,
  48. },
  49. },
  50. },
  51. }
  52. circuitBreaker := &CircuitBreaker{
  53. counters: make(map[string]*int64),
  54. limitations: make(map[string]int64),
  55. }
  56. err := circuitBreaker.loadCircuitBreakerConfig(circuitBreakerConfig)
  57. if err != nil {
  58. t.Fatal(err)
  59. }
  60. successCount := doLimit(circuitBreaker, tc.routineCount, &http.Request{ContentLength: tc.reqBytes})
  61. if successCount != tc.successCount {
  62. t.Errorf("successCount not equal, expect=%d, actual=%d", tc.successCount, successCount)
  63. }
  64. }
  65. }
  66. func doLimit(circuitBreaker *CircuitBreaker, routineCount int, r *http.Request) int64 {
  67. var successCounter int64
  68. resultCh := make(chan []func(), routineCount)
  69. var wg sync.WaitGroup
  70. for i := 0; i < routineCount; i++ {
  71. wg.Add(1)
  72. go func() {
  73. defer wg.Done()
  74. rollbackFn, errCode := circuitBreaker.limit(r, bucket, action)
  75. if errCode == s3err.ErrNone {
  76. atomic.AddInt64(&successCounter, 1)
  77. }
  78. resultCh <- rollbackFn
  79. }()
  80. }
  81. wg.Wait()
  82. close(resultCh)
  83. for fns := range resultCh {
  84. for _, fn := range fns {
  85. fn()
  86. }
  87. }
  88. return successCounter
  89. }