|
|
@ -379,17 +379,40 @@ func (logBuffer *LogBuffer) AddDataToBuffer(partitionKey, data []byte, processin |
|
|
|
if logBuffer.LastTsNs.Load() >= processingTsNs { |
|
|
|
processingTsNs = logBuffer.LastTsNs.Add(1) |
|
|
|
ts = time.Unix(0, processingTsNs) |
|
|
|
// Re-marshal with corrected timestamp
|
|
|
|
logEntry.TsNs = processingTsNs |
|
|
|
logEntryData, _ = proto.Marshal(logEntry) |
|
|
|
} else { |
|
|
|
logBuffer.LastTsNs.Store(processingTsNs) |
|
|
|
} |
|
|
|
|
|
|
|
// CRITICAL FIX: Set the offset in the LogEntry before marshaling
|
|
|
|
// This ensures the flushed data contains the correct offset information
|
|
|
|
// Note: This also enables AddToBuffer to work correctly with Kafka-style offset-based reads
|
|
|
|
logEntry.Offset = logBuffer.offset |
|
|
|
|
|
|
|
// Marshal with correct timestamp and offset
|
|
|
|
logEntryData, _ = proto.Marshal(logEntry) |
|
|
|
|
|
|
|
size := len(logEntryData) |
|
|
|
|
|
|
|
if logBuffer.pos == 0 { |
|
|
|
logBuffer.startTime = ts |
|
|
|
// Reset offset tracking for new buffer
|
|
|
|
logBuffer.hasOffsets = false |
|
|
|
} |
|
|
|
|
|
|
|
// Track offset ranges for Kafka integration
|
|
|
|
// CRITICAL FIX: Track the current offset being written
|
|
|
|
if !logBuffer.hasOffsets { |
|
|
|
logBuffer.minOffset = logBuffer.offset |
|
|
|
logBuffer.maxOffset = logBuffer.offset |
|
|
|
logBuffer.hasOffsets = true |
|
|
|
} else { |
|
|
|
if logBuffer.offset < logBuffer.minOffset { |
|
|
|
logBuffer.minOffset = logBuffer.offset |
|
|
|
} |
|
|
|
if logBuffer.offset > logBuffer.maxOffset { |
|
|
|
logBuffer.maxOffset = logBuffer.offset |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
if logBuffer.startTime.Add(logBuffer.flushInterval).Before(ts) || len(logBuffer.buf)-logBuffer.pos < size+4 { |
|
|
@ -417,6 +440,7 @@ func (logBuffer *LogBuffer) AddDataToBuffer(partitionKey, data []byte, processin |
|
|
|
copy(logBuffer.buf[logBuffer.pos+4:logBuffer.pos+4+size], logEntryData) |
|
|
|
logBuffer.pos += size + 4 |
|
|
|
|
|
|
|
logBuffer.offset++ |
|
|
|
} |
|
|
|
|
|
|
|
func (logBuffer *LogBuffer) IsStopping() bool { |
|
|
@ -560,12 +584,12 @@ func (logBuffer *LogBuffer) copyToFlushInternal(withCallback bool) *dataToFlush |
|
|
|
logBuffer.hasOffsets = false |
|
|
|
logBuffer.minOffset = 0 |
|
|
|
logBuffer.maxOffset = 0 |
|
|
|
|
|
|
|
|
|
|
|
// CRITICAL FIX: Invalidate disk cache chunks after flush
|
|
|
|
// The cache may contain stale data from before this flush
|
|
|
|
// Invalidating ensures consumers will re-read fresh data from disk after flush
|
|
|
|
logBuffer.invalidateAllDiskCacheChunks() |
|
|
|
|
|
|
|
|
|
|
|
return d |
|
|
|
} |
|
|
|
return nil |
|
|
@ -576,7 +600,7 @@ func (logBuffer *LogBuffer) copyToFlushInternal(withCallback bool) *dataToFlush |
|
|
|
func (logBuffer *LogBuffer) invalidateAllDiskCacheChunks() { |
|
|
|
logBuffer.diskChunkCache.mu.Lock() |
|
|
|
defer logBuffer.diskChunkCache.mu.Unlock() |
|
|
|
|
|
|
|
|
|
|
|
if len(logBuffer.diskChunkCache.chunks) > 0 { |
|
|
|
glog.Infof("[DiskCache] Invalidating all %d cached chunks after flush", len(logBuffer.diskChunkCache.chunks)) |
|
|
|
logBuffer.diskChunkCache.chunks = make(map[int64]*CachedDiskChunk) |
|
|
@ -680,25 +704,31 @@ func (logBuffer *LogBuffer) ReadFromBuffer(lastReadPosition MessagePosition) (bu |
|
|
|
// if td < tm, case 2.3
|
|
|
|
// read from disk again
|
|
|
|
var tsMemory time.Time |
|
|
|
var tsBatchIndex int64 |
|
|
|
if !logBuffer.startTime.IsZero() { |
|
|
|
tsMemory = logBuffer.startTime |
|
|
|
tsBatchIndex = logBuffer.offset |
|
|
|
} |
|
|
|
for _, prevBuf := range logBuffer.prevBuffers.buffers { |
|
|
|
if !prevBuf.startTime.IsZero() && prevBuf.startTime.Before(tsMemory) { |
|
|
|
tsMemory = prevBuf.startTime |
|
|
|
tsBatchIndex = prevBuf.offset |
|
|
|
} |
|
|
|
} |
|
|
|
if tsMemory.IsZero() { // case 2.2
|
|
|
|
return nil, -2, nil |
|
|
|
} else if lastReadPosition.Time.Before(tsMemory) && lastReadPosition.Offset+1 < tsBatchIndex { // case 2.3
|
|
|
|
} else if lastReadPosition.Time.Before(tsMemory) { // case 2.3
|
|
|
|
// CRITICAL FIX: For time-based reads, only check timestamp for disk reads
|
|
|
|
// Don't use offset comparisons as they're not meaningful for time-based subscriptions
|
|
|
|
|
|
|
|
// Special case: If requested time is zero (Unix epoch), treat as "start from beginning"
|
|
|
|
// This handles queries that want to read all data without knowing the exact start time
|
|
|
|
if lastReadPosition.Time.IsZero() || lastReadPosition.Time.Unix() == 0 { |
|
|
|
// Start from the beginning of memory
|
|
|
|
// Fall through to case 2.1 to read from earliest buffer
|
|
|
|
} else if lastReadPosition.Offset == 0 && lastReadPosition.Time.Before(tsMemory) { |
|
|
|
// CRITICAL FIX: If this is the first read (offset=0) and time is slightly before memory,
|
|
|
|
// it's likely a race between starting to read and first message being written
|
|
|
|
// Fall through to case 2.1 to read from earliest buffer instead of triggering disk read
|
|
|
|
glog.V(2).Infof("first read at time %v before earliest memory %v, reading from memory", |
|
|
|
lastReadPosition.Time, tsMemory) |
|
|
|
} else { |
|
|
|
// Data not in memory buffers - read from disk
|
|
|
|
glog.V(0).Infof("resume from disk: requested time %v < earliest memory time %v", |
|
|
|