diff --git a/weed/util/log_buffer/log_buffer.go b/weed/util/log_buffer/log_buffer.go index aff8ec80b..77077cc8f 100644 --- a/weed/util/log_buffer/log_buffer.go +++ b/weed/util/log_buffer/log_buffer.go @@ -33,6 +33,21 @@ type EachLogEntryWithOffsetFuncType func(logEntry *filer_pb.LogEntry, offset int type LogFlushFuncType func(logBuffer *LogBuffer, startTime, stopTime time.Time, buf []byte, minOffset, maxOffset int64) type LogReadFromDiskFuncType func(startPosition MessagePosition, stopTsNs int64, eachLogEntryFn EachLogEntryFuncType) (lastReadPosition MessagePosition, isDone bool, err error) +// DiskChunkCache caches chunks of historical data read from disk +type DiskChunkCache struct { + mu sync.RWMutex + chunks map[int64]*CachedDiskChunk // Key: chunk start offset (aligned to chunkSize) + maxChunks int // Maximum number of chunks to cache +} + +// CachedDiskChunk represents a cached chunk of disk data +type CachedDiskChunk struct { + startOffset int64 + endOffset int64 + messages []*filer_pb.LogEntry + lastAccess time.Time +} + type LogBuffer struct { LastFlushTsNs int64 name string @@ -63,6 +78,8 @@ type LogBuffer struct { hasOffsets bool lastFlushedOffset atomic.Int64 // Highest offset that has been flushed to disk (-1 = nothing flushed yet) lastFlushedTime atomic.Int64 // Latest timestamp that has been flushed to disk (0 = nothing flushed yet) + // Disk chunk cache for historical data reads + diskChunkCache *DiskChunkCache sync.RWMutex } @@ -81,6 +98,10 @@ func NewLogBuffer(name string, flushInterval time.Duration, flushFn LogFlushFunc flushChan: make(chan *dataToFlush, 256), isStopping: new(atomic.Bool), offset: 0, // Will be initialized from existing data if available + diskChunkCache: &DiskChunkCache{ + chunks: make(map[int64]*CachedDiskChunk), + maxChunks: 16, // Cache up to 16 chunks (configurable) + }, } lb.lastFlushedOffset.Store(-1) // Nothing flushed to disk yet go lb.loopFlush() diff --git a/weed/util/log_buffer/log_read_stateless.go b/weed/util/log_buffer/log_read_stateless.go index 2769bc2e8..e0db6aec1 100644 --- a/weed/util/log_buffer/log_read_stateless.go +++ b/weed/util/log_buffer/log_read_stateless.go @@ -173,6 +173,7 @@ func (logBuffer *LogBuffer) ReadMessagesAtOffset(startOffset int64, maxMessages // readHistoricalDataFromDisk reads messages from disk for historical offsets // This is called when the requested offset is older than what's in memory +// Uses an in-memory cache to avoid repeated disk I/O for the same chunks func readHistoricalDataFromDisk( logBuffer *LogBuffer, startOffset int64, @@ -180,55 +181,154 @@ func readHistoricalDataFromDisk( maxBytes int, highWaterMark int64, ) (messages []*filer_pb.LogEntry, nextOffset int64, err error) { - messages = make([]*filer_pb.LogEntry, 0, maxMessages) - nextOffset = startOffset - totalBytes := 0 - messageCount := 0 - - // Create a position for the start offset - startPosition := MessagePosition{ + const chunkSize = 1000 // Size of each cached chunk + + // Calculate chunk start offset (aligned to chunkSize boundary) + chunkStartOffset := (startOffset / chunkSize) * chunkSize + + // Try to get from cache first + cachedMessages, cacheHit := getCachedDiskChunk(logBuffer, chunkStartOffset) + + if cacheHit { + // Found in cache - extract requested messages + glog.V(3).Infof("[DiskCache] Cache HIT for chunk starting at offset %d (requested: %d)", + chunkStartOffset, startOffset) + + return extractMessagesFromCache(cachedMessages, startOffset, maxMessages, maxBytes) + } + + glog.V(3).Infof("[DiskCache] Cache MISS for chunk starting at offset %d, reading from disk", + chunkStartOffset) + + // Not in cache - read entire chunk from disk for caching + chunkMessages := make([]*filer_pb.LogEntry, 0, chunkSize) + chunkNextOffset := chunkStartOffset + + // Create a position for the chunk start + chunkPosition := MessagePosition{ IsOffsetBased: true, - Offset: startOffset, + Offset: chunkStartOffset, } - - // Define the callback function to collect messages + + // Define callback to collect the entire chunk eachMessageFn := func(logEntry *filer_pb.LogEntry) (isDone bool, err error) { - // Check if we've reached the maxMessages or maxBytes limit - if messageCount >= maxMessages { - return true, nil // Done, reached message limit - } - - entrySize := proto.Size(logEntry) - if totalBytes > 0 && totalBytes+entrySize > maxBytes { - return true, nil // Done, would exceed byte limit + // Read up to chunkSize messages for caching + if len(chunkMessages) >= chunkSize { + return true, nil } - - // Add this message to the results - messages = append(messages, logEntry) - messageCount++ - totalBytes += entrySize - nextOffset++ - - // Continue reading + + chunkMessages = append(chunkMessages, logEntry) + chunkNextOffset++ + + // Continue reading the chunk return false, nil } - - // Call the ReadFromDiskFn to read historical data - // This function should be non-blocking and have its own timeout handling - lastPosition, isDone, readErr := logBuffer.ReadFromDiskFn(startPosition, 0, eachMessageFn) - + + // Read chunk from disk + _, _, readErr := logBuffer.ReadFromDiskFn(chunkPosition, 0, eachMessageFn) + if readErr != nil { return nil, startOffset, fmt.Errorf("failed to read from disk: %w", readErr) } + + // Cache the chunk for future reads + if len(chunkMessages) > 0 { + cacheDiskChunk(logBuffer, chunkStartOffset, chunkNextOffset-1, chunkMessages) + glog.V(3).Infof("[DiskCache] Cached chunk: offsets %d-%d (%d messages)", + chunkStartOffset, chunkNextOffset-1, len(chunkMessages)) + } + + // Extract requested messages from the chunk + return extractMessagesFromCache(chunkMessages, startOffset, maxMessages, maxBytes) +} - // Update nextOffset based on what was actually read - if lastPosition.IsOffsetBased { - nextOffset = lastPosition.Offset + 1 // Next offset to read +// getCachedDiskChunk retrieves a cached disk chunk if available +func getCachedDiskChunk(logBuffer *LogBuffer, chunkStartOffset int64) ([]*filer_pb.LogEntry, bool) { + logBuffer.diskChunkCache.mu.RLock() + defer logBuffer.diskChunkCache.mu.RUnlock() + + if chunk, exists := logBuffer.diskChunkCache.chunks[chunkStartOffset]; exists { + // Update last access time + chunk.lastAccess = time.Now() + return chunk.messages, true } + + return nil, false +} - glog.V(3).Infof("[DiskRead] Read %d messages from disk (offset %d to %d), isDone=%v", - len(messages), startOffset, nextOffset-1, isDone) +// cacheDiskChunk stores a disk chunk in the cache with LRU eviction +func cacheDiskChunk(logBuffer *LogBuffer, startOffset, endOffset int64, messages []*filer_pb.LogEntry) { + logBuffer.diskChunkCache.mu.Lock() + defer logBuffer.diskChunkCache.mu.Unlock() + + // Check if we need to evict old chunks (LRU policy) + if len(logBuffer.diskChunkCache.chunks) >= logBuffer.diskChunkCache.maxChunks { + // Find least recently used chunk + var oldestOffset int64 + var oldestTime time.Time + first := true + + for offset, chunk := range logBuffer.diskChunkCache.chunks { + if first || chunk.lastAccess.Before(oldestTime) { + oldestOffset = offset + oldestTime = chunk.lastAccess + first = false + } + } + + // Evict oldest chunk + delete(logBuffer.diskChunkCache.chunks, oldestOffset) + glog.V(4).Infof("[DiskCache] Evicted chunk at offset %d (LRU)", oldestOffset) + } + + // Store new chunk + logBuffer.diskChunkCache.chunks[startOffset] = &CachedDiskChunk{ + startOffset: startOffset, + endOffset: endOffset, + messages: messages, + lastAccess: time.Now(), + } +} +// extractMessagesFromCache extracts requested messages from a cached chunk +// chunkMessages contains messages starting from the chunk's aligned start offset +// We need to skip to the requested startOffset within the chunk +func extractMessagesFromCache(chunkMessages []*filer_pb.LogEntry, startOffset int64, maxMessages, maxBytes int) ([]*filer_pb.LogEntry, int64, error) { + const chunkSize = 1000 + chunkStartOffset := (startOffset / chunkSize) * chunkSize + + // Calculate position within chunk + positionInChunk := int(startOffset - chunkStartOffset) + + // Check if requested offset is within the chunk + if positionInChunk < 0 || positionInChunk >= len(chunkMessages) { + glog.V(4).Infof("[DiskCache] Requested offset %d out of chunk range (chunk start: %d, size: %d)", + startOffset, chunkStartOffset, len(chunkMessages)) + return nil, startOffset, nil // Return empty but no error + } + + // Extract messages starting from the requested position + messages := make([]*filer_pb.LogEntry, 0, maxMessages) + nextOffset := startOffset + totalBytes := 0 + + for i := positionInChunk; i < len(chunkMessages) && len(messages) < maxMessages; i++ { + entry := chunkMessages[i] + entrySize := proto.Size(entry) + + // Check byte limit + if totalBytes > 0 && totalBytes+entrySize > maxBytes { + break + } + + messages = append(messages, entry) + totalBytes += entrySize + nextOffset++ + } + + glog.V(4).Infof("[DiskCache] Extracted %d messages from cache (offset %d-%d, bytes=%d)", + len(messages), startOffset, nextOffset-1, totalBytes) + return messages, nextOffset, nil }