diff --git a/weed/mq/broker/broker_grpc_pub_follow.go b/weed/mq/broker/broker_grpc_pub_follow.go index 8ef85110a..f0c4a26ca 100644 --- a/weed/mq/broker/broker_grpc_pub_follow.go +++ b/weed/mq/broker/broker_grpc_pub_follow.go @@ -23,7 +23,7 @@ func (b *MessageQueueBroker) PublishFollowMe(c context.Context, request *mq_pb.P Message: &mq_pb.FollowInMemoryMessagesRequest_Init{ Init: &mq_pb.FollowInMemoryMessagesRequest_InitMessage{ ConsumerGroup: string(b.option.BrokerAddress()), - ConsumerId: fmt.Sprintf("followMe-%d", followerId), + ConsumerId: fmt.Sprintf("followMe@%s-%d", b.option.BrokerAddress(), followerId), FollowerId: followerId, Topic: request.Topic, PartitionOffset: &mq_pb.PartitionOffset{ @@ -52,7 +52,7 @@ func (b *MessageQueueBroker) PublishFollowMe(c context.Context, request *mq_pb.P } wg.Done() - b.doFollowInMemoryMessage(context.Background(), subscribeClient) + b.doFollowInMemoryMessage(context.Background(), followerId, subscribeClient) return nil }) @@ -60,7 +60,7 @@ func (b *MessageQueueBroker) PublishFollowMe(c context.Context, request *mq_pb.P return &mq_pb.PublishFollowMeResponse{}, ret } -func (b *MessageQueueBroker) doFollowInMemoryMessage(c context.Context, client mq_pb.SeaweedMessaging_FollowInMemoryMessagesClient) { +func (b *MessageQueueBroker) doFollowInMemoryMessage(c context.Context, followerId int32, client mq_pb.SeaweedMessaging_FollowInMemoryMessagesClient) { for { resp, err := client.Recv() if err != nil { @@ -87,9 +87,11 @@ func (b *MessageQueueBroker) doFollowInMemoryMessage(c context.Context, client m } if m.Ctrl.FollowerChangedToId != 0 { // follower changed - glog.V(0).Infof("doFollowInMemoryMessage follower changed to %d", m.Ctrl.FollowerChangedToId) + glog.V(0).Infof("doFollowInMemoryMessage follower changed from %d to %d", followerId, m.Ctrl.FollowerChangedToId) return } + default: + glog.V(0).Infof("doFollowInMemoryMessage unknown message type: %v", m) } } }