Browse Source

retry connecting to broker leader

pull/5637/head
chrislu 11 months ago
parent
commit
e7823ee967
  1. 47
      weed/mq/broker/broker_connect.go
  2. 15
      weed/mq/broker/broker_server.go

47
weed/mq/broker/broker_connect.go

@ -12,7 +12,8 @@ import (
)
// BrokerConnectToBalancer connects to the broker balancer and sends stats
func (b *MessageQueueBroker) BrokerConnectToBalancer(brokerBalancer string) error {
func (b *MessageQueueBroker) BrokerConnectToBalancer(brokerBalancer string, stopCh chan struct{}) error {
self := string(b.option.BrokerAddress())
glog.V(0).Infof("broker %s connects to balancer %s", self, brokerBalancer)
@ -39,6 +40,13 @@ func (b *MessageQueueBroker) BrokerConnectToBalancer(brokerBalancer string) erro
}
for {
// check if the broker is stopping
select {
case <-stopCh:
return nil
default:
}
stats := b.localTopicManager.CollectStats(time.Second * 5)
err = stream.Send(&mq_pb.PublisherToPubBalancerRequest{
Message: &mq_pb.PublisherToPubBalancerRequest_Stats{
@ -55,7 +63,40 @@ func (b *MessageQueueBroker) BrokerConnectToBalancer(brokerBalancer string) erro
time.Sleep(time.Millisecond*5000 + time.Duration(rand.Intn(1000))*time.Millisecond)
}
return nil
})
}
func (b *MessageQueueBroker) KeepConnectedToBrokerBalancer(newBrokerBalancerCh chan string) {
var stopPrevRunChan chan struct{}
for {
select {
case newBrokerBalancer := <-newBrokerBalancerCh:
if stopPrevRunChan != nil {
close(stopPrevRunChan)
stopPrevRunChan = nil
}
thisRunStopChan := make(chan struct{})
if newBrokerBalancer != "" {
stopPrevRunChan = thisRunStopChan
go func() {
for {
err := b.BrokerConnectToBalancer(newBrokerBalancer, thisRunStopChan)
if err != nil {
glog.V(0).Infof("connect to balancer %s: %v", newBrokerBalancer, err)
time.Sleep(time.Second)
} else {
break
}
select {
case <-thisRunStopChan:
return
default:
}
}
}()
}
}
}
}

15
weed/mq/broker/broker_server.go

@ -1,7 +1,6 @@
package broker
import (
"github.com/seaweedfs/seaweedfs/weed/cluster/lock_manager"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/mq/pub_balancer"
"github.com/seaweedfs/seaweedfs/weed/mq/sub_coordinator"
@ -84,19 +83,13 @@ func NewMessageBroker(option *MessageQueueBrokerOption, grpcDialOption grpc.Dial
self := option.BrokerAddress()
glog.V(0).Infof("broker %s found filer %s", self, mqBroker.currentFiler)
newBrokerBalancerCh := make(chan string, 1)
lockClient := cluster.NewLockClient(grpcDialOption, mqBroker.currentFiler)
mqBroker.lockAsBalancer = lockClient.StartLongLivedLock(pub_balancer.LockBrokerBalancer, string(self), func(newLockOwner string) {
// FIXME this is a blocking call, should be in a goroutine
if err := mqBroker.BrokerConnectToBalancer(newLockOwner); err != nil {
glog.V(0).Infof("BrokerConnectToBalancer %s: %v", newLockOwner, err)
}
glog.V(0).Infof("broker %s found balanacer %s", self, newLockOwner)
newBrokerBalancerCh <- newLockOwner
})
for {
time.Sleep(lock_manager.RenewInterval)
if err := mqBroker.lockAsBalancer.AttemptToLock(lock_manager.LiveLockTTL); err != nil {
glog.V(4).Infof("AttemptToLock: %v", err)
}
}
mqBroker.KeepConnectedToBrokerBalancer(newBrokerBalancerCh)
}()
return mqBroker, nil

Loading…
Cancel
Save