|
|
|
@ -245,24 +245,28 @@ func (logBuffer *LogBuffer) InitializeOffsetFromExistingData(getHighestOffsetFn |
|
|
|
return nil |
|
|
|
} |
|
|
|
|
|
|
|
func (logBuffer *LogBuffer) AddToBuffer(message *mq_pb.DataMessage) { |
|
|
|
logBuffer.AddDataToBuffer(message.Key, message.Value, message.TsNs) |
|
|
|
func (logBuffer *LogBuffer) AddToBuffer(message *mq_pb.DataMessage) error { |
|
|
|
return logBuffer.AddDataToBuffer(message.Key, message.Value, message.TsNs) |
|
|
|
} |
|
|
|
|
|
|
|
// AddLogEntryToBuffer directly adds a LogEntry to the buffer, preserving offset information
|
|
|
|
func (logBuffer *LogBuffer) AddLogEntryToBuffer(logEntry *filer_pb.LogEntry) { |
|
|
|
func (logBuffer *LogBuffer) AddLogEntryToBuffer(logEntry *filer_pb.LogEntry) error { |
|
|
|
var toFlush *dataToFlush |
|
|
|
var marshalErr error |
|
|
|
logBuffer.Lock() |
|
|
|
defer func() { |
|
|
|
logBuffer.Unlock() |
|
|
|
if toFlush != nil { |
|
|
|
logBuffer.flushChan <- toFlush |
|
|
|
} |
|
|
|
if logBuffer.notifyFn != nil { |
|
|
|
logBuffer.notifyFn() |
|
|
|
// Only notify if there was no error
|
|
|
|
if marshalErr == nil { |
|
|
|
if logBuffer.notifyFn != nil { |
|
|
|
logBuffer.notifyFn() |
|
|
|
} |
|
|
|
// Notify all registered subscribers instantly (<1ms latency)
|
|
|
|
logBuffer.notifySubscribers() |
|
|
|
} |
|
|
|
// Notify all registered subscribers instantly (<1ms latency)
|
|
|
|
logBuffer.notifySubscribers() |
|
|
|
}() |
|
|
|
|
|
|
|
processingTsNs := logEntry.TsNs |
|
|
|
@ -278,10 +282,11 @@ func (logBuffer *LogBuffer) AddLogEntryToBuffer(logEntry *filer_pb.LogEntry) { |
|
|
|
logBuffer.LastTsNs.Store(processingTsNs) |
|
|
|
} |
|
|
|
|
|
|
|
logEntryData, marshalErr := proto.Marshal(logEntry) |
|
|
|
if marshalErr != nil { |
|
|
|
glog.Errorf("Failed to marshal LogEntry: %v", marshalErr) |
|
|
|
return |
|
|
|
logEntryData, err := proto.Marshal(logEntry) |
|
|
|
if err != nil { |
|
|
|
marshalErr = fmt.Errorf("failed to marshal LogEntry: %w", err) |
|
|
|
glog.Errorf("%v", marshalErr) |
|
|
|
return marshalErr |
|
|
|
} |
|
|
|
size := len(logEntryData) |
|
|
|
|
|
|
|
@ -316,7 +321,9 @@ func (logBuffer *LogBuffer) AddLogEntryToBuffer(logEntry *filer_pb.LogEntry) { |
|
|
|
const maxBufferSize = 1 << 30 // 1 GiB practical limit
|
|
|
|
// Ensure 2*size + 4 won't overflow int and stays within practical bounds
|
|
|
|
if size < 0 || size > (math.MaxInt-4)/2 || size > (maxBufferSize-4)/2 { |
|
|
|
return |
|
|
|
marshalErr = fmt.Errorf("message size %d exceeds maximum allowed size", size) |
|
|
|
glog.Errorf("%v", marshalErr) |
|
|
|
return marshalErr |
|
|
|
} |
|
|
|
// Safe to compute now that we've validated size is in valid range
|
|
|
|
newSize := 2*size + 4 |
|
|
|
@ -332,9 +339,10 @@ func (logBuffer *LogBuffer) AddLogEntryToBuffer(logEntry *filer_pb.LogEntry) { |
|
|
|
logBuffer.pos += size + 4 |
|
|
|
|
|
|
|
logBuffer.offset++ |
|
|
|
return nil |
|
|
|
} |
|
|
|
|
|
|
|
func (logBuffer *LogBuffer) AddDataToBuffer(partitionKey, data []byte, processingTsNs int64) { |
|
|
|
func (logBuffer *LogBuffer) AddDataToBuffer(partitionKey, data []byte, processingTsNs int64) error { |
|
|
|
|
|
|
|
// PERFORMANCE OPTIMIZATION: Pre-process expensive operations OUTSIDE the lock
|
|
|
|
var ts time.Time |
|
|
|
@ -353,17 +361,21 @@ func (logBuffer *LogBuffer) AddDataToBuffer(partitionKey, data []byte, processin |
|
|
|
} |
|
|
|
|
|
|
|
var toFlush *dataToFlush |
|
|
|
var marshalErr error |
|
|
|
logBuffer.Lock() |
|
|
|
defer func() { |
|
|
|
logBuffer.Unlock() |
|
|
|
if toFlush != nil { |
|
|
|
logBuffer.flushChan <- toFlush |
|
|
|
} |
|
|
|
if logBuffer.notifyFn != nil { |
|
|
|
logBuffer.notifyFn() |
|
|
|
// Only notify if there was no error
|
|
|
|
if marshalErr == nil { |
|
|
|
if logBuffer.notifyFn != nil { |
|
|
|
logBuffer.notifyFn() |
|
|
|
} |
|
|
|
// Notify all registered subscribers instantly (<1ms latency)
|
|
|
|
logBuffer.notifySubscribers() |
|
|
|
} |
|
|
|
// Notify all registered subscribers instantly (<1ms latency)
|
|
|
|
logBuffer.notifySubscribers() |
|
|
|
}() |
|
|
|
|
|
|
|
// Handle timestamp collision inside lock (rare case)
|
|
|
|
@ -381,10 +393,11 @@ func (logBuffer *LogBuffer) AddDataToBuffer(partitionKey, data []byte, processin |
|
|
|
logEntry.Offset = logBuffer.offset |
|
|
|
|
|
|
|
// Marshal with correct timestamp and offset
|
|
|
|
logEntryData, marshalErr := proto.Marshal(logEntry) |
|
|
|
if marshalErr != nil { |
|
|
|
glog.Errorf("Failed to marshal LogEntry: %v", marshalErr) |
|
|
|
return |
|
|
|
logEntryData, err := proto.Marshal(logEntry) |
|
|
|
if err != nil { |
|
|
|
marshalErr = fmt.Errorf("failed to marshal LogEntry: %w", err) |
|
|
|
glog.Errorf("%v", marshalErr) |
|
|
|
return marshalErr |
|
|
|
} |
|
|
|
|
|
|
|
size := len(logEntryData) |
|
|
|
@ -418,7 +431,9 @@ func (logBuffer *LogBuffer) AddDataToBuffer(partitionKey, data []byte, processin |
|
|
|
const maxBufferSize = 1 << 30 // 1 GiB practical limit
|
|
|
|
// Ensure 2*size + 4 won't overflow int and stays within practical bounds
|
|
|
|
if size < 0 || size > (math.MaxInt-4)/2 || size > (maxBufferSize-4)/2 { |
|
|
|
return |
|
|
|
marshalErr = fmt.Errorf("message size %d exceeds maximum allowed size", size) |
|
|
|
glog.Errorf("%v", marshalErr) |
|
|
|
return marshalErr |
|
|
|
} |
|
|
|
// Safe to compute now that we've validated size is in valid range
|
|
|
|
newSize := 2*size + 4 |
|
|
|
@ -434,6 +449,7 @@ func (logBuffer *LogBuffer) AddDataToBuffer(partitionKey, data []byte, processin |
|
|
|
logBuffer.pos += size + 4 |
|
|
|
|
|
|
|
logBuffer.offset++ |
|
|
|
return nil |
|
|
|
} |
|
|
|
|
|
|
|
func (logBuffer *LogBuffer) IsStopping() bool { |
|
|
|
@ -851,15 +867,15 @@ func copiedBytes(buf []byte) (copied *bytes.Buffer) { |
|
|
|
} |
|
|
|
|
|
|
|
func readTs(buf []byte, pos int) (size int, ts int64, err error) { |
|
|
|
// Bounds check for size field
|
|
|
|
if pos+4 > len(buf) { |
|
|
|
// Bounds check for size field (overflow-safe)
|
|
|
|
if pos < 0 || pos > len(buf)-4 { |
|
|
|
return 0, 0, fmt.Errorf("corrupted log buffer: cannot read size at pos %d, buffer length %d", pos, len(buf)) |
|
|
|
} |
|
|
|
|
|
|
|
size = int(util.BytesToUint32(buf[pos : pos+4])) |
|
|
|
|
|
|
|
// Bounds check for entry data
|
|
|
|
if pos+4+size > len(buf) { |
|
|
|
// Bounds check for entry data (overflow-safe, protects against negative size)
|
|
|
|
if size < 0 || size > len(buf)-pos-4 { |
|
|
|
return 0, 0, fmt.Errorf("corrupted log buffer: entry size %d at pos %d exceeds buffer length %d", size, pos, len(buf)) |
|
|
|
} |
|
|
|
|
|
|
|
|