diff --git a/weed/mq/broker/broker_grpc_pub.go b/weed/mq/broker/broker_grpc_pub.go index e4861e9bc..17d01f620 100644 --- a/weed/mq/broker/broker_grpc_pub.go +++ b/weed/mq/broker/broker_grpc_pub.go @@ -7,10 +7,10 @@ import ( "github.com/seaweedfs/seaweedfs/weed/mq/topic" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" "google.golang.org/grpc/peer" + "io" "math/rand" "net" "sync/atomic" - "time" ) // PUB @@ -75,14 +75,17 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis respChan := make(chan *mq_pb.PublishMessageResponse, 128) defer func() { atomic.StoreInt32(&isStopping, 1) + respChan <- &mq_pb.PublishMessageResponse{ + AckSequence: ackSequence, + } close(respChan) localTopicPartition.Publishers.RemovePublisher(clientName) if localTopicPartition.MaybeShutdownLocalPartition() { b.localTopicManager.RemoveTopicPartition(t, p) } + glog.V(0).Infof("topic %v partition %v published %d messges.", initMessage.Topic, initMessage.Partition, ackSequence) }() go func() { - ticker := time.NewTicker(1 * time.Second) for { select { case resp := <-respChan: @@ -93,15 +96,6 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis } else { return } - case <-ticker.C: - if atomic.LoadInt32(&isStopping) == 0 { - response := &mq_pb.PublishMessageResponse{ - AckSequence: ackSequence, - } - respChan <- response - } else { - return - } case <-localTopicPartition.StopPublishersCh: respChan <- &mq_pb.PublishMessageResponse{ AckSequence: ackSequence, @@ -116,6 +110,10 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis // receive a message req, err := stream.Recv() if err != nil { + if err == io.EOF { + break + } + glog.V(0).Infof("topic %v partition %v publish stream error: %v", initMessage.Topic, initMessage.Partition, err) return err } diff --git a/weed/mq/client/pub_client/scheduler.go b/weed/mq/client/pub_client/scheduler.go index c24ac0384..89d131580 100644 --- a/weed/mq/client/pub_client/scheduler.go +++ b/weed/mq/client/pub_client/scheduler.go @@ -158,7 +158,7 @@ func (p *TopicPublisher) doPublishToPartition(job *EachPartitionPublishJob) erro go func() { for { - _, err := publishClient.Recv() + ackResp, err := publishClient.Recv() if err != nil { e, ok := status.FromError(err) if ok && e.Code() == codes.Unknown && e.Message() == "EOF" { @@ -168,9 +168,18 @@ func (p *TopicPublisher) doPublishToPartition(job *EachPartitionPublishJob) erro fmt.Printf("publish to %s error: %v\n", publishClient.Broker, err) return } + if ackResp.Error != "" { + publishClient.Err = fmt.Errorf("ack error: %v", ackResp.Error) + fmt.Printf("publish to %s error: %v\n", publishClient.Broker, ackResp.Error) + return + } + if ackResp.AckSequence > 0 { + log.Printf("ack %d", ackResp.AckSequence) + } } }() + publishCounter := 0 for data, hasData := job.inputQueue.Dequeue(); hasData; data, hasData = job.inputQueue.Dequeue() { if err := publishClient.Send(&mq_pb.PublishMessageRequest{ Message: &mq_pb.PublishMessageRequest_Data{ @@ -179,7 +188,17 @@ func (p *TopicPublisher) doPublishToPartition(job *EachPartitionPublishJob) erro }); err != nil { return fmt.Errorf("send publish data: %v", err) } + publishCounter++ + } + + if err := publishClient.CloseSend(); err != nil { + return fmt.Errorf("close send: %v", err) } + + time.Sleep(3 * time.Second) + + log.Printf("published %d messages to %v for topic partition %+v", publishCounter, job.LeaderBroker, job.Partition) + return nil }