diff --git a/weed/mq/broker/broker_grpc_pub.go b/weed/mq/broker/broker_grpc_pub.go index 6cb907794..68bcc49b2 100644 --- a/weed/mq/broker/broker_grpc_pub.go +++ b/weed/mq/broker/broker_grpc_pub.go @@ -91,38 +91,11 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis }); err != nil { return err } - } - - // process each published messages - clientName := fmt.Sprintf("%v-%4d/%s/%v", findClientAddress(stream.Context()), rand.Intn(10000), initMessage.Topic, initMessage.Partition) - localTopicPartition.Publishers.AddPublisher(clientName, topic.NewLocalPublisher()) - - ackCounter := 0 - var ackSequence int64 - defer func() { - if localTopicPartition.FollowerStream == nil { - // remove the publisher - localTopicPartition.Publishers.RemovePublisher(clientName) - glog.V(0).Infof("topic %v partition %v published %d messges Publisher:%d Subscriber:%d", initMessage.Topic, initMessage.Partition, ackSequence, localTopicPartition.Publishers.Size(), localTopicPartition.Subscribers.Size()) - if localTopicPartition.MaybeShutdownLocalPartition() { - b.localTopicManager.RemoveTopicPartition(t, p) - } - } - }() - if localTopicPartition.FollowerStream != nil { + // start receiving ack from follower go func() { defer func() { println("stop receiving ack from follower") - - // remove the publisher - localTopicPartition.Publishers.RemovePublisher(clientName) - glog.V(0).Infof("topic %v partition %v published %d messges Publisher:%d Subscriber:%d", initMessage.Topic, initMessage.Partition, ackSequence, localTopicPartition.Publishers.Size(), localTopicPartition.Subscribers.Size()) - if localTopicPartition.MaybeShutdownLocalPartition() { - b.localTopicManager.RemoveTopicPartition(t, p) - } - println("closing grpcConnection to follower") - localTopicPartition.GrpcConnection.Close() }() for { @@ -131,7 +104,6 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis glog.Errorf("Error receiving response: %v", err) return } - ackSequence = ack.AckTsNs println("recv ack", ack.AckTsNs) if err := stream.Send(&mq_pb.PublishMessageResponse{ AckSequence: ack.AckTsNs, @@ -143,6 +115,33 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis }() } + // process each published messages + clientName := fmt.Sprintf("%v-%4d/%s/%v", findClientAddress(stream.Context()), rand.Intn(10000), initMessage.Topic, initMessage.Partition) + localTopicPartition.Publishers.AddPublisher(clientName, topic.NewLocalPublisher()) + + ackCounter := 0 + var ackSequence int64 + defer func() { + // remove the publisher + localTopicPartition.Publishers.RemovePublisher(clientName) + glog.V(0).Infof("topic %v partition %v published %d messges Publisher:%d Subscriber:%d", initMessage.Topic, initMessage.Partition, ackSequence, localTopicPartition.Publishers.Size(), localTopicPartition.Subscribers.Size()) + if localTopicPartition.MaybeShutdownLocalPartition() { + if localTopicPartition.FollowerStream != nil { + // send close to the follower + if followErr := localTopicPartition.FollowerStream.Send(&mq_pb.PublishFollowMeRequest{ + Message: &mq_pb.PublishFollowMeRequest_Close{ + Close: &mq_pb.PublishFollowMeRequest_CloseMessage{}, + }, + }); followErr != nil { + glog.Errorf("Error closing follower stream: %v", followErr) + } + println("closing grpcConnection to follower") + localTopicPartition.GrpcConnection.Close() + } + b.localTopicManager.RemoveTopicPartition(t, p) + } + }() + // send a hello message stream.Send(&mq_pb.PublishMessageResponse{})