Browse Source

sending keyed offset

pull/5890/head
chrislu 8 months ago
parent
commit
f13475ac2e
  1. 15
      weed/mq/client/sub_client/connect_to_sub_coordinator.go

15
weed/mq/client/sub_client/connect_to_sub_coordinator.go

@ -125,7 +125,12 @@ func (sub *TopicSubscriber) onEachPartition(assigned *mq_pb.BrokerPartitionAssig
defer sub.OnCompletionFunc() defer sub.OnCompletionFunc()
} }
partitionOffsetChan := make(chan int64, 1024)
type KeyedOffset struct {
Key []byte
Offset int64
}
partitionOffsetChan := make(chan KeyedOffset, 1024)
defer func() { defer func() {
close(partitionOffsetChan) close(partitionOffsetChan)
}() }()
@ -136,7 +141,8 @@ func (sub *TopicSubscriber) onEachPartition(assigned *mq_pb.BrokerPartitionAssig
subscribeClient.SendMsg(&mq_pb.SubscribeMessageRequest{ subscribeClient.SendMsg(&mq_pb.SubscribeMessageRequest{
Message: &mq_pb.SubscribeMessageRequest_Ack{ Message: &mq_pb.SubscribeMessageRequest_Ack{
Ack: &mq_pb.SubscribeMessageRequest_AckMessage{ Ack: &mq_pb.SubscribeMessageRequest_AckMessage{
Sequence: ack,
Key: ack.Key,
Sequence: ack.Offset,
}, },
}, },
}) })
@ -161,7 +167,10 @@ func (sub *TopicSubscriber) onEachPartition(assigned *mq_pb.BrokerPartitionAssig
executors.Execute(func() { executors.Execute(func() {
processErr := sub.OnEachMessageFunc(m.Data.Key, m.Data.Value) processErr := sub.OnEachMessageFunc(m.Data.Key, m.Data.Value)
if processErr == nil { if processErr == nil {
partitionOffsetChan <- m.Data.TsNs
partitionOffsetChan <- KeyedOffset{
Key: m.Data.Key,
Offset: m.Data.TsNs,
}
} else { } else {
lastErr = processErr lastErr = processErr
} }

Loading…
Cancel
Save