From 34b743c481e65b25c052512603d1794eddfe839c Mon Sep 17 00:00:00 2001 From: chrislu Date: Sat, 26 Mar 2022 12:33:45 -0700 Subject: [PATCH] Revert "remove duplicated metadata subscription in filer" This reverts commit 34742be0295998c2105a5ee50e3e77ef2397c403. Related to https://github.com/chrislusf/seaweedfs/issues/2545 --- weed/filer/meta_aggregator.go | 31 +++++++++---------------------- 1 file changed, 9 insertions(+), 22 deletions(-) diff --git a/weed/filer/meta_aggregator.go b/weed/filer/meta_aggregator.go index 1e8b89ad5..13c2239f0 100644 --- a/weed/filer/meta_aggregator.go +++ b/weed/filer/meta_aggregator.go @@ -25,7 +25,7 @@ type MetaAggregator struct { isLeader bool grpcDialOption grpc.DialOption MetaLogBuffer *log_buffer.LogBuffer - peerStatues map[pb.ServerAddress]int + peerStatues map[pb.ServerAddress]struct{} peerStatuesLock sync.Mutex // notifying clients ListenersLock sync.Mutex @@ -39,7 +39,7 @@ func NewMetaAggregator(filer *Filer, self pb.ServerAddress, grpcDialOption grpc. filer: filer, self: self, grpcDialOption: grpcDialOption, - peerStatues: make(map[pb.ServerAddress]int), + peerStatues: make(map[pb.ServerAddress]struct{}), } t.ListenersCond = sync.NewCond(&t.ListenersLock) t.MetaLogBuffer = log_buffer.NewLogBuffer("aggr", LogFlushInterval, nil, func() { @@ -56,40 +56,27 @@ func (ma *MetaAggregator) OnPeerUpdate(update *master_pb.ClusterNodeUpdate) { address := pb.ServerAddress(update.Address) if update.IsAdd { // every filer should subscribe to a new filer - if ma.setActive(address, true) { - go ma.subscribeToOneFiler(ma.filer, ma.self, address) - } + ma.setActive(address, true) + go ma.subscribeToOneFiler(ma.filer, ma.self, address) } else { ma.setActive(address, false) } } -func (ma *MetaAggregator) setActive(address pb.ServerAddress, isActive bool) (notDuplicated bool) { +func (ma *MetaAggregator) setActive(address pb.ServerAddress, isActive bool) { ma.peerStatuesLock.Lock() defer ma.peerStatuesLock.Unlock() if isActive { - if _, found := ma.peerStatues[address]; found { - ma.peerStatues[address] += 1 - } else { - ma.peerStatues[address] = 1 - notDuplicated = true - } + ma.peerStatues[address] = struct{}{} } else { - if _, found := ma.peerStatues[address]; found { - ma.peerStatues[address] -= 1 - } - if ma.peerStatues[address] <= 0 { - delete(ma.peerStatues, address) - } + delete(ma.peerStatues, address) } - return } func (ma *MetaAggregator) isActive(address pb.ServerAddress) (isActive bool) { ma.peerStatuesLock.Lock() defer ma.peerStatuesLock.Unlock() - var count int - count, isActive = ma.peerStatues[address] - return count > 0 && isActive + _, isActive = ma.peerStatues[address] + return } func (ma *MetaAggregator) subscribeToOneFiler(f *Filer, self pb.ServerAddress, peer pb.ServerAddress) {