diff --git a/weed/mq/broker/broker_grpc_pub_follow.go b/weed/mq/broker/broker_grpc_pub_follow.go index e74d7025f..533e32f18 100644 --- a/weed/mq/broker/broker_grpc_pub_follow.go +++ b/weed/mq/broker/broker_grpc_pub_follow.go @@ -8,11 +8,15 @@ import ( "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" "io" "math/rand" + "sync" "time" ) func (b *MessageQueueBroker) PublishFollowMe(c context.Context, request *mq_pb.PublishFollowMeRequest) (*mq_pb.PublishFollowMeResponse, error){ glog.V(0).Infof("PublishFollowMe %v", request) + var wg sync.WaitGroup + wg.Add(1) + var ret error go b.withBrokerClient(true, pb.ServerAddress(request.BrokerSelf), func(client mq_pb.SeaweedMessagingClient) error { followerId := rand.Int31() subscribeClient, err := client.FollowInMemoryMessages(context.Background(), &mq_pb.FollowInMemoryMessagesRequest{ @@ -30,16 +34,30 @@ func (b *MessageQueueBroker) PublishFollowMe(c context.Context, request *mq_pb. }, }, }) + if err != nil { glog.Errorf("FollowInMemoryMessages error: %v", err) + ret = err return err } + // receive first hello message + resp, err := subscribeClient.Recv() + if err != nil { + return fmt.Errorf("FollowInMemoryMessages recv first message error: %v", err) + } + if resp == nil { + glog.V(0).Infof("doFollowInMemoryMessage recv first message nil response") + return io.ErrUnexpectedEOF + } + wg.Done() + b.doFollowInMemoryMessage(context.Background(), subscribeClient) return nil }) - return &mq_pb.PublishFollowMeResponse{}, nil + wg.Wait() + return &mq_pb.PublishFollowMeResponse{}, ret } func (b *MessageQueueBroker) doFollowInMemoryMessage(c context.Context, client mq_pb.SeaweedMessaging_FollowInMemoryMessagesClient) { diff --git a/weed/mq/broker/broker_grpc_sub.go b/weed/mq/broker/broker_grpc_sub.go index 3280be2c0..5fd4522bd 100644 --- a/weed/mq/broker/broker_grpc_sub.go +++ b/weed/mq/broker/broker_grpc_sub.go @@ -184,6 +184,15 @@ func (b *MessageQueueBroker) FollowInMemoryMessages(req *mq_pb.FollowInMemoryMe glog.V(0).Infof("FollowInMemoryMessages %s on %v %v disconnected, sent %d", clientName, t, partition, counter) }() + // send first hello message + // to indicate the follower is connected + stream.Send(&mq_pb.FollowInMemoryMessagesResponse{ + Message: &mq_pb.FollowInMemoryMessagesResponse_Ctrl{ + Ctrl: &mq_pb.FollowInMemoryMessagesResponse_CtrlMessage{ + }, + }, + }) + var startPosition log_buffer.MessagePosition if req.GetInit() != nil && req.GetInit().GetPartitionOffset() != nil { startPosition = getRequestPosition(req.GetInit().GetPartitionOffset())