|
@ -58,9 +58,13 @@ func NewLogBuffer(name string, flushInterval time.Duration, flushFn func(startTi |
|
|
|
|
|
|
|
|
func (m *LogBuffer) AddToBuffer(partitionKey, data []byte, processingTsNs int64) { |
|
|
func (m *LogBuffer) AddToBuffer(partitionKey, data []byte, processingTsNs int64) { |
|
|
|
|
|
|
|
|
|
|
|
var toFlush *dataToFlush |
|
|
m.Lock() |
|
|
m.Lock() |
|
|
defer func() { |
|
|
defer func() { |
|
|
m.Unlock() |
|
|
m.Unlock() |
|
|
|
|
|
if toFlush != nil { |
|
|
|
|
|
m.flushChan <- toFlush |
|
|
|
|
|
} |
|
|
if m.notifyFn != nil { |
|
|
if m.notifyFn != nil { |
|
|
m.notifyFn() |
|
|
m.notifyFn() |
|
|
} |
|
|
} |
|
@ -96,7 +100,7 @@ func (m *LogBuffer) AddToBuffer(partitionKey, data []byte, processingTsNs int64) |
|
|
|
|
|
|
|
|
if m.startTime.Add(m.flushInterval).Before(ts) || len(m.buf)-m.pos < size+4 { |
|
|
if m.startTime.Add(m.flushInterval).Before(ts) || len(m.buf)-m.pos < size+4 { |
|
|
// glog.V(4).Infof("%s copyToFlush1 start time %v, ts %v, remaining %d bytes", m.name, m.startTime, ts, len(m.buf)-m.pos)
|
|
|
// glog.V(4).Infof("%s copyToFlush1 start time %v, ts %v, remaining %d bytes", m.name, m.startTime, ts, len(m.buf)-m.pos)
|
|
|
m.flushChan <- m.copyToFlush() |
|
|
|
|
|
|
|
|
toFlush = m.copyToFlush() |
|
|
m.startTime = ts |
|
|
m.startTime = ts |
|
|
if len(m.buf) < size+4 { |
|
|
if len(m.buf) < size+4 { |
|
|
m.buf = make([]byte, 2*size+4) |
|
|
m.buf = make([]byte, 2*size+4) |
|
@ -148,8 +152,10 @@ func (m *LogBuffer) loopInterval() { |
|
|
return |
|
|
return |
|
|
} |
|
|
} |
|
|
toFlush := m.copyToFlush() |
|
|
toFlush := m.copyToFlush() |
|
|
m.flushChan <- toFlush |
|
|
|
|
|
m.Unlock() |
|
|
m.Unlock() |
|
|
|
|
|
if toFlush != nil { |
|
|
|
|
|
m.flushChan <- toFlush |
|
|
|
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|