package coordinator

import (
	"github.com/seaweedfs/seaweedfs/weed/mq/topic"
	"sync"
)

func (cg *ConsumerGroup) SetMinMaxActiveInstances(min, max int32) {
	cg.MinimumActiveInstances = min
	cg.MaximumActiveInstances = max
}

func (cg *ConsumerGroup) AddConsumerGroupInstance(clientId string) *ConsumerGroupInstance {
	cgi := &ConsumerGroupInstance{
		ClientId: clientId,
	}
	cg.ConsumerGroupInstances.Set(clientId, cgi)
	return cgi
}

func (cg *ConsumerGroup) RemoveConsumerGroupInstance(clientId string) {
	cg.ConsumerGroupInstances.Remove(clientId)
}

func (cg *ConsumerGroup) CoordinateIfNeeded() {
	emptyInstanceCount, activeInstanceCount := int32(0), int32(0)
	for cgi := range cg.ConsumerGroupInstances.IterBuffered() {
		if cgi.Val.Partition == nil {
			// this consumer group instance is not assigned a partition
			// need to assign one
			emptyInstanceCount++
		} else {
			activeInstanceCount++
		}
	}

	var delta int32
	if emptyInstanceCount > 0 {
		if cg.MinimumActiveInstances <= 0 {
			// need to assign more partitions
			delta = emptyInstanceCount
		} else if activeInstanceCount < cg.MinimumActiveInstances && activeInstanceCount+emptyInstanceCount >= cg.MinimumActiveInstances {
			// need to assign more partitions
			delta = cg.MinimumActiveInstances - activeInstanceCount
		}
	}

	if cg.MaximumActiveInstances > 0 {
		if activeInstanceCount > cg.MaximumActiveInstances {
			// need to remove some partitions
			delta = cg.MaximumActiveInstances - activeInstanceCount
		}
	}
	if delta == 0 {
		return
	}
	cg.doCoordinate(activeInstanceCount + delta)
}

func (cg *ConsumerGroup) doCoordinate(target int32) {
	// stop existing instances from processing
	var wg sync.WaitGroup
	for cgi := range cg.ConsumerGroupInstances.IterBuffered() {
		if cgi.Val.Partition != nil {
			wg.Add(1)
			go func(cgi *ConsumerGroupInstance) {
				defer wg.Done()
				// stop processing
				// flush internal state
				// wait for all messages to be processed
				// close the connection
			}(cgi.Val)
		}
	}
	wg.Wait()

	partitions := topic.SplitPartitions(target)

	// assign partitions to new instances
	i := 0
	for cgi := range cg.ConsumerGroupInstances.IterBuffered() {
		cgi.Val.Partition = partitions[i]
		i++
		wg.Add(1)
		go func(cgi *ConsumerGroupInstance) {
			defer wg.Done()
			// start processing
			// start consuming from the last offset
		}(cgi.Val)
	}
	wg.Wait()
}