|
@ -75,6 +75,24 @@ func (logBuffer *LogBuffer) AddToBuffer(message *mq_pb.DataMessage) { |
|
|
|
|
|
|
|
|
func (logBuffer *LogBuffer) AddDataToBuffer(partitionKey, data []byte, processingTsNs int64) { |
|
|
func (logBuffer *LogBuffer) AddDataToBuffer(partitionKey, data []byte, processingTsNs int64) { |
|
|
|
|
|
|
|
|
|
|
|
// PERFORMANCE OPTIMIZATION: Pre-process expensive operations OUTSIDE the lock
|
|
|
|
|
|
var ts time.Time |
|
|
|
|
|
if processingTsNs == 0 { |
|
|
|
|
|
ts = time.Now() |
|
|
|
|
|
processingTsNs = ts.UnixNano() |
|
|
|
|
|
} else { |
|
|
|
|
|
ts = time.Unix(0, processingTsNs) |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
logEntry := &filer_pb.LogEntry{ |
|
|
|
|
|
TsNs: processingTsNs, // Will be updated if needed
|
|
|
|
|
|
PartitionKeyHash: util.HashToInt32(partitionKey), |
|
|
|
|
|
Data: data, |
|
|
|
|
|
Key: partitionKey, |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
logEntryData, _ := proto.Marshal(logEntry) |
|
|
|
|
|
|
|
|
var toFlush *dataToFlush |
|
|
var toFlush *dataToFlush |
|
|
logBuffer.Lock() |
|
|
logBuffer.Lock() |
|
|
defer func() { |
|
|
defer func() { |
|
@ -87,28 +105,16 @@ func (logBuffer *LogBuffer) AddDataToBuffer(partitionKey, data []byte, processin |
|
|
} |
|
|
} |
|
|
}() |
|
|
}() |
|
|
|
|
|
|
|
|
// need to put the timestamp inside the lock
|
|
|
|
|
|
var ts time.Time |
|
|
|
|
|
if processingTsNs == 0 { |
|
|
|
|
|
ts = time.Now() |
|
|
|
|
|
processingTsNs = ts.UnixNano() |
|
|
|
|
|
} else { |
|
|
|
|
|
ts = time.Unix(0, processingTsNs) |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
// Handle timestamp collision inside lock (rare case)
|
|
|
if logBuffer.LastTsNs.Load() >= processingTsNs { |
|
|
if logBuffer.LastTsNs.Load() >= processingTsNs { |
|
|
// this is unlikely to happen, but just in case
|
|
|
|
|
|
processingTsNs = logBuffer.LastTsNs.Add(1) |
|
|
processingTsNs = logBuffer.LastTsNs.Add(1) |
|
|
ts = time.Unix(0, processingTsNs) |
|
|
ts = time.Unix(0, processingTsNs) |
|
|
|
|
|
// Re-marshal with corrected timestamp
|
|
|
|
|
|
logEntry.TsNs = processingTsNs |
|
|
|
|
|
logEntryData, _ = proto.Marshal(logEntry) |
|
|
|
|
|
} else { |
|
|
|
|
|
logBuffer.LastTsNs.Store(processingTsNs) |
|
|
} |
|
|
} |
|
|
logBuffer.LastTsNs.Store(processingTsNs) |
|
|
|
|
|
logEntry := &filer_pb.LogEntry{ |
|
|
|
|
|
TsNs: processingTsNs, |
|
|
|
|
|
PartitionKeyHash: util.HashToInt32(partitionKey), |
|
|
|
|
|
Data: data, |
|
|
|
|
|
Key: partitionKey, |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
logEntryData, _ := proto.Marshal(logEntry) |
|
|
|
|
|
|
|
|
|
|
|
size := len(logEntryData) |
|
|
size := len(logEntryData) |
|
|
|
|
|
|
|
|