|
@ -56,7 +56,7 @@ func NewLogBuffer(name string, flushInterval time.Duration, flushFn func(startTi |
|
|
return lb |
|
|
return lb |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
func (m *LogBuffer) AddToBuffer(partitionKey, data []byte, eventTsNs int64) { |
|
|
|
|
|
|
|
|
func (m *LogBuffer) AddToBuffer(partitionKey, data []byte, processingTsNs int64) { |
|
|
|
|
|
|
|
|
m.Lock() |
|
|
m.Lock() |
|
|
defer func() { |
|
|
defer func() { |
|
@ -68,20 +68,20 @@ func (m *LogBuffer) AddToBuffer(partitionKey, data []byte, eventTsNs int64) { |
|
|
|
|
|
|
|
|
// need to put the timestamp inside the lock
|
|
|
// need to put the timestamp inside the lock
|
|
|
var ts time.Time |
|
|
var ts time.Time |
|
|
if eventTsNs == 0 { |
|
|
|
|
|
|
|
|
if processingTsNs == 0 { |
|
|
ts = time.Now() |
|
|
ts = time.Now() |
|
|
eventTsNs = ts.UnixNano() |
|
|
|
|
|
|
|
|
processingTsNs = ts.UnixNano() |
|
|
} else { |
|
|
} else { |
|
|
ts = time.Unix(0, eventTsNs) |
|
|
|
|
|
|
|
|
ts = time.Unix(0, processingTsNs) |
|
|
} |
|
|
} |
|
|
if m.lastTsNs >= eventTsNs { |
|
|
|
|
|
|
|
|
if m.lastTsNs >= processingTsNs { |
|
|
// this is unlikely to happen, but just in case
|
|
|
// this is unlikely to happen, but just in case
|
|
|
eventTsNs = m.lastTsNs + 1 |
|
|
|
|
|
ts = time.Unix(0, eventTsNs) |
|
|
|
|
|
|
|
|
processingTsNs = m.lastTsNs + 1 |
|
|
|
|
|
ts = time.Unix(0, processingTsNs) |
|
|
} |
|
|
} |
|
|
m.lastTsNs = eventTsNs |
|
|
|
|
|
|
|
|
m.lastTsNs = processingTsNs |
|
|
logEntry := &filer_pb.LogEntry{ |
|
|
logEntry := &filer_pb.LogEntry{ |
|
|
TsNs: eventTsNs, |
|
|
|
|
|
|
|
|
TsNs: processingTsNs, |
|
|
PartitionKeyHash: util.HashToInt32(partitionKey), |
|
|
PartitionKeyHash: util.HashToInt32(partitionKey), |
|
|
Data: data, |
|
|
Data: data, |
|
|
} |
|
|
} |
|
|