Browse Source

use stopChan to close previous filer peer meta subscription instances

pull/5378/head
chrislu 12 months ago
parent
commit
604091a480
  1. 57
      weed/filer/meta_aggregator.go

57
weed/filer/meta_aggregator.go

@ -26,8 +26,8 @@ type MetaAggregator struct {
isLeader bool
grpcDialOption grpc.DialOption
MetaLogBuffer *log_buffer.LogBuffer
peerStatues map[pb.ServerAddress]int
peerStatuesLock sync.Mutex
peerChans map[pb.ServerAddress]chan struct{}
peerChansLock sync.Mutex
// notifying clients
ListenersLock sync.Mutex
ListenersCond *sync.Cond
@ -40,7 +40,7 @@ func NewMetaAggregator(filer *Filer, self pb.ServerAddress, grpcDialOption grpc.
filer: filer,
self: self,
grpcDialOption: grpcDialOption,
peerStatues: make(map[pb.ServerAddress]int),
peerChans: make(map[pb.ServerAddress]chan struct{}),
}
t.ListenersCond = sync.NewCond(&t.ListenersLock)
t.MetaLogBuffer = log_buffer.NewLogBuffer("aggr", LogFlushInterval, nil, func() {
@ -50,51 +50,40 @@ func NewMetaAggregator(filer *Filer, self pb.ServerAddress, grpcDialOption grpc.
}
func (ma *MetaAggregator) OnPeerUpdate(update *master_pb.ClusterNodeUpdate, startFrom time.Time) {
ma.peerChansLock.Lock()
defer ma.peerChansLock.Unlock()
address := pb.ServerAddress(update.Address)
if update.IsAdd {
// every filer should subscribe to a new filer
if ma.setActive(address, true) {
go ma.loopSubscribeToOneFiler(ma.filer, ma.self, address, startFrom)
}
} else {
ma.setActive(address, false)
// cancel previous subscription if any
if prevChan, found := ma.peerChans[address]; found {
close(prevChan)
}
}
func (ma *MetaAggregator) setActive(address pb.ServerAddress, isActive bool) (notDuplicated bool) {
ma.peerStatuesLock.Lock()
defer ma.peerStatuesLock.Unlock()
if isActive {
if _, found := ma.peerStatues[address]; found {
ma.peerStatues[address] += 1
stopChan := make(chan struct{})
ma.peerChans[address] = stopChan
go ma.loopSubscribeToOneFiler(ma.filer, ma.self, address, startFrom, stopChan)
} else {
ma.peerStatues[address] = 1
notDuplicated = true
if prevChan, found := ma.peerChans[address]; found {
close(prevChan)
delete(ma.peerChans, address)
}
} else {
if _, found := ma.peerStatues[address]; found {
delete(ma.peerStatues, address)
}
}
return
}
func (ma *MetaAggregator) isActive(address pb.ServerAddress) (isActive bool) {
ma.peerStatuesLock.Lock()
defer ma.peerStatuesLock.Unlock()
var count int
count, isActive = ma.peerStatues[address]
return count > 0 && isActive
}
func (ma *MetaAggregator) loopSubscribeToOneFiler(f *Filer, self pb.ServerAddress, peer pb.ServerAddress, startFrom time.Time) {
func (ma *MetaAggregator) loopSubscribeToOneFiler(f *Filer, self pb.ServerAddress, peer pb.ServerAddress, startFrom time.Time, stopChan chan struct{}) {
lastTsNs := startFrom.UnixNano()
for {
glog.V(0).Infof("loopSubscribeToOneFiler read %s start from %v %d", peer, time.Unix(0, lastTsNs), lastTsNs)
nextLastTsNs, err := ma.doSubscribeToOneFiler(f, self, peer, lastTsNs)
if !ma.isActive(peer) {
glog.V(0).Infof("stop subscribing remote %s meta change", peer)
// check stopChan to see if we should stop
select {
case <-stopChan:
glog.V(0).Infof("stop subscribing peer %s meta change", peer)
return
default:
}
if err != nil {
errLvl := glog.Level(0)
if strings.Contains(err.Error(), "duplicated local subscription detected") {

Loading…
Cancel
Save