From 604091a4807a56b7a56caeeb316c0e5a7642e57d Mon Sep 17 00:00:00 2001 From: chrislu Date: Tue, 12 Mar 2024 09:18:54 -0700 Subject: [PATCH] use stopChan to close previous filer peer meta subscription instances --- weed/filer/meta_aggregator.go | 67 +++++++++++++++-------------------- 1 file changed, 28 insertions(+), 39 deletions(-) diff --git a/weed/filer/meta_aggregator.go b/weed/filer/meta_aggregator.go index 0433a63a0..e18e69216 100644 --- a/weed/filer/meta_aggregator.go +++ b/weed/filer/meta_aggregator.go @@ -21,13 +21,13 @@ import ( ) type MetaAggregator struct { - filer *Filer - self pb.ServerAddress - isLeader bool - grpcDialOption grpc.DialOption - MetaLogBuffer *log_buffer.LogBuffer - peerStatues map[pb.ServerAddress]int - peerStatuesLock sync.Mutex + filer *Filer + self pb.ServerAddress + isLeader bool + grpcDialOption grpc.DialOption + MetaLogBuffer *log_buffer.LogBuffer + peerChans map[pb.ServerAddress]chan struct{} + peerChansLock sync.Mutex // notifying clients ListenersLock sync.Mutex ListenersCond *sync.Cond @@ -40,7 +40,7 @@ func NewMetaAggregator(filer *Filer, self pb.ServerAddress, grpcDialOption grpc. filer: filer, self: self, grpcDialOption: grpcDialOption, - peerStatues: make(map[pb.ServerAddress]int), + peerChans: make(map[pb.ServerAddress]chan struct{}), } t.ListenersCond = sync.NewCond(&t.ListenersLock) t.MetaLogBuffer = log_buffer.NewLogBuffer("aggr", LogFlushInterval, nil, func() { @@ -50,51 +50,40 @@ func NewMetaAggregator(filer *Filer, self pb.ServerAddress, grpcDialOption grpc. } func (ma *MetaAggregator) OnPeerUpdate(update *master_pb.ClusterNodeUpdate, startFrom time.Time) { + ma.peerChansLock.Lock() + defer ma.peerChansLock.Unlock() + address := pb.ServerAddress(update.Address) if update.IsAdd { - // every filer should subscribe to a new filer - if ma.setActive(address, true) { - go ma.loopSubscribeToOneFiler(ma.filer, ma.self, address, startFrom) - } - } else { - ma.setActive(address, false) - } -} - -func (ma *MetaAggregator) setActive(address pb.ServerAddress, isActive bool) (notDuplicated bool) { - ma.peerStatuesLock.Lock() - defer ma.peerStatuesLock.Unlock() - if isActive { - if _, found := ma.peerStatues[address]; found { - ma.peerStatues[address] += 1 - } else { - ma.peerStatues[address] = 1 - notDuplicated = true + // cancel previous subscription if any + if prevChan, found := ma.peerChans[address]; found { + close(prevChan) } + stopChan := make(chan struct{}) + ma.peerChans[address] = stopChan + go ma.loopSubscribeToOneFiler(ma.filer, ma.self, address, startFrom, stopChan) } else { - if _, found := ma.peerStatues[address]; found { - delete(ma.peerStatues, address) + if prevChan, found := ma.peerChans[address]; found { + close(prevChan) + delete(ma.peerChans, address) } } - return -} -func (ma *MetaAggregator) isActive(address pb.ServerAddress) (isActive bool) { - ma.peerStatuesLock.Lock() - defer ma.peerStatuesLock.Unlock() - var count int - count, isActive = ma.peerStatues[address] - return count > 0 && isActive } -func (ma *MetaAggregator) loopSubscribeToOneFiler(f *Filer, self pb.ServerAddress, peer pb.ServerAddress, startFrom time.Time) { +func (ma *MetaAggregator) loopSubscribeToOneFiler(f *Filer, self pb.ServerAddress, peer pb.ServerAddress, startFrom time.Time, stopChan chan struct{}) { lastTsNs := startFrom.UnixNano() for { glog.V(0).Infof("loopSubscribeToOneFiler read %s start from %v %d", peer, time.Unix(0, lastTsNs), lastTsNs) nextLastTsNs, err := ma.doSubscribeToOneFiler(f, self, peer, lastTsNs) - if !ma.isActive(peer) { - glog.V(0).Infof("stop subscribing remote %s meta change", peer) + + // check stopChan to see if we should stop + select { + case <-stopChan: + glog.V(0).Infof("stop subscribing peer %s meta change", peer) return + default: } + if err != nil { errLvl := glog.Level(0) if strings.Contains(err.Error(), "duplicated local subscription detected") {