diff --git a/weed/util/log_buffer/log_buffer.go b/weed/util/log_buffer/log_buffer.go index 50449db31..9defae932 100644 --- a/weed/util/log_buffer/log_buffer.go +++ b/weed/util/log_buffer/log_buffer.go @@ -379,17 +379,40 @@ func (logBuffer *LogBuffer) AddDataToBuffer(partitionKey, data []byte, processin if logBuffer.LastTsNs.Load() >= processingTsNs { processingTsNs = logBuffer.LastTsNs.Add(1) ts = time.Unix(0, processingTsNs) - // Re-marshal with corrected timestamp logEntry.TsNs = processingTsNs - logEntryData, _ = proto.Marshal(logEntry) } else { logBuffer.LastTsNs.Store(processingTsNs) } + // CRITICAL FIX: Set the offset in the LogEntry before marshaling + // This ensures the flushed data contains the correct offset information + // Note: This also enables AddToBuffer to work correctly with Kafka-style offset-based reads + logEntry.Offset = logBuffer.offset + + // Marshal with correct timestamp and offset + logEntryData, _ = proto.Marshal(logEntry) + size := len(logEntryData) if logBuffer.pos == 0 { logBuffer.startTime = ts + // Reset offset tracking for new buffer + logBuffer.hasOffsets = false + } + + // Track offset ranges for Kafka integration + // CRITICAL FIX: Track the current offset being written + if !logBuffer.hasOffsets { + logBuffer.minOffset = logBuffer.offset + logBuffer.maxOffset = logBuffer.offset + logBuffer.hasOffsets = true + } else { + if logBuffer.offset < logBuffer.minOffset { + logBuffer.minOffset = logBuffer.offset + } + if logBuffer.offset > logBuffer.maxOffset { + logBuffer.maxOffset = logBuffer.offset + } } if logBuffer.startTime.Add(logBuffer.flushInterval).Before(ts) || len(logBuffer.buf)-logBuffer.pos < size+4 { @@ -417,6 +440,7 @@ func (logBuffer *LogBuffer) AddDataToBuffer(partitionKey, data []byte, processin copy(logBuffer.buf[logBuffer.pos+4:logBuffer.pos+4+size], logEntryData) logBuffer.pos += size + 4 + logBuffer.offset++ } func (logBuffer *LogBuffer) IsStopping() bool { @@ -560,12 +584,12 @@ func (logBuffer *LogBuffer) copyToFlushInternal(withCallback bool) *dataToFlush logBuffer.hasOffsets = false logBuffer.minOffset = 0 logBuffer.maxOffset = 0 - + // CRITICAL FIX: Invalidate disk cache chunks after flush // The cache may contain stale data from before this flush // Invalidating ensures consumers will re-read fresh data from disk after flush logBuffer.invalidateAllDiskCacheChunks() - + return d } return nil @@ -576,7 +600,7 @@ func (logBuffer *LogBuffer) copyToFlushInternal(withCallback bool) *dataToFlush func (logBuffer *LogBuffer) invalidateAllDiskCacheChunks() { logBuffer.diskChunkCache.mu.Lock() defer logBuffer.diskChunkCache.mu.Unlock() - + if len(logBuffer.diskChunkCache.chunks) > 0 { glog.Infof("[DiskCache] Invalidating all %d cached chunks after flush", len(logBuffer.diskChunkCache.chunks)) logBuffer.diskChunkCache.chunks = make(map[int64]*CachedDiskChunk) @@ -680,25 +704,31 @@ func (logBuffer *LogBuffer) ReadFromBuffer(lastReadPosition MessagePosition) (bu // if td < tm, case 2.3 // read from disk again var tsMemory time.Time - var tsBatchIndex int64 if !logBuffer.startTime.IsZero() { tsMemory = logBuffer.startTime - tsBatchIndex = logBuffer.offset } for _, prevBuf := range logBuffer.prevBuffers.buffers { if !prevBuf.startTime.IsZero() && prevBuf.startTime.Before(tsMemory) { tsMemory = prevBuf.startTime - tsBatchIndex = prevBuf.offset } } if tsMemory.IsZero() { // case 2.2 return nil, -2, nil - } else if lastReadPosition.Time.Before(tsMemory) && lastReadPosition.Offset+1 < tsBatchIndex { // case 2.3 + } else if lastReadPosition.Time.Before(tsMemory) { // case 2.3 + // CRITICAL FIX: For time-based reads, only check timestamp for disk reads + // Don't use offset comparisons as they're not meaningful for time-based subscriptions + // Special case: If requested time is zero (Unix epoch), treat as "start from beginning" // This handles queries that want to read all data without knowing the exact start time if lastReadPosition.Time.IsZero() || lastReadPosition.Time.Unix() == 0 { // Start from the beginning of memory // Fall through to case 2.1 to read from earliest buffer + } else if lastReadPosition.Offset == 0 && lastReadPosition.Time.Before(tsMemory) { + // CRITICAL FIX: If this is the first read (offset=0) and time is slightly before memory, + // it's likely a race between starting to read and first message being written + // Fall through to case 2.1 to read from earliest buffer instead of triggering disk read + glog.V(2).Infof("first read at time %v before earliest memory %v, reading from memory", + lastReadPosition.Time, tsMemory) } else { // Data not in memory buffers - read from disk glog.V(0).Infof("resume from disk: requested time %v < earliest memory time %v",