package shell

import (
	"context"
	"errors"
	"flag"
	"fmt"
	"io"
	"net/http"
	"strings"

	"github.com/seaweedfs/seaweedfs/weed/storage/needle"
	"github.com/seaweedfs/seaweedfs/weed/wdclient"

	"github.com/seaweedfs/seaweedfs/weed/operation"
	"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
	"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
	"github.com/seaweedfs/seaweedfs/weed/util"
)

var (
	client *http.Client
)

func init() {
	client = &http.Client{}
	Commands = append(Commands, &commandFsMergeVolumes{})
}

type commandFsMergeVolumes struct {
	volumes map[needle.VolumeId]*master_pb.VolumeInformationMessage
}

func (c *commandFsMergeVolumes) Name() string {
	return "fs.mergeVolumes"
}

func (c *commandFsMergeVolumes) Help() string {
	return `re-locate chunks into target volumes and try to clear lighter volumes.
	
	This would help clear half-full volumes and let vacuum system to delete them later.

	fs.mergeVolumes -toVolumeId=y [-fromVolumeId=x] [-apply] /dir/
`
}

func (c *commandFsMergeVolumes) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {

	dir, err := commandEnv.parseUrl(findInputDirectory(args))
	if err != nil {
		return err
	}
	dir = strings.TrimRight(dir, "/")
	fsMergeVolumesCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
	fromVolumeArg := fsMergeVolumesCommand.Uint("fromVolumeId", 0, "move chunks with this volume id")
	toVolumeArg := fsMergeVolumesCommand.Uint("toVolumeId", 0, "change chunks to this volume id")
	apply := fsMergeVolumesCommand.Bool("apply", false, "applying the metadata changes")
	if err = fsMergeVolumesCommand.Parse(args); err != nil {
		return err
	}
	fromVolumeId := needle.VolumeId(*fromVolumeArg)
	toVolumeId := needle.VolumeId(*toVolumeArg)

	if toVolumeId == 0 {
		return fmt.Errorf("volume id can not be zero")
	}

	c.reloadVolumesInfo(commandEnv.MasterClient)

	toVolumeInfo, err := c.getVolumeInfoById(toVolumeId)
	if err != nil {
		return err
	}
	if toVolumeInfo.ReadOnly {
		return fmt.Errorf("volume is readonly: %d", toVolumeId)
	}

	if fromVolumeId != 0 {
		if fromVolumeId == toVolumeId {
			return fmt.Errorf("no volume id changes, %d == %d", fromVolumeId, toVolumeId)
		}
		compatible, err := c.volumesAreCompatible(fromVolumeId, toVolumeId)
		if err != nil {
			return fmt.Errorf("cannot determine volumes are compatible: %d and %d", fromVolumeId, toVolumeId)
		}
		if !compatible {
			return fmt.Errorf("volume %d is not compatible with volume %d", fromVolumeId, toVolumeId)
		}
	}
	defer client.CloseIdleConnections()

	compatibility := make(map[string]bool)

	return commandEnv.WithFilerClient(false, func(filerClient filer_pb.SeaweedFilerClient) error {
		return filer_pb.TraverseBfs(commandEnv, util.FullPath(dir), func(parentPath util.FullPath, entry *filer_pb.Entry) {
			if !entry.IsDirectory {
				for _, chunk := range entry.Chunks {
					if chunk.IsChunkManifest {
						fmt.Printf("Change volume id for large file is not implemented yet: %s/%s\n", parentPath, entry.Name)
						continue
					}
					chunkVolumeId := needle.VolumeId(chunk.Fid.VolumeId)
					if chunkVolumeId == toVolumeId || (fromVolumeId != 0 && fromVolumeId != chunkVolumeId) {
						continue
					}
					cacheKey := fmt.Sprintf("%d-%d", chunkVolumeId, toVolumeId)
					compatible, cached := compatibility[cacheKey]
					if !cached {
						compatible, err = c.volumesAreCompatible(chunkVolumeId, toVolumeId)
						if err != nil {
							_ = fmt.Errorf("cannot determine volumes are compatible: %d and %d", chunkVolumeId, toVolumeId)
							return
						}
						compatibility[cacheKey] = compatible
					}
					if !compatible {
						if fromVolumeId != 0 {
							_ = fmt.Errorf("volumes are incompatible: %d and %d", fromVolumeId, toVolumeId)
							return
						}
						continue
					}
					path := parentPath.Child(entry.Name)

					fmt.Printf("move %s(%s)\n", path, chunk.GetFileIdString())
					if !*apply {
						continue
					}
					if err = moveChunk(chunk, toVolumeId, commandEnv.MasterClient); err != nil {
						fmt.Printf("failed to move %s/%s: %v\n", path, chunk.GetFileIdString(), err)
						continue
					}

					if err = filer_pb.UpdateEntry(filerClient, &filer_pb.UpdateEntryRequest{
						Directory: string(parentPath),
						Entry:     entry,
					}); err != nil {
						fmt.Printf("failed to update %s: %v\n", path, err)
					}
				}
			}
		})
	})
}

func (c *commandFsMergeVolumes) getVolumeInfoById(vid needle.VolumeId) (*master_pb.VolumeInformationMessage, error) {
	info := c.volumes[vid]
	var err error
	if info == nil {
		err = errors.New("cannot find volume")
	}
	return info, err
}

func (c *commandFsMergeVolumes) volumesAreCompatible(src needle.VolumeId, dest needle.VolumeId) (bool, error) {
	srcInfo, err := c.getVolumeInfoById(src)
	if err != nil {
		return false, err
	}
	destInfo, err := c.getVolumeInfoById(dest)
	if err != nil {
		return false, err
	}
	return (srcInfo.Collection == destInfo.Collection &&
		srcInfo.Ttl == destInfo.Ttl &&
		srcInfo.ReplicaPlacement == destInfo.ReplicaPlacement), nil
}

func (c *commandFsMergeVolumes) reloadVolumesInfo(masterClient *wdclient.MasterClient) error {
	c.volumes = make(map[needle.VolumeId]*master_pb.VolumeInformationMessage)

	return masterClient.WithClient(false, func(client master_pb.SeaweedClient) error {
		volumes, err := client.VolumeList(context.Background(), &master_pb.VolumeListRequest{})
		if err != nil {
			return err
		}
		for _, dc := range volumes.TopologyInfo.DataCenterInfos {
			for _, rack := range dc.RackInfos {
				for _, node := range rack.DataNodeInfos {
					for _, disk := range node.DiskInfos {
						for _, volume := range disk.VolumeInfos {
							vid := needle.VolumeId(volume.Id)
							if found := c.volumes[vid]; found == nil {
								c.volumes[vid] = volume
							}
						}
					}
				}
			}
		}
		return nil
	})
}

func moveChunk(chunk *filer_pb.FileChunk, toVolumeId needle.VolumeId, masterClient *wdclient.MasterClient) error {
	fromFid := needle.NewFileId(needle.VolumeId(chunk.Fid.VolumeId), chunk.Fid.FileKey, chunk.Fid.Cookie)
	toFid := needle.NewFileId(toVolumeId, chunk.Fid.FileKey, chunk.Fid.Cookie)

	downloadURLs, err := masterClient.LookupVolumeServerUrl(fromFid.VolumeId.String())
	if err != nil {
		return err
	}

	downloadURL := fmt.Sprintf("http://%s/%s", downloadURLs[0], fromFid.String())

	uploadURLs, err := masterClient.LookupVolumeServerUrl(toVolumeId.String())
	if err != nil {
		return err
	}
	uploadURL := fmt.Sprintf("http://%s/%s", uploadURLs[0], toFid.String())

	resp, reader, err := readUrl(downloadURL)
	if err != nil {
		return err
	}
	defer util.CloseResponse(resp)
	defer reader.Close()

	var filename string

	contentDisposition := resp.Header.Get("Content-Disposition")
	if len(contentDisposition) > 0 {
		idx := strings.Index(contentDisposition, "filename=")
		if idx != -1 {
			filename = contentDisposition[idx+len("filename="):]
			filename = strings.Trim(filename, "\"")
		}
	}

	contentType := resp.Header.Get("Content-Type")
	isCompressed := resp.Header.Get("Content-Encoding") == "gzip"
	md5 := resp.Header.Get("Content-MD5")

	_, err, _ = operation.Upload(reader, &operation.UploadOption{
		UploadUrl:         uploadURL,
		Filename:          filename,
		IsInputCompressed: isCompressed,
		Cipher:            false,
		MimeType:          contentType,
		PairMap:           nil,
		Md5:               md5,
	})
	if err != nil {
		return err
	}
	chunk.Fid.VolumeId = uint32(toVolumeId)
	chunk.FileId = ""

	return nil
}

func readUrl(fileUrl string) (*http.Response, io.ReadCloser, error) {

	req, err := http.NewRequest("GET", fileUrl, nil)
	if err != nil {
		return nil, nil, err
	}
	req.Header.Add("Accept-Encoding", "gzip")

	r, err := client.Do(req)
	if err != nil {
		return nil, nil, err
	}
	if r.StatusCode >= 400 {
		util.CloseResponse(r)
		return nil, nil, fmt.Errorf("%s: %s", fileUrl, r.Status)
	}

	return r, r.Body, nil
}