diff --git a/weed/mq/broker/broker_grpc_pub.go b/weed/mq/broker/broker_grpc_pub.go index ab4320a9d..f8554ea5b 100644 --- a/weed/mq/broker/broker_grpc_pub.go +++ b/weed/mq/broker/broker_grpc_pub.go @@ -11,6 +11,8 @@ import ( "io" "math/rand" "net" + "sync/atomic" + "time" ) // PUB @@ -45,7 +47,6 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis } response := &mq_pb.PublishMessageResponse{} // TODO check whether current broker should be the leader for the topic partition - ackInterval := 1 initMessage := req.GetInit() if initMessage == nil { response.Error = fmt.Sprintf("missing init message") @@ -62,7 +63,6 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis return stream.Send(response) } - ackInterval = int(initMessage.AckInterval) // connect to follower brokers if localTopicPartition.FollowerStream == nil && len(initMessage.FollowerBrokers) > 0 { @@ -104,22 +104,49 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis glog.Errorf("Error receiving follower ack: %v", err) return } + atomic.StoreInt64(&localTopicPartition.AckTsNs, ack.AckTsNs) println("recv ack", ack.AckTsNs) - if err := stream.Send(&mq_pb.PublishMessageResponse{ - AckSequence: ack.AckTsNs, - }); err != nil { - glog.Errorf("Error sending publisher ack %v: %v", ack, err) - return - } } }() } + var receivedSequence, acknowledgedSequence int64 + var isClosed bool + + // start sending ack to publisher + ackInterval := int64(1) + if initMessage.AckInterval > 0 { + ackInterval = int64(initMessage.AckInterval) + } + go func() { + defer func() { + println("stop sending ack to publisher") + }() + + lastAckTime := time.Now() + for !isClosed { + receivedSequence = atomic.LoadInt64(&localTopicPartition.AckTsNs) + if acknowledgedSequence < receivedSequence && (receivedSequence - acknowledgedSequence >= ackInterval || time.Since(lastAckTime) > 1*time.Second){ + acknowledgedSequence = receivedSequence + response := &mq_pb.PublishMessageResponse{ + AckSequence: acknowledgedSequence, + } + if err := stream.Send(response); err != nil { + glog.Errorf("Error sending response %v: %v", response, err) + } + println("sent ack", acknowledgedSequence) + lastAckTime = time.Now() + } else { + time.Sleep(1 * time.Second) + } + } + }() + + // process each published messages clientName := fmt.Sprintf("%v-%4d/%s/%v", findClientAddress(stream.Context()), rand.Intn(10000), initMessage.Topic, initMessage.Partition) localTopicPartition.Publishers.AddPublisher(clientName, topic.NewLocalPublisher()) - ackCounter := 0 var ackSequence int64 defer func() { // remove the publisher @@ -146,24 +173,8 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis // send a hello message stream.Send(&mq_pb.PublishMessageResponse{}) - var receivedSequence, acknowledgedSequence int64 - defer func() { - if localTopicPartition.FollowerStream != nil { - //if err := followerStream.CloseSend(); err != nil { - // glog.Errorf("Error closing follower stream: %v", err) - //} - } else { - if acknowledgedSequence < receivedSequence { - acknowledgedSequence = receivedSequence - response := &mq_pb.PublishMessageResponse{ - AckSequence: acknowledgedSequence, - } - if err := stream.Send(response); err != nil { - glog.Errorf("Error sending response %v: %v", response, err) - } - } - } + isClosed = true }() // process each published messages @@ -175,7 +186,7 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis break } glog.V(0).Infof("topic %v partition %v publish stream error: %v", initMessage.Topic, initMessage.Partition, err) - return err + break } // Process the received message @@ -199,37 +210,10 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis return followErr } } else { - ackCounter++ - if ackCounter >= ackInterval { - ackCounter = 0 - // send back the ack directly - acknowledgedSequence = receivedSequence - response := &mq_pb.PublishMessageResponse{ - AckSequence: acknowledgedSequence, - } - if err := stream.Send(response); err != nil { - glog.Errorf("Error sending response %v: %v", response, err) - } - } + atomic.StoreInt64(&localTopicPartition.AckTsNs, receivedSequence) } } - if localTopicPartition.FollowerStream != nil { - // send close to the follower - if followErr := localTopicPartition.FollowerStream.Send(&mq_pb.PublishFollowMeRequest{ - Message: &mq_pb.PublishFollowMeRequest_Close{ - Close: &mq_pb.PublishFollowMeRequest_CloseMessage{}, - }, - }); followErr != nil { - return followErr - } - println("closing follower stream") - - //if err := followerStream.CloseSend(); err != nil { - // glog.Errorf("Error closing follower stream: %v", err) - //} - } - glog.V(0).Infof("topic %v partition %v publish stream closed.", initMessage.Topic, initMessage.Partition) return nil diff --git a/weed/mq/topic/local_partition.go b/weed/mq/topic/local_partition.go index dbef9da89..34c2903f4 100644 --- a/weed/mq/topic/local_partition.go +++ b/weed/mq/topic/local_partition.go @@ -14,6 +14,7 @@ import ( type LocalPartition struct { ListenersWaits int64 + AckTsNs int64 // notifying clients ListenersLock sync.Mutex