chrislu 4 months ago
parent
commit
eb2d2979b1
  1. 24
      weed/mq/topic/local_partition.go

24
weed/mq/topic/local_partition.go

@ -28,7 +28,7 @@ type LocalPartition struct {
Publishers *LocalPartitionPublishers Publishers *LocalPartitionPublishers
Subscribers *LocalPartitionSubscribers Subscribers *LocalPartitionSubscribers
publishFolloweMeStream mq_pb.SeaweedMessaging_PublishFollowMeClient
publishFollowMeStream mq_pb.SeaweedMessaging_PublishFollowMeClient
followerGrpcConnection *grpc.ClientConn followerGrpcConnection *grpc.ClientConn
Follower string Follower string
} }
@ -55,9 +55,9 @@ func (p *LocalPartition) Publish(message *mq_pb.DataMessage) error {
p.LogBuffer.AddToBuffer(message) p.LogBuffer.AddToBuffer(message)
// maybe send to the follower // maybe send to the follower
if p.publishFolloweMeStream != nil {
if p.publishFollowMeStream != nil {
// println("recv", string(message.Key), message.TsNs) // println("recv", string(message.Key), message.TsNs)
if followErr := p.publishFolloweMeStream.Send(&mq_pb.PublishFollowMeRequest{
if followErr := p.publishFollowMeStream.Send(&mq_pb.PublishFollowMeRequest{
Message: &mq_pb.PublishFollowMeRequest_Data{ Message: &mq_pb.PublishFollowMeRequest_Data{
Data: message, Data: message,
}, },
@ -134,7 +134,7 @@ func (p *LocalPartition) WaitUntilNoPublishers() {
} }
func (p *LocalPartition) MaybeConnectToFollowers(initMessage *mq_pb.PublishMessageRequest_InitMessage, grpcDialOption grpc.DialOption) (err error) { func (p *LocalPartition) MaybeConnectToFollowers(initMessage *mq_pb.PublishMessageRequest_InitMessage, grpcDialOption grpc.DialOption) (err error) {
if p.publishFolloweMeStream != nil {
if p.publishFollowMeStream != nil {
return nil return nil
} }
if initMessage.FollowerBroker == "" { if initMessage.FollowerBroker == "" {
@ -148,11 +148,11 @@ func (p *LocalPartition) MaybeConnectToFollowers(initMessage *mq_pb.PublishMessa
return fmt.Errorf("fail to dial %s: %v", p.Follower, err) return fmt.Errorf("fail to dial %s: %v", p.Follower, err)
} }
followerClient := mq_pb.NewSeaweedMessagingClient(p.followerGrpcConnection) followerClient := mq_pb.NewSeaweedMessagingClient(p.followerGrpcConnection)
p.publishFolloweMeStream, err = followerClient.PublishFollowMe(ctx)
p.publishFollowMeStream, err = followerClient.PublishFollowMe(ctx)
if err != nil { if err != nil {
return fmt.Errorf("fail to create publish client: %v", err) return fmt.Errorf("fail to create publish client: %v", err)
} }
if err = p.publishFolloweMeStream.Send(&mq_pb.PublishFollowMeRequest{
if err = p.publishFollowMeStream.Send(&mq_pb.PublishFollowMeRequest{
Message: &mq_pb.PublishFollowMeRequest_Init{ Message: &mq_pb.PublishFollowMeRequest_Init{
Init: &mq_pb.PublishFollowMeRequest_InitMessage{ Init: &mq_pb.PublishFollowMeRequest_InitMessage{
Topic: initMessage.Topic, Topic: initMessage.Topic,
@ -170,7 +170,7 @@ func (p *LocalPartition) MaybeConnectToFollowers(initMessage *mq_pb.PublishMessa
}() }()
for { for {
ack, err := p.publishFolloweMeStream.Recv()
ack, err := p.publishFollowMeStream.Recv()
if err != nil { if err != nil {
e, _ := status.FromError(err) e, _ := status.FromError(err)
if e.Code() == codes.Canceled { if e.Code() == codes.Canceled {
@ -194,9 +194,9 @@ func (p *LocalPartition) MaybeShutdownLocalPartition() (hasShutdown bool) {
for !p.LogBuffer.IsAllFlushed() { for !p.LogBuffer.IsAllFlushed() {
time.Sleep(113 * time.Millisecond) time.Sleep(113 * time.Millisecond)
} }
if p.publishFolloweMeStream != nil {
if p.publishFollowMeStream != nil {
// send close to the follower // send close to the follower
if followErr := p.publishFolloweMeStream.Send(&mq_pb.PublishFollowMeRequest{
if followErr := p.publishFollowMeStream.Send(&mq_pb.PublishFollowMeRequest{
Message: &mq_pb.PublishFollowMeRequest_Close{ Message: &mq_pb.PublishFollowMeRequest_Close{
Close: &mq_pb.PublishFollowMeRequest_CloseMessage{}, Close: &mq_pb.PublishFollowMeRequest_CloseMessage{},
}, },
@ -205,7 +205,7 @@ func (p *LocalPartition) MaybeShutdownLocalPartition() (hasShutdown bool) {
} }
glog.V(4).Infof("closing grpcConnection to follower") glog.V(4).Infof("closing grpcConnection to follower")
p.followerGrpcConnection.Close() p.followerGrpcConnection.Close()
p.publishFolloweMeStream = nil
p.publishFollowMeStream = nil
p.Follower = "" p.Follower = ""
} }
@ -224,8 +224,8 @@ func (p *LocalPartition) Shutdown() {
} }
func (p *LocalPartition) NotifyLogFlushed(flushTsNs int64) { func (p *LocalPartition) NotifyLogFlushed(flushTsNs int64) {
if p.publishFolloweMeStream != nil {
if followErr := p.publishFolloweMeStream.Send(&mq_pb.PublishFollowMeRequest{
if p.publishFollowMeStream != nil {
if followErr := p.publishFollowMeStream.Send(&mq_pb.PublishFollowMeRequest{
Message: &mq_pb.PublishFollowMeRequest_Flush{ Message: &mq_pb.PublishFollowMeRequest_Flush{
Flush: &mq_pb.PublishFollowMeRequest_FlushMessage{ Flush: &mq_pb.PublishFollowMeRequest_FlushMessage{
TsNs: flushTsNs, TsNs: flushTsNs,

Loading…
Cancel
Save