package broker

import (
	"context"
	"fmt"
	"github.com/seaweedfs/seaweedfs/weed/glog"
	"github.com/seaweedfs/seaweedfs/weed/mq/pub_balancer"
	"github.com/seaweedfs/seaweedfs/weed/mq/topic"
	"github.com/seaweedfs/seaweedfs/weed/pb"
	"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
	"sync"
)

// AssignTopicPartitions Runs on the assigned broker, to execute the topic partition assignment
func (b *MessageQueueBroker) AssignTopicPartitions(c context.Context, request *mq_pb.AssignTopicPartitionsRequest) (*mq_pb.AssignTopicPartitionsResponse, error) {
	ret := &mq_pb.AssignTopicPartitionsResponse{}

	// drain existing topic partition subscriptions
	for _, assignment := range request.BrokerPartitionAssignments {
		t := topic.FromPbTopic(request.Topic)
		partition := topic.FromPbPartition(assignment.Partition)
		b.accessLock.Lock()
		if request.IsDraining {
			// TODO drain existing topic partition subscriptions
			b.localTopicManager.RemoveLocalPartition(t, partition)
		} else {
			var localPartition *topic.LocalPartition
			if localPartition = b.localTopicManager.GetLocalPartition(t, partition); localPartition == nil {
				localPartition = topic.NewLocalPartition(partition, b.genLogFlushFunc(t, assignment.Partition), b.genLogOnDiskReadFunc(t, assignment.Partition))
				b.localTopicManager.AddLocalPartition(t, localPartition)
			}
		}
		b.accessLock.Unlock()
	}

	// if is leader, notify the followers to drain existing topic partition subscriptions
	if request.IsLeader {
		for _, brokerPartition := range request.BrokerPartitionAssignments {
			for _, follower := range brokerPartition.FollowerBrokers {
				err := pb.WithBrokerGrpcClient(false, follower, b.grpcDialOption, func(client mq_pb.SeaweedMessagingClient) error {
					_, err := client.AssignTopicPartitions(context.Background(), request)
					return err
				})
				if err != nil {
					return ret, err
				}
			}
		}
	}

	glog.V(0).Infof("AssignTopicPartitions: topic %s partition assignments: %v", request.Topic, request.BrokerPartitionAssignments)
	return ret, nil
}

// called by broker leader to drain existing partitions.
// new/updated partitions will be detected by broker from the filer
func (b *MessageQueueBroker) assignTopicPartitionsToBrokers(ctx context.Context, t *mq_pb.Topic, assignments []*mq_pb.BrokerPartitionAssignment, isAdd bool) error {
	// notify the brokers to create the topic partitions in parallel
	var wg sync.WaitGroup
	for _, bpa := range assignments {
		wg.Add(1)
		go func(bpa *mq_pb.BrokerPartitionAssignment) {
			defer wg.Done()
			if doCreateErr := b.withBrokerClient(false, pb.ServerAddress(bpa.LeaderBroker), func(client mq_pb.SeaweedMessagingClient) error {
				_, doCreateErr := client.AssignTopicPartitions(ctx, &mq_pb.AssignTopicPartitionsRequest{
					Topic: t,
					BrokerPartitionAssignments: []*mq_pb.BrokerPartitionAssignment{
						{
							Partition: bpa.Partition,
						},
					},
					IsLeader:   true,
					IsDraining: !isAdd,
				})
				if doCreateErr != nil {
					if !isAdd {
						return fmt.Errorf("drain topic %s %v on %s: %v", t, bpa.LeaderBroker, bpa.Partition, doCreateErr)
					} else {
						return fmt.Errorf("create topic %s %v on %s: %v", t, bpa.LeaderBroker, bpa.Partition, doCreateErr)
					}
				}
				brokerStats, found := b.Balancer.Brokers.Get(bpa.LeaderBroker)
				if !found {
					brokerStats = pub_balancer.NewBrokerStats()
					if !b.Balancer.Brokers.SetIfAbsent(bpa.LeaderBroker, brokerStats) {
						brokerStats, _ = b.Balancer.Brokers.Get(bpa.LeaderBroker)
					}
				}
				brokerStats.RegisterAssignment(t, bpa.Partition, isAdd)
				return nil
			}); doCreateErr != nil {
				glog.Errorf("create topic %s partition %+v on %s: %v", t, bpa.Partition, bpa.LeaderBroker, doCreateErr)
			}
		}(bpa)
	}
	wg.Wait()

	return nil
}