package shell

import (
	"bytes"
	"flag"
	"fmt"
	"github.com/chrislusf/seaweedfs/weed/filer"
	"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
	"io"
	"math"
)

func init() {
	Commands = append(Commands, &commandS3BucketQuotaEnforce{})
}

type commandS3BucketQuotaEnforce struct {
}

func (c *commandS3BucketQuotaEnforce) Name() string {
	return "s3.bucket.quota.enforce"
}

func (c *commandS3BucketQuotaEnforce) Help() string {
	return `check quota for all buckets, make the bucket read only if over the limit

	Example:
		s3.bucket.quota.enforce -apply
`
}

func (c *commandS3BucketQuotaEnforce) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {

	bucketCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
	applyQuotaLimit := bucketCommand.Bool("apply", false, "actually change the buckets readonly attribute")
	if err = bucketCommand.Parse(args); err != nil {
		return nil
	}

	// collect collection information
	topologyInfo, _, err := collectTopologyInfo(commandEnv, 0)
	if err != nil {
		return err
	}
	collectionInfos := make(map[string]*CollectionInfo)
	collectCollectionInfo(topologyInfo, collectionInfos)

	// read buckets path
	var filerBucketsPath string
	filerBucketsPath, err = readFilerBucketsPath(commandEnv)
	if err != nil {
		return fmt.Errorf("read buckets: %v", err)
	}

	// read existing filer configuration
	fc, err := filer.ReadFilerConf(commandEnv.option.FilerAddress, commandEnv.option.GrpcDialOption, commandEnv.MasterClient)
	if err != nil {
		return err
	}

	// process each bucket
	hasConfChanges := false
	err = filer_pb.List(commandEnv, filerBucketsPath, "", func(entry *filer_pb.Entry, isLast bool) error {
		if !entry.IsDirectory {
			return nil
		}
		collection := entry.Name
		var collectionSize float64
		if collectionInfo, found := collectionInfos[collection]; found {
			collectionSize = collectionInfo.Size
		}
		if c.processEachBucket(fc, filerBucketsPath, entry, writer, collectionSize) {
			hasConfChanges = true
		}
		return nil
	}, "", false, math.MaxUint32)
	if err != nil {
		return fmt.Errorf("list buckets under %v: %v", filerBucketsPath, err)
	}

	// apply the configuration changes
	if hasConfChanges && *applyQuotaLimit {

		var buf2 bytes.Buffer
		fc.ToText(&buf2)

		if err = commandEnv.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
			return filer.SaveInsideFiler(client, filer.DirectoryEtcSeaweedFS, filer.FilerConfName, buf2.Bytes())
		}); err != nil && err != filer_pb.ErrNotFound {
			return err
		}
	}

	return err

}

func (c *commandS3BucketQuotaEnforce) processEachBucket(fc *filer.FilerConf, filerBucketsPath string, entry *filer_pb.Entry, writer io.Writer, collectionSize float64) (hasConfChanges bool) {

	locPrefix := filerBucketsPath + "/" + entry.Name + "/"
	locConf := fc.MatchStorageRule(locPrefix)
	locConf.LocationPrefix = locPrefix

	if entry.Quota > 0 {
		if locConf.ReadOnly {
			if collectionSize < float64(entry.Quota) {
				locConf.ReadOnly = false
				hasConfChanges = true
			}
		} else {
			if collectionSize > float64(entry.Quota) {
				locConf.ReadOnly = true
				hasConfChanges = true
			}
		}
	} else {
		if locConf.ReadOnly {
			locConf.ReadOnly = false
			hasConfChanges = true
		}
	}

	if hasConfChanges {
		fmt.Fprintf(writer, "  %s\tsize:%.0f", entry.Name, collectionSize)
		fmt.Fprintf(writer, "\tquota:%d\tusage:%.2f%%", entry.Quota, collectionSize*100/float64(entry.Quota))
		fmt.Fprintln(writer)
		if locConf.ReadOnly {
			fmt.Fprintf(writer, "    changing bucket %s to read only!\n", entry.Name)
		} else {
			fmt.Fprintf(writer, "    changing bucket %s to writable.\n", entry.Name)
		}
		fc.AddLocationConf(locConf)
	}

	return
}