diff --git a/weed/mq/client/sub_client/connect_to_sub_coordinator.go b/weed/mq/client/sub_client/connect_to_sub_coordinator.go index ef7841725..328968c89 100644 --- a/weed/mq/client/sub_client/connect_to_sub_coordinator.go +++ b/weed/mq/client/sub_client/connect_to_sub_coordinator.go @@ -7,7 +7,6 @@ import ( "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" "io" - "sync" "time" ) @@ -69,8 +68,10 @@ func (sub *TopicSubscriber) doKeepConnectedToSubCoordinator() { assignment := resp.GetAssignment() if assignment != nil { glog.V(0).Infof("subscriber %s receive assignment: %v", sub.ContentConfig.Topic, assignment) + for _, assignedPartition := range assignment.PartitionAssignments { + sub.brokerPartitionAssignmentChan <- assignedPartition + } } - sub.onEachAssignment(assignment) } return nil @@ -84,31 +85,6 @@ func (sub *TopicSubscriber) doKeepConnectedToSubCoordinator() { } } -func (sub *TopicSubscriber) onEachAssignment(assignment *mq_pb.SubscriberToSubCoordinatorResponse_Assignment) { - if assignment == nil { - return - } - // process each partition, with a concurrency limit - var wg sync.WaitGroup - semaphore := make(chan struct{}, sub.ProcessorConfig.ConcurrentPartitionLimit) - - for _, assigned := range assignment.PartitionAssignments { - wg.Add(1) - semaphore <- struct{}{} - go func(assigned *mq_pb.BrokerPartitionAssignment) { - defer wg.Done() - defer func() { <-semaphore }() - glog.V(0).Infof("subscriber %s/%s assigned partition %+v at %v", sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup, assigned.Partition, assigned.LeaderBroker) - err := sub.onEachPartition(assigned) - if err != nil { - glog.V(0).Infof("subscriber %s/%s partition %+v at %v: %v", sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup, assigned.Partition, assigned.LeaderBroker, err) - } - }(assigned) - } - - wg.Wait() -} - func (sub *TopicSubscriber) onEachPartition(assigned *mq_pb.BrokerPartitionAssignment) error { // connect to the partition broker return pb.WithBrokerGrpcClient(true, assigned.LeaderBroker, sub.SubscriberConfig.GrpcDialOption, func(client mq_pb.SeaweedMessagingClient) error { diff --git a/weed/mq/client/sub_client/subscribe.go b/weed/mq/client/sub_client/subscribe.go index df62ea674..b788faeb5 100644 --- a/weed/mq/client/sub_client/subscribe.go +++ b/weed/mq/client/sub_client/subscribe.go @@ -1,11 +1,88 @@ package sub_client +import ( + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/mq/topic" + "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" + "sync" + "time" +) + +type ProcessorState struct { + +} + // Subscribe subscribes to a topic's specified partitions. // If a partition is moved to another broker, the subscriber will automatically reconnect to the new broker. func (sub *TopicSubscriber) Subscribe() error { + + go sub.startProcessors() + // loop forever sub.doKeepConnectedToSubCoordinator() return nil } + +func (sub *TopicSubscriber) startProcessors() { + // listen to the messages from the sub coordinator + // start one processor per partition + var wg sync.WaitGroup + semaphore := make(chan struct{}, sub.ProcessorConfig.ConcurrentPartitionLimit) + + for assigned := range sub.brokerPartitionAssignmentChan { + wg.Add(1) + semaphore <- struct{}{} + + topicPartition := topic.FromPbPartition(assigned.Partition) + + // wait until no covering partition is still in progress + sub.waitUntilNoOverlappingPartitionInFlight(topicPartition) + + // start a processors + sub.activeProcessorsLock.Lock() + sub.activeProcessors[topicPartition] = &ProcessorState{} + sub.activeProcessorsLock.Unlock() + + go func(assigned *mq_pb.BrokerPartitionAssignment, topicPartition topic.Partition) { + defer func() { + sub.activeProcessorsLock.Lock() + delete(sub.activeProcessors, topicPartition) + sub.activeProcessorsLock.Unlock() + + <-semaphore + wg.Done() + }() + glog.V(0).Infof("subscriber %s/%s assigned partition %+v at %v", sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup, assigned.Partition, assigned.LeaderBroker) + err := sub.onEachPartition(assigned) + if err != nil { + glog.V(0).Infof("subscriber %s/%s partition %+v at %v: %v", sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup, assigned.Partition, assigned.LeaderBroker, err) + } else { + glog.V(0).Infof("subscriber %s/%s partition %+v at %v completed", sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup, assigned.Partition, assigned.LeaderBroker) + } + }(assigned, topicPartition) + } + + wg.Wait() + +} + +func (sub *TopicSubscriber) waitUntilNoOverlappingPartitionInFlight(topicPartition topic.Partition) { + foundOverlapping := true + for foundOverlapping { + sub.activeProcessorsLock.Lock() + foundOverlapping = false + for partition, _ := range sub.activeProcessors { + if partition.Overlaps(topicPartition) { + foundOverlapping = true + break + } + } + sub.activeProcessorsLock.Unlock() + if foundOverlapping { + glog.V(0).Infof("subscriber %s/%s waiting for partition %+v to complete", sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup, topicPartition) + time.Sleep(1 * time.Second) + } + } +} diff --git a/weed/mq/client/sub_client/subscriber.go b/weed/mq/client/sub_client/subscriber.go index 723520c1d..a84791f0a 100644 --- a/weed/mq/client/sub_client/subscriber.go +++ b/weed/mq/client/sub_client/subscriber.go @@ -4,6 +4,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/mq/topic" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" "google.golang.org/grpc" + "sync" "time" ) @@ -30,23 +31,27 @@ type OnCompletionFunc func() type TopicSubscriber struct { SubscriberConfig *SubscriberConfiguration ContentConfig *ContentConfiguration - ProcessorConfig *ProcessorConfiguration - brokerPartitionAssignments []*mq_pb.BrokerPartitionAssignment - OnEachMessageFunc OnEachMessageFunc + ProcessorConfig *ProcessorConfiguration + brokerPartitionAssignmentChan chan *mq_pb.BrokerPartitionAssignment + OnEachMessageFunc OnEachMessageFunc OnCompletionFunc OnCompletionFunc bootstrapBrokers []string waitForMoreMessage bool alreadyProcessedTsNs int64 + activeProcessors map[topic.Partition]*ProcessorState + activeProcessorsLock sync.Mutex } func NewTopicSubscriber(bootstrapBrokers []string, subscriber *SubscriberConfiguration, content *ContentConfiguration, processor ProcessorConfiguration) *TopicSubscriber { return &TopicSubscriber{ - SubscriberConfig: subscriber, - ContentConfig: content, - ProcessorConfig: &processor, - bootstrapBrokers: bootstrapBrokers, - waitForMoreMessage: true, - alreadyProcessedTsNs: content.StartTime.UnixNano(), + SubscriberConfig: subscriber, + ContentConfig: content, + ProcessorConfig: &processor, + brokerPartitionAssignmentChan: make(chan *mq_pb.BrokerPartitionAssignment, 1024), + bootstrapBrokers: bootstrapBrokers, + waitForMoreMessage: true, + alreadyProcessedTsNs: content.StartTime.UnixNano(), + activeProcessors: make(map[topic.Partition]*ProcessorState), } } diff --git a/weed/mq/topic/partition.go b/weed/mq/topic/partition.go index 45b55c43b..ba1accce1 100644 --- a/weed/mq/topic/partition.go +++ b/weed/mq/topic/partition.go @@ -71,3 +71,13 @@ func (partition Partition) ToPbPartition() *mq_pb.Partition { UnixTimeNs: partition.UnixTimeNs, } } + +func (partition Partition) Overlaps(partition2 Partition) bool { + if partition.RangeStart >= partition2.RangeStop { + return false + } + if partition.RangeStop <= partition2.RangeStart { + return false + } + return true +}