diff --git a/weed/mq/client/sub_client/on_each_partition.go b/weed/mq/client/sub_client/on_each_partition.go index 792376a69..8d5bf2044 100644 --- a/weed/mq/client/sub_client/on_each_partition.go +++ b/weed/mq/client/sub_client/on_each_partition.go @@ -10,7 +10,7 @@ import ( "io" ) -func (sub *TopicSubscriber) onEachPartition(assigned *mq_pb.BrokerPartitionAssignment) error { +func (sub *TopicSubscriber) onEachPartition(assigned *mq_pb.BrokerPartitionAssignment, stopCh chan struct{}) error { // connect to the partition broker return pb.WithBrokerGrpcClient(true, assigned.LeaderBroker, sub.SubscriberConfig.GrpcDialOption, func(client mq_pb.SeaweedMessagingClient) error { @@ -61,15 +61,20 @@ func (sub *TopicSubscriber) onEachPartition(assigned *mq_pb.BrokerPartitionAssig executors := util.NewLimitedConcurrentExecutor(int(perPartitionConcurrency)) go func() { - for ack := range partitionOffsetChan { - subscribeClient.SendMsg(&mq_pb.SubscribeMessageRequest{ - Message: &mq_pb.SubscribeMessageRequest_Ack{ - Ack: &mq_pb.SubscribeMessageRequest_AckMessage{ - Key: ack.Key, - Sequence: ack.Offset, + for { + select { + case <-stopCh: + break + case ack := <- partitionOffsetChan: + subscribeClient.SendMsg(&mq_pb.SubscribeMessageRequest{ + Message: &mq_pb.SubscribeMessageRequest_Ack{ + Ack: &mq_pb.SubscribeMessageRequest_AckMessage{ + Key: ack.Key, + Sequence: ack.Offset, + }, }, - }, - }) + }) + } } subscribeClient.CloseSend() }() diff --git a/weed/mq/client/sub_client/subscribe.go b/weed/mq/client/sub_client/subscribe.go index e539d3da0..7208ce2ac 100644 --- a/weed/mq/client/sub_client/subscribe.go +++ b/weed/mq/client/sub_client/subscribe.go @@ -9,6 +9,7 @@ import ( ) type ProcessorState struct { + stopCh chan struct{} } // Subscribe subscribes to a topic's specified partitions. @@ -41,8 +42,11 @@ func (sub *TopicSubscriber) startProcessors() { sub.waitUntilNoOverlappingPartitionInFlight(topicPartition) // start a processors + stopChan := make(chan struct{}) sub.activeProcessorsLock.Lock() - sub.activeProcessors[topicPartition] = &ProcessorState{} + sub.activeProcessors[topicPartition] = &ProcessorState{ + stopCh: stopChan, + } sub.activeProcessorsLock.Unlock() go func(assigned *mq_pb.BrokerPartitionAssignment, topicPartition topic.Partition) { @@ -55,7 +59,7 @@ func (sub *TopicSubscriber) startProcessors() { wg.Done() }() glog.V(0).Infof("subscriber %s/%s assigned partition %+v at %v", sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup, assigned.Partition, assigned.LeaderBroker) - err := sub.onEachPartition(assigned) + err := sub.onEachPartition(assigned, stopChan) if err != nil { glog.V(0).Infof("subscriber %s/%s partition %+v at %v: %v", sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup, assigned.Partition, assigned.LeaderBroker, err) } else { @@ -64,7 +68,13 @@ func (sub *TopicSubscriber) startProcessors() { }(assigned.PartitionAssignment, topicPartition) } if unAssignment := message.GetUnAssignment(); unAssignment != nil { - + topicPartition := topic.FromPbPartition(unAssignment.Partition) + sub.activeProcessorsLock.Lock() + if processor, found := sub.activeProcessors[topicPartition]; found { + close(processor.stopCh) + delete(sub.activeProcessors, topicPartition) + } + sub.activeProcessorsLock.Unlock() } }