|
|
@ -3,6 +3,7 @@ package log_buffer |
|
|
|
import ( |
|
|
|
"bytes" |
|
|
|
"sync" |
|
|
|
"sync/atomic" |
|
|
|
"time" |
|
|
|
|
|
|
|
"google.golang.org/protobuf/proto" |
|
|
@ -34,7 +35,7 @@ type LogBuffer struct { |
|
|
|
flushInterval time.Duration |
|
|
|
flushFn func(startTime, stopTime time.Time, buf []byte) |
|
|
|
notifyFn func() |
|
|
|
isStopping bool |
|
|
|
isStopping *atomic.Bool |
|
|
|
flushChan chan *dataToFlush |
|
|
|
lastTsNs int64 |
|
|
|
sync.RWMutex |
|
|
@ -50,6 +51,7 @@ func NewLogBuffer(name string, flushInterval time.Duration, flushFn func(startTi |
|
|
|
flushFn: flushFn, |
|
|
|
notifyFn: notifyFn, |
|
|
|
flushChan: make(chan *dataToFlush, 256), |
|
|
|
isStopping: new(atomic.Bool), |
|
|
|
} |
|
|
|
go lb.loopFlush() |
|
|
|
go lb.loopInterval() |
|
|
@ -119,20 +121,14 @@ func (m *LogBuffer) AddToBuffer(partitionKey, data []byte, processingTsNs int64) |
|
|
|
} |
|
|
|
|
|
|
|
func (m *LogBuffer) IsStopping() bool { |
|
|
|
m.RLock() |
|
|
|
defer m.RUnlock() |
|
|
|
|
|
|
|
return m.isStopping |
|
|
|
return m.isStopping.Load() |
|
|
|
} |
|
|
|
|
|
|
|
func (m *LogBuffer) Shutdown() { |
|
|
|
m.Lock() |
|
|
|
defer m.Unlock() |
|
|
|
|
|
|
|
if m.isStopping { |
|
|
|
isAlreadyStopped := m.isStopping.Swap(true) |
|
|
|
if isAlreadyStopped { |
|
|
|
return |
|
|
|
} |
|
|
|
m.isStopping = true |
|
|
|
toFlush := m.copyToFlush() |
|
|
|
m.flushChan <- toFlush |
|
|
|
close(m.flushChan) |
|
|
|