diff --git a/weed/mq/broker/broker_connect.go b/weed/mq/broker/broker_connect.go index 33040c6a2..602461c82 100644 --- a/weed/mq/broker/broker_connect.go +++ b/weed/mq/broker/broker_connect.go @@ -4,9 +4,7 @@ import ( "context" "fmt" "github.com/seaweedfs/seaweedfs/weed/glog" - "github.com/seaweedfs/seaweedfs/weed/mq/pub_balancer" "github.com/seaweedfs/seaweedfs/weed/pb" - "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" "io" "math/rand" @@ -14,31 +12,16 @@ import ( ) // BrokerConnectToBalancer connects to the broker balancer and sends stats -func (b *MessageQueueBroker) BrokerConnectToBalancer(self string) error { - // find the lock owner - var brokerBalancer string - err := b.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { - resp, err := client.FindLockOwner(context.Background(), &filer_pb.FindLockOwnerRequest{ - Name: pub_balancer.LockBrokerBalancer, - }) - if err != nil { - return err - } - brokerBalancer = resp.Owner - return nil - }) - if err != nil { - return err - } - b.currentBalancer = pb.ServerAddress(brokerBalancer) +func (b *MessageQueueBroker) BrokerConnectToBalancer(brokerBalancer string) error { + self := string(b.option.BrokerAddress()) - glog.V(0).Infof("broker %s found balancer %s", self, brokerBalancer) + glog.V(0).Infof("broker %s connects to balancer %s", self, brokerBalancer) if brokerBalancer == "" { return fmt.Errorf("no balancer found") } // connect to the lock owner - err = pb.WithBrokerGrpcClient(false, brokerBalancer, b.grpcDialOption, func(client mq_pb.SeaweedMessagingClient) error { + return pb.WithBrokerGrpcClient(false, brokerBalancer, b.grpcDialOption, func(client mq_pb.SeaweedMessagingClient) error { stream, err := client.PublisherToPubBalancer(context.Background()) if err != nil { return fmt.Errorf("connect to balancer %v: %v", brokerBalancer, err) @@ -75,6 +58,4 @@ func (b *MessageQueueBroker) BrokerConnectToBalancer(self string) error { return nil }) - - return err } diff --git a/weed/mq/broker/broker_grpc_balance.go b/weed/mq/broker/broker_grpc_balance.go index c09161ff9..4d74cede4 100644 --- a/weed/mq/broker/broker_grpc_balance.go +++ b/weed/mq/broker/broker_grpc_balance.go @@ -2,17 +2,18 @@ package broker import ( "context" + "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" ) func (b *MessageQueueBroker) BalanceTopics(ctx context.Context, request *mq_pb.BalanceTopicsRequest) (resp *mq_pb.BalanceTopicsResponse, err error) { - if b.currentBalancer == "" { + if !b.lockAsBalancer.IsLocked() { return nil, status.Errorf(codes.Unavailable, "no balancer") } - if !b.lockAsBalancer.IsLocked() { - proxyErr := b.withBrokerClient(false, b.currentBalancer, func(client mq_pb.SeaweedMessagingClient) error { + if !b.isLockOwner() { + proxyErr := b.withBrokerClient(false, pb.ServerAddress(b.lockAsBalancer.LockOwner()), func(client mq_pb.SeaweedMessagingClient) error { resp, err = client.BalanceTopics(ctx, request) return nil }) diff --git a/weed/mq/broker/broker_grpc_configure.go b/weed/mq/broker/broker_grpc_configure.go index 318e2f6da..d882eeea7 100644 --- a/weed/mq/broker/broker_grpc_configure.go +++ b/weed/mq/broker/broker_grpc_configure.go @@ -6,6 +6,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/mq/pub_balancer" "github.com/seaweedfs/seaweedfs/weed/mq/topic" + "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -15,11 +16,12 @@ import ( // It generates an assignments based on existing allocations, // and then assign the partitions to the brokers. func (b *MessageQueueBroker) ConfigureTopic(ctx context.Context, request *mq_pb.ConfigureTopicRequest) (resp *mq_pb.ConfigureTopicResponse, err error) { - if b.currentBalancer == "" { + if !b.lockAsBalancer.IsLocked() { + glog.V(0).Infof("broker %s found balancer:%s, %s isLocked:%v", b.option.BrokerAddress(), pb.ServerAddress(b.lockAsBalancer.LockOwner()), b.lockAsBalancer.LockOwner(), b.lockAsBalancer.IsLocked()) return nil, status.Errorf(codes.Unavailable, "no balancer") } - if !b.lockAsBalancer.IsLocked() { - proxyErr := b.withBrokerClient(false, b.currentBalancer, func(client mq_pb.SeaweedMessagingClient) error { + if !b.isLockOwner() { + proxyErr := b.withBrokerClient(false, pb.ServerAddress(b.lockAsBalancer.LockOwner()), func(client mq_pb.SeaweedMessagingClient) error { resp, err = client.ConfigureTopic(ctx, request) return nil }) @@ -30,38 +32,42 @@ func (b *MessageQueueBroker) ConfigureTopic(ctx context.Context, request *mq_pb. } t := topic.FromPbTopic(request.Topic) - var readErr error + var readErr, assignErr error resp, readErr = b.readTopicConfFromFiler(t) if readErr != nil { - glog.V(0).Infof("read topic %s conf: %v", request.Topic, err) - } else { - readErr = b.ensureTopicActiveAssignments(t, resp) + glog.V(0).Infof("read topic %s conf: %v", request.Topic, readErr) + } + + if resp != nil { + assignErr = b.ensureTopicActiveAssignments(t, resp) // no need to assign directly. // The added or updated assignees will read from filer directly. // The gone assignees will die by themselves. } - if readErr == nil && len(resp.BrokerPartitionAssignments) == int(request.PartitionCount) { + + if readErr == nil && assignErr == nil && len(resp.BrokerPartitionAssignments) == int(request.PartitionCount) { glog.V(0).Infof("existing topic partitions %d: %+v", len(resp.BrokerPartitionAssignments), resp.BrokerPartitionAssignments) - } else { - if resp!=nil && len(resp.BrokerPartitionAssignments) > 0 { - if cancelErr := b.assignTopicPartitionsToBrokers(ctx, request.Topic, resp.BrokerPartitionAssignments, false); cancelErr != nil { - glog.V(1).Infof("cancel old topic %s partitions assignments %v : %v", request.Topic, resp.BrokerPartitionAssignments, cancelErr) - } - } - resp = &mq_pb.ConfigureTopicResponse{} - if b.Balancer.Brokers.IsEmpty() { - return nil, status.Errorf(codes.Unavailable, pub_balancer.ErrNoBroker.Error()) - } - resp.BrokerPartitionAssignments = pub_balancer.AllocateTopicPartitions(b.Balancer.Brokers, request.PartitionCount) + return + } - // save the topic configuration on filer - if err := b.saveTopicConfToFiler(request.Topic, resp); err != nil { - return nil, fmt.Errorf("configure topic: %v", err) + if resp != nil && len(resp.BrokerPartitionAssignments) > 0 { + if cancelErr := b.assignTopicPartitionsToBrokers(ctx, request.Topic, resp.BrokerPartitionAssignments, false); cancelErr != nil { + glog.V(1).Infof("cancel old topic %s partitions assignments %v : %v", request.Topic, resp.BrokerPartitionAssignments, cancelErr) } + } + resp = &mq_pb.ConfigureTopicResponse{} + if b.Balancer.Brokers.IsEmpty() { + return nil, status.Errorf(codes.Unavailable, pub_balancer.ErrNoBroker.Error()) + } + resp.BrokerPartitionAssignments = pub_balancer.AllocateTopicPartitions(b.Balancer.Brokers, request.PartitionCount) - b.Balancer.OnPartitionChange(request.Topic, resp.BrokerPartitionAssignments) + // save the topic configuration on filer + if err := b.saveTopicConfToFiler(request.Topic, resp); err != nil { + return nil, fmt.Errorf("configure topic: %v", err) } + b.Balancer.OnPartitionChange(request.Topic, resp.BrokerPartitionAssignments) + glog.V(0).Infof("ConfigureTopic: topic %s partition assignments: %v", request.Topic, resp.BrokerPartitionAssignments) return resp, err diff --git a/weed/mq/broker/broker_grpc_lookup.go b/weed/mq/broker/broker_grpc_lookup.go index 966345d94..7881090c9 100644 --- a/weed/mq/broker/broker_grpc_lookup.go +++ b/weed/mq/broker/broker_grpc_lookup.go @@ -5,6 +5,7 @@ import ( "fmt" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/mq/topic" + "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -12,11 +13,11 @@ import ( // LookupTopicBrokers returns the brokers that are serving the topic func (b *MessageQueueBroker) LookupTopicBrokers(ctx context.Context, request *mq_pb.LookupTopicBrokersRequest) (resp *mq_pb.LookupTopicBrokersResponse, err error) { - if b.currentBalancer == "" { + if !b.lockAsBalancer.IsLocked() { return nil, status.Errorf(codes.Unavailable, "no balancer") } if !b.lockAsBalancer.IsLocked() { - proxyErr := b.withBrokerClient(false, b.currentBalancer, func(client mq_pb.SeaweedMessagingClient) error { + proxyErr := b.withBrokerClient(false, pb.ServerAddress(b.lockAsBalancer.LockOwner()), func(client mq_pb.SeaweedMessagingClient) error { resp, err = client.LookupTopicBrokers(ctx, request) return nil }) @@ -41,11 +42,11 @@ func (b *MessageQueueBroker) LookupTopicBrokers(ctx context.Context, request *mq } func (b *MessageQueueBroker) ListTopics(ctx context.Context, request *mq_pb.ListTopicsRequest) (resp *mq_pb.ListTopicsResponse, err error) { - if b.currentBalancer == "" { + if !b.lockAsBalancer.IsLocked() { return nil, status.Errorf(codes.Unavailable, "no balancer") } - if !b.lockAsBalancer.IsLocked() { - proxyErr := b.withBrokerClient(false, b.currentBalancer, func(client mq_pb.SeaweedMessagingClient) error { + if !b.isLockOwner() { + proxyErr := b.withBrokerClient(false, pb.ServerAddress(b.lockAsBalancer.LockOwner()), func(client mq_pb.SeaweedMessagingClient) error { resp, err = client.ListTopics(ctx, request) return nil }) @@ -76,3 +77,7 @@ func (b *MessageQueueBroker) ListTopics(ctx context.Context, request *mq_pb.List return ret, nil } + +func (b *MessageQueueBroker) isLockOwner() bool { + return b.lockAsBalancer.LockOwner() == b.option.BrokerAddress().String() +} diff --git a/weed/mq/broker/broker_server.go b/weed/mq/broker/broker_server.go index 15a3fea16..773ee19cb 100644 --- a/weed/mq/broker/broker_server.go +++ b/weed/mq/broker/broker_server.go @@ -46,7 +46,6 @@ type MessageQueueBroker struct { localTopicManager *topic.LocalTopicManager Balancer *pub_balancer.Balancer lockAsBalancer *cluster.LiveLock - currentBalancer pb.ServerAddress Coordinator *sub_coordinator.Coordinator accessLock sync.Mutex } @@ -87,9 +86,8 @@ func NewMessageBroker(option *MessageQueueBrokerOption, grpcDialOption grpc.Dial lockClient := cluster.NewLockClient(grpcDialOption, mqBroker.currentFiler) mqBroker.lockAsBalancer = lockClient.StartLongLivedLock(pub_balancer.LockBrokerBalancer, string(self), func(newLockOwner string) { - balancer := mqBroker.lockAsBalancer.LockOwner() - if err := mqBroker.BrokerConnectToBalancer(balancer); err != nil { - glog.V(0).Infof("BrokerConnectToBalancer %s: %v", balancer, err) + if err := mqBroker.BrokerConnectToBalancer(newLockOwner); err != nil { + glog.V(0).Infof("BrokerConnectToBalancer: %v", err) } }) for { diff --git a/weed/mq/client/pub_client/scheduler.go b/weed/mq/client/pub_client/scheduler.go index 89d131580..d577e77e0 100644 --- a/weed/mq/client/pub_client/scheduler.go +++ b/weed/mq/client/pub_client/scheduler.go @@ -17,20 +17,21 @@ import ( type EachPartitionError struct { *mq_pb.BrokerPartitionAssignment - Err error + Err error generation int } type EachPartitionPublishJob struct { *mq_pb.BrokerPartitionAssignment - stopChan chan bool - wg sync.WaitGroup + stopChan chan bool + wg sync.WaitGroup generation int inputQueue *buffered_queue.BufferedQueue[*mq_pb.DataMessage] } + func (p *TopicPublisher) startSchedulerThread(wg *sync.WaitGroup) error { - if err := p.doEnsureConfigureTopic(); err != nil { + if err := p.doConfigureTopic(); err != nil { return fmt.Errorf("configure topic %s: %v", p.config.Topic, err) } @@ -101,9 +102,9 @@ func (p *TopicPublisher) onEachAssignments(generation int, assignments []*mq_pb. // start a go routine to publish to this partition job := &EachPartitionPublishJob{ BrokerPartitionAssignment: assignment, - stopChan: make(chan bool, 1), - generation: generation, - inputQueue: buffered_queue.NewBufferedQueue[*mq_pb.DataMessage](1024), + stopChan: make(chan bool, 1), + generation: generation, + inputQueue: buffered_queue.NewBufferedQueue[*mq_pb.DataMessage](1024), } job.wg.Add(1) go func(job *EachPartitionPublishJob) { @@ -135,13 +136,13 @@ func (p *TopicPublisher) doPublishToPartition(job *EachPartitionPublishJob) erro } publishClient := &PublishClient{ SeaweedMessaging_PublishMessageClient: stream, - Broker: job.LeaderBroker, + Broker: job.LeaderBroker, } if err = publishClient.Send(&mq_pb.PublishMessageRequest{ Message: &mq_pb.PublishMessageRequest_Init{ Init: &mq_pb.PublishMessageRequest_InitMessage{ - Topic: p.config.Topic.ToPbTopic(), - Partition: job.Partition, + Topic: p.config.Topic.ToPbTopic(), + Partition: job.Partition, AckInterval: 128, }, }, @@ -202,7 +203,7 @@ func (p *TopicPublisher) doPublishToPartition(job *EachPartitionPublishJob) erro return nil } -func (p *TopicPublisher) doEnsureConfigureTopic() (err error) { +func (p *TopicPublisher) doConfigureTopic() (err error) { if len(p.config.Brokers) == 0 { return fmt.Errorf("no bootstrap brokers") } @@ -213,7 +214,7 @@ func (p *TopicPublisher) doEnsureConfigureTopic() (err error) { p.grpcDialOption, func(client mq_pb.SeaweedMessagingClient) error { _, err := client.ConfigureTopic(context.Background(), &mq_pb.ConfigureTopicRequest{ - Topic: p.config.Topic.ToPbTopic(), + Topic: p.config.Topic.ToPbTopic(), PartitionCount: p.config.CreateTopicPartitionCount, }) return err @@ -226,7 +227,7 @@ func (p *TopicPublisher) doEnsureConfigureTopic() (err error) { } if lastErr != nil { - return fmt.Errorf("configure topic %s: %v", p.config.Topic, err) + return fmt.Errorf("doConfigureTopic %s: %v", p.config.Topic, err) } return nil }