358 lines
9.9 KiB

  1. package shell
  2. import (
  3. "bytes"
  4. "flag"
  5. "fmt"
  6. "github.com/seaweedfs/seaweedfs/weed/filer"
  7. "github.com/seaweedfs/seaweedfs/weed/pb/s3_pb"
  8. "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
  9. "io"
  10. "strconv"
  11. "strings"
  12. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  13. )
  14. var LoadConfig = loadConfig
  15. func init() {
  16. Commands = append(Commands, &commandS3CircuitBreaker{})
  17. }
  18. type commandS3CircuitBreaker struct {
  19. }
  20. func (c *commandS3CircuitBreaker) Name() string {
  21. return "s3.circuitBreaker"
  22. }
  23. func (c *commandS3CircuitBreaker) Help() string {
  24. return `configure and apply s3 circuit breaker options for each bucket
  25. # examples
  26. # add circuit breaker config for global
  27. s3.circuitBreaker -global -type count -actions Read,Write -values 500,200 -apply
  28. # disable global config
  29. s3.circuitBreaker -global -disable -apply
  30. # add circuit breaker config for buckets x,y,z
  31. s3.circuitBreaker -buckets x,y,z -type count -actions Read,Write -values 200,100 -apply
  32. # disable circuit breaker config of x
  33. s3.circuitBreaker -buckets x -disable -apply
  34. # delete circuit breaker config of x
  35. s3.circuitBreaker -buckets x -delete -apply
  36. # clear all circuit breaker config
  37. s3.circuitBreaker -delete -apply
  38. `
  39. }
  40. func (c *commandS3CircuitBreaker) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
  41. dir := s3_constants.CircuitBreakerConfigDir
  42. file := s3_constants.CircuitBreakerConfigFile
  43. s3CircuitBreakerCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
  44. buckets := s3CircuitBreakerCommand.String("buckets", "", "the bucket name(s) to configure, eg: -buckets x,y,z")
  45. global := s3CircuitBreakerCommand.Bool("global", false, "configure global circuit breaker")
  46. actions := s3CircuitBreakerCommand.String("actions", "", "comma separated actions names: Read,Write,List,Tagging,Admin")
  47. limitType := s3CircuitBreakerCommand.String("type", "", "'Count' or 'MB'; Count represents the number of simultaneous requests, and MB represents the content size of all simultaneous requests")
  48. values := s3CircuitBreakerCommand.String("values", "", "comma separated values")
  49. disabled := s3CircuitBreakerCommand.Bool("disable", false, "disable global or buckets circuit breaker")
  50. deleted := s3CircuitBreakerCommand.Bool("delete", false, "delete circuit breaker config")
  51. apply := s3CircuitBreakerCommand.Bool("apply", false, "update and apply current configuration")
  52. if err = s3CircuitBreakerCommand.Parse(args); err != nil {
  53. return nil
  54. }
  55. var buf bytes.Buffer
  56. err = LoadConfig(commandEnv, dir, file, &buf)
  57. if err != nil {
  58. return err
  59. }
  60. cbCfg := &s3_pb.S3CircuitBreakerConfig{
  61. Buckets: make(map[string]*s3_pb.S3CircuitBreakerOptions),
  62. }
  63. if buf.Len() > 0 {
  64. if err = filer.ParseS3ConfigurationFromBytes(buf.Bytes(), cbCfg); err != nil {
  65. return err
  66. }
  67. }
  68. if *deleted {
  69. cmdBuckets, cmdActions, _, err := c.initActionsAndValues(buckets, actions, limitType, values, true)
  70. if err != nil {
  71. return err
  72. }
  73. if len(cmdBuckets) <= 0 && !*global {
  74. if len(cmdActions) > 0 {
  75. deleteGlobalActions(cbCfg, cmdActions, limitType)
  76. if cbCfg.Buckets != nil {
  77. var allBuckets []string
  78. for bucket := range cbCfg.Buckets {
  79. allBuckets = append(allBuckets, bucket)
  80. }
  81. deleteBucketsActions(allBuckets, cbCfg, cmdActions, limitType)
  82. }
  83. } else {
  84. cbCfg.Global = nil
  85. cbCfg.Buckets = nil
  86. }
  87. } else {
  88. if len(cmdBuckets) > 0 {
  89. deleteBucketsActions(cmdBuckets, cbCfg, cmdActions, limitType)
  90. }
  91. if *global {
  92. deleteGlobalActions(cbCfg, cmdActions, nil)
  93. }
  94. }
  95. } else {
  96. cmdBuckets, cmdActions, cmdValues, err := c.initActionsAndValues(buckets, actions, limitType, values, *disabled)
  97. if err != nil {
  98. return err
  99. }
  100. if len(cmdActions) > 0 && len(*buckets) <= 0 && !*global {
  101. return fmt.Errorf("one of -global and -buckets must be specified")
  102. }
  103. if len(*buckets) > 0 {
  104. for _, bucket := range cmdBuckets {
  105. var cbOptions *s3_pb.S3CircuitBreakerOptions
  106. var exists bool
  107. if cbOptions, exists = cbCfg.Buckets[bucket]; !exists {
  108. cbOptions = &s3_pb.S3CircuitBreakerOptions{}
  109. cbCfg.Buckets[bucket] = cbOptions
  110. }
  111. cbOptions.Enabled = !*disabled
  112. if len(cmdActions) > 0 {
  113. err = insertOrUpdateValues(cbOptions, cmdActions, cmdValues, limitType)
  114. if err != nil {
  115. return err
  116. }
  117. }
  118. if len(cbOptions.Actions) <= 0 && !cbOptions.Enabled {
  119. delete(cbCfg.Buckets, bucket)
  120. }
  121. }
  122. }
  123. if *global {
  124. globalOptions := cbCfg.Global
  125. if globalOptions == nil {
  126. globalOptions = &s3_pb.S3CircuitBreakerOptions{Actions: make(map[string]int64, len(cmdActions))}
  127. cbCfg.Global = globalOptions
  128. }
  129. globalOptions.Enabled = !*disabled
  130. if len(cmdActions) > 0 {
  131. err = insertOrUpdateValues(globalOptions, cmdActions, cmdValues, limitType)
  132. if err != nil {
  133. return err
  134. }
  135. }
  136. if len(globalOptions.Actions) <= 0 && !globalOptions.Enabled {
  137. cbCfg.Global = nil
  138. }
  139. }
  140. }
  141. buf.Reset()
  142. err = filer.ProtoToText(&buf, cbCfg)
  143. if err != nil {
  144. return err
  145. }
  146. _, _ = fmt.Fprintf(writer, string(buf.Bytes()))
  147. _, _ = fmt.Fprintln(writer)
  148. if *apply {
  149. if err := commandEnv.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
  150. return filer.SaveInsideFiler(client, dir, file, buf.Bytes())
  151. }); err != nil {
  152. return err
  153. }
  154. }
  155. return nil
  156. }
  157. func loadConfig(commandEnv *CommandEnv, dir string, file string, buf *bytes.Buffer) error {
  158. if err := commandEnv.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
  159. return filer.ReadEntry(commandEnv.MasterClient, client, dir, file, buf)
  160. }); err != nil && err != filer_pb.ErrNotFound {
  161. return err
  162. }
  163. return nil
  164. }
  165. func insertOrUpdateValues(cbOptions *s3_pb.S3CircuitBreakerOptions, cmdActions []string, cmdValues []int64, limitType *string) error {
  166. if len(*limitType) == 0 {
  167. return fmt.Errorf("type not valid, only 'count' and 'bytes' are allowed")
  168. }
  169. if cbOptions.Actions == nil {
  170. cbOptions.Actions = make(map[string]int64, len(cmdActions))
  171. }
  172. if len(cmdValues) > 0 {
  173. for i, action := range cmdActions {
  174. cbOptions.Actions[s3_constants.Concat(action, *limitType)] = cmdValues[i]
  175. }
  176. }
  177. return nil
  178. }
  179. func deleteBucketsActions(cmdBuckets []string, cbCfg *s3_pb.S3CircuitBreakerConfig, cmdActions []string, limitType *string) {
  180. if cbCfg.Buckets == nil {
  181. return
  182. }
  183. if len(cmdActions) == 0 {
  184. for _, bucket := range cmdBuckets {
  185. delete(cbCfg.Buckets, bucket)
  186. }
  187. } else {
  188. for _, bucket := range cmdBuckets {
  189. if cbOption, ok := cbCfg.Buckets[bucket]; ok {
  190. if len(cmdActions) > 0 && cbOption.Actions != nil {
  191. for _, action := range cmdActions {
  192. delete(cbOption.Actions, s3_constants.Concat(action, *limitType))
  193. }
  194. }
  195. if len(cbOption.Actions) == 0 && !cbOption.Enabled {
  196. delete(cbCfg.Buckets, bucket)
  197. }
  198. }
  199. }
  200. }
  201. if len(cbCfg.Buckets) == 0 {
  202. cbCfg.Buckets = nil
  203. }
  204. }
  205. func deleteGlobalActions(cbCfg *s3_pb.S3CircuitBreakerConfig, cmdActions []string, limitType *string) {
  206. globalOptions := cbCfg.Global
  207. if globalOptions == nil {
  208. return
  209. }
  210. if len(cmdActions) == 0 && globalOptions.Actions != nil {
  211. globalOptions.Actions = nil
  212. return
  213. } else {
  214. for _, action := range cmdActions {
  215. delete(globalOptions.Actions, s3_constants.Concat(action, *limitType))
  216. }
  217. }
  218. if len(globalOptions.Actions) == 0 && !globalOptions.Enabled {
  219. cbCfg.Global = nil
  220. }
  221. }
  222. func (c *commandS3CircuitBreaker) initActionsAndValues(buckets, actions, limitType, values *string, parseValues bool) (cmdBuckets, cmdActions []string, cmdValues []int64, err error) {
  223. if len(*buckets) > 0 {
  224. cmdBuckets = strings.Split(*buckets, ",")
  225. }
  226. if len(*actions) > 0 {
  227. cmdActions = strings.Split(*actions, ",")
  228. //check action valid
  229. for _, action := range cmdActions {
  230. var found bool
  231. for _, allowedAction := range s3_constants.AllowedActions {
  232. if allowedAction == action {
  233. found = true
  234. }
  235. }
  236. if !found {
  237. return nil, nil, nil, fmt.Errorf("value(%s) of flag[-action] not valid, allowed actions: %v", *actions, s3_constants.AllowedActions)
  238. }
  239. }
  240. }
  241. if !parseValues {
  242. if len(cmdActions) < 0 {
  243. for _, action := range s3_constants.AllowedActions {
  244. cmdActions = append(cmdActions, action)
  245. }
  246. }
  247. if len(*limitType) > 0 {
  248. switch *limitType {
  249. case s3_constants.LimitTypeCount:
  250. elements := strings.Split(*values, ",")
  251. if len(cmdActions) != len(elements) {
  252. if len(elements) != 1 || len(elements) == 0 {
  253. return nil, nil, nil, fmt.Errorf("count of flag[-actions] and flag[-counts] not equal")
  254. }
  255. v, err := strconv.Atoi(elements[0])
  256. if err != nil {
  257. return nil, nil, nil, fmt.Errorf("value of -values must be a legal number(s)")
  258. }
  259. for range cmdActions {
  260. cmdValues = append(cmdValues, int64(v))
  261. }
  262. } else {
  263. for _, value := range elements {
  264. v, err := strconv.Atoi(value)
  265. if err != nil {
  266. return nil, nil, nil, fmt.Errorf("value of -values must be a legal number(s)")
  267. }
  268. cmdValues = append(cmdValues, int64(v))
  269. }
  270. }
  271. case s3_constants.LimitTypeBytes:
  272. elements := strings.Split(*values, ",")
  273. if len(cmdActions) != len(elements) {
  274. if len(elements) != 1 || len(elements) == 0 {
  275. return nil, nil, nil, fmt.Errorf("values count of -actions and -values not equal")
  276. }
  277. v, err := parseMBToBytes(elements[0])
  278. if err != nil {
  279. return nil, nil, nil, fmt.Errorf("value of -max must be a legal number(s)")
  280. }
  281. for range cmdActions {
  282. cmdValues = append(cmdValues, v)
  283. }
  284. } else {
  285. for _, value := range elements {
  286. v, err := parseMBToBytes(value)
  287. if err != nil {
  288. return nil, nil, nil, fmt.Errorf("value of -max must be a legal number(s)")
  289. }
  290. cmdValues = append(cmdValues, v)
  291. }
  292. }
  293. default:
  294. return nil, nil, nil, fmt.Errorf("type not valid, only 'count' and 'bytes' are allowed")
  295. }
  296. } else {
  297. *limitType = ""
  298. }
  299. }
  300. return cmdBuckets, cmdActions, cmdValues, nil
  301. }
  302. func parseMBToBytes(valStr string) (int64, error) {
  303. v, err := strconv.Atoi(valStr)
  304. v *= 1024 * 1024
  305. return int64(v), err
  306. }