Browse Source

use atomic int64

mq
chrislu 6 days ago
parent
commit
c24b7da159
  1. 8
      weed/util/log_buffer/log_buffer.go
  2. 11
      weed/util/log_buffer/log_read.go

8
weed/util/log_buffer/log_buffer.go

@ -46,7 +46,7 @@ type LogBuffer struct {
isStopping *atomic.Bool
isAllFlushed bool
flushChan chan *dataToFlush
LastTsNs int64
LastTsNs atomic.Int64
sync.RWMutex
}
@ -95,12 +95,12 @@ func (logBuffer *LogBuffer) AddDataToBuffer(partitionKey, data []byte, processin
} else {
ts = time.Unix(0, processingTsNs)
}
if logBuffer.LastTsNs >= processingTsNs {
if logBuffer.LastTsNs.Load() >= processingTsNs {
// this is unlikely to happen, but just in case
processingTsNs = logBuffer.LastTsNs + 1
processingTsNs = logBuffer.LastTsNs.Add(1)
ts = time.Unix(0, processingTsNs)
}
logBuffer.LastTsNs = processingTsNs
logBuffer.LastTsNs.Store(processingTsNs)
logEntry := &filer_pb.LogEntry{
TsNs: processingTsNs,
PartitionKeyHash: util.HashToInt32(partitionKey),

11
weed/util/log_buffer/log_read.go

@ -66,17 +66,10 @@ func (logBuffer *LogBuffer) LoopProcessLogData(readerName string, startPosition
isDone = true
return
}
logBuffer.RLock()
lastTsNs := logBuffer.LastTsNs
logBuffer.RUnlock()
loopTsNs := lastTsNs // make a copy
lastTsNs := logBuffer.LastTsNs.Load()
for lastTsNs == loopTsNs {
for lastTsNs == logBuffer.LastTsNs.Load() {
if waitForDataFn() {
// Update loopTsNs and loop again
logBuffer.RLock()
loopTsNs = logBuffer.LastTsNs
logBuffer.RUnlock()
continue
} else {
isDone = true

Loading…
Cancel
Save