You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

353 lines
9.8 KiB

  1. package shell
  2. import (
  3. "bytes"
  4. "flag"
  5. "fmt"
  6. "github.com/alecthomas/units"
  7. "github.com/chrislusf/seaweedfs/weed/filer"
  8. "github.com/chrislusf/seaweedfs/weed/pb/s3_pb"
  9. "github.com/chrislusf/seaweedfs/weed/s3api/s3_constants"
  10. "io"
  11. "strconv"
  12. "strings"
  13. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  14. )
  15. var LoadConfig = loadConfig
  16. func init() {
  17. Commands = append(Commands, &commandS3CircuitBreaker{})
  18. }
  19. type commandS3CircuitBreaker struct {
  20. }
  21. func (c *commandS3CircuitBreaker) Name() string {
  22. return "s3.circuitBreaker"
  23. }
  24. func (c *commandS3CircuitBreaker) Help() string {
  25. return `configure and apply s3 circuit breaker options for each bucket
  26. # examples
  27. # add circuit breaker config for global
  28. s3.circuitBreaker -global -type count -actions Read,Write -values 500,200 -apply
  29. # disable global config
  30. s3.circuitBreaker -global -disable -apply
  31. # add circuit breaker config for buckets x,y,z
  32. s3.circuitBreaker -buckets x,y,z -type count -actions Read,Write -values 200,100 -apply
  33. # disable circuit breaker config of x
  34. s3.circuitBreaker -buckets x -disable -apply
  35. # delete circuit breaker config of x
  36. s3.circuitBreaker -buckets x -delete -apply
  37. # clear all circuit breaker config
  38. s3.circuitBreaker -delete -apply
  39. `
  40. }
  41. func (c *commandS3CircuitBreaker) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
  42. dir := s3_constants.CircuitBreakerConfigDir
  43. file := s3_constants.CircuitBreakerConfigFile
  44. s3CircuitBreakerCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
  45. buckets := s3CircuitBreakerCommand.String("buckets", "", "the bucket name(s) to configure, eg: -buckets x,y,z")
  46. global := s3CircuitBreakerCommand.Bool("global", false, "configure global circuit breaker")
  47. actions := s3CircuitBreakerCommand.String("actions", "", "comma separated actions names: Read,Write,List,Tagging,Admin")
  48. limitType := s3CircuitBreakerCommand.String("type", "", "count|bytes simultaneous requests count")
  49. values := s3CircuitBreakerCommand.String("values", "", "comma separated max values,Maximum number of simultaneous requests content length, support byte unit: eg: 1k, 10m, 1g")
  50. disabled := s3CircuitBreakerCommand.Bool("disable", false, "disable global or buckets circuit breaker")
  51. deleted := s3CircuitBreakerCommand.Bool("delete", false, "delete circuit breaker config")
  52. apply := s3CircuitBreakerCommand.Bool("apply", false, "update and apply current configuration")
  53. if err = s3CircuitBreakerCommand.Parse(args); err != nil {
  54. return nil
  55. }
  56. var buf bytes.Buffer
  57. err = LoadConfig(commandEnv, dir, file, &buf)
  58. if err != nil {
  59. return err
  60. }
  61. cbCfg := &s3_pb.S3CircuitBreakerConfig{
  62. Buckets: make(map[string]*s3_pb.S3CircuitBreakerOptions),
  63. }
  64. if buf.Len() > 0 {
  65. if err = filer.ParseS3ConfigurationFromBytes(buf.Bytes(), cbCfg); err != nil {
  66. return err
  67. }
  68. }
  69. if *deleted {
  70. cmdBuckets, cmdActions, _, err := c.initActionsAndValues(buckets, actions, limitType, values, true)
  71. if err != nil {
  72. return err
  73. }
  74. if len(cmdBuckets) <= 0 && !*global {
  75. if len(cmdActions) > 0 {
  76. deleteGlobalActions(cbCfg, cmdActions, limitType)
  77. if cbCfg.Buckets != nil {
  78. var allBuckets []string
  79. for bucket := range cbCfg.Buckets {
  80. allBuckets = append(allBuckets, bucket)
  81. }
  82. deleteBucketsActions(allBuckets, cbCfg, cmdActions, limitType)
  83. }
  84. } else {
  85. cbCfg.Global = nil
  86. cbCfg.Buckets = nil
  87. }
  88. } else {
  89. if len(cmdBuckets) > 0 {
  90. deleteBucketsActions(cmdBuckets, cbCfg, cmdActions, limitType)
  91. }
  92. if *global {
  93. deleteGlobalActions(cbCfg, cmdActions, nil)
  94. }
  95. }
  96. } else {
  97. cmdBuckets, cmdActions, cmdValues, err := c.initActionsAndValues(buckets, actions, limitType, values, *disabled)
  98. if err != nil {
  99. return err
  100. }
  101. if len(cmdActions) > 0 && len(*buckets) <= 0 && !*global {
  102. return fmt.Errorf("one of -global and -buckets must be specified")
  103. }
  104. if len(*buckets) > 0 {
  105. for _, bucket := range cmdBuckets {
  106. var cbOptions *s3_pb.S3CircuitBreakerOptions
  107. var exists bool
  108. if cbOptions, exists = cbCfg.Buckets[bucket]; !exists {
  109. cbOptions = &s3_pb.S3CircuitBreakerOptions{}
  110. cbCfg.Buckets[bucket] = cbOptions
  111. }
  112. cbOptions.Enabled = !*disabled
  113. if len(cmdActions) > 0 {
  114. err = insertOrUpdateValues(cbOptions, cmdActions, cmdValues, limitType)
  115. if err != nil {
  116. return err
  117. }
  118. }
  119. if len(cbOptions.Actions) <= 0 && !cbOptions.Enabled {
  120. delete(cbCfg.Buckets, bucket)
  121. }
  122. }
  123. }
  124. if *global {
  125. globalOptions := cbCfg.Global
  126. if globalOptions == nil {
  127. globalOptions = &s3_pb.S3CircuitBreakerOptions{Actions: make(map[string]int64, len(cmdActions))}
  128. cbCfg.Global = globalOptions
  129. }
  130. globalOptions.Enabled = !*disabled
  131. if len(cmdActions) > 0 {
  132. err = insertOrUpdateValues(globalOptions, cmdActions, cmdValues, limitType)
  133. if err != nil {
  134. return err
  135. }
  136. }
  137. if len(globalOptions.Actions) <= 0 && !globalOptions.Enabled {
  138. cbCfg.Global = nil
  139. }
  140. }
  141. }
  142. buf.Reset()
  143. err = filer.ProtoToText(&buf, cbCfg)
  144. if err != nil {
  145. return err
  146. }
  147. _, _ = fmt.Fprintf(writer, string(buf.Bytes()))
  148. _, _ = fmt.Fprintln(writer)
  149. if *apply {
  150. if err := commandEnv.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
  151. return filer.SaveInsideFiler(client, dir, file, buf.Bytes())
  152. }); err != nil {
  153. return err
  154. }
  155. }
  156. return nil
  157. }
  158. func loadConfig(commandEnv *CommandEnv, dir string, file string, buf *bytes.Buffer) error {
  159. if err := commandEnv.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
  160. return filer.ReadEntry(commandEnv.MasterClient, client, dir, file, buf)
  161. }); err != nil && err != filer_pb.ErrNotFound {
  162. return err
  163. }
  164. return nil
  165. }
  166. func insertOrUpdateValues(cbOptions *s3_pb.S3CircuitBreakerOptions, cmdActions []string, cmdValues []int64, limitType *string) error {
  167. if len(*limitType) == 0 {
  168. return fmt.Errorf("type not valid, only 'count' and 'bytes' are allowed")
  169. }
  170. if cbOptions.Actions == nil {
  171. cbOptions.Actions = make(map[string]int64, len(cmdActions))
  172. }
  173. if len(cmdValues) > 0 {
  174. for i, action := range cmdActions {
  175. cbOptions.Actions[s3_constants.Concat(action, *limitType)] = cmdValues[i]
  176. }
  177. }
  178. return nil
  179. }
  180. func deleteBucketsActions(cmdBuckets []string, cbCfg *s3_pb.S3CircuitBreakerConfig, cmdActions []string, limitType *string) {
  181. if cbCfg.Buckets == nil {
  182. return
  183. }
  184. if len(cmdActions) == 0 {
  185. for _, bucket := range cmdBuckets {
  186. delete(cbCfg.Buckets, bucket)
  187. }
  188. } else {
  189. for _, bucket := range cmdBuckets {
  190. if cbOption, ok := cbCfg.Buckets[bucket]; ok {
  191. if len(cmdActions) > 0 && cbOption.Actions != nil {
  192. for _, action := range cmdActions {
  193. delete(cbOption.Actions, s3_constants.Concat(action, *limitType))
  194. }
  195. }
  196. if len(cbOption.Actions) == 0 && !cbOption.Enabled {
  197. delete(cbCfg.Buckets, bucket)
  198. }
  199. }
  200. }
  201. }
  202. if len(cbCfg.Buckets) == 0 {
  203. cbCfg.Buckets = nil
  204. }
  205. }
  206. func deleteGlobalActions(cbCfg *s3_pb.S3CircuitBreakerConfig, cmdActions []string, limitType *string) {
  207. globalOptions := cbCfg.Global
  208. if globalOptions == nil {
  209. return
  210. }
  211. if len(cmdActions) == 0 && globalOptions.Actions != nil {
  212. globalOptions.Actions = nil
  213. return
  214. } else {
  215. for _, action := range cmdActions {
  216. delete(globalOptions.Actions, s3_constants.Concat(action, *limitType))
  217. }
  218. }
  219. if len(globalOptions.Actions) == 0 && !globalOptions.Enabled {
  220. cbCfg.Global = nil
  221. }
  222. }
  223. func (c *commandS3CircuitBreaker) initActionsAndValues(buckets, actions, limitType, values *string, parseValues bool) (cmdBuckets, cmdActions []string, cmdValues []int64, err error) {
  224. if len(*buckets) > 0 {
  225. cmdBuckets = strings.Split(*buckets, ",")
  226. }
  227. if len(*actions) > 0 {
  228. cmdActions = strings.Split(*actions, ",")
  229. //check action valid
  230. for _, action := range cmdActions {
  231. var found bool
  232. for _, allowedAction := range s3_constants.AllowedActions {
  233. if allowedAction == action {
  234. found = true
  235. }
  236. }
  237. if !found {
  238. return nil, nil, nil, fmt.Errorf("value(%s) of flag[-action] not valid, allowed actions: %v", *actions, s3_constants.AllowedActions)
  239. }
  240. }
  241. }
  242. if !parseValues {
  243. if len(cmdActions) < 0 {
  244. for _, action := range s3_constants.AllowedActions {
  245. cmdActions = append(cmdActions, action)
  246. }
  247. }
  248. if len(*limitType) > 0 {
  249. switch *limitType {
  250. case s3_constants.LimitTypeCount:
  251. elements := strings.Split(*values, ",")
  252. if len(cmdActions) != len(elements) {
  253. if len(elements) != 1 || len(elements) == 0 {
  254. return nil, nil, nil, fmt.Errorf("count of flag[-actions] and flag[-counts] not equal")
  255. }
  256. v, err := strconv.Atoi(elements[0])
  257. if err != nil {
  258. return nil, nil, nil, fmt.Errorf("value of -values must be a legal number(s)")
  259. }
  260. for range cmdActions {
  261. cmdValues = append(cmdValues, int64(v))
  262. }
  263. } else {
  264. for _, value := range elements {
  265. v, err := strconv.Atoi(value)
  266. if err != nil {
  267. return nil, nil, nil, fmt.Errorf("value of -values must be a legal number(s)")
  268. }
  269. cmdValues = append(cmdValues, int64(v))
  270. }
  271. }
  272. case s3_constants.LimitTypeBytes:
  273. elements := strings.Split(*values, ",")
  274. if len(cmdActions) != len(elements) {
  275. if len(elements) != 1 || len(elements) == 0 {
  276. return nil, nil, nil, fmt.Errorf("values count of -actions and -values not equal")
  277. }
  278. v, err := units.ParseStrictBytes(elements[0])
  279. if err != nil {
  280. return nil, nil, nil, fmt.Errorf("value of -max must be a legal number(s)")
  281. }
  282. for range cmdActions {
  283. cmdValues = append(cmdValues, v)
  284. }
  285. } else {
  286. for _, value := range elements {
  287. v, err := units.ParseStrictBytes(value)
  288. if err != nil {
  289. return nil, nil, nil, fmt.Errorf("value of -max must be a legal number(s)")
  290. }
  291. cmdValues = append(cmdValues, v)
  292. }
  293. }
  294. default:
  295. return nil, nil, nil, fmt.Errorf("type not valid, only 'count' and 'bytes' are allowed")
  296. }
  297. } else {
  298. *limitType = ""
  299. }
  300. }
  301. return cmdBuckets, cmdActions, cmdValues, nil
  302. }