From 23a72db1df85a275a5f63a8b76d1990667434994 Mon Sep 17 00:00:00 2001 From: chrislu Date: Thu, 30 May 2024 00:27:44 -0700 Subject: [PATCH] stop partitionOffsetChan if closed --- weed/mq/client/sub_client/on_each_partition.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/weed/mq/client/sub_client/on_each_partition.go b/weed/mq/client/sub_client/on_each_partition.go index 14e427713..d0222c370 100644 --- a/weed/mq/client/sub_client/on_each_partition.go +++ b/weed/mq/client/sub_client/on_each_partition.go @@ -67,6 +67,11 @@ func (sub *TopicSubscriber) onEachPartition(assigned *mq_pb.BrokerPartitionAssig subscribeClient.CloseSend() return case ack := <-partitionOffsetChan: + case ack, ok := <-partitionOffsetChan: + if !ok { + subscribeClient.CloseSend() + return + } subscribeClient.SendMsg(&mq_pb.SubscribeMessageRequest{ Message: &mq_pb.SubscribeMessageRequest_Ack{ Ack: &mq_pb.SubscribeMessageRequest_AckMessage{