Browse Source

publish and send to follower

mq-subscribe
chrislu 10 months ago
parent
commit
dab5454332
  1. 17
      weed/mq/broker/broker_grpc_pub.go
  2. 25
      weed/mq/topic/local_partition.go

17
weed/mq/broker/broker_grpc_pub.go

@ -143,21 +143,8 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis
}
// send to the local partition
localTopicPartition.Publish(dataMessage)
receivedSequence = dataMessage.TsNs
// maybe send to the follower
if localTopicPartition.FollowerStream != nil {
println("recv", string(dataMessage.Key), dataMessage.TsNs)
if followErr := localTopicPartition.FollowerStream.Send(&mq_pb.PublishFollowMeRequest{
Message: &mq_pb.PublishFollowMeRequest_Data{
Data: dataMessage,
},
}); followErr != nil {
return followErr
}
} else {
atomic.StoreInt64(&localTopicPartition.AckTsNs, receivedSequence)
if err = localTopicPartition.Publish(dataMessage); err != nil {
return fmt.Errorf("topic %v partition %v publish error: %v", initMessage.Topic, initMessage.Partition, err)
}
}

25
weed/mq/topic/local_partition.go

@ -32,6 +32,7 @@ type LocalPartition struct {
FollowerStream mq_pb.SeaweedMessaging_PublishFollowMeClient
FollowerGrpcConnection *grpc.ClientConn
follower string
}
var TIME_FORMAT = "2006-01-02-15-04-05"
@ -54,8 +55,24 @@ func NewLocalPartition(partition Partition, isLeader bool, followerBrokers []pb.
return lp
}
func (p *LocalPartition) Publish(message *mq_pb.DataMessage) {
func (p *LocalPartition) Publish(message *mq_pb.DataMessage) error {
p.LogBuffer.AddToBuffer(message.Key, message.Value, time.Now().UnixNano())
// maybe send to the follower
if p.FollowerStream != nil {
println("recv", string(message.Key), message.TsNs)
if followErr := p.FollowerStream.Send(&mq_pb.PublishFollowMeRequest{
Message: &mq_pb.PublishFollowMeRequest_Data{
Data: message,
},
}); followErr != nil {
return fmt.Errorf("send to follower %s: %v", p.follower, followErr)
}
} else {
atomic.StoreInt64(&p.AckTsNs, message.TsNs)
}
return nil
}
func (p *LocalPartition) Subscribe(clientName string, startPosition log_buffer.MessagePosition,
@ -137,11 +154,11 @@ func (p *LocalPartition) MaybeConnectToFollowers(initMessage *mq_pb.PublishMessa
return nil
}
follower := initMessage.FollowerBrokers[0]
p.follower = initMessage.FollowerBrokers[0]
ctx := context.Background()
p.FollowerGrpcConnection, err = pb.GrpcDial(ctx, follower, true, grpcDialOption)
p.FollowerGrpcConnection, err = pb.GrpcDial(ctx, p.follower, true, grpcDialOption)
if err != nil {
return fmt.Errorf("fail to dial %s: %v", follower, err)
return fmt.Errorf("fail to dial %s: %v", p.follower, err)
}
followerClient := mq_pb.NewSeaweedMessagingClient(p.FollowerGrpcConnection)
p.FollowerStream, err = followerClient.PublishFollowMe(ctx)

Loading…
Cancel
Save