diff --git a/weed/filer/filer.go b/weed/filer/filer.go index 993175112..fe5fe289a 100644 --- a/weed/filer/filer.go +++ b/weed/filer/filer.go @@ -96,7 +96,7 @@ func (f *Filer) MaybeBootstrapFromPeers(self pb.ServerAddress, existingNodes []* func (f *Filer) AggregateFromPeers(self pb.ServerAddress, existingNodes []*master_pb.ClusterNodeUpdate, startFrom time.Time) { f.MetaAggregator = NewMetaAggregator(f, self, f.GrpcDialOption) - f.MasterClient.OnPeerUpdate = f.MetaAggregator.OnPeerUpdate + f.MasterClient.SetOnPeerUpdateFn(f.MetaAggregator.OnPeerUpdate) for _, peerUpdate := range existingNodes { f.MetaAggregator.OnPeerUpdate(peerUpdate, startFrom) diff --git a/weed/mq/broker/broker_server.go b/weed/mq/broker/broker_server.go index 89afb6e4d..4c86d813f 100644 --- a/weed/mq/broker/broker_server.go +++ b/weed/mq/broker/broker_server.go @@ -42,7 +42,7 @@ func NewMessageBroker(option *MessageQueueBrokerOption, grpcDialOption grpc.Dial MasterClient: wdclient.NewMasterClient(grpcDialOption, option.FilerGroup, cluster.BrokerType, pb.NewServerAddress(option.Ip, option.Port, 0), option.DataCenter, option.Rack, option.Masters), filers: make(map[pb.ServerAddress]struct{}), } - mqBroker.MasterClient.OnPeerUpdate = mqBroker.OnBrokerUpdate + mqBroker.MasterClient.SetOnPeerUpdateFn(mqBroker.OnBrokerUpdate) go mqBroker.MasterClient.KeepConnectedToMaster() diff --git a/weed/server/master_server.go b/weed/server/master_server.go index ecbfd64af..758f212ad 100644 --- a/weed/server/master_server.go +++ b/weed/server/master_server.go @@ -116,7 +116,7 @@ func NewMasterServer(r *mux.Router, option *MasterOption, peers map[string]pb.Se } ms.boundedLeaderChan = make(chan int, 16) - ms.MasterClient.OnPeerUpdate = ms.OnPeerUpdate + ms.MasterClient.SetOnPeerUpdateFn(ms.OnPeerUpdate) seq := ms.createSequencer(option) if nil == seq { diff --git a/weed/wdclient/masterclient.go b/weed/wdclient/masterclient.go index 2583bda80..6e91c31eb 100644 --- a/weed/wdclient/masterclient.go +++ b/weed/wdclient/masterclient.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "math/rand" + "sync" "time" "github.com/seaweedfs/seaweedfs/weed/stats" @@ -28,7 +29,8 @@ type MasterClient struct { vidMap vidMapCacheSize int - OnPeerUpdate func(update *master_pb.ClusterNodeUpdate, startFrom time.Time) + OnPeerUpdate func(update *master_pb.ClusterNodeUpdate, startFrom time.Time) + OnPeerUpdateAccessLock sync.RWMutex } func NewMasterClient(grpcDialOption grpc.DialOption, filerGroup string, clientType string, clientHost pb.ServerAddress, clientDataCenter string, rack string, masters map[string]pb.ServerAddress) *MasterClient { @@ -44,6 +46,12 @@ func NewMasterClient(grpcDialOption grpc.DialOption, filerGroup string, clientTy } } +func (mc *MasterClient) SetOnPeerUpdateFn(onPeerUpdate func(update *master_pb.ClusterNodeUpdate, startFrom time.Time)) { + mc.OnPeerUpdateAccessLock.Lock() + mc.OnPeerUpdate = onPeerUpdate + mc.OnPeerUpdateAccessLock.Unlock() +} + func (mc *MasterClient) GetLookupFileIdFunction() LookupFileIdFunctionType { return mc.LookupFileIdWithFallback } @@ -219,6 +227,7 @@ func (mc *MasterClient) tryConnectToMaster(master pb.ServerAddress) (nextHintedL if resp.ClusterNodeUpdate != nil { update := resp.ClusterNodeUpdate + mc.OnPeerUpdateAccessLock.RLock() if mc.OnPeerUpdate != nil { if update.FilerGroup == mc.FilerGroup { if update.IsAdd { @@ -230,6 +239,7 @@ func (mc *MasterClient) tryConnectToMaster(master pb.ServerAddress) (nextHintedL mc.OnPeerUpdate(update, time.Now()) } } + mc.OnPeerUpdateAccessLock.RUnlock() } } })