package broker

import (
	"context"
	"github.com/seaweedfs/seaweedfs/weed/cluster"
	"github.com/seaweedfs/seaweedfs/weed/glog"
	"github.com/seaweedfs/seaweedfs/weed/mq"
	"github.com/seaweedfs/seaweedfs/weed/pb"
	"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
	"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
	"sort"
	"sync"
)

func (broker *MessageQueueBroker) FindBrokerLeader(c context.Context, request *mq_pb.FindBrokerLeaderRequest) (*mq_pb.FindBrokerLeaderResponse, error) {
	ret := &mq_pb.FindBrokerLeaderResponse{}
	err := broker.withMasterClient(false, broker.MasterClient.GetMaster(), func(client master_pb.SeaweedClient) error {
		resp, err := client.ListClusterNodes(context.Background(), &master_pb.ListClusterNodesRequest{
			ClientType:   cluster.BrokerType,
			FilerGroup:   request.FilerGroup,
			IsLeaderOnly: true,
		})
		if err != nil {
			return err
		}
		if len(resp.ClusterNodes) == 0 {
			return nil
		}
		ret.Broker = resp.ClusterNodes[0].Address
		return nil
	})
	return ret, err
}

func (broker *MessageQueueBroker) CheckSegmentStatus(c context.Context, request *mq_pb.CheckSegmentStatusRequest) (*mq_pb.CheckSegmentStatusResponse, error) {
	ret := &mq_pb.CheckSegmentStatusResponse{}
	// TODO add in memory active segment
	return ret, nil
}

func (broker *MessageQueueBroker) CheckBrokerLoad(c context.Context, request *mq_pb.CheckBrokerLoadRequest) (*mq_pb.CheckBrokerLoadResponse, error) {
	ret := &mq_pb.CheckBrokerLoadResponse{}
	// TODO read broker's load
	return ret, nil
}

func (broker *MessageQueueBroker) AssignSegmentBrokers(c context.Context, request *mq_pb.AssignSegmentBrokersRequest) (*mq_pb.AssignSegmentBrokersResponse, error) {
	ret := &mq_pb.AssignSegmentBrokersResponse{}
	segment := mq.FromPbSegment(request.Segment)

	// check existing segment locations on filer
	existingBrokers, err := broker.checkSegmentOnFiler(segment)
	if err != nil {
		return ret, err
	}

	if len(existingBrokers) > 0 {
		// good if the segment is still on the brokers
		isActive, err := broker.checkSegmentsOnBrokers(segment, existingBrokers)
		if err != nil {
			return ret, err
		}
		if isActive {
			for _, broker := range existingBrokers {
				ret.Brokers = append(ret.Brokers, string(broker))
			}
			return ret, nil
		}
	}

	// randomly pick up to 10 brokers, and find the ones with the lightest load
	selectedBrokers, err := broker.selectBrokers()
	if err != nil {
		return ret, err
	}

	// save the allocated brokers info for this segment on the filer
	if err := broker.saveSegmentBrokersOnFiler(segment, selectedBrokers); err != nil {
		return ret, err
	}

	for _, broker := range selectedBrokers {
		ret.Brokers = append(ret.Brokers, string(broker))
	}
	return ret, nil
}

func (broker *MessageQueueBroker) checkSegmentsOnBrokers(segment *mq.Segment, brokers []pb.ServerAddress) (active bool, err error) {
	var wg sync.WaitGroup

	for _, candidate := range brokers {
		wg.Add(1)
		go func(candidate pb.ServerAddress) {
			defer wg.Done()
			broker.withBrokerClient(false, candidate, func(client mq_pb.SeaweedMessagingClient) error {
				resp, checkErr := client.CheckSegmentStatus(context.Background(), &mq_pb.CheckSegmentStatusRequest{
					Segment: &mq_pb.Segment{
						Namespace: string(segment.Topic.Namespace),
						Topic:     segment.Topic.Name,
						Id:        segment.Id,
					},
				})
				if checkErr != nil {
					err = checkErr
					glog.V(0).Infof("check segment status on %s: %v", candidate, checkErr)
					return nil
				}
				if resp.IsActive == false {
					active = false
				}
				return nil
			})
		}(candidate)
	}
	wg.Wait()
	return
}

func (broker *MessageQueueBroker) selectBrokers() (brokers []pb.ServerAddress, err error) {
	candidates, err := broker.selectCandidatesFromMaster(10)
	if err != nil {
		return
	}
	brokers, err = broker.pickLightestCandidates(candidates, 3)
	return
}

func (broker *MessageQueueBroker) selectCandidatesFromMaster(limit int32) (candidates []pb.ServerAddress, err error) {
	err = broker.withMasterClient(false, broker.MasterClient.GetMaster(), func(client master_pb.SeaweedClient) error {
		resp, err := client.ListClusterNodes(context.Background(), &master_pb.ListClusterNodesRequest{
			ClientType: cluster.BrokerType,
			FilerGroup: broker.option.FilerGroup,
			Limit:      limit,
		})
		if err != nil {
			return err
		}
		if len(resp.ClusterNodes) == 0 {
			return nil
		}
		for _, node := range resp.ClusterNodes {
			candidates = append(candidates, pb.ServerAddress(node.Address))
		}
		return nil
	})
	return
}

type CandidateStatus struct {
	address      pb.ServerAddress
	messageCount int64
	bytesCount   int64
	load         int64
}

func (broker *MessageQueueBroker) pickLightestCandidates(candidates []pb.ServerAddress, limit int) (selected []pb.ServerAddress, err error) {

	if len(candidates) <= limit {
		return candidates, nil
	}

	candidateStatuses, err := broker.checkBrokerStatus(candidates)
	if err != nil {
		return nil, err
	}

	sort.Slice(candidateStatuses, func(i, j int) bool {
		return candidateStatuses[i].load < candidateStatuses[j].load
	})

	for i, candidate := range candidateStatuses {
		if i >= limit {
			break
		}
		selected = append(selected, candidate.address)
	}

	return
}

func (broker *MessageQueueBroker) checkBrokerStatus(candidates []pb.ServerAddress) (candidateStatuses []*CandidateStatus, err error) {

	candidateStatuses = make([]*CandidateStatus, len(candidates))
	var wg sync.WaitGroup
	for i, candidate := range candidates {
		wg.Add(1)
		go func(i int, candidate pb.ServerAddress) {
			defer wg.Done()
			err = broker.withBrokerClient(false, candidate, func(client mq_pb.SeaweedMessagingClient) error {
				resp, checkErr := client.CheckBrokerLoad(context.Background(), &mq_pb.CheckBrokerLoadRequest{})
				if checkErr != nil {
					err = checkErr
					return err
				}
				candidateStatuses[i] = &CandidateStatus{
					address:      candidate,
					messageCount: resp.MessageCount,
					bytesCount:   resp.BytesCount,
					load:         resp.MessageCount + resp.BytesCount/(64*1024),
				}
				return nil
			})
		}(i, candidate)
	}
	wg.Wait()
	return
}