Browse Source

refactor

mq-subscribe
chrislu 10 months ago
parent
commit
059a120708
  1. 49
      weed/mq/broker/broker_grpc_pub.go
  2. 50
      weed/mq/topic/local_partition.go

49
weed/mq/broker/broker_grpc_pub.go

@ -5,7 +5,6 @@ import (
"fmt" "fmt"
"github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/mq/topic" "github.com/seaweedfs/seaweedfs/weed/mq/topic"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"google.golang.org/grpc/peer" "google.golang.org/grpc/peer"
"io" "io"
@ -63,51 +62,11 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis
return stream.Send(response) return stream.Send(response)
} }
// connect to follower brokers // connect to follower brokers
if localTopicPartition.FollowerStream == nil && len(initMessage.FollowerBrokers) > 0 {
follower := initMessage.FollowerBrokers[0]
ctx := context.Background()
localTopicPartition.FollowerGrpcConnection, err = pb.GrpcDial(ctx, follower, true, b.grpcDialOption)
if err != nil {
response.Error = fmt.Sprintf("fail to dial %s: %v", follower, err)
glog.Errorf("fail to dial %s: %v", follower, err)
return stream.Send(response)
}
followerClient := mq_pb.NewSeaweedMessagingClient(localTopicPartition.FollowerGrpcConnection)
localTopicPartition.FollowerStream, err = followerClient.PublishFollowMe(ctx)
if err != nil {
response.Error = fmt.Sprintf("fail to create publish client: %v", err)
glog.Errorf("fail to create publish client: %v", err)
return stream.Send(response)
}
if err = localTopicPartition.FollowerStream.Send(&mq_pb.PublishFollowMeRequest{
Message: &mq_pb.PublishFollowMeRequest_Init{
Init: &mq_pb.PublishFollowMeRequest_InitMessage{
Topic: initMessage.Topic,
Partition: initMessage.Partition,
},
},
}); err != nil {
return err
}
// start receiving ack from follower
go func() {
defer func() {
println("stop receiving ack from follower")
}()
for {
ack, err := localTopicPartition.FollowerStream.Recv()
if err != nil {
glog.Errorf("Error receiving follower ack: %v", err)
return
}
atomic.StoreInt64(&localTopicPartition.AckTsNs, ack.AckTsNs)
println("recv ack", ack.AckTsNs)
}
}()
if followerErr := localTopicPartition.MaybeConnectToFollowers(initMessage, b.grpcDialOption); followerErr != nil {
response.Error = followerErr.Error()
glog.Errorf("MaybeConnectToFollowers: %v", followerErr)
return stream.Send(response)
} }
var receivedSequence, acknowledgedSequence int64 var receivedSequence, acknowledgedSequence int64

50
weed/mq/topic/local_partition.go

@ -1,6 +1,7 @@
package topic package topic
import ( import (
"context"
"fmt" "fmt"
"github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb"
@ -128,6 +129,55 @@ func (p *LocalPartition) WaitUntilNoPublishers() {
} }
} }
func (p *LocalPartition) MaybeConnectToFollowers(initMessage *mq_pb.PublishMessageRequest_InitMessage, grpcDialOption grpc.DialOption) (err error) {
if p.FollowerStream != nil {
return nil
}
if len(initMessage.FollowerBrokers) == 0 {
return nil
}
follower := initMessage.FollowerBrokers[0]
ctx := context.Background()
p.FollowerGrpcConnection, err = pb.GrpcDial(ctx, follower, true, grpcDialOption)
if err != nil {
return fmt.Errorf("fail to dial %s: %v", follower, err)
}
followerClient := mq_pb.NewSeaweedMessagingClient(p.FollowerGrpcConnection)
p.FollowerStream, err = followerClient.PublishFollowMe(ctx)
if err != nil {
return fmt.Errorf("fail to create publish client: %v", err)
}
if err = p.FollowerStream.Send(&mq_pb.PublishFollowMeRequest{
Message: &mq_pb.PublishFollowMeRequest_Init{
Init: &mq_pb.PublishFollowMeRequest_InitMessage{
Topic: initMessage.Topic,
Partition: initMessage.Partition,
},
},
}); err != nil {
return err
}
// start receiving ack from follower
go func() {
defer func() {
println("stop receiving ack from follower")
}()
for {
ack, err := p.FollowerStream.Recv()
if err != nil {
glog.Errorf("Error receiving follower ack: %v", err)
return
}
atomic.StoreInt64(&p.AckTsNs, ack.AckTsNs)
println("recv ack", ack.AckTsNs)
}
}()
return nil
}
func (p *LocalPartition) MaybeShutdownLocalPartition() (hasShutdown bool) { func (p *LocalPartition) MaybeShutdownLocalPartition() (hasShutdown bool) {
if p.MaybeShutdownLocalPartition() { if p.MaybeShutdownLocalPartition() {
if p.FollowerStream != nil { if p.FollowerStream != nil {

Loading…
Cancel
Save