7 changed files with 295 additions and 110 deletions
-
8weed/s3api/auth_credentials_subscribe.go
-
9weed/s3api/s3_config/s3_config.go
-
57weed/s3api/s3api_circuit_breaker.go
-
97weed/s3api/s3api_circuit_breaker_test.go
-
81weed/s3api/s3api_server.go
-
84weed/shell/command_s3_circuitbreaker.go
-
69weed/shell/command_s3_circuitbreaker_test.go
@ -1,11 +1,14 @@ |
|||
package config |
|||
package s3_config |
|||
|
|||
import "strings" |
|||
import ( |
|||
"github.com/chrislusf/seaweedfs/weed/s3api/s3_constants" |
|||
"strings" |
|||
) |
|||
|
|||
var ( |
|||
CircuitBreakerConfigDir = "/etc/s3" |
|||
CircuitBreakerConfigFile = "circuit_breaker.json" |
|||
AllowedActions = []string{"Read", "Write", "List", "Tagging", "Admin"} |
|||
AllowedActions = []string{s3_constants.ACTION_READ, s3_constants.ACTION_WRITE, s3_constants.ACTION_LIST, s3_constants.ACTION_TAGGING, s3_constants.ACTION_ADMIN} |
|||
LimitTypeCount = "count" |
|||
LimitTypeBytes = "bytes" |
|||
Separator = ":" |
@ -0,0 +1,97 @@ |
|||
package s3api |
|||
|
|||
import ( |
|||
"github.com/chrislusf/seaweedfs/weed/pb/s3_pb" |
|||
"github.com/chrislusf/seaweedfs/weed/s3api/s3_config" |
|||
"github.com/chrislusf/seaweedfs/weed/s3api/s3_constants" |
|||
"github.com/chrislusf/seaweedfs/weed/s3api/s3err" |
|||
"go.uber.org/atomic" |
|||
"net/http" |
|||
"sync" |
|||
"testing" |
|||
) |
|||
|
|||
type TestLimitCase struct { |
|||
actionName string |
|||
limitType string |
|||
bucketLimitValue int64 |
|||
globalLimitValue int64 |
|||
|
|||
routineCount int |
|||
reqBytes int64 |
|||
|
|||
successCount int64 |
|||
} |
|||
|
|||
var ( |
|||
bucket = "/test" |
|||
action = s3_constants.ACTION_READ |
|||
TestLimitCases = []*TestLimitCase{ |
|||
{action, s3_config.LimitTypeCount, 5, 5, 6, 1024, 5}, |
|||
{action, s3_config.LimitTypeCount, 6, 6, 6, 1024, 6}, |
|||
{action, s3_config.LimitTypeCount, 5, 6, 6, 1024, 5}, |
|||
{action, s3_config.LimitTypeBytes, 1024, 1024, 6, 200, 5}, |
|||
{action, s3_config.LimitTypeBytes, 1200, 1200, 6, 200, 6}, |
|||
{action, s3_config.LimitTypeBytes, 11990, 11990, 60, 200, 59}, |
|||
{action, s3_config.LimitTypeBytes, 11790, 11990, 60, 200, 58}, |
|||
} |
|||
) |
|||
|
|||
func TestLimit(t *testing.T) { |
|||
for _, tc := range TestLimitCases { |
|||
circuitBreakerConfig := &s3_pb.S3CircuitBreakerConfig{ |
|||
Global: &s3_pb.CbOptions{ |
|||
Enabled: true, |
|||
Actions: map[string]int64{ |
|||
s3_config.Concat(tc.actionName, tc.limitType): tc.globalLimitValue, |
|||
}, |
|||
}, |
|||
Buckets: map[string]*s3_pb.CbOptions{ |
|||
bucket: { |
|||
Enabled: true, |
|||
Actions: map[string]int64{ |
|||
s3_config.Concat(tc.actionName, tc.limitType): tc.bucketLimitValue, |
|||
}, |
|||
}, |
|||
}, |
|||
} |
|||
circuitBreaker := &CircuitBreaker{ |
|||
counters: make(map[string]*atomic.Int64), |
|||
limitations: make(map[string]int64), |
|||
} |
|||
err := circuitBreaker.loadCircuitBreakerConfig(circuitBreakerConfig) |
|||
if err != nil { |
|||
t.Fatal(err) |
|||
} |
|||
|
|||
successCount := doLimit(circuitBreaker, tc.routineCount, &http.Request{ContentLength: tc.reqBytes}) |
|||
if successCount != tc.successCount { |
|||
t.Errorf("successCount not equal, expect=%d, actual=%d", tc.successCount, successCount) |
|||
} |
|||
} |
|||
} |
|||
|
|||
func doLimit(circuitBreaker *CircuitBreaker, routineCount int, r *http.Request) int64 { |
|||
var successCounter atomic.Int64 |
|||
resultCh := make(chan []func(), routineCount) |
|||
var wg sync.WaitGroup |
|||
for i := 0; i < routineCount; i++ { |
|||
wg.Add(1) |
|||
go func() { |
|||
defer wg.Done() |
|||
rollbackFn, errCode := circuitBreaker.limit(r, bucket, action) |
|||
if errCode == s3err.ErrNone { |
|||
successCounter.Inc() |
|||
} |
|||
resultCh <- rollbackFn |
|||
}() |
|||
} |
|||
wg.Wait() |
|||
close(resultCh) |
|||
for fns := range resultCh { |
|||
for _, fn := range fns { |
|||
fn() |
|||
} |
|||
} |
|||
return successCounter.Load() |
|||
} |
@ -1,7 +1,74 @@ |
|||
package shell |
|||
|
|||
import "testing" |
|||
import ( |
|||
"bytes" |
|||
"strings" |
|||
"testing" |
|||
) |
|||
|
|||
type Case struct { |
|||
args []string |
|||
result string |
|||
} |
|||
|
|||
var ( |
|||
TestCases = []*Case{ |
|||
//add circuit breaker config for global
|
|||
{ |
|||
args: strings.Split("-global -type count -actions Read,Write -values 500,200", " "), |
|||
result: "{\n \"global\": {\n \"enabled\": true,\n \"actions\": {\n \"Read:count\": \"500\",\n \"Write:count\": \"200\"\n }\n }\n}\n", |
|||
}, |
|||
//disable global config
|
|||
{ |
|||
args: strings.Split("-global -disable", " "), |
|||
result: "{\n \"global\": {\n \"actions\": {\n \"Read:count\": \"500\",\n \"Write:count\": \"200\"\n }\n }\n}\n", |
|||
}, |
|||
//add circuit breaker config for buckets x,y,z
|
|||
{ |
|||
args: strings.Split("-buckets x,y,z -type count -actions Read,Write -values 200,100", " "), |
|||
result: "{\n \"global\": {\n \"actions\": {\n \"Read:count\": \"500\",\n \"Write:count\": \"200\"\n }\n },\n \"buckets\": {\n \"x\": {\n \"enabled\": true,\n \"actions\": {\n \"Read:count\": \"200\",\n \"Write:count\": \"100\"\n }\n },\n \"y\": {\n \"enabled\": true,\n \"actions\": {\n \"Read:count\": \"200\",\n \"Write:count\": \"100\"\n }\n },\n \"z\": {\n \"enabled\": true,\n \"actions\": {\n \"Read:count\": \"200\",\n \"Write:count\": \"100\"\n }\n }\n }\n}\n", |
|||
}, |
|||
//disable circuit breaker config of x
|
|||
{ |
|||
args: strings.Split("-buckets x -disable", " "), |
|||
result: "{\n \"global\": {\n \"actions\": {\n \"Read:count\": \"500\",\n \"Write:count\": \"200\"\n }\n },\n \"buckets\": {\n \"x\": {\n \"actions\": {\n \"Read:count\": \"200\",\n \"Write:count\": \"100\"\n }\n },\n \"y\": {\n \"enabled\": true,\n \"actions\": {\n \"Read:count\": \"200\",\n \"Write:count\": \"100\"\n }\n },\n \"z\": {\n \"enabled\": true,\n \"actions\": {\n \"Read:count\": \"200\",\n \"Write:count\": \"100\"\n }\n }\n }\n}\n", |
|||
}, |
|||
//delete circuit breaker config of x
|
|||
{ |
|||
args: strings.Split("-buckets x -delete", " "), |
|||
result: "{\n \"global\": {\n \"actions\": {\n \"Read:count\": \"500\",\n \"Write:count\": \"200\"\n }\n },\n \"buckets\": {\n \"y\": {\n \"enabled\": true,\n \"actions\": {\n \"Read:count\": \"200\",\n \"Write:count\": \"100\"\n }\n },\n \"z\": {\n \"enabled\": true,\n \"actions\": {\n \"Read:count\": \"200\",\n \"Write:count\": \"100\"\n }\n }\n }\n}\n", |
|||
}, |
|||
//clear all circuit breaker config
|
|||
{ |
|||
args: strings.Split("-delete", " "), |
|||
result: "{\n\n}\n", |
|||
}, |
|||
} |
|||
) |
|||
|
|||
func TestCircuitBreakerShell(t *testing.T) { |
|||
var writeBuf bytes.Buffer |
|||
cmd := &commandS3CircuitBreaker{} |
|||
LoadConfig = func(commandEnv *CommandEnv, dir string, file string, buf *bytes.Buffer) error { |
|||
_, err := buf.Write(writeBuf.Bytes()) |
|||
if err != nil { |
|||
return err |
|||
} |
|||
writeBuf.Reset() |
|||
return nil |
|||
} |
|||
|
|||
for i, tc := range TestCases { |
|||
err := cmd.Do(tc.args, nil, &writeBuf) |
|||
if err != nil { |
|||
t.Fatal(err) |
|||
} |
|||
if i != 0 { |
|||
result := writeBuf.String() |
|||
if result != tc.result { |
|||
t.Fatal("result of s3 circuit breaker shell command is unexpect!") |
|||
} |
|||
|
|||
} |
|||
} |
|||
} |
Write
Preview
Loading…
Cancel
Save
Reference in new issue