From e7823ee967e0a199081605e156a84c785e9839b1 Mon Sep 17 00:00:00 2001 From: chrislu Date: Sat, 9 Mar 2024 12:56:49 -0800 Subject: [PATCH] retry connecting to broker leader --- weed/mq/broker/broker_connect.go | 47 ++++++++++++++++++++++++++++++-- weed/mq/broker/broker_server.go | 15 +++------- 2 files changed, 48 insertions(+), 14 deletions(-) diff --git a/weed/mq/broker/broker_connect.go b/weed/mq/broker/broker_connect.go index 602461c82..859e330a7 100644 --- a/weed/mq/broker/broker_connect.go +++ b/weed/mq/broker/broker_connect.go @@ -12,7 +12,8 @@ import ( ) // BrokerConnectToBalancer connects to the broker balancer and sends stats -func (b *MessageQueueBroker) BrokerConnectToBalancer(brokerBalancer string) error { +func (b *MessageQueueBroker) BrokerConnectToBalancer(brokerBalancer string, stopCh chan struct{}) error { + self := string(b.option.BrokerAddress()) glog.V(0).Infof("broker %s connects to balancer %s", self, brokerBalancer) @@ -39,6 +40,13 @@ func (b *MessageQueueBroker) BrokerConnectToBalancer(brokerBalancer string) erro } for { + // check if the broker is stopping + select { + case <-stopCh: + return nil + default: + } + stats := b.localTopicManager.CollectStats(time.Second * 5) err = stream.Send(&mq_pb.PublisherToPubBalancerRequest{ Message: &mq_pb.PublisherToPubBalancerRequest_Stats{ @@ -55,7 +63,40 @@ func (b *MessageQueueBroker) BrokerConnectToBalancer(brokerBalancer string) erro time.Sleep(time.Millisecond*5000 + time.Duration(rand.Intn(1000))*time.Millisecond) } - - return nil }) } + +func (b *MessageQueueBroker) KeepConnectedToBrokerBalancer(newBrokerBalancerCh chan string) { + var stopPrevRunChan chan struct{} + for { + select { + case newBrokerBalancer := <-newBrokerBalancerCh: + if stopPrevRunChan != nil { + close(stopPrevRunChan) + stopPrevRunChan = nil + } + thisRunStopChan := make(chan struct{}) + if newBrokerBalancer != "" { + stopPrevRunChan = thisRunStopChan + go func() { + for { + err := b.BrokerConnectToBalancer(newBrokerBalancer, thisRunStopChan) + if err != nil { + glog.V(0).Infof("connect to balancer %s: %v", newBrokerBalancer, err) + time.Sleep(time.Second) + } else { + break + } + + select { + case <-thisRunStopChan: + return + default: + } + + } + }() + } + } + } +} diff --git a/weed/mq/broker/broker_server.go b/weed/mq/broker/broker_server.go index 492d088e2..9c321744b 100644 --- a/weed/mq/broker/broker_server.go +++ b/weed/mq/broker/broker_server.go @@ -1,7 +1,6 @@ package broker import ( - "github.com/seaweedfs/seaweedfs/weed/cluster/lock_manager" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/mq/pub_balancer" "github.com/seaweedfs/seaweedfs/weed/mq/sub_coordinator" @@ -84,19 +83,13 @@ func NewMessageBroker(option *MessageQueueBrokerOption, grpcDialOption grpc.Dial self := option.BrokerAddress() glog.V(0).Infof("broker %s found filer %s", self, mqBroker.currentFiler) + newBrokerBalancerCh := make(chan string, 1) lockClient := cluster.NewLockClient(grpcDialOption, mqBroker.currentFiler) mqBroker.lockAsBalancer = lockClient.StartLongLivedLock(pub_balancer.LockBrokerBalancer, string(self), func(newLockOwner string) { - // FIXME this is a blocking call, should be in a goroutine - if err := mqBroker.BrokerConnectToBalancer(newLockOwner); err != nil { - glog.V(0).Infof("BrokerConnectToBalancer %s: %v", newLockOwner, err) - } + glog.V(0).Infof("broker %s found balanacer %s", self, newLockOwner) + newBrokerBalancerCh <- newLockOwner }) - for { - time.Sleep(lock_manager.RenewInterval) - if err := mqBroker.lockAsBalancer.AttemptToLock(lock_manager.LiveLockTTL); err != nil { - glog.V(4).Infof("AttemptToLock: %v", err) - } - } + mqBroker.KeepConnectedToBrokerBalancer(newBrokerBalancerCh) }() return mqBroker, nil