From 496fc8fbbfe85547e4b6a46f153076a27fd4e9aa Mon Sep 17 00:00:00 2001 From: chrislu Date: Fri, 5 Jan 2024 15:35:19 -0800 Subject: [PATCH] refactor --- .../sub_client/connect_to_sub_coordinator.go | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/weed/mq/client/sub_client/connect_to_sub_coordinator.go b/weed/mq/client/sub_client/connect_to_sub_coordinator.go index e7749f94b..fa75f87fe 100644 --- a/weed/mq/client/sub_client/connect_to_sub_coordinator.go +++ b/weed/mq/client/sub_client/connect_to_sub_coordinator.go @@ -108,15 +108,15 @@ func (sub *TopicSubscriber) onEachPartition(partition *mq_pb.Partition, broker s Namespace: sub.ContentConfig.Namespace, Name: sub.ContentConfig.Topic, }, - Partition: &mq_pb.Partition{ - RingSize: partition.RingSize, - RangeStart: partition.RangeStart, - RangeStop: partition.RangeStop, + PartitionOffset: &mq_pb.PartitionOffset{ + Partition: &mq_pb.Partition{ + RingSize: partition.RingSize, + RangeStart: partition.RangeStart, + RangeStop: partition.RangeStop, + }, + TsNs: sub.alreadyProcessedTsNs, }, Filter: sub.ContentConfig.Filter, - Offset: &mq_pb.SubscribeMessageRequest_InitMessage_StartTimestampNs{ - StartTimestampNs: sub.alreadyProcessedTsNs, - }, }, }, }) @@ -148,6 +148,7 @@ func (sub *TopicSubscriber) onEachPartition(partition *mq_pb.Partition, broker s return fmt.Errorf("subscribe recv: %v", err) } if resp.Message == nil { + glog.V(0).Infof("subscriber %s/%s/%s received nil message", sub.ContentConfig.Namespace, sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup) continue } switch m := resp.Message.(type) {