|
@ -44,7 +44,7 @@ type LogBuffer struct { |
|
|
notifyFn func() |
|
|
notifyFn func() |
|
|
isStopping *atomic.Bool |
|
|
isStopping *atomic.Bool |
|
|
flushChan chan *dataToFlush |
|
|
flushChan chan *dataToFlush |
|
|
lastTsNs int64 |
|
|
|
|
|
|
|
|
LastTsNs int64 |
|
|
sync.RWMutex |
|
|
sync.RWMutex |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
@ -89,12 +89,12 @@ func (logBuffer *LogBuffer) AddToBuffer(partitionKey, data []byte, processingTsN |
|
|
} else { |
|
|
} else { |
|
|
ts = time.Unix(0, processingTsNs) |
|
|
ts = time.Unix(0, processingTsNs) |
|
|
} |
|
|
} |
|
|
if logBuffer.lastTsNs >= processingTsNs { |
|
|
|
|
|
|
|
|
if logBuffer.LastTsNs >= processingTsNs { |
|
|
// this is unlikely to happen, but just in case
|
|
|
// this is unlikely to happen, but just in case
|
|
|
processingTsNs = logBuffer.lastTsNs + 1 |
|
|
|
|
|
|
|
|
processingTsNs = logBuffer.LastTsNs + 1 |
|
|
ts = time.Unix(0, processingTsNs) |
|
|
ts = time.Unix(0, processingTsNs) |
|
|
} |
|
|
} |
|
|
logBuffer.lastTsNs = processingTsNs |
|
|
|
|
|
|
|
|
logBuffer.LastTsNs = processingTsNs |
|
|
logEntry := &filer_pb.LogEntry{ |
|
|
logEntry := &filer_pb.LogEntry{ |
|
|
TsNs: processingTsNs, |
|
|
TsNs: processingTsNs, |
|
|
PartitionKeyHash: util.HashToInt32(partitionKey), |
|
|
PartitionKeyHash: util.HashToInt32(partitionKey), |
|
|