Browse Source

fix

pull/5890/head
chrislu 8 months ago
parent
commit
65dd5ac6fb
  1. 3
      weed/mq/broker/broker_grpc_sub.go

3
weed/mq/broker/broker_grpc_sub.go

@ -101,6 +101,7 @@ func (b *MessageQueueBroker) SubscribeMessage(stream mq_pb.SeaweedMessaging_Subs
} }
imt.AcknowledgeMessage(ack.GetAck().Key, ack.GetAck().Sequence) imt.AcknowledgeMessage(ack.GetAck().Key, ack.GetAck().Sequence)
currentLastOffset := imt.GetOldest() currentLastOffset := imt.GetOldest()
fmt.Printf("%+v recv (%s,%d), oldest %d\n", partition, string(ack.GetAck().Key), ack.GetAck().Sequence, currentLastOffset)
if subscribeFollowMeStream != nil && currentLastOffset > lastOffset { if subscribeFollowMeStream != nil && currentLastOffset > lastOffset {
if err := subscribeFollowMeStream.Send(&mq_pb.SubscribeFollowMeRequest{ if err := subscribeFollowMeStream.Send(&mq_pb.SubscribeFollowMeRequest{
Message: &mq_pb.SubscribeFollowMeRequest_Ack{ Message: &mq_pb.SubscribeFollowMeRequest_Ack{
@ -161,6 +162,8 @@ func (b *MessageQueueBroker) SubscribeMessage(stream mq_pb.SeaweedMessaging_Subs
// reset the sleep interval count // reset the sleep interval count
sleepIntervalCount = 0 sleepIntervalCount = 0
imt.InflightMessage(logEntry.Key, logEntry.TsNs)
if err := stream.Send(&mq_pb.SubscribeMessageResponse{Message: &mq_pb.SubscribeMessageResponse_Data{ if err := stream.Send(&mq_pb.SubscribeMessageResponse{Message: &mq_pb.SubscribeMessageResponse_Data{
Data: &mq_pb.DataMessage{ Data: &mq_pb.DataMessage{
Key: logEntry.Key, Key: logEntry.Key,

Loading…
Cancel
Save