Browse Source

proxy requests to lock owner

pull/5637/head
Chris Lu 11 months ago
parent
commit
4dc6681833
  1. 27
      weed/mq/broker/broker_connect.go
  2. 7
      weed/mq/broker/broker_grpc_balance.go
  3. 52
      weed/mq/broker/broker_grpc_configure.go
  4. 15
      weed/mq/broker/broker_grpc_lookup.go
  5. 6
      weed/mq/broker/broker_server.go
  6. 27
      weed/mq/client/pub_client/scheduler.go

27
weed/mq/broker/broker_connect.go

@ -4,9 +4,7 @@ import (
"context" "context"
"fmt" "fmt"
"github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/mq/pub_balancer"
"github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"io" "io"
"math/rand" "math/rand"
@ -14,31 +12,16 @@ import (
) )
// BrokerConnectToBalancer connects to the broker balancer and sends stats // BrokerConnectToBalancer connects to the broker balancer and sends stats
func (b *MessageQueueBroker) BrokerConnectToBalancer(self string) error {
// find the lock owner
var brokerBalancer string
err := b.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
resp, err := client.FindLockOwner(context.Background(), &filer_pb.FindLockOwnerRequest{
Name: pub_balancer.LockBrokerBalancer,
})
if err != nil {
return err
}
brokerBalancer = resp.Owner
return nil
})
if err != nil {
return err
}
b.currentBalancer = pb.ServerAddress(brokerBalancer)
func (b *MessageQueueBroker) BrokerConnectToBalancer(brokerBalancer string) error {
self := string(b.option.BrokerAddress())
glog.V(0).Infof("broker %s found balancer %s", self, brokerBalancer)
glog.V(0).Infof("broker %s connects to balancer %s", self, brokerBalancer)
if brokerBalancer == "" { if brokerBalancer == "" {
return fmt.Errorf("no balancer found") return fmt.Errorf("no balancer found")
} }
// connect to the lock owner // connect to the lock owner
err = pb.WithBrokerGrpcClient(false, brokerBalancer, b.grpcDialOption, func(client mq_pb.SeaweedMessagingClient) error {
return pb.WithBrokerGrpcClient(false, brokerBalancer, b.grpcDialOption, func(client mq_pb.SeaweedMessagingClient) error {
stream, err := client.PublisherToPubBalancer(context.Background()) stream, err := client.PublisherToPubBalancer(context.Background())
if err != nil { if err != nil {
return fmt.Errorf("connect to balancer %v: %v", brokerBalancer, err) return fmt.Errorf("connect to balancer %v: %v", brokerBalancer, err)
@ -75,6 +58,4 @@ func (b *MessageQueueBroker) BrokerConnectToBalancer(self string) error {
return nil return nil
}) })
return err
} }

7
weed/mq/broker/broker_grpc_balance.go

@ -2,17 +2,18 @@ package broker
import ( import (
"context" "context"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"google.golang.org/grpc/codes" "google.golang.org/grpc/codes"
"google.golang.org/grpc/status" "google.golang.org/grpc/status"
) )
func (b *MessageQueueBroker) BalanceTopics(ctx context.Context, request *mq_pb.BalanceTopicsRequest) (resp *mq_pb.BalanceTopicsResponse, err error) { func (b *MessageQueueBroker) BalanceTopics(ctx context.Context, request *mq_pb.BalanceTopicsRequest) (resp *mq_pb.BalanceTopicsResponse, err error) {
if b.currentBalancer == "" {
if !b.lockAsBalancer.IsLocked() {
return nil, status.Errorf(codes.Unavailable, "no balancer") return nil, status.Errorf(codes.Unavailable, "no balancer")
} }
if !b.lockAsBalancer.IsLocked() {
proxyErr := b.withBrokerClient(false, b.currentBalancer, func(client mq_pb.SeaweedMessagingClient) error {
if !b.isLockOwner() {
proxyErr := b.withBrokerClient(false, pb.ServerAddress(b.lockAsBalancer.LockOwner()), func(client mq_pb.SeaweedMessagingClient) error {
resp, err = client.BalanceTopics(ctx, request) resp, err = client.BalanceTopics(ctx, request)
return nil return nil
}) })

52
weed/mq/broker/broker_grpc_configure.go

@ -6,6 +6,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/mq/pub_balancer" "github.com/seaweedfs/seaweedfs/weed/mq/pub_balancer"
"github.com/seaweedfs/seaweedfs/weed/mq/topic" "github.com/seaweedfs/seaweedfs/weed/mq/topic"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"google.golang.org/grpc/codes" "google.golang.org/grpc/codes"
"google.golang.org/grpc/status" "google.golang.org/grpc/status"
@ -15,11 +16,12 @@ import (
// It generates an assignments based on existing allocations, // It generates an assignments based on existing allocations,
// and then assign the partitions to the brokers. // and then assign the partitions to the brokers.
func (b *MessageQueueBroker) ConfigureTopic(ctx context.Context, request *mq_pb.ConfigureTopicRequest) (resp *mq_pb.ConfigureTopicResponse, err error) { func (b *MessageQueueBroker) ConfigureTopic(ctx context.Context, request *mq_pb.ConfigureTopicRequest) (resp *mq_pb.ConfigureTopicResponse, err error) {
if b.currentBalancer == "" {
if !b.lockAsBalancer.IsLocked() {
glog.V(0).Infof("broker %s found balancer:%s, %s isLocked:%v", b.option.BrokerAddress(), pb.ServerAddress(b.lockAsBalancer.LockOwner()), b.lockAsBalancer.LockOwner(), b.lockAsBalancer.IsLocked())
return nil, status.Errorf(codes.Unavailable, "no balancer") return nil, status.Errorf(codes.Unavailable, "no balancer")
} }
if !b.lockAsBalancer.IsLocked() {
proxyErr := b.withBrokerClient(false, b.currentBalancer, func(client mq_pb.SeaweedMessagingClient) error {
if !b.isLockOwner() {
proxyErr := b.withBrokerClient(false, pb.ServerAddress(b.lockAsBalancer.LockOwner()), func(client mq_pb.SeaweedMessagingClient) error {
resp, err = client.ConfigureTopic(ctx, request) resp, err = client.ConfigureTopic(ctx, request)
return nil return nil
}) })
@ -30,38 +32,42 @@ func (b *MessageQueueBroker) ConfigureTopic(ctx context.Context, request *mq_pb.
} }
t := topic.FromPbTopic(request.Topic) t := topic.FromPbTopic(request.Topic)
var readErr error
var readErr, assignErr error
resp, readErr = b.readTopicConfFromFiler(t) resp, readErr = b.readTopicConfFromFiler(t)
if readErr != nil { if readErr != nil {
glog.V(0).Infof("read topic %s conf: %v", request.Topic, err)
} else {
readErr = b.ensureTopicActiveAssignments(t, resp)
glog.V(0).Infof("read topic %s conf: %v", request.Topic, readErr)
}
if resp != nil {
assignErr = b.ensureTopicActiveAssignments(t, resp)
// no need to assign directly. // no need to assign directly.
// The added or updated assignees will read from filer directly. // The added or updated assignees will read from filer directly.
// The gone assignees will die by themselves. // The gone assignees will die by themselves.
} }
if readErr == nil && len(resp.BrokerPartitionAssignments) == int(request.PartitionCount) {
if readErr == nil && assignErr == nil && len(resp.BrokerPartitionAssignments) == int(request.PartitionCount) {
glog.V(0).Infof("existing topic partitions %d: %+v", len(resp.BrokerPartitionAssignments), resp.BrokerPartitionAssignments) glog.V(0).Infof("existing topic partitions %d: %+v", len(resp.BrokerPartitionAssignments), resp.BrokerPartitionAssignments)
} else {
if resp!=nil && len(resp.BrokerPartitionAssignments) > 0 {
if cancelErr := b.assignTopicPartitionsToBrokers(ctx, request.Topic, resp.BrokerPartitionAssignments, false); cancelErr != nil {
glog.V(1).Infof("cancel old topic %s partitions assignments %v : %v", request.Topic, resp.BrokerPartitionAssignments, cancelErr)
}
}
resp = &mq_pb.ConfigureTopicResponse{}
if b.Balancer.Brokers.IsEmpty() {
return nil, status.Errorf(codes.Unavailable, pub_balancer.ErrNoBroker.Error())
}
resp.BrokerPartitionAssignments = pub_balancer.AllocateTopicPartitions(b.Balancer.Brokers, request.PartitionCount)
return
}
// save the topic configuration on filer
if err := b.saveTopicConfToFiler(request.Topic, resp); err != nil {
return nil, fmt.Errorf("configure topic: %v", err)
if resp != nil && len(resp.BrokerPartitionAssignments) > 0 {
if cancelErr := b.assignTopicPartitionsToBrokers(ctx, request.Topic, resp.BrokerPartitionAssignments, false); cancelErr != nil {
glog.V(1).Infof("cancel old topic %s partitions assignments %v : %v", request.Topic, resp.BrokerPartitionAssignments, cancelErr)
} }
}
resp = &mq_pb.ConfigureTopicResponse{}
if b.Balancer.Brokers.IsEmpty() {
return nil, status.Errorf(codes.Unavailable, pub_balancer.ErrNoBroker.Error())
}
resp.BrokerPartitionAssignments = pub_balancer.AllocateTopicPartitions(b.Balancer.Brokers, request.PartitionCount)
b.Balancer.OnPartitionChange(request.Topic, resp.BrokerPartitionAssignments)
// save the topic configuration on filer
if err := b.saveTopicConfToFiler(request.Topic, resp); err != nil {
return nil, fmt.Errorf("configure topic: %v", err)
} }
b.Balancer.OnPartitionChange(request.Topic, resp.BrokerPartitionAssignments)
glog.V(0).Infof("ConfigureTopic: topic %s partition assignments: %v", request.Topic, resp.BrokerPartitionAssignments) glog.V(0).Infof("ConfigureTopic: topic %s partition assignments: %v", request.Topic, resp.BrokerPartitionAssignments)
return resp, err return resp, err

15
weed/mq/broker/broker_grpc_lookup.go

@ -5,6 +5,7 @@ import (
"fmt" "fmt"
"github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/mq/topic" "github.com/seaweedfs/seaweedfs/weed/mq/topic"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"google.golang.org/grpc/codes" "google.golang.org/grpc/codes"
"google.golang.org/grpc/status" "google.golang.org/grpc/status"
@ -12,11 +13,11 @@ import (
// LookupTopicBrokers returns the brokers that are serving the topic // LookupTopicBrokers returns the brokers that are serving the topic
func (b *MessageQueueBroker) LookupTopicBrokers(ctx context.Context, request *mq_pb.LookupTopicBrokersRequest) (resp *mq_pb.LookupTopicBrokersResponse, err error) { func (b *MessageQueueBroker) LookupTopicBrokers(ctx context.Context, request *mq_pb.LookupTopicBrokersRequest) (resp *mq_pb.LookupTopicBrokersResponse, err error) {
if b.currentBalancer == "" {
if !b.lockAsBalancer.IsLocked() {
return nil, status.Errorf(codes.Unavailable, "no balancer") return nil, status.Errorf(codes.Unavailable, "no balancer")
} }
if !b.lockAsBalancer.IsLocked() { if !b.lockAsBalancer.IsLocked() {
proxyErr := b.withBrokerClient(false, b.currentBalancer, func(client mq_pb.SeaweedMessagingClient) error {
proxyErr := b.withBrokerClient(false, pb.ServerAddress(b.lockAsBalancer.LockOwner()), func(client mq_pb.SeaweedMessagingClient) error {
resp, err = client.LookupTopicBrokers(ctx, request) resp, err = client.LookupTopicBrokers(ctx, request)
return nil return nil
}) })
@ -41,11 +42,11 @@ func (b *MessageQueueBroker) LookupTopicBrokers(ctx context.Context, request *mq
} }
func (b *MessageQueueBroker) ListTopics(ctx context.Context, request *mq_pb.ListTopicsRequest) (resp *mq_pb.ListTopicsResponse, err error) { func (b *MessageQueueBroker) ListTopics(ctx context.Context, request *mq_pb.ListTopicsRequest) (resp *mq_pb.ListTopicsResponse, err error) {
if b.currentBalancer == "" {
if !b.lockAsBalancer.IsLocked() {
return nil, status.Errorf(codes.Unavailable, "no balancer") return nil, status.Errorf(codes.Unavailable, "no balancer")
} }
if !b.lockAsBalancer.IsLocked() {
proxyErr := b.withBrokerClient(false, b.currentBalancer, func(client mq_pb.SeaweedMessagingClient) error {
if !b.isLockOwner() {
proxyErr := b.withBrokerClient(false, pb.ServerAddress(b.lockAsBalancer.LockOwner()), func(client mq_pb.SeaweedMessagingClient) error {
resp, err = client.ListTopics(ctx, request) resp, err = client.ListTopics(ctx, request)
return nil return nil
}) })
@ -76,3 +77,7 @@ func (b *MessageQueueBroker) ListTopics(ctx context.Context, request *mq_pb.List
return ret, nil return ret, nil
} }
func (b *MessageQueueBroker) isLockOwner() bool {
return b.lockAsBalancer.LockOwner() == b.option.BrokerAddress().String()
}

6
weed/mq/broker/broker_server.go

@ -46,7 +46,6 @@ type MessageQueueBroker struct {
localTopicManager *topic.LocalTopicManager localTopicManager *topic.LocalTopicManager
Balancer *pub_balancer.Balancer Balancer *pub_balancer.Balancer
lockAsBalancer *cluster.LiveLock lockAsBalancer *cluster.LiveLock
currentBalancer pb.ServerAddress
Coordinator *sub_coordinator.Coordinator Coordinator *sub_coordinator.Coordinator
accessLock sync.Mutex accessLock sync.Mutex
} }
@ -87,9 +86,8 @@ func NewMessageBroker(option *MessageQueueBrokerOption, grpcDialOption grpc.Dial
lockClient := cluster.NewLockClient(grpcDialOption, mqBroker.currentFiler) lockClient := cluster.NewLockClient(grpcDialOption, mqBroker.currentFiler)
mqBroker.lockAsBalancer = lockClient.StartLongLivedLock(pub_balancer.LockBrokerBalancer, string(self), func(newLockOwner string) { mqBroker.lockAsBalancer = lockClient.StartLongLivedLock(pub_balancer.LockBrokerBalancer, string(self), func(newLockOwner string) {
balancer := mqBroker.lockAsBalancer.LockOwner()
if err := mqBroker.BrokerConnectToBalancer(balancer); err != nil {
glog.V(0).Infof("BrokerConnectToBalancer %s: %v", balancer, err)
if err := mqBroker.BrokerConnectToBalancer(newLockOwner); err != nil {
glog.V(0).Infof("BrokerConnectToBalancer: %v", err)
} }
}) })
for { for {

27
weed/mq/client/pub_client/scheduler.go

@ -17,20 +17,21 @@ import (
type EachPartitionError struct { type EachPartitionError struct {
*mq_pb.BrokerPartitionAssignment *mq_pb.BrokerPartitionAssignment
Err error
Err error
generation int generation int
} }
type EachPartitionPublishJob struct { type EachPartitionPublishJob struct {
*mq_pb.BrokerPartitionAssignment *mq_pb.BrokerPartitionAssignment
stopChan chan bool
wg sync.WaitGroup
stopChan chan bool
wg sync.WaitGroup
generation int generation int
inputQueue *buffered_queue.BufferedQueue[*mq_pb.DataMessage] inputQueue *buffered_queue.BufferedQueue[*mq_pb.DataMessage]
} }
func (p *TopicPublisher) startSchedulerThread(wg *sync.WaitGroup) error { func (p *TopicPublisher) startSchedulerThread(wg *sync.WaitGroup) error {
if err := p.doEnsureConfigureTopic(); err != nil {
if err := p.doConfigureTopic(); err != nil {
return fmt.Errorf("configure topic %s: %v", p.config.Topic, err) return fmt.Errorf("configure topic %s: %v", p.config.Topic, err)
} }
@ -101,9 +102,9 @@ func (p *TopicPublisher) onEachAssignments(generation int, assignments []*mq_pb.
// start a go routine to publish to this partition // start a go routine to publish to this partition
job := &EachPartitionPublishJob{ job := &EachPartitionPublishJob{
BrokerPartitionAssignment: assignment, BrokerPartitionAssignment: assignment,
stopChan: make(chan bool, 1),
generation: generation,
inputQueue: buffered_queue.NewBufferedQueue[*mq_pb.DataMessage](1024),
stopChan: make(chan bool, 1),
generation: generation,
inputQueue: buffered_queue.NewBufferedQueue[*mq_pb.DataMessage](1024),
} }
job.wg.Add(1) job.wg.Add(1)
go func(job *EachPartitionPublishJob) { go func(job *EachPartitionPublishJob) {
@ -135,13 +136,13 @@ func (p *TopicPublisher) doPublishToPartition(job *EachPartitionPublishJob) erro
} }
publishClient := &PublishClient{ publishClient := &PublishClient{
SeaweedMessaging_PublishMessageClient: stream, SeaweedMessaging_PublishMessageClient: stream,
Broker: job.LeaderBroker,
Broker: job.LeaderBroker,
} }
if err = publishClient.Send(&mq_pb.PublishMessageRequest{ if err = publishClient.Send(&mq_pb.PublishMessageRequest{
Message: &mq_pb.PublishMessageRequest_Init{ Message: &mq_pb.PublishMessageRequest_Init{
Init: &mq_pb.PublishMessageRequest_InitMessage{ Init: &mq_pb.PublishMessageRequest_InitMessage{
Topic: p.config.Topic.ToPbTopic(),
Partition: job.Partition,
Topic: p.config.Topic.ToPbTopic(),
Partition: job.Partition,
AckInterval: 128, AckInterval: 128,
}, },
}, },
@ -202,7 +203,7 @@ func (p *TopicPublisher) doPublishToPartition(job *EachPartitionPublishJob) erro
return nil return nil
} }
func (p *TopicPublisher) doEnsureConfigureTopic() (err error) {
func (p *TopicPublisher) doConfigureTopic() (err error) {
if len(p.config.Brokers) == 0 { if len(p.config.Brokers) == 0 {
return fmt.Errorf("no bootstrap brokers") return fmt.Errorf("no bootstrap brokers")
} }
@ -213,7 +214,7 @@ func (p *TopicPublisher) doEnsureConfigureTopic() (err error) {
p.grpcDialOption, p.grpcDialOption,
func(client mq_pb.SeaweedMessagingClient) error { func(client mq_pb.SeaweedMessagingClient) error {
_, err := client.ConfigureTopic(context.Background(), &mq_pb.ConfigureTopicRequest{ _, err := client.ConfigureTopic(context.Background(), &mq_pb.ConfigureTopicRequest{
Topic: p.config.Topic.ToPbTopic(),
Topic: p.config.Topic.ToPbTopic(),
PartitionCount: p.config.CreateTopicPartitionCount, PartitionCount: p.config.CreateTopicPartitionCount,
}) })
return err return err
@ -226,7 +227,7 @@ func (p *TopicPublisher) doEnsureConfigureTopic() (err error) {
} }
if lastErr != nil { if lastErr != nil {
return fmt.Errorf("configure topic %s: %v", p.config.Topic, err)
return fmt.Errorf("doConfigureTopic %s: %v", p.config.Topic, err)
} }
return nil return nil
} }

Loading…
Cancel
Save