From 7e46abf052b029bfd703a6f73ad9b16dea8f25bc Mon Sep 17 00:00:00 2001 From: chrislu Date: Wed, 15 Oct 2025 22:00:38 -0700 Subject: [PATCH] fmt --- weed/util/log_buffer/log_buffer.go | 6 +-- weed/util/log_buffer/log_read_stateless.go | 56 +++++++++++----------- 2 files changed, 31 insertions(+), 31 deletions(-) diff --git a/weed/util/log_buffer/log_buffer.go b/weed/util/log_buffer/log_buffer.go index 77077cc8f..89b0627a1 100644 --- a/weed/util/log_buffer/log_buffer.go +++ b/weed/util/log_buffer/log_buffer.go @@ -35,9 +35,9 @@ type LogReadFromDiskFuncType func(startPosition MessagePosition, stopTsNs int64, // DiskChunkCache caches chunks of historical data read from disk type DiskChunkCache struct { - mu sync.RWMutex - chunks map[int64]*CachedDiskChunk // Key: chunk start offset (aligned to chunkSize) - maxChunks int // Maximum number of chunks to cache + mu sync.RWMutex + chunks map[int64]*CachedDiskChunk // Key: chunk start offset (aligned to chunkSize) + maxChunks int // Maximum number of chunks to cache } // CachedDiskChunk represents a cached chunk of disk data diff --git a/weed/util/log_buffer/log_read_stateless.go b/weed/util/log_buffer/log_read_stateless.go index e0db6aec1..5979bae1c 100644 --- a/weed/util/log_buffer/log_read_stateless.go +++ b/weed/util/log_buffer/log_read_stateless.go @@ -182,62 +182,62 @@ func readHistoricalDataFromDisk( highWaterMark int64, ) (messages []*filer_pb.LogEntry, nextOffset int64, err error) { const chunkSize = 1000 // Size of each cached chunk - + // Calculate chunk start offset (aligned to chunkSize boundary) chunkStartOffset := (startOffset / chunkSize) * chunkSize - + // Try to get from cache first cachedMessages, cacheHit := getCachedDiskChunk(logBuffer, chunkStartOffset) - + if cacheHit { // Found in cache - extract requested messages glog.V(3).Infof("[DiskCache] Cache HIT for chunk starting at offset %d (requested: %d)", chunkStartOffset, startOffset) - + return extractMessagesFromCache(cachedMessages, startOffset, maxMessages, maxBytes) } - + glog.V(3).Infof("[DiskCache] Cache MISS for chunk starting at offset %d, reading from disk", chunkStartOffset) - + // Not in cache - read entire chunk from disk for caching chunkMessages := make([]*filer_pb.LogEntry, 0, chunkSize) chunkNextOffset := chunkStartOffset - + // Create a position for the chunk start chunkPosition := MessagePosition{ IsOffsetBased: true, Offset: chunkStartOffset, } - + // Define callback to collect the entire chunk eachMessageFn := func(logEntry *filer_pb.LogEntry) (isDone bool, err error) { // Read up to chunkSize messages for caching if len(chunkMessages) >= chunkSize { return true, nil } - + chunkMessages = append(chunkMessages, logEntry) chunkNextOffset++ - + // Continue reading the chunk return false, nil } - + // Read chunk from disk _, _, readErr := logBuffer.ReadFromDiskFn(chunkPosition, 0, eachMessageFn) - + if readErr != nil { return nil, startOffset, fmt.Errorf("failed to read from disk: %w", readErr) } - + // Cache the chunk for future reads if len(chunkMessages) > 0 { cacheDiskChunk(logBuffer, chunkStartOffset, chunkNextOffset-1, chunkMessages) glog.V(3).Infof("[DiskCache] Cached chunk: offsets %d-%d (%d messages)", chunkStartOffset, chunkNextOffset-1, len(chunkMessages)) } - + // Extract requested messages from the chunk return extractMessagesFromCache(chunkMessages, startOffset, maxMessages, maxBytes) } @@ -246,13 +246,13 @@ func readHistoricalDataFromDisk( func getCachedDiskChunk(logBuffer *LogBuffer, chunkStartOffset int64) ([]*filer_pb.LogEntry, bool) { logBuffer.diskChunkCache.mu.RLock() defer logBuffer.diskChunkCache.mu.RUnlock() - + if chunk, exists := logBuffer.diskChunkCache.chunks[chunkStartOffset]; exists { // Update last access time chunk.lastAccess = time.Now() return chunk.messages, true } - + return nil, false } @@ -260,14 +260,14 @@ func getCachedDiskChunk(logBuffer *LogBuffer, chunkStartOffset int64) ([]*filer_ func cacheDiskChunk(logBuffer *LogBuffer, startOffset, endOffset int64, messages []*filer_pb.LogEntry) { logBuffer.diskChunkCache.mu.Lock() defer logBuffer.diskChunkCache.mu.Unlock() - + // Check if we need to evict old chunks (LRU policy) if len(logBuffer.diskChunkCache.chunks) >= logBuffer.diskChunkCache.maxChunks { // Find least recently used chunk var oldestOffset int64 var oldestTime time.Time first := true - + for offset, chunk := range logBuffer.diskChunkCache.chunks { if first || chunk.lastAccess.Before(oldestTime) { oldestOffset = offset @@ -275,12 +275,12 @@ func cacheDiskChunk(logBuffer *LogBuffer, startOffset, endOffset int64, messages first = false } } - + // Evict oldest chunk delete(logBuffer.diskChunkCache.chunks, oldestOffset) glog.V(4).Infof("[DiskCache] Evicted chunk at offset %d (LRU)", oldestOffset) } - + // Store new chunk logBuffer.diskChunkCache.chunks[startOffset] = &CachedDiskChunk{ startOffset: startOffset, @@ -296,39 +296,39 @@ func cacheDiskChunk(logBuffer *LogBuffer, startOffset, endOffset int64, messages func extractMessagesFromCache(chunkMessages []*filer_pb.LogEntry, startOffset int64, maxMessages, maxBytes int) ([]*filer_pb.LogEntry, int64, error) { const chunkSize = 1000 chunkStartOffset := (startOffset / chunkSize) * chunkSize - + // Calculate position within chunk positionInChunk := int(startOffset - chunkStartOffset) - + // Check if requested offset is within the chunk if positionInChunk < 0 || positionInChunk >= len(chunkMessages) { glog.V(4).Infof("[DiskCache] Requested offset %d out of chunk range (chunk start: %d, size: %d)", startOffset, chunkStartOffset, len(chunkMessages)) return nil, startOffset, nil // Return empty but no error } - + // Extract messages starting from the requested position messages := make([]*filer_pb.LogEntry, 0, maxMessages) nextOffset := startOffset totalBytes := 0 - + for i := positionInChunk; i < len(chunkMessages) && len(messages) < maxMessages; i++ { entry := chunkMessages[i] entrySize := proto.Size(entry) - + // Check byte limit if totalBytes > 0 && totalBytes+entrySize > maxBytes { break } - + messages = append(messages, entry) totalBytes += entrySize nextOffset++ } - + glog.V(4).Infof("[DiskCache] Extracted %d messages from cache (offset %d-%d, bytes=%d)", len(messages), startOffset, nextOffset-1, totalBytes) - + return messages, nextOffset, nil }