19 changed files with 1318 additions and 108 deletions
			
			
		- 
					2.github/workflows/depsreview.yml
- 
					12go.mod
- 
					36go.sum
- 
					2weed/command/update.go
- 
					7weed/filer/filechunk_manifest.go
- 
					8weed/filer/filechunks.go
- 
					3weed/filer/s3iam_conf.go
- 
					10weed/pb/s3.proto
- 
					221weed/pb/s3_pb/s3.pb.go
- 
					33weed/s3api/auth_credentials_subscribe.go
- 
					18weed/s3api/s3_constants/s3_config.go
- 
					183weed/s3api/s3api_circuit_breaker.go
- 
					107weed/s3api/s3api_circuit_breaker_test.go
- 
					13weed/s3api/s3api_object_multipart_handlers.go
- 
					84weed/s3api/s3api_server.go
- 
					13weed/s3api/s3err/s3api_errors.go
- 
					24weed/server/volume_grpc_copy.go
- 
					358weed/shell/command_s3_circuitbreaker.go
- 
					292weed/shell/command_s3_circuitbreaker_test.go
| @ -0,0 +1,18 @@ | |||
| package s3_constants | |||
| 
 | |||
| import ( | |||
| 	"strings" | |||
| ) | |||
| 
 | |||
| var ( | |||
| 	CircuitBreakerConfigDir  = "/etc/s3" | |||
| 	CircuitBreakerConfigFile = "circuit_breaker.json" | |||
| 	AllowedActions           = []string{ACTION_READ, ACTION_WRITE, ACTION_LIST, ACTION_TAGGING, ACTION_ADMIN} | |||
| 	LimitTypeCount           = "Count" | |||
| 	LimitTypeBytes           = "MB" | |||
| 	Separator                = ":" | |||
| ) | |||
| 
 | |||
| func Concat(elements ...string) string { | |||
| 	return strings.Join(elements, Separator) | |||
| } | |||
| @ -0,0 +1,183 @@ | |||
| package s3api | |||
| 
 | |||
| import ( | |||
| 	"fmt" | |||
| 	"github.com/chrislusf/seaweedfs/weed/filer" | |||
| 	"github.com/chrislusf/seaweedfs/weed/glog" | |||
| 	"github.com/chrislusf/seaweedfs/weed/pb" | |||
| 	"github.com/chrislusf/seaweedfs/weed/pb/filer_pb" | |||
| 	"github.com/chrislusf/seaweedfs/weed/pb/s3_pb" | |||
| 	"github.com/chrislusf/seaweedfs/weed/s3api/s3_constants" | |||
| 	"github.com/chrislusf/seaweedfs/weed/s3api/s3err" | |||
| 	"github.com/gorilla/mux" | |||
| 	"net/http" | |||
| 	"sync" | |||
| 	"sync/atomic" | |||
| ) | |||
| 
 | |||
| type CircuitBreaker struct { | |||
| 	sync.RWMutex | |||
| 	Enabled     bool | |||
| 	counters    map[string]*int64 | |||
| 	limitations map[string]int64 | |||
| } | |||
| 
 | |||
| func NewCircuitBreaker(option *S3ApiServerOption) *CircuitBreaker { | |||
| 	cb := &CircuitBreaker{ | |||
| 		counters:    make(map[string]*int64), | |||
| 		limitations: make(map[string]int64), | |||
| 	} | |||
| 
 | |||
| 	err := pb.WithFilerClient(false, option.Filer, option.GrpcDialOption, func(client filer_pb.SeaweedFilerClient) error { | |||
| 		content, err := filer.ReadInsideFiler(client, s3_constants.CircuitBreakerConfigDir, s3_constants.CircuitBreakerConfigFile) | |||
| 		if err != nil { | |||
| 			return fmt.Errorf("read S3 circuit breaker config: %v", err) | |||
| 		} | |||
| 		return cb.LoadS3ApiConfigurationFromBytes(content) | |||
| 	}) | |||
| 
 | |||
| 	if err != nil { | |||
| 		glog.Warningf("fail to load config: %v", err) | |||
| 	} | |||
| 
 | |||
| 	return cb | |||
| } | |||
| 
 | |||
| func (cb *CircuitBreaker) LoadS3ApiConfigurationFromBytes(content []byte) error { | |||
| 	cbCfg := &s3_pb.S3CircuitBreakerConfig{} | |||
| 	if err := filer.ParseS3ConfigurationFromBytes(content, cbCfg); err != nil { | |||
| 		glog.Warningf("unmarshal error: %v", err) | |||
| 		return fmt.Errorf("unmarshal error: %v", err) | |||
| 	} | |||
| 	if err := cb.loadCircuitBreakerConfig(cbCfg); err != nil { | |||
| 		return err | |||
| 	} | |||
| 	return nil | |||
| } | |||
| 
 | |||
| func (cb *CircuitBreaker) loadCircuitBreakerConfig(cfg *s3_pb.S3CircuitBreakerConfig) error { | |||
| 
 | |||
| 	//global
 | |||
| 	globalEnabled := false | |||
| 	globalOptions := cfg.Global | |||
| 	limitations := make(map[string]int64) | |||
| 	if globalOptions != nil && globalOptions.Enabled && len(globalOptions.Actions) > 0 { | |||
| 		globalEnabled = globalOptions.Enabled | |||
| 		for action, limit := range globalOptions.Actions { | |||
| 			limitations[action] = limit | |||
| 		} | |||
| 	} | |||
| 	cb.Enabled = globalEnabled | |||
| 
 | |||
| 	//buckets
 | |||
| 	for bucket, cbOptions := range cfg.Buckets { | |||
| 		if cbOptions.Enabled { | |||
| 			for action, limit := range cbOptions.Actions { | |||
| 				limitations[s3_constants.Concat(bucket, action)] = limit | |||
| 			} | |||
| 		} | |||
| 	} | |||
| 
 | |||
| 	cb.limitations = limitations | |||
| 	return nil | |||
| } | |||
| 
 | |||
| func (cb *CircuitBreaker) Limit(f func(w http.ResponseWriter, r *http.Request), action string) (http.HandlerFunc, Action) { | |||
| 	return func(w http.ResponseWriter, r *http.Request) { | |||
| 		if !cb.Enabled { | |||
| 			f(w, r) | |||
| 			return | |||
| 		} | |||
| 
 | |||
| 		vars := mux.Vars(r) | |||
| 		bucket := vars["bucket"] | |||
| 
 | |||
| 		rollback, errCode := cb.limit(r, bucket, action) | |||
| 		defer func() { | |||
| 			for _, rf := range rollback { | |||
| 				rf() | |||
| 			} | |||
| 		}() | |||
| 
 | |||
| 		if errCode == s3err.ErrNone { | |||
| 			f(w, r) | |||
| 			return | |||
| 		} | |||
| 		s3err.WriteErrorResponse(w, r, errCode) | |||
| 	}, Action(action) | |||
| } | |||
| 
 | |||
| func (cb *CircuitBreaker) limit(r *http.Request, bucket string, action string) (rollback []func(), errCode s3err.ErrorCode) { | |||
| 
 | |||
| 	//bucket simultaneous request count
 | |||
| 	bucketCountRollBack, errCode := cb.loadCounterAndCompare(s3_constants.Concat(bucket, action, s3_constants.LimitTypeCount), 1, s3err.ErrTooManyRequest) | |||
| 	if bucketCountRollBack != nil { | |||
| 		rollback = append(rollback, bucketCountRollBack) | |||
| 	} | |||
| 	if errCode != s3err.ErrNone { | |||
| 		return | |||
| 	} | |||
| 
 | |||
| 	//bucket simultaneous request content bytes
 | |||
| 	bucketContentLengthRollBack, errCode := cb.loadCounterAndCompare(s3_constants.Concat(bucket, action, s3_constants.LimitTypeBytes), r.ContentLength, s3err.ErrRequestBytesExceed) | |||
| 	if bucketContentLengthRollBack != nil { | |||
| 		rollback = append(rollback, bucketContentLengthRollBack) | |||
| 	} | |||
| 	if errCode != s3err.ErrNone { | |||
| 		return | |||
| 	} | |||
| 
 | |||
| 	//global simultaneous request count
 | |||
| 	globalCountRollBack, errCode := cb.loadCounterAndCompare(s3_constants.Concat(action, s3_constants.LimitTypeCount), 1, s3err.ErrTooManyRequest) | |||
| 	if globalCountRollBack != nil { | |||
| 		rollback = append(rollback, globalCountRollBack) | |||
| 	} | |||
| 	if errCode != s3err.ErrNone { | |||
| 		return | |||
| 	} | |||
| 
 | |||
| 	//global simultaneous request content bytes
 | |||
| 	globalContentLengthRollBack, errCode := cb.loadCounterAndCompare(s3_constants.Concat(action, s3_constants.LimitTypeBytes), r.ContentLength, s3err.ErrRequestBytesExceed) | |||
| 	if globalContentLengthRollBack != nil { | |||
| 		rollback = append(rollback, globalContentLengthRollBack) | |||
| 	} | |||
| 	if errCode != s3err.ErrNone { | |||
| 		return | |||
| 	} | |||
| 	return | |||
| } | |||
| 
 | |||
| func (cb *CircuitBreaker) loadCounterAndCompare(key string, inc int64, errCode s3err.ErrorCode) (f func(), e s3err.ErrorCode) { | |||
| 	e = s3err.ErrNone | |||
| 	if max, ok := cb.limitations[key]; ok { | |||
| 		cb.RLock() | |||
| 		counter, exists := cb.counters[key] | |||
| 		cb.RUnlock() | |||
| 
 | |||
| 		if !exists { | |||
| 			cb.Lock() | |||
| 			counter, exists = cb.counters[key] | |||
| 			if !exists { | |||
| 				var newCounter int64 | |||
| 				counter = &newCounter | |||
| 				cb.counters[key] = counter | |||
| 			} | |||
| 			cb.Unlock() | |||
| 		} | |||
| 		current := atomic.LoadInt64(counter) | |||
| 		if current+inc > max { | |||
| 			e = errCode | |||
| 			return | |||
| 		} else { | |||
| 			current := atomic.AddInt64(counter, inc) | |||
| 			f = func() { | |||
| 				atomic.AddInt64(counter, -inc) | |||
| 			} | |||
| 			if current > max { | |||
| 				e = errCode | |||
| 				return | |||
| 			} | |||
| 		} | |||
| 	} | |||
| 	return | |||
| } | |||
| @ -0,0 +1,107 @@ | |||
| package s3api | |||
| 
 | |||
| import ( | |||
| 	"github.com/chrislusf/seaweedfs/weed/pb/s3_pb" | |||
| 	"github.com/chrislusf/seaweedfs/weed/s3api/s3_constants" | |||
| 	"github.com/chrislusf/seaweedfs/weed/s3api/s3err" | |||
| 	"net/http" | |||
| 	"sync" | |||
| 	"sync/atomic" | |||
| 	"testing" | |||
| ) | |||
| 
 | |||
| type TestLimitCase struct { | |||
| 	actionName string | |||
| 
 | |||
| 	limitType        string | |||
| 	bucketLimitValue int64 | |||
| 	globalLimitValue int64 | |||
| 
 | |||
| 	routineCount int | |||
| 	successCount int64 | |||
| } | |||
| 
 | |||
| var ( | |||
| 	bucket         = "/test" | |||
| 	action         = s3_constants.ACTION_WRITE | |||
| 	fileSize int64 = 200 | |||
| 
 | |||
| 	TestLimitCases = []*TestLimitCase{ | |||
| 
 | |||
| 		//bucket-LimitTypeCount
 | |||
| 		{action, s3_constants.LimitTypeCount, 5, 6, 60, 5}, | |||
| 		{action, s3_constants.LimitTypeCount, 0, 6, 6, 0}, | |||
| 
 | |||
| 		//global-LimitTypeCount
 | |||
| 		{action, s3_constants.LimitTypeCount, 6, 5, 6, 5}, | |||
| 		{action, s3_constants.LimitTypeCount, 6, 0, 6, 0}, | |||
| 
 | |||
| 		//bucket-LimitTypeBytes
 | |||
| 		{action, s3_constants.LimitTypeBytes, 1000, 1020, 6, 5}, | |||
| 		{action, s3_constants.LimitTypeBytes, 0, 1020, 6, 0}, | |||
| 
 | |||
| 		//global-LimitTypeBytes
 | |||
| 		{action, s3_constants.LimitTypeBytes, 1020, 1000, 6, 5}, | |||
| 		{action, s3_constants.LimitTypeBytes, 1020, 0, 6, 0}, | |||
| 	} | |||
| ) | |||
| 
 | |||
| func TestLimit(t *testing.T) { | |||
| 	for _, tc := range TestLimitCases { | |||
| 		circuitBreakerConfig := &s3_pb.S3CircuitBreakerConfig{ | |||
| 			Global: &s3_pb.S3CircuitBreakerOptions{ | |||
| 				Enabled: true, | |||
| 				Actions: map[string]int64{ | |||
| 					s3_constants.Concat(tc.actionName, tc.limitType): tc.globalLimitValue, | |||
| 					s3_constants.Concat(tc.actionName, tc.limitType): tc.globalLimitValue, | |||
| 				}, | |||
| 			}, | |||
| 			Buckets: map[string]*s3_pb.S3CircuitBreakerOptions{ | |||
| 				bucket: { | |||
| 					Enabled: true, | |||
| 					Actions: map[string]int64{ | |||
| 						s3_constants.Concat(tc.actionName, tc.limitType): tc.bucketLimitValue, | |||
| 					}, | |||
| 				}, | |||
| 			}, | |||
| 		} | |||
| 		circuitBreaker := &CircuitBreaker{ | |||
| 			counters:    make(map[string]*int64), | |||
| 			limitations: make(map[string]int64), | |||
| 		} | |||
| 		err := circuitBreaker.loadCircuitBreakerConfig(circuitBreakerConfig) | |||
| 		if err != nil { | |||
| 			t.Fatal(err) | |||
| 		} | |||
| 
 | |||
| 		successCount := doLimit(circuitBreaker, tc.routineCount, &http.Request{ContentLength: fileSize}, tc.actionName) | |||
| 		if successCount != tc.successCount { | |||
| 			t.Errorf("successCount not equal, expect=%d, actual=%d, case: %v", tc.successCount, successCount, tc) | |||
| 		} | |||
| 	} | |||
| } | |||
| 
 | |||
| func doLimit(circuitBreaker *CircuitBreaker, routineCount int, r *http.Request, action string) int64 { | |||
| 	var successCounter int64 | |||
| 	resultCh := make(chan []func(), routineCount) | |||
| 	var wg sync.WaitGroup | |||
| 	for i := 0; i < routineCount; i++ { | |||
| 		wg.Add(1) | |||
| 		go func() { | |||
| 			defer wg.Done() | |||
| 			rollbackFn, errCode := circuitBreaker.limit(r, bucket, action) | |||
| 			if errCode == s3err.ErrNone { | |||
| 				atomic.AddInt64(&successCounter, 1) | |||
| 			} | |||
| 			resultCh <- rollbackFn | |||
| 		}() | |||
| 	} | |||
| 	wg.Wait() | |||
| 	close(resultCh) | |||
| 	for fns := range resultCh { | |||
| 		for _, fn := range fns { | |||
| 			fn() | |||
| 		} | |||
| 	} | |||
| 	return successCounter | |||
| } | |||
| @ -0,0 +1,358 @@ | |||
| package shell | |||
| 
 | |||
| import ( | |||
| 	"bytes" | |||
| 	"flag" | |||
| 	"fmt" | |||
| 	"github.com/chrislusf/seaweedfs/weed/filer" | |||
| 	"github.com/chrislusf/seaweedfs/weed/pb/s3_pb" | |||
| 	"github.com/chrislusf/seaweedfs/weed/s3api/s3_constants" | |||
| 	"io" | |||
| 	"strconv" | |||
| 	"strings" | |||
| 
 | |||
| 	"github.com/chrislusf/seaweedfs/weed/pb/filer_pb" | |||
| ) | |||
| 
 | |||
| var LoadConfig = loadConfig | |||
| 
 | |||
| func init() { | |||
| 	Commands = append(Commands, &commandS3CircuitBreaker{}) | |||
| } | |||
| 
 | |||
| type commandS3CircuitBreaker struct { | |||
| } | |||
| 
 | |||
| func (c *commandS3CircuitBreaker) Name() string { | |||
| 	return "s3.circuitBreaker" | |||
| } | |||
| 
 | |||
| func (c *commandS3CircuitBreaker) Help() string { | |||
| 	return `configure and apply s3 circuit breaker options for each bucket | |||
| 
 | |||
| 	# examples | |||
| 	# add circuit breaker config for global | |||
| 	s3.circuitBreaker -global -type count -actions Read,Write -values 500,200 -apply | |||
| 
 | |||
| 	# disable global config | |||
| 	s3.circuitBreaker -global -disable -apply | |||
| 
 | |||
| 	# add circuit breaker config for buckets x,y,z | |||
| 	s3.circuitBreaker -buckets x,y,z -type count -actions Read,Write -values 200,100 -apply | |||
| 
 | |||
| 	# disable circuit breaker config of x | |||
| 	s3.circuitBreaker -buckets x -disable -apply | |||
| 
 | |||
| 	# delete circuit breaker config of x | |||
| 	s3.circuitBreaker -buckets x -delete -apply | |||
| 
 | |||
| 	# clear all circuit breaker config | |||
| 	s3.circuitBreaker -delete -apply | |||
| 	` | |||
| } | |||
| 
 | |||
| func (c *commandS3CircuitBreaker) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { | |||
| 	dir := s3_constants.CircuitBreakerConfigDir | |||
| 	file := s3_constants.CircuitBreakerConfigFile | |||
| 
 | |||
| 	s3CircuitBreakerCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) | |||
| 	buckets := s3CircuitBreakerCommand.String("buckets", "", "the bucket name(s) to configure, eg: -buckets x,y,z") | |||
| 	global := s3CircuitBreakerCommand.Bool("global", false, "configure global circuit breaker") | |||
| 
 | |||
| 	actions := s3CircuitBreakerCommand.String("actions", "", "comma separated actions names: Read,Write,List,Tagging,Admin") | |||
| 	limitType := s3CircuitBreakerCommand.String("type", "", "'Count' or 'MB'; Count represents the number of simultaneous requests, and MB represents the content size of all simultaneous requests") | |||
| 	values := s3CircuitBreakerCommand.String("values", "", "comma separated values") | |||
| 
 | |||
| 	disabled := s3CircuitBreakerCommand.Bool("disable", false, "disable global or buckets circuit breaker") | |||
| 	deleted := s3CircuitBreakerCommand.Bool("delete", false, "delete circuit breaker config") | |||
| 
 | |||
| 	apply := s3CircuitBreakerCommand.Bool("apply", false, "update and apply current configuration") | |||
| 
 | |||
| 	if err = s3CircuitBreakerCommand.Parse(args); err != nil { | |||
| 		return nil | |||
| 
 | |||
| 	} | |||
| 
 | |||
| 	var buf bytes.Buffer | |||
| 	err = LoadConfig(commandEnv, dir, file, &buf) | |||
| 	if err != nil { | |||
| 		return err | |||
| 	} | |||
| 
 | |||
| 	cbCfg := &s3_pb.S3CircuitBreakerConfig{ | |||
| 		Buckets: make(map[string]*s3_pb.S3CircuitBreakerOptions), | |||
| 	} | |||
| 	if buf.Len() > 0 { | |||
| 		if err = filer.ParseS3ConfigurationFromBytes(buf.Bytes(), cbCfg); err != nil { | |||
| 			return err | |||
| 		} | |||
| 	} | |||
| 
 | |||
| 	if *deleted { | |||
| 		cmdBuckets, cmdActions, _, err := c.initActionsAndValues(buckets, actions, limitType, values, true) | |||
| 		if err != nil { | |||
| 			return err | |||
| 		} | |||
| 
 | |||
| 		if len(cmdBuckets) <= 0 && !*global { | |||
| 			if len(cmdActions) > 0 { | |||
| 				deleteGlobalActions(cbCfg, cmdActions, limitType) | |||
| 				if cbCfg.Buckets != nil { | |||
| 					var allBuckets []string | |||
| 					for bucket := range cbCfg.Buckets { | |||
| 						allBuckets = append(allBuckets, bucket) | |||
| 					} | |||
| 					deleteBucketsActions(allBuckets, cbCfg, cmdActions, limitType) | |||
| 				} | |||
| 			} else { | |||
| 				cbCfg.Global = nil | |||
| 				cbCfg.Buckets = nil | |||
| 			} | |||
| 		} else { | |||
| 			if len(cmdBuckets) > 0 { | |||
| 				deleteBucketsActions(cmdBuckets, cbCfg, cmdActions, limitType) | |||
| 			} | |||
| 			if *global { | |||
| 				deleteGlobalActions(cbCfg, cmdActions, nil) | |||
| 			} | |||
| 		} | |||
| 	} else { | |||
| 		cmdBuckets, cmdActions, cmdValues, err := c.initActionsAndValues(buckets, actions, limitType, values, *disabled) | |||
| 		if err != nil { | |||
| 			return err | |||
| 		} | |||
| 
 | |||
| 		if len(cmdActions) > 0 && len(*buckets) <= 0 && !*global { | |||
| 			return fmt.Errorf("one of -global and -buckets must be specified") | |||
| 		} | |||
| 
 | |||
| 		if len(*buckets) > 0 { | |||
| 			for _, bucket := range cmdBuckets { | |||
| 				var cbOptions *s3_pb.S3CircuitBreakerOptions | |||
| 				var exists bool | |||
| 				if cbOptions, exists = cbCfg.Buckets[bucket]; !exists { | |||
| 					cbOptions = &s3_pb.S3CircuitBreakerOptions{} | |||
| 					cbCfg.Buckets[bucket] = cbOptions | |||
| 				} | |||
| 				cbOptions.Enabled = !*disabled | |||
| 
 | |||
| 				if len(cmdActions) > 0 { | |||
| 					err = insertOrUpdateValues(cbOptions, cmdActions, cmdValues, limitType) | |||
| 					if err != nil { | |||
| 						return err | |||
| 					} | |||
| 				} | |||
| 
 | |||
| 				if len(cbOptions.Actions) <= 0 && !cbOptions.Enabled { | |||
| 					delete(cbCfg.Buckets, bucket) | |||
| 				} | |||
| 			} | |||
| 		} | |||
| 
 | |||
| 		if *global { | |||
| 			globalOptions := cbCfg.Global | |||
| 			if globalOptions == nil { | |||
| 				globalOptions = &s3_pb.S3CircuitBreakerOptions{Actions: make(map[string]int64, len(cmdActions))} | |||
| 				cbCfg.Global = globalOptions | |||
| 			} | |||
| 			globalOptions.Enabled = !*disabled | |||
| 
 | |||
| 			if len(cmdActions) > 0 { | |||
| 				err = insertOrUpdateValues(globalOptions, cmdActions, cmdValues, limitType) | |||
| 				if err != nil { | |||
| 					return err | |||
| 				} | |||
| 			} | |||
| 
 | |||
| 			if len(globalOptions.Actions) <= 0 && !globalOptions.Enabled { | |||
| 				cbCfg.Global = nil | |||
| 			} | |||
| 		} | |||
| 	} | |||
| 
 | |||
| 	buf.Reset() | |||
| 	err = filer.ProtoToText(&buf, cbCfg) | |||
| 	if err != nil { | |||
| 		return err | |||
| 	} | |||
| 
 | |||
| 	_, _ = fmt.Fprintf(writer, string(buf.Bytes())) | |||
| 	_, _ = fmt.Fprintln(writer) | |||
| 
 | |||
| 	if *apply { | |||
| 		if err := commandEnv.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { | |||
| 			return filer.SaveInsideFiler(client, dir, file, buf.Bytes()) | |||
| 		}); err != nil { | |||
| 			return err | |||
| 		} | |||
| 	} | |||
| 
 | |||
| 	return nil | |||
| } | |||
| 
 | |||
| func loadConfig(commandEnv *CommandEnv, dir string, file string, buf *bytes.Buffer) error { | |||
| 	if err := commandEnv.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { | |||
| 		return filer.ReadEntry(commandEnv.MasterClient, client, dir, file, buf) | |||
| 	}); err != nil && err != filer_pb.ErrNotFound { | |||
| 		return err | |||
| 	} | |||
| 	return nil | |||
| } | |||
| 
 | |||
| func insertOrUpdateValues(cbOptions *s3_pb.S3CircuitBreakerOptions, cmdActions []string, cmdValues []int64, limitType *string) error { | |||
| 	if len(*limitType) == 0 { | |||
| 		return fmt.Errorf("type not valid, only 'count' and 'bytes' are allowed") | |||
| 	} | |||
| 
 | |||
| 	if cbOptions.Actions == nil { | |||
| 		cbOptions.Actions = make(map[string]int64, len(cmdActions)) | |||
| 	} | |||
| 
 | |||
| 	if len(cmdValues) > 0 { | |||
| 		for i, action := range cmdActions { | |||
| 			cbOptions.Actions[s3_constants.Concat(action, *limitType)] = cmdValues[i] | |||
| 		} | |||
| 	} | |||
| 	return nil | |||
| } | |||
| 
 | |||
| func deleteBucketsActions(cmdBuckets []string, cbCfg *s3_pb.S3CircuitBreakerConfig, cmdActions []string, limitType *string) { | |||
| 	if cbCfg.Buckets == nil { | |||
| 		return | |||
| 	} | |||
| 
 | |||
| 	if len(cmdActions) == 0 { | |||
| 		for _, bucket := range cmdBuckets { | |||
| 			delete(cbCfg.Buckets, bucket) | |||
| 		} | |||
| 	} else { | |||
| 		for _, bucket := range cmdBuckets { | |||
| 			if cbOption, ok := cbCfg.Buckets[bucket]; ok { | |||
| 				if len(cmdActions) > 0 && cbOption.Actions != nil { | |||
| 					for _, action := range cmdActions { | |||
| 						delete(cbOption.Actions, s3_constants.Concat(action, *limitType)) | |||
| 					} | |||
| 				} | |||
| 
 | |||
| 				if len(cbOption.Actions) == 0 && !cbOption.Enabled { | |||
| 					delete(cbCfg.Buckets, bucket) | |||
| 				} | |||
| 			} | |||
| 		} | |||
| 	} | |||
| 
 | |||
| 	if len(cbCfg.Buckets) == 0 { | |||
| 		cbCfg.Buckets = nil | |||
| 	} | |||
| } | |||
| 
 | |||
| func deleteGlobalActions(cbCfg *s3_pb.S3CircuitBreakerConfig, cmdActions []string, limitType *string) { | |||
| 	globalOptions := cbCfg.Global | |||
| 	if globalOptions == nil { | |||
| 		return | |||
| 	} | |||
| 
 | |||
| 	if len(cmdActions) == 0 && globalOptions.Actions != nil { | |||
| 		globalOptions.Actions = nil | |||
| 		return | |||
| 	} else { | |||
| 		for _, action := range cmdActions { | |||
| 			delete(globalOptions.Actions, s3_constants.Concat(action, *limitType)) | |||
| 		} | |||
| 	} | |||
| 
 | |||
| 	if len(globalOptions.Actions) == 0 && !globalOptions.Enabled { | |||
| 		cbCfg.Global = nil | |||
| 	} | |||
| } | |||
| 
 | |||
| func (c *commandS3CircuitBreaker) initActionsAndValues(buckets, actions, limitType, values *string, parseValues bool) (cmdBuckets, cmdActions []string, cmdValues []int64, err error) { | |||
| 	if len(*buckets) > 0 { | |||
| 		cmdBuckets = strings.Split(*buckets, ",") | |||
| 	} | |||
| 
 | |||
| 	if len(*actions) > 0 { | |||
| 		cmdActions = strings.Split(*actions, ",") | |||
| 
 | |||
| 		//check action valid
 | |||
| 		for _, action := range cmdActions { | |||
| 			var found bool | |||
| 			for _, allowedAction := range s3_constants.AllowedActions { | |||
| 				if allowedAction == action { | |||
| 					found = true | |||
| 				} | |||
| 			} | |||
| 			if !found { | |||
| 				return nil, nil, nil, fmt.Errorf("value(%s) of flag[-action] not valid, allowed actions: %v", *actions, s3_constants.AllowedActions) | |||
| 			} | |||
| 		} | |||
| 	} | |||
| 
 | |||
| 	if !parseValues { | |||
| 		if len(cmdActions) < 0 { | |||
| 			for _, action := range s3_constants.AllowedActions { | |||
| 				cmdActions = append(cmdActions, action) | |||
| 			} | |||
| 		} | |||
| 
 | |||
| 		if len(*limitType) > 0 { | |||
| 			switch *limitType { | |||
| 			case s3_constants.LimitTypeCount: | |||
| 				elements := strings.Split(*values, ",") | |||
| 				if len(cmdActions) != len(elements) { | |||
| 					if len(elements) != 1 || len(elements) == 0 { | |||
| 						return nil, nil, nil, fmt.Errorf("count of flag[-actions] and flag[-counts] not equal") | |||
| 					} | |||
| 					v, err := strconv.Atoi(elements[0]) | |||
| 					if err != nil { | |||
| 						return nil, nil, nil, fmt.Errorf("value of -values must be a legal number(s)") | |||
| 					} | |||
| 					for range cmdActions { | |||
| 						cmdValues = append(cmdValues, int64(v)) | |||
| 					} | |||
| 				} else { | |||
| 					for _, value := range elements { | |||
| 						v, err := strconv.Atoi(value) | |||
| 						if err != nil { | |||
| 							return nil, nil, nil, fmt.Errorf("value of -values must be a legal number(s)") | |||
| 						} | |||
| 						cmdValues = append(cmdValues, int64(v)) | |||
| 					} | |||
| 				} | |||
| 			case s3_constants.LimitTypeBytes: | |||
| 				elements := strings.Split(*values, ",") | |||
| 				if len(cmdActions) != len(elements) { | |||
| 					if len(elements) != 1 || len(elements) == 0 { | |||
| 						return nil, nil, nil, fmt.Errorf("values count of -actions and -values not equal") | |||
| 					} | |||
| 					v, err := parseMBToBytes(elements[0]) | |||
| 					if err != nil { | |||
| 						return nil, nil, nil, fmt.Errorf("value of -max must be a legal number(s)") | |||
| 					} | |||
| 					for range cmdActions { | |||
| 						cmdValues = append(cmdValues, v) | |||
| 					} | |||
| 				} else { | |||
| 					for _, value := range elements { | |||
| 						v, err := parseMBToBytes(value) | |||
| 						if err != nil { | |||
| 							return nil, nil, nil, fmt.Errorf("value of -max must be a legal number(s)") | |||
| 						} | |||
| 						cmdValues = append(cmdValues, v) | |||
| 					} | |||
| 				} | |||
| 			default: | |||
| 				return nil, nil, nil, fmt.Errorf("type not valid, only 'count' and 'bytes' are allowed") | |||
| 			} | |||
| 		} else { | |||
| 			*limitType = "" | |||
| 		} | |||
| 	} | |||
| 	return cmdBuckets, cmdActions, cmdValues, nil | |||
| } | |||
| 
 | |||
| func parseMBToBytes(valStr string) (int64, error) { | |||
| 	v, err := strconv.Atoi(valStr) | |||
| 	v *= 1024 * 1024 | |||
| 	return int64(v), err | |||
| } | |||
| @ -0,0 +1,292 @@ | |||
| package shell | |||
| 
 | |||
| import ( | |||
| 	"bytes" | |||
| 	"encoding/json" | |||
| 	"reflect" | |||
| 	"strings" | |||
| 	"testing" | |||
| ) | |||
| 
 | |||
| type Case struct { | |||
| 	args   []string | |||
| 	result string | |||
| } | |||
| 
 | |||
| var ( | |||
| 	TestCases = []*Case{ | |||
| 		//add circuit breaker config for global
 | |||
| 		{ | |||
| 			args: strings.Split("-global -type Count -actions Read,Write -values 500,200", " "), | |||
| 			result: `{ | |||
| 				"global": { | |||
| 					"enabled": true, | |||
| 					"actions": { | |||
| 						"Read:Count": "500", | |||
| 						"Write:Count": "200" | |||
| 					} | |||
| 				} | |||
| 			}`, | |||
| 		}, | |||
| 
 | |||
| 		//disable global config
 | |||
| 		{ | |||
| 			args: strings.Split("-global -disable", " "), | |||
| 			result: `{ | |||
| 				"global": { | |||
| 					"actions": { | |||
| 						"Read:Count": "500", | |||
| 						"Write:Count": "200" | |||
| 					} | |||
| 				} | |||
| 			}`, | |||
| 		}, | |||
| 
 | |||
| 		//add circuit breaker config for buckets x,y,z
 | |||
| 		{ | |||
| 			args: strings.Split("-buckets x,y,z -type Count -actions Read,Write -values 200,100", " "), | |||
| 			result: `{ | |||
| 				"global": { | |||
| 					"actions": { | |||
| 						"Read:Count": "500", | |||
| 						"Write:Count": "200" | |||
| 					} | |||
| 				}, | |||
| 				"buckets": { | |||
| 					"x": { | |||
| 						"enabled": true, | |||
| 						"actions": { | |||
| 							"Read:Count": "200", | |||
| 							"Write:Count": "100" | |||
| 						} | |||
| 					}, | |||
| 					"y": { | |||
| 						"enabled": true, | |||
| 						"actions": { | |||
| 							"Read:Count": "200", | |||
| 							"Write:Count": "100" | |||
| 						} | |||
| 					}, | |||
| 					"z": { | |||
| 						"enabled": true, | |||
| 						"actions": { | |||
| 							"Read:Count": "200", | |||
| 							"Write:Count": "100" | |||
| 						} | |||
| 					} | |||
| 				} | |||
| 			}`, | |||
| 		}, | |||
| 
 | |||
| 		//disable circuit breaker config of x
 | |||
| 		{ | |||
| 			args: strings.Split("-buckets x -disable", " "), | |||
| 			result: `{ | |||
| 			  "global": { | |||
| 				"actions": { | |||
| 				  "Read:Count": "500", | |||
| 				  "Write:Count": "200" | |||
| 				} | |||
| 			  }, | |||
| 			  "buckets": { | |||
| 				"x": { | |||
| 				  "actions": { | |||
| 					"Read:Count": "200", | |||
| 					"Write:Count": "100" | |||
| 				  } | |||
| 				}, | |||
| 				"y": { | |||
| 				  "enabled": true, | |||
| 				  "actions": { | |||
| 					"Read:Count": "200", | |||
| 					"Write:Count": "100" | |||
| 				  } | |||
| 				}, | |||
| 				"z": { | |||
| 				  "enabled": true, | |||
| 				  "actions": { | |||
| 					"Read:Count": "200", | |||
| 					"Write:Count": "100" | |||
| 				  } | |||
| 				} | |||
| 			  } | |||
| 			}`, | |||
| 		}, | |||
| 
 | |||
| 		//delete circuit breaker config of x
 | |||
| 		{ | |||
| 			args: strings.Split("-buckets x -delete", " "), | |||
| 			result: `{ | |||
| 			  "global": { | |||
| 				"actions": { | |||
| 				  "Read:Count": "500", | |||
| 				  "Write:Count": "200" | |||
| 				} | |||
| 			  }, | |||
| 			  "buckets": { | |||
| 				"y": { | |||
| 				  "enabled": true, | |||
| 				  "actions": { | |||
| 					"Read:Count": "200", | |||
| 					"Write:Count": "100" | |||
| 				  } | |||
| 				}, | |||
| 				"z": { | |||
| 				  "enabled": true, | |||
| 				  "actions": { | |||
| 					"Read:Count": "200", | |||
| 					"Write:Count": "100" | |||
| 				  } | |||
| 				} | |||
| 			  } | |||
| 			}`, | |||
| 		}, | |||
| 
 | |||
| 		//configure the circuit breaker for the size of the uploaded file for bucket x,y
 | |||
| 		{ | |||
| 			args: strings.Split("-buckets x,y -type MB -actions Write -values 1024", " "), | |||
| 			result: `{ | |||
| 			  "global": { | |||
| 				"actions": { | |||
| 				  "Read:Count": "500", | |||
| 				  "Write:Count": "200" | |||
| 				} | |||
| 			  }, | |||
| 			  "buckets": { | |||
| 				"x": { | |||
| 				  "enabled": true, | |||
| 				  "actions": { | |||
| 					"Write:MB": "1073741824" | |||
| 				  } | |||
| 				}, | |||
| 				"y": { | |||
| 				  "enabled": true, | |||
| 				  "actions": { | |||
| 					"Read:Count": "200", | |||
| 					"Write:Count": "100", | |||
| 					"Write:MB": "1073741824" | |||
| 				  } | |||
| 				}, | |||
| 				"z": { | |||
| 				  "enabled": true, | |||
| 				  "actions": { | |||
| 					"Read:Count": "200", | |||
| 					"Write:Count": "100" | |||
| 				  } | |||
| 				} | |||
| 			  } | |||
| 			}`, | |||
| 		}, | |||
| 
 | |||
| 		//delete the circuit breaker configuration for the size of the uploaded file of bucket x,y
 | |||
| 		{ | |||
| 			args: strings.Split("-buckets x,y -type MB -actions Write -delete", " "), | |||
| 			result: `{ | |||
| 			  "global": { | |||
| 				"actions": { | |||
| 				  "Read:Count": "500", | |||
| 				  "Write:Count": "200" | |||
| 				} | |||
| 			  }, | |||
| 			  "buckets": { | |||
| 				"x": { | |||
| 				  "enabled": true | |||
| 				}, | |||
| 				"y": { | |||
| 				  "enabled": true, | |||
| 				  "actions": { | |||
| 					"Read:Count": "200", | |||
| 					"Write:Count": "100" | |||
| 				  } | |||
| 				}, | |||
| 				"z": { | |||
| 				  "enabled": true, | |||
| 				  "actions": { | |||
| 					"Read:Count": "200", | |||
| 					"Write:Count": "100" | |||
| 				  } | |||
| 				} | |||
| 			  } | |||
| 			}`, | |||
| 		}, | |||
| 
 | |||
| 		//enable global circuit breaker config (without -disable flag)
 | |||
| 		{ | |||
| 			args: strings.Split("-global", " "), | |||
| 			result: `{ | |||
| 			  "global": { | |||
| 				"enabled": true, | |||
| 				"actions": { | |||
| 				  "Read:Count": "500", | |||
| 				  "Write:Count": "200" | |||
| 				} | |||
| 			  }, | |||
| 			  "buckets": { | |||
| 				"x": { | |||
| 				  "enabled": true | |||
| 				}, | |||
| 				"y": { | |||
| 				  "enabled": true, | |||
| 				  "actions": { | |||
| 					"Read:Count": "200", | |||
| 					"Write:Count": "100" | |||
| 				  } | |||
| 				}, | |||
| 				"z": { | |||
| 				  "enabled": true, | |||
| 				  "actions": { | |||
| 					"Read:Count": "200", | |||
| 					"Write:Count": "100" | |||
| 				  } | |||
| 				} | |||
| 			  } | |||
| 			}`, | |||
| 		}, | |||
| 
 | |||
| 		//clear all circuit breaker config
 | |||
| 		{ | |||
| 			args: strings.Split("-delete", " "), | |||
| 			result: `{ | |||
| 			 | |||
| 			}`, | |||
| 		}, | |||
| 	} | |||
| ) | |||
| 
 | |||
| func TestCircuitBreakerShell(t *testing.T) { | |||
| 	var writeBuf bytes.Buffer | |||
| 	cmd := &commandS3CircuitBreaker{} | |||
| 	LoadConfig = func(commandEnv *CommandEnv, dir string, file string, buf *bytes.Buffer) error { | |||
| 		_, err := buf.Write(writeBuf.Bytes()) | |||
| 		if err != nil { | |||
| 			return err | |||
| 		} | |||
| 		writeBuf.Reset() | |||
| 		return nil | |||
| 	} | |||
| 
 | |||
| 	for i, tc := range TestCases { | |||
| 		err := cmd.Do(tc.args, nil, &writeBuf) | |||
| 		if err != nil { | |||
| 			t.Fatal(err) | |||
| 		} | |||
| 		if i != 0 { | |||
| 			result := writeBuf.String() | |||
| 
 | |||
| 			actual := make(map[string]interface{}) | |||
| 			err := json.Unmarshal([]byte(result), &actual) | |||
| 			if err != nil { | |||
| 				t.Error(err) | |||
| 			} | |||
| 
 | |||
| 			expect := make(map[string]interface{}) | |||
| 			err = json.Unmarshal([]byte(result), &expect) | |||
| 			if err != nil { | |||
| 				t.Error(err) | |||
| 			} | |||
| 			if !reflect.DeepEqual(actual, expect) { | |||
| 				t.Fatal("result of s3 circuit breaker shell command is unexpect!") | |||
| 			} | |||
| 		} | |||
| 	} | |||
| } | |||
						Write
						Preview
					
					
					Loading…
					
					Cancel
						Save
					
		Reference in new issue