137 lines
3.7 KiB

3 years ago
  1. package shell
  2. import (
  3. "bytes"
  4. "flag"
  5. "fmt"
  6. "github.com/seaweedfs/seaweedfs/weed/filer"
  7. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  8. "io"
  9. "math"
  10. )
  11. func init() {
  12. Commands = append(Commands, &commandS3BucketQuotaEnforce{})
  13. }
  14. type commandS3BucketQuotaEnforce struct {
  15. }
  16. func (c *commandS3BucketQuotaEnforce) Name() string {
  17. return "s3.bucket.quota.enforce"
  18. }
  19. func (c *commandS3BucketQuotaEnforce) Help() string {
  20. return `check quota for all buckets, make the bucket read only if over the limit
  21. Example:
  22. s3.bucket.quota.enforce -apply
  23. `
  24. }
  25. func (c *commandS3BucketQuotaEnforce) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
  26. bucketCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
  27. applyQuotaLimit := bucketCommand.Bool("apply", false, "actually change the buckets readonly attribute")
  28. if err = bucketCommand.Parse(args); err != nil {
  29. return nil
  30. }
  31. infoAboutSimulationMode(writer, *applyQuotaLimit, "-apply")
  32. // collect collection information
  33. topologyInfo, _, err := collectTopologyInfo(commandEnv, 0)
  34. if err != nil {
  35. return err
  36. }
  37. collectionInfos := make(map[string]*CollectionInfo)
  38. collectCollectionInfo(topologyInfo, collectionInfos)
  39. // read buckets path
  40. var filerBucketsPath string
  41. filerBucketsPath, err = readFilerBucketsPath(commandEnv)
  42. if err != nil {
  43. return fmt.Errorf("read buckets: %v", err)
  44. }
  45. // read existing filer configuration
  46. fc, err := filer.ReadFilerConf(commandEnv.option.FilerAddress, commandEnv.option.GrpcDialOption, commandEnv.MasterClient)
  47. if err != nil {
  48. return err
  49. }
  50. // process each bucket
  51. hasConfChanges := false
  52. err = filer_pb.List(commandEnv, filerBucketsPath, "", func(entry *filer_pb.Entry, isLast bool) error {
  53. if !entry.IsDirectory {
  54. return nil
  55. }
  56. collection := entry.Name
  57. var collectionSize float64
  58. if collectionInfo, found := collectionInfos[collection]; found {
  59. collectionSize = collectionInfo.Size
  60. }
  61. if c.processEachBucket(fc, filerBucketsPath, entry, writer, collectionSize) {
  62. hasConfChanges = true
  63. }
  64. return nil
  65. }, "", false, math.MaxUint32)
  66. if err != nil {
  67. return fmt.Errorf("list buckets under %v: %v", filerBucketsPath, err)
  68. }
  69. // apply the configuration changes
  70. if hasConfChanges && *applyQuotaLimit {
  71. var buf2 bytes.Buffer
  72. fc.ToText(&buf2)
  73. if err = commandEnv.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
  74. return filer.SaveInsideFiler(client, filer.DirectoryEtcSeaweedFS, filer.FilerConfName, buf2.Bytes())
  75. }); err != nil && err != filer_pb.ErrNotFound {
  76. return err
  77. }
  78. }
  79. return err
  80. }
  81. func (c *commandS3BucketQuotaEnforce) processEachBucket(fc *filer.FilerConf, filerBucketsPath string, entry *filer_pb.Entry, writer io.Writer, collectionSize float64) (hasConfChanges bool) {
  82. locPrefix := filerBucketsPath + "/" + entry.Name + "/"
  83. locConf := fc.MatchStorageRule(locPrefix)
  84. locConf.LocationPrefix = locPrefix
  85. if entry.Quota > 0 {
  86. if locConf.ReadOnly {
  87. if collectionSize < float64(entry.Quota) {
  88. locConf.ReadOnly = false
  89. hasConfChanges = true
  90. }
  91. } else {
  92. if collectionSize > float64(entry.Quota) {
  93. locConf.ReadOnly = true
  94. hasConfChanges = true
  95. }
  96. }
  97. } else {
  98. if locConf.ReadOnly {
  99. locConf.ReadOnly = false
  100. hasConfChanges = true
  101. }
  102. }
  103. if hasConfChanges {
  104. fmt.Fprintf(writer, " %s\tsize:%.0f", entry.Name, collectionSize)
  105. fmt.Fprintf(writer, "\tquota:%d\tusage:%.2f%%", entry.Quota, collectionSize*100/float64(entry.Quota))
  106. fmt.Fprintln(writer)
  107. if locConf.ReadOnly {
  108. fmt.Fprintf(writer, " changing bucket %s to read only!\n", entry.Name)
  109. } else {
  110. fmt.Fprintf(writer, " changing bucket %s to writable.\n", entry.Name)
  111. }
  112. fc.AddLocationConf(locConf)
  113. }
  114. return
  115. }