From e90ab4ac604a7bdc02a25bc8dde5b6dde52272a3 Mon Sep 17 00:00:00 2001 From: Konstantin Lebedev <9497591+kmlebedev@users.noreply.github.com> Date: Fri, 26 Aug 2022 22:18:49 +0500 Subject: [PATCH] avoid race conditions for OnPeerUpdate (#3525) https://github.com/seaweedfs/seaweedfs/issues/3524 --- weed/filer/filer.go | 2 +- weed/mq/broker/broker_server.go | 2 +- weed/server/master_server.go | 2 +- weed/wdclient/masterclient.go | 12 +++++++++++- 4 files changed, 14 insertions(+), 4 deletions(-) diff --git a/weed/filer/filer.go b/weed/filer/filer.go index 993175112..fe5fe289a 100644 --- a/weed/filer/filer.go +++ b/weed/filer/filer.go @@ -96,7 +96,7 @@ func (f *Filer) MaybeBootstrapFromPeers(self pb.ServerAddress, existingNodes []* func (f *Filer) AggregateFromPeers(self pb.ServerAddress, existingNodes []*master_pb.ClusterNodeUpdate, startFrom time.Time) { f.MetaAggregator = NewMetaAggregator(f, self, f.GrpcDialOption) - f.MasterClient.OnPeerUpdate = f.MetaAggregator.OnPeerUpdate + f.MasterClient.SetOnPeerUpdateFn(f.MetaAggregator.OnPeerUpdate) for _, peerUpdate := range existingNodes { f.MetaAggregator.OnPeerUpdate(peerUpdate, startFrom) diff --git a/weed/mq/broker/broker_server.go b/weed/mq/broker/broker_server.go index 89afb6e4d..4c86d813f 100644 --- a/weed/mq/broker/broker_server.go +++ b/weed/mq/broker/broker_server.go @@ -42,7 +42,7 @@ func NewMessageBroker(option *MessageQueueBrokerOption, grpcDialOption grpc.Dial MasterClient: wdclient.NewMasterClient(grpcDialOption, option.FilerGroup, cluster.BrokerType, pb.NewServerAddress(option.Ip, option.Port, 0), option.DataCenter, option.Rack, option.Masters), filers: make(map[pb.ServerAddress]struct{}), } - mqBroker.MasterClient.OnPeerUpdate = mqBroker.OnBrokerUpdate + mqBroker.MasterClient.SetOnPeerUpdateFn(mqBroker.OnBrokerUpdate) go mqBroker.MasterClient.KeepConnectedToMaster() diff --git a/weed/server/master_server.go b/weed/server/master_server.go index ecbfd64af..758f212ad 100644 --- a/weed/server/master_server.go +++ b/weed/server/master_server.go @@ -116,7 +116,7 @@ func NewMasterServer(r *mux.Router, option *MasterOption, peers map[string]pb.Se } ms.boundedLeaderChan = make(chan int, 16) - ms.MasterClient.OnPeerUpdate = ms.OnPeerUpdate + ms.MasterClient.SetOnPeerUpdateFn(ms.OnPeerUpdate) seq := ms.createSequencer(option) if nil == seq { diff --git a/weed/wdclient/masterclient.go b/weed/wdclient/masterclient.go index 2583bda80..6e91c31eb 100644 --- a/weed/wdclient/masterclient.go +++ b/weed/wdclient/masterclient.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "math/rand" + "sync" "time" "github.com/seaweedfs/seaweedfs/weed/stats" @@ -28,7 +29,8 @@ type MasterClient struct { vidMap vidMapCacheSize int - OnPeerUpdate func(update *master_pb.ClusterNodeUpdate, startFrom time.Time) + OnPeerUpdate func(update *master_pb.ClusterNodeUpdate, startFrom time.Time) + OnPeerUpdateAccessLock sync.RWMutex } func NewMasterClient(grpcDialOption grpc.DialOption, filerGroup string, clientType string, clientHost pb.ServerAddress, clientDataCenter string, rack string, masters map[string]pb.ServerAddress) *MasterClient { @@ -44,6 +46,12 @@ func NewMasterClient(grpcDialOption grpc.DialOption, filerGroup string, clientTy } } +func (mc *MasterClient) SetOnPeerUpdateFn(onPeerUpdate func(update *master_pb.ClusterNodeUpdate, startFrom time.Time)) { + mc.OnPeerUpdateAccessLock.Lock() + mc.OnPeerUpdate = onPeerUpdate + mc.OnPeerUpdateAccessLock.Unlock() +} + func (mc *MasterClient) GetLookupFileIdFunction() LookupFileIdFunctionType { return mc.LookupFileIdWithFallback } @@ -219,6 +227,7 @@ func (mc *MasterClient) tryConnectToMaster(master pb.ServerAddress) (nextHintedL if resp.ClusterNodeUpdate != nil { update := resp.ClusterNodeUpdate + mc.OnPeerUpdateAccessLock.RLock() if mc.OnPeerUpdate != nil { if update.FilerGroup == mc.FilerGroup { if update.IsAdd { @@ -230,6 +239,7 @@ func (mc *MasterClient) tryConnectToMaster(master pb.ServerAddress) (nextHintedL mc.OnPeerUpdate(update, time.Now()) } } + mc.OnPeerUpdateAccessLock.RUnlock() } } })