|
|
@ -30,37 +30,42 @@ func (sub *TopicSubscriber) startProcessors() { |
|
|
|
var wg sync.WaitGroup |
|
|
|
semaphore := make(chan struct{}, sub.SubscriberConfig.MaxPartitionCount) |
|
|
|
|
|
|
|
for assigned := range sub.brokerPartitionAssignmentChan { |
|
|
|
wg.Add(1) |
|
|
|
semaphore <- struct{}{} |
|
|
|
|
|
|
|
topicPartition := topic.FromPbPartition(assigned.Partition) |
|
|
|
|
|
|
|
// wait until no covering partition is still in progress
|
|
|
|
sub.waitUntilNoOverlappingPartitionInFlight(topicPartition) |
|
|
|
|
|
|
|
// start a processors
|
|
|
|
sub.activeProcessorsLock.Lock() |
|
|
|
sub.activeProcessors[topicPartition] = &ProcessorState{} |
|
|
|
sub.activeProcessorsLock.Unlock() |
|
|
|
for message := range sub.brokerPartitionAssignmentChan { |
|
|
|
if assigned := message.GetAssignment(); assigned != nil { |
|
|
|
wg.Add(1) |
|
|
|
semaphore <- struct{}{} |
|
|
|
|
|
|
|
topicPartition := topic.FromPbPartition(assigned.PartitionAssignment.Partition) |
|
|
|
|
|
|
|
// wait until no covering partition is still in progress
|
|
|
|
sub.waitUntilNoOverlappingPartitionInFlight(topicPartition) |
|
|
|
|
|
|
|
// start a processors
|
|
|
|
sub.activeProcessorsLock.Lock() |
|
|
|
sub.activeProcessors[topicPartition] = &ProcessorState{} |
|
|
|
sub.activeProcessorsLock.Unlock() |
|
|
|
|
|
|
|
go func(assigned *mq_pb.BrokerPartitionAssignment, topicPartition topic.Partition) { |
|
|
|
defer func() { |
|
|
|
sub.activeProcessorsLock.Lock() |
|
|
|
delete(sub.activeProcessors, topicPartition) |
|
|
|
sub.activeProcessorsLock.Unlock() |
|
|
|
|
|
|
|
<-semaphore |
|
|
|
wg.Done() |
|
|
|
}() |
|
|
|
glog.V(0).Infof("subscriber %s/%s assigned partition %+v at %v", sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup, assigned.Partition, assigned.LeaderBroker) |
|
|
|
err := sub.onEachPartition(assigned) |
|
|
|
if err != nil { |
|
|
|
glog.V(0).Infof("subscriber %s/%s partition %+v at %v: %v", sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup, assigned.Partition, assigned.LeaderBroker, err) |
|
|
|
} else { |
|
|
|
glog.V(0).Infof("subscriber %s/%s partition %+v at %v completed", sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup, assigned.Partition, assigned.LeaderBroker) |
|
|
|
} |
|
|
|
}(assigned.PartitionAssignment, topicPartition) |
|
|
|
} |
|
|
|
if unAssignment := message.GetUnAssignment(); unAssignment != nil { |
|
|
|
|
|
|
|
go func(assigned *mq_pb.BrokerPartitionAssignment, topicPartition topic.Partition) { |
|
|
|
defer func() { |
|
|
|
sub.activeProcessorsLock.Lock() |
|
|
|
delete(sub.activeProcessors, topicPartition) |
|
|
|
sub.activeProcessorsLock.Unlock() |
|
|
|
|
|
|
|
<-semaphore |
|
|
|
wg.Done() |
|
|
|
}() |
|
|
|
glog.V(0).Infof("subscriber %s/%s assigned partition %+v at %v", sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup, assigned.Partition, assigned.LeaderBroker) |
|
|
|
err := sub.onEachPartition(assigned) |
|
|
|
if err != nil { |
|
|
|
glog.V(0).Infof("subscriber %s/%s partition %+v at %v: %v", sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup, assigned.Partition, assigned.LeaderBroker, err) |
|
|
|
} else { |
|
|
|
glog.V(0).Infof("subscriber %s/%s partition %+v at %v completed", sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup, assigned.Partition, assigned.LeaderBroker) |
|
|
|
} |
|
|
|
}(assigned, topicPartition) |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
wg.Wait() |
|
|
|