Browse Source

separate goroutine to send ack to publisher

mq-subscribe
chrislu 9 months ago
parent
commit
5cc94a05b9
  1. 94
      weed/mq/broker/broker_grpc_pub.go
  2. 1
      weed/mq/topic/local_partition.go

94
weed/mq/broker/broker_grpc_pub.go

@ -11,6 +11,8 @@ import (
"io"
"math/rand"
"net"
"sync/atomic"
"time"
)
// PUB
@ -45,7 +47,6 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis
}
response := &mq_pb.PublishMessageResponse{}
// TODO check whether current broker should be the leader for the topic partition
ackInterval := 1
initMessage := req.GetInit()
if initMessage == nil {
response.Error = fmt.Sprintf("missing init message")
@ -62,7 +63,6 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis
return stream.Send(response)
}
ackInterval = int(initMessage.AckInterval)
// connect to follower brokers
if localTopicPartition.FollowerStream == nil && len(initMessage.FollowerBrokers) > 0 {
@ -104,22 +104,49 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis
glog.Errorf("Error receiving follower ack: %v", err)
return
}
atomic.StoreInt64(&localTopicPartition.AckTsNs, ack.AckTsNs)
println("recv ack", ack.AckTsNs)
if err := stream.Send(&mq_pb.PublishMessageResponse{
AckSequence: ack.AckTsNs,
}); err != nil {
glog.Errorf("Error sending publisher ack %v: %v", ack, err)
return
}
}
}()
}
var receivedSequence, acknowledgedSequence int64
var isClosed bool
// start sending ack to publisher
ackInterval := int64(1)
if initMessage.AckInterval > 0 {
ackInterval = int64(initMessage.AckInterval)
}
go func() {
defer func() {
println("stop sending ack to publisher")
}()
lastAckTime := time.Now()
for !isClosed {
receivedSequence = atomic.LoadInt64(&localTopicPartition.AckTsNs)
if acknowledgedSequence < receivedSequence && (receivedSequence - acknowledgedSequence >= ackInterval || time.Since(lastAckTime) > 1*time.Second){
acknowledgedSequence = receivedSequence
response := &mq_pb.PublishMessageResponse{
AckSequence: acknowledgedSequence,
}
if err := stream.Send(response); err != nil {
glog.Errorf("Error sending response %v: %v", response, err)
}
println("sent ack", acknowledgedSequence)
lastAckTime = time.Now()
} else {
time.Sleep(1 * time.Second)
}
}
}()
// process each published messages
clientName := fmt.Sprintf("%v-%4d/%s/%v", findClientAddress(stream.Context()), rand.Intn(10000), initMessage.Topic, initMessage.Partition)
localTopicPartition.Publishers.AddPublisher(clientName, topic.NewLocalPublisher())
ackCounter := 0
var ackSequence int64
defer func() {
// remove the publisher
@ -146,24 +173,8 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis
// send a hello message
stream.Send(&mq_pb.PublishMessageResponse{})
var receivedSequence, acknowledgedSequence int64
defer func() {
if localTopicPartition.FollowerStream != nil {
//if err := followerStream.CloseSend(); err != nil {
// glog.Errorf("Error closing follower stream: %v", err)
//}
} else {
if acknowledgedSequence < receivedSequence {
acknowledgedSequence = receivedSequence
response := &mq_pb.PublishMessageResponse{
AckSequence: acknowledgedSequence,
}
if err := stream.Send(response); err != nil {
glog.Errorf("Error sending response %v: %v", response, err)
}
}
}
isClosed = true
}()
// process each published messages
@ -175,7 +186,7 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis
break
}
glog.V(0).Infof("topic %v partition %v publish stream error: %v", initMessage.Topic, initMessage.Partition, err)
return err
break
}
// Process the received message
@ -199,37 +210,10 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis
return followErr
}
} else {
ackCounter++
if ackCounter >= ackInterval {
ackCounter = 0
// send back the ack directly
acknowledgedSequence = receivedSequence
response := &mq_pb.PublishMessageResponse{
AckSequence: acknowledgedSequence,
}
if err := stream.Send(response); err != nil {
glog.Errorf("Error sending response %v: %v", response, err)
}
}
atomic.StoreInt64(&localTopicPartition.AckTsNs, receivedSequence)
}
}
if localTopicPartition.FollowerStream != nil {
// send close to the follower
if followErr := localTopicPartition.FollowerStream.Send(&mq_pb.PublishFollowMeRequest{
Message: &mq_pb.PublishFollowMeRequest_Close{
Close: &mq_pb.PublishFollowMeRequest_CloseMessage{},
},
}); followErr != nil {
return followErr
}
println("closing follower stream")
//if err := followerStream.CloseSend(); err != nil {
// glog.Errorf("Error closing follower stream: %v", err)
//}
}
glog.V(0).Infof("topic %v partition %v publish stream closed.", initMessage.Topic, initMessage.Partition)
return nil

1
weed/mq/topic/local_partition.go

@ -14,6 +14,7 @@ import (
type LocalPartition struct {
ListenersWaits int64
AckTsNs int64
// notifying clients
ListenersLock sync.Mutex

Loading…
Cancel
Save