|  |  | @ -25,7 +25,7 @@ type MetaAggregator struct { | 
			
		
	
		
			
				
					|  |  |  | 	isLeader        bool | 
			
		
	
		
			
				
					|  |  |  | 	grpcDialOption  grpc.DialOption | 
			
		
	
		
			
				
					|  |  |  | 	MetaLogBuffer   *log_buffer.LogBuffer | 
			
		
	
		
			
				
					|  |  |  | 	peerStatues     map[pb.ServerAddress]struct{} | 
			
		
	
		
			
				
					|  |  |  | 	peerStatues     map[pb.ServerAddress]int | 
			
		
	
		
			
				
					|  |  |  | 	peerStatuesLock sync.Mutex | 
			
		
	
		
			
				
					|  |  |  | 	// notifying clients
 | 
			
		
	
		
			
				
					|  |  |  | 	ListenersLock sync.Mutex | 
			
		
	
	
		
			
				
					|  |  | @ -39,7 +39,7 @@ func NewMetaAggregator(filer *Filer, self pb.ServerAddress, grpcDialOption grpc. | 
			
		
	
		
			
				
					|  |  |  | 		filer:          filer, | 
			
		
	
		
			
				
					|  |  |  | 		self:           self, | 
			
		
	
		
			
				
					|  |  |  | 		grpcDialOption: grpcDialOption, | 
			
		
	
		
			
				
					|  |  |  | 		peerStatues:    make(map[pb.ServerAddress]struct{}), | 
			
		
	
		
			
				
					|  |  |  | 		peerStatues:    make(map[pb.ServerAddress]int), | 
			
		
	
		
			
				
					|  |  |  | 	} | 
			
		
	
		
			
				
					|  |  |  | 	t.ListenersCond = sync.NewCond(&t.ListenersLock) | 
			
		
	
		
			
				
					|  |  |  | 	t.MetaLogBuffer = log_buffer.NewLogBuffer("aggr", LogFlushInterval, nil, func() { | 
			
		
	
	
		
			
				
					|  |  | @ -56,27 +56,40 @@ func (ma *MetaAggregator) OnPeerUpdate(update *master_pb.ClusterNodeUpdate) { | 
			
		
	
		
			
				
					|  |  |  | 	address := pb.ServerAddress(update.Address) | 
			
		
	
		
			
				
					|  |  |  | 	if update.IsAdd { | 
			
		
	
		
			
				
					|  |  |  | 		// every filer should subscribe to a new filer
 | 
			
		
	
		
			
				
					|  |  |  | 		ma.setActive(address, true) | 
			
		
	
		
			
				
					|  |  |  | 		go ma.subscribeToOneFiler(ma.filer, ma.self, address) | 
			
		
	
		
			
				
					|  |  |  | 		if ma.setActive(address, true) { | 
			
		
	
		
			
				
					|  |  |  | 			go ma.subscribeToOneFiler(ma.filer, ma.self, address) | 
			
		
	
		
			
				
					|  |  |  | 		} | 
			
		
	
		
			
				
					|  |  |  | 	} else { | 
			
		
	
		
			
				
					|  |  |  | 		ma.setActive(address, false) | 
			
		
	
		
			
				
					|  |  |  | 	} | 
			
		
	
		
			
				
					|  |  |  | } | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | func (ma *MetaAggregator) setActive(address pb.ServerAddress, isActive bool) { | 
			
		
	
		
			
				
					|  |  |  | func (ma *MetaAggregator) setActive(address pb.ServerAddress, isActive bool) (notDuplicated bool) { | 
			
		
	
		
			
				
					|  |  |  | 	ma.peerStatuesLock.Lock() | 
			
		
	
		
			
				
					|  |  |  | 	defer ma.peerStatuesLock.Unlock() | 
			
		
	
		
			
				
					|  |  |  | 	if isActive { | 
			
		
	
		
			
				
					|  |  |  | 		ma.peerStatues[address] = struct{}{} | 
			
		
	
		
			
				
					|  |  |  | 		if _, found := ma.peerStatues[address]; found { | 
			
		
	
		
			
				
					|  |  |  | 			ma.peerStatues[address] += 1 | 
			
		
	
		
			
				
					|  |  |  | 		} else { | 
			
		
	
		
			
				
					|  |  |  | 			ma.peerStatues[address] = 1 | 
			
		
	
		
			
				
					|  |  |  | 			notDuplicated = true | 
			
		
	
		
			
				
					|  |  |  | 		} | 
			
		
	
		
			
				
					|  |  |  | 	} else { | 
			
		
	
		
			
				
					|  |  |  | 		delete(ma.peerStatues, address) | 
			
		
	
		
			
				
					|  |  |  | 		if _, found := ma.peerStatues[address]; found { | 
			
		
	
		
			
				
					|  |  |  | 			ma.peerStatues[address] -= 1 | 
			
		
	
		
			
				
					|  |  |  | 		} | 
			
		
	
		
			
				
					|  |  |  | 		if ma.peerStatues[address] <= 0 { | 
			
		
	
		
			
				
					|  |  |  | 			delete(ma.peerStatues, address) | 
			
		
	
		
			
				
					|  |  |  | 		} | 
			
		
	
		
			
				
					|  |  |  | 	} | 
			
		
	
		
			
				
					|  |  |  | 	return | 
			
		
	
		
			
				
					|  |  |  | } | 
			
		
	
		
			
				
					|  |  |  | func (ma *MetaAggregator) isActive(address pb.ServerAddress) (isActive bool) { | 
			
		
	
		
			
				
					|  |  |  | 	ma.peerStatuesLock.Lock() | 
			
		
	
		
			
				
					|  |  |  | 	defer ma.peerStatuesLock.Unlock() | 
			
		
	
		
			
				
					|  |  |  | 	_, isActive = ma.peerStatues[address] | 
			
		
	
		
			
				
					|  |  |  | 	return | 
			
		
	
		
			
				
					|  |  |  | 	var count int | 
			
		
	
		
			
				
					|  |  |  | 	count, isActive = ma.peerStatues[address] | 
			
		
	
		
			
				
					|  |  |  | 	return count > 0 && isActive | 
			
		
	
		
			
				
					|  |  |  | } | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | func (ma *MetaAggregator) subscribeToOneFiler(f *Filer, self pb.ServerAddress, peer pb.ServerAddress) { | 
			
		
	
	
		
			
				
					|  |  | 
 |