Browse Source

follower receive a hello message before actual data

the leader can ensure the follower has connected
pull/5637/head
chrislu 10 months ago
parent
commit
f653838f34
  1. 20
      weed/mq/broker/broker_grpc_pub_follow.go
  2. 9
      weed/mq/broker/broker_grpc_sub.go

20
weed/mq/broker/broker_grpc_pub_follow.go

@ -8,11 +8,15 @@ import (
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"io" "io"
"math/rand" "math/rand"
"sync"
"time" "time"
) )
func (b *MessageQueueBroker) PublishFollowMe(c context.Context, request *mq_pb.PublishFollowMeRequest) (*mq_pb.PublishFollowMeResponse, error){ func (b *MessageQueueBroker) PublishFollowMe(c context.Context, request *mq_pb.PublishFollowMeRequest) (*mq_pb.PublishFollowMeResponse, error){
glog.V(0).Infof("PublishFollowMe %v", request) glog.V(0).Infof("PublishFollowMe %v", request)
var wg sync.WaitGroup
wg.Add(1)
var ret error
go b.withBrokerClient(true, pb.ServerAddress(request.BrokerSelf), func(client mq_pb.SeaweedMessagingClient) error { go b.withBrokerClient(true, pb.ServerAddress(request.BrokerSelf), func(client mq_pb.SeaweedMessagingClient) error {
followerId := rand.Int31() followerId := rand.Int31()
subscribeClient, err := client.FollowInMemoryMessages(context.Background(), &mq_pb.FollowInMemoryMessagesRequest{ subscribeClient, err := client.FollowInMemoryMessages(context.Background(), &mq_pb.FollowInMemoryMessagesRequest{
@ -30,16 +34,30 @@ func (b *MessageQueueBroker) PublishFollowMe(c context.Context, request *mq_pb.
}, },
}, },
}) })
if err != nil { if err != nil {
glog.Errorf("FollowInMemoryMessages error: %v", err) glog.Errorf("FollowInMemoryMessages error: %v", err)
ret = err
return err return err
} }
// receive first hello message
resp, err := subscribeClient.Recv()
if err != nil {
return fmt.Errorf("FollowInMemoryMessages recv first message error: %v", err)
}
if resp == nil {
glog.V(0).Infof("doFollowInMemoryMessage recv first message nil response")
return io.ErrUnexpectedEOF
}
wg.Done()
b.doFollowInMemoryMessage(context.Background(), subscribeClient) b.doFollowInMemoryMessage(context.Background(), subscribeClient)
return nil return nil
}) })
return &mq_pb.PublishFollowMeResponse{}, nil
wg.Wait()
return &mq_pb.PublishFollowMeResponse{}, ret
} }
func (b *MessageQueueBroker) doFollowInMemoryMessage(c context.Context, client mq_pb.SeaweedMessaging_FollowInMemoryMessagesClient) { func (b *MessageQueueBroker) doFollowInMemoryMessage(c context.Context, client mq_pb.SeaweedMessaging_FollowInMemoryMessagesClient) {

9
weed/mq/broker/broker_grpc_sub.go

@ -184,6 +184,15 @@ func (b *MessageQueueBroker) FollowInMemoryMessages(req *mq_pb.FollowInMemoryMe
glog.V(0).Infof("FollowInMemoryMessages %s on %v %v disconnected, sent %d", clientName, t, partition, counter) glog.V(0).Infof("FollowInMemoryMessages %s on %v %v disconnected, sent %d", clientName, t, partition, counter)
}() }()
// send first hello message
// to indicate the follower is connected
stream.Send(&mq_pb.FollowInMemoryMessagesResponse{
Message: &mq_pb.FollowInMemoryMessagesResponse_Ctrl{
Ctrl: &mq_pb.FollowInMemoryMessagesResponse_CtrlMessage{
},
},
})
var startPosition log_buffer.MessagePosition var startPosition log_buffer.MessagePosition
if req.GetInit() != nil && req.GetInit().GetPartitionOffset() != nil { if req.GetInit() != nil && req.GetInit().GetPartitionOffset() != nil {
startPosition = getRequestPosition(req.GetInit().GetPartitionOffset()) startPosition = getRequestPosition(req.GetInit().GetPartitionOffset())

Loading…
Cancel
Save