diff --git a/weed/mq/client/sub_client/connect_to_sub_coordinator.go b/weed/mq/client/sub_client/connect_to_sub_coordinator.go index 53ac27418..84af3a19c 100644 --- a/weed/mq/client/sub_client/connect_to_sub_coordinator.go +++ b/weed/mq/client/sub_client/connect_to_sub_coordinator.go @@ -125,7 +125,12 @@ func (sub *TopicSubscriber) onEachPartition(assigned *mq_pb.BrokerPartitionAssig defer sub.OnCompletionFunc() } - partitionOffsetChan := make(chan int64, 1024) + type KeyedOffset struct { + Key []byte + Offset int64 + } + + partitionOffsetChan := make(chan KeyedOffset, 1024) defer func() { close(partitionOffsetChan) }() @@ -136,7 +141,8 @@ func (sub *TopicSubscriber) onEachPartition(assigned *mq_pb.BrokerPartitionAssig subscribeClient.SendMsg(&mq_pb.SubscribeMessageRequest{ Message: &mq_pb.SubscribeMessageRequest_Ack{ Ack: &mq_pb.SubscribeMessageRequest_AckMessage{ - Sequence: ack, + Key: ack.Key, + Sequence: ack.Offset, }, }, }) @@ -161,7 +167,10 @@ func (sub *TopicSubscriber) onEachPartition(assigned *mq_pb.BrokerPartitionAssig executors.Execute(func() { processErr := sub.OnEachMessageFunc(m.Data.Key, m.Data.Value) if processErr == nil { - partitionOffsetChan <- m.Data.TsNs + partitionOffsetChan <- KeyedOffset{ + Key: m.Data.Key, + Offset: m.Data.TsNs, + } } else { lastErr = processErr }