package sub_client

import (
	"context"
	"fmt"
	"github.com/seaweedfs/seaweedfs/weed/glog"
	"github.com/seaweedfs/seaweedfs/weed/pb"
	"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
	"github.com/seaweedfs/seaweedfs/weed/util"
	"io"
	"reflect"
)

func (sub *TopicSubscriber) onEachPartition(assigned *mq_pb.BrokerPartitionAssignment, stopCh chan struct{}) error {
	// connect to the partition broker
	return pb.WithBrokerGrpcClient(true, assigned.LeaderBroker, sub.SubscriberConfig.GrpcDialOption, func(client mq_pb.SeaweedMessagingClient) error {

		subscribeClient, err := client.SubscribeMessage(context.Background())
		if err != nil {
			return fmt.Errorf("create subscribe client: %v", err)
		}

		perPartitionConcurrency := sub.SubscriberConfig.PerPartitionConcurrency
		if perPartitionConcurrency <= 0 {
			perPartitionConcurrency = 1
		}

		var stopTsNs int64
		if !sub.ContentConfig.StopTime.IsZero() {
			stopTsNs = sub.ContentConfig.StopTime.UnixNano()
		}

		if err = subscribeClient.Send(&mq_pb.SubscribeMessageRequest{
			Message: &mq_pb.SubscribeMessageRequest_Init{
				Init: &mq_pb.SubscribeMessageRequest_InitMessage{
					ConsumerGroup: sub.SubscriberConfig.ConsumerGroup,
					ConsumerId:    sub.SubscriberConfig.ConsumerGroupInstanceId,
					Topic:         sub.ContentConfig.Topic.ToPbTopic(),
					PartitionOffset: &mq_pb.PartitionOffset{
						Partition: assigned.Partition,
						StartTsNs: sub.ContentConfig.StartTime.UnixNano(),
						StopTsNs:  stopTsNs,
						StartType: mq_pb.PartitionOffsetStartType_EARLIEST_IN_MEMORY,
					},
					Filter:         sub.ContentConfig.Filter,
					FollowerBroker: assigned.FollowerBroker,
					Concurrency:    perPartitionConcurrency,
				},
			},
		}); err != nil {
			glog.V(0).Infof("subscriber %s connected to partition %+v at %v: %v", sub.ContentConfig.Topic, assigned.Partition, assigned.LeaderBroker, err)
		}

		glog.V(0).Infof("subscriber %s connected to partition %+v at %v", sub.ContentConfig.Topic, assigned.Partition, assigned.LeaderBroker)

		if sub.OnCompletionFunc != nil {
			defer sub.OnCompletionFunc()
		}

		type KeyedOffset struct {
			Key    []byte
			Offset int64
		}

		partitionOffsetChan := make(chan KeyedOffset, 1024)
		defer func() {
			close(partitionOffsetChan)
		}()
		executors := util.NewLimitedConcurrentExecutor(int(perPartitionConcurrency))

		go func() {
			for {
				select {
				case <-stopCh:
					subscribeClient.CloseSend()
					return
				case ack, ok := <-partitionOffsetChan:
					if !ok {
						subscribeClient.CloseSend()
						return
					}
					subscribeClient.SendMsg(&mq_pb.SubscribeMessageRequest{
						Message: &mq_pb.SubscribeMessageRequest_Ack{
							Ack: &mq_pb.SubscribeMessageRequest_AckMessage{
								Key:      ack.Key,
								Sequence: ack.Offset,
							},
						},
					})
				}
			}
		}()

		var lastErr error

		for lastErr == nil {
			// glog.V(0).Infof("subscriber %s/%s/%s waiting for message", sub.ContentConfig.Namespace, sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup)
			resp, err := subscribeClient.Recv()
			if err != nil {
				return fmt.Errorf("subscribe recv: %v", err)
			}
			if resp.Message == nil {
				glog.V(0).Infof("subscriber %s/%s received nil message", sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup)
				continue
			}
			switch m := resp.Message.(type) {
			case *mq_pb.SubscribeMessageResponse_Data:
				if m.Data.Ctrl != nil {
					glog.V(2).Infof("subscriber %s received control from producer:%s isClose:%v", sub.SubscriberConfig.ConsumerGroup, m.Data.Ctrl.PublisherName, m.Data.Ctrl.IsClose)
					continue
				}
				if len(m.Data.Key) == 0 {
					fmt.Printf("empty key %+v, type %v\n", m, reflect.TypeOf(m))
					continue
				}
				executors.Execute(func() {
					processErr := sub.OnEachMessageFunc(m.Data.Key, m.Data.Value)
					if processErr == nil {
						partitionOffsetChan <- KeyedOffset{
							Key:    m.Data.Key,
							Offset: m.Data.TsNs,
						}
					} else {
						lastErr = processErr
					}
				})
			case *mq_pb.SubscribeMessageResponse_Ctrl:
				// glog.V(0).Infof("subscriber %s/%s/%s received control %+v", sub.ContentConfig.Namespace, sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup, m.Ctrl)
				if m.Ctrl.IsEndOfStream || m.Ctrl.IsEndOfTopic {
					return io.EOF
				}
			}
		}

		return lastErr
	})
}