|  |  | @ -22,6 +22,7 @@ type dataToFlush struct { | 
			
		
	
		
			
				
					|  |  |  | } | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | type LogBuffer struct { | 
			
		
	
		
			
				
					|  |  |  | 	name          string | 
			
		
	
		
			
				
					|  |  |  | 	prevBuffers   *SealedBuffers | 
			
		
	
		
			
				
					|  |  |  | 	buf           []byte | 
			
		
	
		
			
				
					|  |  |  | 	idx           []int | 
			
		
	
	
		
			
				
					|  |  | @ -39,8 +40,9 @@ type LogBuffer struct { | 
			
		
	
		
			
				
					|  |  |  | 	sync.RWMutex | 
			
		
	
		
			
				
					|  |  |  | } | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | func NewLogBuffer(flushInterval time.Duration, flushFn func(startTime, stopTime time.Time, buf []byte), notifyFn func()) *LogBuffer { | 
			
		
	
		
			
				
					|  |  |  | func NewLogBuffer(name string, flushInterval time.Duration, flushFn func(startTime, stopTime time.Time, buf []byte), notifyFn func()) *LogBuffer { | 
			
		
	
		
			
				
					|  |  |  | 	lb := &LogBuffer{ | 
			
		
	
		
			
				
					|  |  |  | 		name:          name, | 
			
		
	
		
			
				
					|  |  |  | 		prevBuffers:   newSealedBuffers(PreviousBufferCount), | 
			
		
	
		
			
				
					|  |  |  | 		buf:           make([]byte, BufferSize), | 
			
		
	
		
			
				
					|  |  |  | 		sizeBuf:       make([]byte, 4), | 
			
		
	
	
		
			
				
					|  |  | @ -93,6 +95,7 @@ func (m *LogBuffer) AddToBuffer(partitionKey, data []byte, eventTsNs int64) { | 
			
		
	
		
			
				
					|  |  |  | 	} | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | 	if m.startTime.Add(m.flushInterval).Before(ts) || len(m.buf)-m.pos < size+4 { | 
			
		
	
		
			
				
					|  |  |  | 		// glog.V(4).Infof("%s copyToFlush1 start time %v, ts %v, remaining %d bytes", m.name, m.startTime, ts, len(m.buf)-m.pos)
 | 
			
		
	
		
			
				
					|  |  |  | 		m.flushChan <- m.copyToFlush() | 
			
		
	
		
			
				
					|  |  |  | 		m.startTime = ts | 
			
		
	
		
			
				
					|  |  |  | 		if len(m.buf) < size+4 { | 
			
		
	
	
		
			
				
					|  |  | @ -127,9 +130,10 @@ func (m *LogBuffer) Shutdown() { | 
			
		
	
		
			
				
					|  |  |  | func (m *LogBuffer) loopFlush() { | 
			
		
	
		
			
				
					|  |  |  | 	for d := range m.flushChan { | 
			
		
	
		
			
				
					|  |  |  | 		if d != nil { | 
			
		
	
		
			
				
					|  |  |  | 			// fmt.Printf("flush [%v, %v] size %d\n", d.startTime, d.stopTime, len(d.data.Bytes()))
 | 
			
		
	
		
			
				
					|  |  |  | 			// glog.V(4).Infof("%s flush [%v, %v] size %d", m.name, d.startTime, d.stopTime, len(d.data.Bytes()))
 | 
			
		
	
		
			
				
					|  |  |  | 			m.flushFn(d.startTime, d.stopTime, d.data.Bytes()) | 
			
		
	
		
			
				
					|  |  |  | 			d.releaseMemory() | 
			
		
	
		
			
				
					|  |  |  | 			// local logbuffer is different from aggregate logbuffer here
 | 
			
		
	
		
			
				
					|  |  |  | 			m.lastFlushTime = d.stopTime | 
			
		
	
		
			
				
					|  |  |  | 		} | 
			
		
	
		
			
				
					|  |  |  | 	} | 
			
		
	
	
		
			
				
					|  |  | @ -143,7 +147,6 @@ func (m *LogBuffer) loopInterval() { | 
			
		
	
		
			
				
					|  |  |  | 			m.Unlock() | 
			
		
	
		
			
				
					|  |  |  | 			return | 
			
		
	
		
			
				
					|  |  |  | 		} | 
			
		
	
		
			
				
					|  |  |  | 		// println("loop interval")
 | 
			
		
	
		
			
				
					|  |  |  | 		toFlush := m.copyToFlush() | 
			
		
	
		
			
				
					|  |  |  | 		m.flushChan <- toFlush | 
			
		
	
		
			
				
					|  |  |  | 		m.Unlock() | 
			
		
	
	
		
			
				
					|  |  | @ -162,7 +165,6 @@ func (m *LogBuffer) copyToFlush() *dataToFlush { | 
			
		
	
		
			
				
					|  |  |  | 				data:      copiedBytes(m.buf[:m.pos]), | 
			
		
	
		
			
				
					|  |  |  | 			} | 
			
		
	
		
			
				
					|  |  |  | 		} | 
			
		
	
		
			
				
					|  |  |  | 		// fmt.Printf("flusing [0,%d) with %d entries\n", m.pos, len(m.idx))
 | 
			
		
	
		
			
				
					|  |  |  | 		m.buf = m.prevBuffers.SealBuffer(m.startTime, m.stopTime, m.buf, m.pos) | 
			
		
	
		
			
				
					|  |  |  | 		m.pos = 0 | 
			
		
	
		
			
				
					|  |  |  | 		m.idx = m.idx[:0] | 
			
		
	
	
		
			
				
					|  |  | @ -200,12 +202,9 @@ func (m *LogBuffer) ReadFromBuffer(lastReadTime time.Time) (bufferCopy *bytes.Bu | 
			
		
	
		
			
				
					|  |  |  | 	} | 
			
		
	
		
			
				
					|  |  |  | 	if lastReadTime.Before(m.startTime) { | 
			
		
	
		
			
				
					|  |  |  | 		// println("checking ", lastReadTime.UnixNano())
 | 
			
		
	
		
			
				
					|  |  |  | 		for i, buf := range m.prevBuffers.buffers { | 
			
		
	
		
			
				
					|  |  |  | 		for _, buf := range m.prevBuffers.buffers { | 
			
		
	
		
			
				
					|  |  |  | 			if buf.startTime.After(lastReadTime) { | 
			
		
	
		
			
				
					|  |  |  | 				if i == 0 { | 
			
		
	
		
			
				
					|  |  |  | 					// println("return the earliest in memory", buf.startTime.UnixNano())
 | 
			
		
	
		
			
				
					|  |  |  | 					return copiedBytes(buf.buf[:buf.size]), nil | 
			
		
	
		
			
				
					|  |  |  | 				} | 
			
		
	
		
			
				
					|  |  |  | 				// glog.V(4).Infof("%s return the %d sealed buffer %v", m.name, i, buf.startTime)
 | 
			
		
	
		
			
				
					|  |  |  | 				// println("return the", i, "th in memory", buf.startTime.UnixNano())
 | 
			
		
	
		
			
				
					|  |  |  | 				return copiedBytes(buf.buf[:buf.size]), nil | 
			
		
	
		
			
				
					|  |  |  | 			} | 
			
		
	
	
		
			
				
					|  |  | @ -215,7 +214,7 @@ func (m *LogBuffer) ReadFromBuffer(lastReadTime time.Time) (bufferCopy *bytes.Bu | 
			
		
	
		
			
				
					|  |  |  | 				return copiedBytes(buf.buf[pos:buf.size]), nil | 
			
		
	
		
			
				
					|  |  |  | 			} | 
			
		
	
		
			
				
					|  |  |  | 		} | 
			
		
	
		
			
				
					|  |  |  | 		// println("return the current buf", lastReadTime.UnixNano())
 | 
			
		
	
		
			
				
					|  |  |  | 		// glog.V(4).Infof("%s return the current buf %v", m.name, lastReadTime)
 | 
			
		
	
		
			
				
					|  |  |  | 		return copiedBytes(m.buf[:m.pos]), nil | 
			
		
	
		
			
				
					|  |  |  | 	} | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
	
		
			
				
					|  |  | 
 |