Browse Source
add s3 circuit breaker support for 'simultaneous request count' and 'simultaneous request bytes' limitations
add s3 circuit breaker support for 'simultaneous request count' and 'simultaneous request bytes' limitations
configure s3 circuit breaker by 'command_s3_circuitbreaker.go': usage eg: # Configure the number of simultaneous global (current s3api node) requests s3.circuit.breaker -global -type count -actions Write -values 1000 -apply # Configure the number of simultaneous requests for bucket x read and write s3.circuit.breaker -buckets -type count -actions Read,Write -values 1000 -apply # Configure the total bytes of simultaneous requests for bucket write s3.circuit.breaker -buckets -type bytes -actions Write -values 100MiB -apply # Disable circuit breaker config of bucket 'x' s3.circuit.breaker -buckets x -enable false -apply # Delete circuit breaker config of bucket 'x' s3.circuit.breaker -buckets x -delete -applypull/3189/head
石昌林
3 years ago
12 changed files with 819 additions and 72 deletions
-
3go.mod
-
3go.sum
-
16weed/config/s3_config.go
-
3weed/filer/s3iam_conf.go
-
10weed/pb/s3.proto
-
211weed/pb/s3_pb/s3.pb.go
-
33weed/s3api/auth_credentials_subscribe.go
-
174weed/s3api/s3api_circuit_breaker.go
-
83weed/s3api/s3api_server.go
-
13weed/s3api/s3err/s3api_errors.go
-
335weed/shell/command_s3_circuitbreaker.go
-
7weed/shell/command_s3_circuitbreaker_test.go
@ -0,0 +1,16 @@ |
|||
package config |
|||
|
|||
import "strings" |
|||
|
|||
var ( |
|||
CircuitBreakerConfigDir = "/etc/s3" |
|||
CircuitBreakerConfigFile = "circuit_breaker.json" |
|||
AllowedActions = []string{"Read", "Write", "List", "Tagging", "Admin"} |
|||
LimitTypeCount = "count" |
|||
LimitTypeBytes = "bytes" |
|||
Separator = ":" |
|||
) |
|||
|
|||
func Concat(elements ...string) string { |
|||
return strings.Join(elements, Separator) |
|||
} |
@ -0,0 +1,174 @@ |
|||
package s3api |
|||
|
|||
import ( |
|||
"fmt" |
|||
"github.com/chrislusf/seaweedfs/weed/config" |
|||
"github.com/chrislusf/seaweedfs/weed/filer" |
|||
"github.com/chrislusf/seaweedfs/weed/glog" |
|||
"github.com/chrislusf/seaweedfs/weed/pb" |
|||
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb" |
|||
"github.com/chrislusf/seaweedfs/weed/pb/s3_pb" |
|||
"github.com/chrislusf/seaweedfs/weed/s3api/s3err" |
|||
"github.com/gorilla/mux" |
|||
"go.uber.org/atomic" |
|||
"net/http" |
|||
) |
|||
|
|||
type CircuitBreaker struct { |
|||
Enabled bool |
|||
counters map[string]*atomic.Int64 |
|||
limitations map[string]int64 |
|||
} |
|||
|
|||
func NewCircuitBreaker(option *S3ApiServerOption) *CircuitBreaker { |
|||
cb := &CircuitBreaker{ |
|||
counters: make(map[string]*atomic.Int64), |
|||
limitations: make(map[string]int64), |
|||
} |
|||
|
|||
_ = pb.WithFilerClient(false, option.Filer, option.GrpcDialOption, func(client filer_pb.SeaweedFilerClient) error { |
|||
content, err := filer.ReadInsideFiler(client, config.CircuitBreakerConfigDir, config.CircuitBreakerConfigFile) |
|||
if err == nil { |
|||
err = cb.LoadS3ApiConfigurationFromBytes(content) |
|||
} |
|||
if err != nil { |
|||
glog.Warningf("load s3 circuit breaker config from filer: %v", err) |
|||
} else { |
|||
glog.V(2).Infof("load s3 circuit breaker config complete: %v", cb) |
|||
} |
|||
return err |
|||
}) |
|||
return cb |
|||
} |
|||
|
|||
func (cb *CircuitBreaker) LoadS3ApiConfigurationFromBytes(content []byte) error { |
|||
cbCfg := &s3_pb.S3CircuitBreakerConfig{} |
|||
if err := filer.ParseS3ConfigurationFromBytes(content, cbCfg); err != nil { |
|||
glog.Warningf("unmarshal error: %v", err) |
|||
return fmt.Errorf("unmarshal error: %v", err) |
|||
} |
|||
if err := cb.loadCbCfg(cbCfg); err != nil { |
|||
return err |
|||
} |
|||
return nil |
|||
} |
|||
|
|||
func (cb *CircuitBreaker) loadCbCfg(cfg *s3_pb.S3CircuitBreakerConfig) error { |
|||
|
|||
//global
|
|||
globalEnabled := false |
|||
globalOptions := cfg.Global |
|||
limitations := make(map[string]int64) |
|||
if globalOptions != nil && globalOptions.Enabled && len(globalOptions.Actions) > 0 { |
|||
globalEnabled = globalOptions.Enabled |
|||
for action, limit := range globalOptions.Actions { |
|||
limitations[action] = limit |
|||
} |
|||
} |
|||
cb.Enabled = globalEnabled |
|||
|
|||
//buckets
|
|||
for bucket, cbOptions := range cfg.Buckets { |
|||
if cbOptions.Enabled { |
|||
for action, limit := range cbOptions.Actions { |
|||
limitations[config.Concat(bucket, action)] = limit |
|||
} |
|||
} |
|||
} |
|||
|
|||
cb.limitations = limitations |
|||
return nil |
|||
} |
|||
|
|||
func (cb *CircuitBreaker) Check(f func(w http.ResponseWriter, r *http.Request), action string) (http.HandlerFunc, Action) { |
|||
return func(w http.ResponseWriter, r *http.Request) { |
|||
if !cb.Enabled { |
|||
f(w, r) |
|||
return |
|||
} |
|||
|
|||
vars := mux.Vars(r) |
|||
bucket := vars["bucket"] |
|||
|
|||
rollback, errCode := cb.check(r, bucket, action) |
|||
defer func() { |
|||
for _, rf := range rollback { |
|||
rf() |
|||
} |
|||
}() |
|||
|
|||
if errCode == s3err.ErrNone { |
|||
f(w, r) |
|||
return |
|||
} |
|||
s3err.WriteErrorResponse(w, r, errCode) |
|||
}, Action(action) |
|||
} |
|||
|
|||
func (cb *CircuitBreaker) check(r *http.Request, bucket string, action string) (rollback []func(), errCode s3err.ErrorCode) { |
|||
|
|||
//bucket simultaneous request count
|
|||
bucketCountRollBack, errCode := cb.loadAndCompare(bucket, action, config.LimitTypeCount, 1, s3err.ErrTooManyRequest) |
|||
if bucketCountRollBack != nil { |
|||
rollback = append(rollback, bucketCountRollBack) |
|||
} |
|||
if errCode != s3err.ErrNone { |
|||
return |
|||
} |
|||
|
|||
//bucket simultaneous request content bytes
|
|||
bucketContentLengthRollBack, errCode := cb.loadAndCompare(bucket, action, config.LimitTypeBytes, r.ContentLength, s3err.ErrRequestBytesExceed) |
|||
if bucketContentLengthRollBack != nil { |
|||
rollback = append(rollback, bucketContentLengthRollBack) |
|||
} |
|||
if errCode != s3err.ErrNone { |
|||
return |
|||
} |
|||
|
|||
//global simultaneous request count
|
|||
globalCountRollBack, errCode := cb.loadAndCompare("", action, config.LimitTypeCount, 1, s3err.ErrTooManyRequest) |
|||
if globalCountRollBack != nil { |
|||
rollback = append(rollback, globalCountRollBack) |
|||
} |
|||
if errCode != s3err.ErrNone { |
|||
return |
|||
} |
|||
|
|||
//global simultaneous request content bytes
|
|||
globalContentLengthRollBack, errCode := cb.loadAndCompare("", action, config.LimitTypeBytes, r.ContentLength, s3err.ErrRequestBytesExceed) |
|||
if globalContentLengthRollBack != nil { |
|||
rollback = append(rollback, globalContentLengthRollBack) |
|||
} |
|||
if errCode != s3err.ErrNone { |
|||
return |
|||
} |
|||
return |
|||
} |
|||
|
|||
func (cb CircuitBreaker) loadAndCompare(bucket, action, limitType string, inc int64, errCode s3err.ErrorCode) (f func(), e s3err.ErrorCode) { |
|||
key := config.Concat(bucket, action, limitType) |
|||
e = s3err.ErrNone |
|||
if max, ok := cb.limitations[key]; ok { |
|||
counter, exists := cb.counters[key] |
|||
if !exists { |
|||
counter = atomic.NewInt64(0) |
|||
cb.counters[key] = counter |
|||
} |
|||
current := counter.Load() |
|||
if current+inc > max { |
|||
e = errCode |
|||
return |
|||
} else { |
|||
counter.Add(inc) |
|||
f = func() { |
|||
counter.Sub(inc) |
|||
} |
|||
current = counter.Load() |
|||
if current+inc > max { |
|||
e = errCode |
|||
return |
|||
} |
|||
} |
|||
} |
|||
return |
|||
} |
@ -0,0 +1,335 @@ |
|||
package shell |
|||
|
|||
import ( |
|||
"bytes" |
|||
"flag" |
|||
"fmt" |
|||
"github.com/alecthomas/units" |
|||
"github.com/chrislusf/seaweedfs/weed/config" |
|||
"github.com/chrislusf/seaweedfs/weed/filer" |
|||
"github.com/chrislusf/seaweedfs/weed/pb/s3_pb" |
|||
"io" |
|||
"strconv" |
|||
"strings" |
|||
|
|||
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb" |
|||
) |
|||
|
|||
func init() { |
|||
Commands = append(Commands, &commandS3CircuitBreaker{}) |
|||
} |
|||
|
|||
type commandS3CircuitBreaker struct { |
|||
} |
|||
|
|||
func (c *commandS3CircuitBreaker) Name() string { |
|||
return "s3.circuit.breaker" |
|||
} |
|||
|
|||
func (c *commandS3CircuitBreaker) Help() string { |
|||
return `configure and apply s3 circuit breaker options for each bucket |
|||
|
|||
# examples |
|||
# add |
|||
s3.circuit.breaker -actions Read,Write -values 500,200 -global -enable -apply -type count |
|||
s3.circuit.breaker -actions Write -values 200MiB -global -enable -apply -type bytes |
|||
s3.circuit.breaker -actions Write -values 200MiB -bucket x,y,z -enable -apply -type bytes |
|||
|
|||
#delete |
|||
s3.circuit.breaker -actions Write -bucket x,y,z -delete -apply -type bytes |
|||
s3.circuit.breaker -actions Write -bucket x,y,z -delete -apply |
|||
s3.circuit.breaker -actions Write -delete -apply |
|||
` |
|||
} |
|||
|
|||
func (c *commandS3CircuitBreaker) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { |
|||
dir := config.CircuitBreakerConfigDir |
|||
file := config.CircuitBreakerConfigFile |
|||
|
|||
s3CircuitBreakerCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) |
|||
buckets := s3CircuitBreakerCommand.String("buckets", "", "comma separated buckets names") |
|||
global := s3CircuitBreakerCommand.Bool("global", false, "comma separated buckets names") |
|||
|
|||
actions := s3CircuitBreakerCommand.String("actions", "", "comma separated actions names: Read,Write,List,Tagging,Admin") |
|||
limitType := s3CircuitBreakerCommand.String("type", "", "count|bytes simultaneous requests count") |
|||
values := s3CircuitBreakerCommand.String("values", "", "comma separated max values,Maximum number of simultaneous requests content length, support byte unit: eg: 1k, 10m, 1g") |
|||
|
|||
enabled := s3CircuitBreakerCommand.Bool("enable", true, "enable or disable circuit breaker") |
|||
deleted := s3CircuitBreakerCommand.Bool("delete", false, "delete users, actions or access keys") |
|||
|
|||
apply := s3CircuitBreakerCommand.Bool("apply", false, "update and apply current configuration") |
|||
|
|||
if err = s3CircuitBreakerCommand.Parse(args); err != nil { |
|||
return nil |
|||
} |
|||
|
|||
var buf bytes.Buffer |
|||
if err = commandEnv.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { |
|||
return filer.ReadEntry(commandEnv.MasterClient, client, dir, file, &buf) |
|||
}); err != nil && err != filer_pb.ErrNotFound { |
|||
return err |
|||
} |
|||
|
|||
cbCfg := &s3_pb.S3CircuitBreakerConfig{ |
|||
Buckets: make(map[string]*s3_pb.CbOptions), |
|||
} |
|||
if buf.Len() > 0 { |
|||
if err = filer.ParseS3ConfigurationFromBytes(buf.Bytes(), cbCfg); err != nil { |
|||
return err |
|||
} |
|||
} |
|||
|
|||
if *deleted { |
|||
cmdBuckets, cmdActions, _, err := c.initActionsAndValues(buckets, actions, limitType, values, true) |
|||
if err != nil { |
|||
return err |
|||
} |
|||
|
|||
if len(cmdBuckets) <= 0 && !*global { |
|||
if len(cmdActions) > 0 { |
|||
deleteGlobalActions(cbCfg, cmdActions, limitType) |
|||
if cbCfg.Buckets != nil { |
|||
var allBuckets []string |
|||
for bucket, _ := range cbCfg.Buckets { |
|||
allBuckets = append(allBuckets, bucket) |
|||
} |
|||
deleteBucketsActions(allBuckets, cbCfg, cmdActions, limitType) |
|||
} |
|||
} else { |
|||
cbCfg.Global = nil |
|||
cbCfg.Buckets = nil |
|||
} |
|||
} else { |
|||
if len(cmdBuckets) > 0 { |
|||
deleteBucketsActions(cmdBuckets, cbCfg, cmdActions, limitType) |
|||
} |
|||
if *global { |
|||
deleteGlobalActions(cbCfg, cmdActions, nil) |
|||
} |
|||
} |
|||
} else { |
|||
cmdBuckets, cmdActions, cmdValues, err := c.initActionsAndValues(buckets, actions, limitType, values, false) |
|||
if err != nil { |
|||
return err |
|||
} |
|||
|
|||
if len(cmdActions) > 0 && len(*buckets) <= 0 && !*global { |
|||
return fmt.Errorf("one of -global and -buckets must be specified") |
|||
} |
|||
|
|||
if len(*buckets) > 0 { |
|||
for _, bucket := range cmdBuckets { |
|||
var cbOptions *s3_pb.CbOptions |
|||
var exists bool |
|||
if cbOptions, exists = cbCfg.Buckets[bucket]; !exists { |
|||
cbOptions = &s3_pb.CbOptions{} |
|||
cbCfg.Buckets[bucket] = cbOptions |
|||
} |
|||
cbOptions.Enabled = *enabled |
|||
|
|||
if len(cmdActions) > 0 { |
|||
err = insertOrUpdateValues(cbOptions, cmdActions, cmdValues, limitType) |
|||
if err != nil { |
|||
return err |
|||
} |
|||
} |
|||
|
|||
if len(cbOptions.Actions) <= 0 && !cbOptions.Enabled { |
|||
delete(cbCfg.Buckets, bucket) |
|||
} |
|||
} |
|||
} |
|||
|
|||
if *global { |
|||
globalOptions := cbCfg.Global |
|||
if globalOptions == nil { |
|||
globalOptions = &s3_pb.CbOptions{Actions: make(map[string]int64, len(cmdActions))} |
|||
cbCfg.Global = globalOptions |
|||
} |
|||
globalOptions.Enabled = *enabled |
|||
|
|||
if len(cmdActions) > 0 { |
|||
err = insertOrUpdateValues(globalOptions, cmdActions, cmdValues, limitType) |
|||
if err != nil { |
|||
return err |
|||
} |
|||
} |
|||
|
|||
if len(globalOptions.Actions) <= 0 && !globalOptions.Enabled { |
|||
cbCfg.Global = nil |
|||
} |
|||
} |
|||
} |
|||
|
|||
buf.Reset() |
|||
err = filer.ProtoToText(&buf, cbCfg) |
|||
if err != nil { |
|||
return err |
|||
} |
|||
|
|||
fmt.Fprintf(writer, string(buf.Bytes())) |
|||
fmt.Fprintln(writer) |
|||
|
|||
if *apply { |
|||
if err := commandEnv.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { |
|||
return filer.SaveInsideFiler(client, dir, file, buf.Bytes()) |
|||
}); err != nil { |
|||
return err |
|||
} |
|||
|
|||
} |
|||
|
|||
return nil |
|||
} |
|||
|
|||
func insertOrUpdateValues(cbOptions *s3_pb.CbOptions, cmdActions []string, cmdValues []int64, limitType *string) error { |
|||
if len(*limitType) == 0 { |
|||
return fmt.Errorf("type not valid, only 'count' and 'bytes' are allowed") |
|||
} |
|||
|
|||
if cbOptions.Actions == nil { |
|||
cbOptions.Actions = make(map[string]int64, len(cmdActions)) |
|||
} |
|||
|
|||
if len(cmdValues) > 0 { |
|||
for i, action := range cmdActions { |
|||
cbOptions.Actions[config.Concat(action, *limitType)] = cmdValues[i] |
|||
} |
|||
} |
|||
return nil |
|||
} |
|||
|
|||
func deleteBucketsActions(cmdBuckets []string, cbCfg *s3_pb.S3CircuitBreakerConfig, cmdActions []string, limitType *string) { |
|||
if cbCfg.Buckets == nil { |
|||
return |
|||
} |
|||
|
|||
if len(cmdActions) == 0 { |
|||
for _, bucket := range cmdBuckets { |
|||
delete(cbCfg.Buckets, bucket) |
|||
} |
|||
} else { |
|||
for _, bucket := range cmdBuckets { |
|||
if cbOption, ok := cbCfg.Buckets[bucket]; ok { |
|||
if len(cmdActions) > 0 && cbOption.Actions != nil { |
|||
for _, action := range cmdActions { |
|||
delete(cbOption.Actions, config.Concat(action, *limitType)) |
|||
} |
|||
} |
|||
|
|||
if len(cbOption.Actions) == 0 && !cbOption.Enabled { |
|||
delete(cbCfg.Buckets, bucket) |
|||
} |
|||
} |
|||
} |
|||
} |
|||
|
|||
if len(cbCfg.Buckets) == 0 { |
|||
cbCfg.Buckets = nil |
|||
} |
|||
} |
|||
|
|||
func deleteGlobalActions(cbCfg *s3_pb.S3CircuitBreakerConfig, cmdActions []string, limitType *string) { |
|||
globalOptions := cbCfg.Global |
|||
if globalOptions == nil { |
|||
return |
|||
} |
|||
|
|||
if len(cmdActions) == 0 && globalOptions.Actions != nil { |
|||
globalOptions.Actions = nil |
|||
return |
|||
} else { |
|||
for _, action := range cmdActions { |
|||
delete(globalOptions.Actions, config.Concat(action, *limitType)) |
|||
} |
|||
} |
|||
|
|||
if len(globalOptions.Actions) == 0 && !globalOptions.Enabled { |
|||
cbCfg.Global = nil |
|||
} |
|||
} |
|||
|
|||
func (c *commandS3CircuitBreaker) initActionsAndValues(buckets, actions, limitType, values *string, deleteOp bool) (cmdBuckets, cmdActions []string, cmdValues []int64, err error) { |
|||
if len(*buckets) > 0 { |
|||
cmdBuckets = strings.Split(*buckets, ",") |
|||
} |
|||
|
|||
if len(*actions) > 0 { |
|||
cmdActions = strings.Split(*actions, ",") |
|||
|
|||
//check action valid
|
|||
for _, action := range cmdActions { |
|||
var found bool |
|||
for _, allowedAction := range config.AllowedActions { |
|||
if allowedAction == action { |
|||
found = true |
|||
} |
|||
} |
|||
if !found { |
|||
return nil, nil, nil, fmt.Errorf("value(%s) of flag[-action] not valid, allowed actions: %v", *actions, config.AllowedActions) |
|||
} |
|||
} |
|||
} |
|||
|
|||
if !deleteOp { |
|||
if len(cmdActions) < 0 { |
|||
for _, action := range config.AllowedActions { |
|||
cmdActions = append(cmdActions, action) |
|||
} |
|||
} |
|||
|
|||
if len(*limitType) > 0 { |
|||
switch *limitType { |
|||
case config.LimitTypeCount: |
|||
elements := strings.Split(*values, ",") |
|||
if len(cmdActions) != len(elements) { |
|||
if len(elements) != 1 || len(elements) == 0 { |
|||
return nil, nil, nil, fmt.Errorf("count of flag[-actions] and flag[-counts] not equal") |
|||
} |
|||
v, err := strconv.Atoi(elements[0]) |
|||
if err != nil { |
|||
return nil, nil, nil, fmt.Errorf("value of -counts must be a legal number(s)") |
|||
} |
|||
for range cmdActions { |
|||
cmdValues = append(cmdValues, int64(v)) |
|||
} |
|||
} else { |
|||
for _, value := range elements { |
|||
v, err := strconv.Atoi(value) |
|||
if err != nil { |
|||
return nil, nil, nil, fmt.Errorf("value of -counts must be a legal number(s)") |
|||
} |
|||
cmdValues = append(cmdValues, int64(v)) |
|||
} |
|||
} |
|||
case config.LimitTypeBytes: |
|||
elements := strings.Split(*values, ",") |
|||
if len(cmdActions) != len(elements) { |
|||
if len(elements) != 1 || len(elements) == 0 { |
|||
return nil, nil, nil, fmt.Errorf("count of flag[-actions] and flag[-counts] not equal") |
|||
} |
|||
v, err := units.ParseStrictBytes(elements[0]) |
|||
if err != nil { |
|||
return nil, nil, nil, fmt.Errorf("value of -max must be a legal number(s)") |
|||
} |
|||
for range cmdActions { |
|||
cmdValues = append(cmdValues, v) |
|||
} |
|||
} else { |
|||
for _, value := range elements { |
|||
v, err := units.ParseStrictBytes(value) |
|||
if err != nil { |
|||
return nil, nil, nil, fmt.Errorf("value of -max must be a legal number(s)") |
|||
} |
|||
cmdValues = append(cmdValues, v) |
|||
} |
|||
} |
|||
default: |
|||
return nil, nil, nil, fmt.Errorf("type not valid, only 'count' and 'bytes' are allowed") |
|||
} |
|||
} else { |
|||
*limitType = "" |
|||
} |
|||
} |
|||
return cmdBuckets, cmdActions, cmdValues, nil |
|||
} |
@ -0,0 +1,7 @@ |
|||
package shell |
|||
|
|||
import "testing" |
|||
|
|||
func TestCircuitBreakerShell(t *testing.T) { |
|||
|
|||
} |
Write
Preview
Loading…
Cancel
Save
Reference in new issue