|
@ -16,7 +16,7 @@ import ( |
|
|
) |
|
|
) |
|
|
|
|
|
|
|
|
type CircuitBreaker struct { |
|
|
type CircuitBreaker struct { |
|
|
sync.Mutex |
|
|
|
|
|
|
|
|
sync.RWMutex |
|
|
Enabled bool |
|
|
Enabled bool |
|
|
counters map[string]*int64 |
|
|
counters map[string]*int64 |
|
|
limitations map[string]int64 |
|
|
limitations map[string]int64 |
|
@ -110,7 +110,7 @@ func (cb *CircuitBreaker) Limit(f func(w http.ResponseWriter, r *http.Request), |
|
|
func (cb *CircuitBreaker) limit(r *http.Request, bucket string, action string) (rollback []func(), errCode s3err.ErrorCode) { |
|
|
func (cb *CircuitBreaker) limit(r *http.Request, bucket string, action string) (rollback []func(), errCode s3err.ErrorCode) { |
|
|
|
|
|
|
|
|
//bucket simultaneous request count
|
|
|
//bucket simultaneous request count
|
|
|
bucketCountRollBack, errCode := cb.loadCounterAndCompare(bucket, action, s3_constants.LimitTypeCount, 1, s3err.ErrTooManyRequest) |
|
|
|
|
|
|
|
|
bucketCountRollBack, errCode := cb.loadCounterAndCompare(s3_constants.Concat(bucket, action, s3_constants.LimitTypeCount), 1, s3err.ErrTooManyRequest) |
|
|
if bucketCountRollBack != nil { |
|
|
if bucketCountRollBack != nil { |
|
|
rollback = append(rollback, bucketCountRollBack) |
|
|
rollback = append(rollback, bucketCountRollBack) |
|
|
} |
|
|
} |
|
@ -119,7 +119,7 @@ func (cb *CircuitBreaker) limit(r *http.Request, bucket string, action string) ( |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
//bucket simultaneous request content bytes
|
|
|
//bucket simultaneous request content bytes
|
|
|
bucketContentLengthRollBack, errCode := cb.loadCounterAndCompare(bucket, action, s3_constants.LimitTypeBytes, r.ContentLength, s3err.ErrRequestBytesExceed) |
|
|
|
|
|
|
|
|
bucketContentLengthRollBack, errCode := cb.loadCounterAndCompare(s3_constants.Concat(bucket, action, s3_constants.LimitTypeBytes), r.ContentLength, s3err.ErrRequestBytesExceed) |
|
|
if bucketContentLengthRollBack != nil { |
|
|
if bucketContentLengthRollBack != nil { |
|
|
rollback = append(rollback, bucketContentLengthRollBack) |
|
|
rollback = append(rollback, bucketContentLengthRollBack) |
|
|
} |
|
|
} |
|
@ -128,7 +128,7 @@ func (cb *CircuitBreaker) limit(r *http.Request, bucket string, action string) ( |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
//global simultaneous request count
|
|
|
//global simultaneous request count
|
|
|
globalCountRollBack, errCode := cb.loadCounterAndCompare("", action, s3_constants.LimitTypeCount, 1, s3err.ErrTooManyRequest) |
|
|
|
|
|
|
|
|
globalCountRollBack, errCode := cb.loadCounterAndCompare(s3_constants.Concat(action, s3_constants.LimitTypeCount), 1, s3err.ErrTooManyRequest) |
|
|
if globalCountRollBack != nil { |
|
|
if globalCountRollBack != nil { |
|
|
rollback = append(rollback, globalCountRollBack) |
|
|
rollback = append(rollback, globalCountRollBack) |
|
|
} |
|
|
} |
|
@ -137,7 +137,7 @@ func (cb *CircuitBreaker) limit(r *http.Request, bucket string, action string) ( |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
//global simultaneous request content bytes
|
|
|
//global simultaneous request content bytes
|
|
|
globalContentLengthRollBack, errCode := cb.loadCounterAndCompare("", action, s3_constants.LimitTypeBytes, r.ContentLength, s3err.ErrRequestBytesExceed) |
|
|
|
|
|
|
|
|
globalContentLengthRollBack, errCode := cb.loadCounterAndCompare(s3_constants.Concat(action, s3_constants.LimitTypeBytes), r.ContentLength, s3err.ErrRequestBytesExceed) |
|
|
if globalContentLengthRollBack != nil { |
|
|
if globalContentLengthRollBack != nil { |
|
|
rollback = append(rollback, globalContentLengthRollBack) |
|
|
rollback = append(rollback, globalContentLengthRollBack) |
|
|
} |
|
|
} |
|
@ -147,11 +147,13 @@ func (cb *CircuitBreaker) limit(r *http.Request, bucket string, action string) ( |
|
|
return |
|
|
return |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
func (cb *CircuitBreaker) loadCounterAndCompare(bucket, action, limitType string, inc int64, errCode s3err.ErrorCode) (f func(), e s3err.ErrorCode) { |
|
|
|
|
|
key := s3_constants.Concat(bucket, action, limitType) |
|
|
|
|
|
|
|
|
func (cb *CircuitBreaker) loadCounterAndCompare(key string, inc int64, errCode s3err.ErrorCode) (f func(), e s3err.ErrorCode) { |
|
|
e = s3err.ErrNone |
|
|
e = s3err.ErrNone |
|
|
if max, ok := cb.limitations[key]; ok { |
|
|
if max, ok := cb.limitations[key]; ok { |
|
|
|
|
|
cb.RLock() |
|
|
counter, exists := cb.counters[key] |
|
|
counter, exists := cb.counters[key] |
|
|
|
|
|
cb.RUnlock() |
|
|
|
|
|
|
|
|
if !exists { |
|
|
if !exists { |
|
|
cb.Lock() |
|
|
cb.Lock() |
|
|
counter, exists = cb.counters[key] |
|
|
counter, exists = cb.counters[key] |
|
@ -171,7 +173,6 @@ func (cb *CircuitBreaker) loadCounterAndCompare(bucket, action, limitType string |
|
|
f = func() { |
|
|
f = func() { |
|
|
atomic.AddInt64(counter, -inc) |
|
|
atomic.AddInt64(counter, -inc) |
|
|
} |
|
|
} |
|
|
current = atomic.LoadInt64(counter) |
|
|
|
|
|
if current > max { |
|
|
if current > max { |
|
|
e = errCode |
|
|
e = errCode |
|
|
return |
|
|
return |
|
|