Browse Source

fmt

pull/7329/head
chrislu 5 days ago
parent
commit
7e46abf052
  1. 6
      weed/util/log_buffer/log_buffer.go
  2. 56
      weed/util/log_buffer/log_read_stateless.go

6
weed/util/log_buffer/log_buffer.go

@ -35,9 +35,9 @@ type LogReadFromDiskFuncType func(startPosition MessagePosition, stopTsNs int64,
// DiskChunkCache caches chunks of historical data read from disk
type DiskChunkCache struct {
mu sync.RWMutex
chunks map[int64]*CachedDiskChunk // Key: chunk start offset (aligned to chunkSize)
maxChunks int // Maximum number of chunks to cache
mu sync.RWMutex
chunks map[int64]*CachedDiskChunk // Key: chunk start offset (aligned to chunkSize)
maxChunks int // Maximum number of chunks to cache
}
// CachedDiskChunk represents a cached chunk of disk data

56
weed/util/log_buffer/log_read_stateless.go

@ -182,62 +182,62 @@ func readHistoricalDataFromDisk(
highWaterMark int64,
) (messages []*filer_pb.LogEntry, nextOffset int64, err error) {
const chunkSize = 1000 // Size of each cached chunk
// Calculate chunk start offset (aligned to chunkSize boundary)
chunkStartOffset := (startOffset / chunkSize) * chunkSize
// Try to get from cache first
cachedMessages, cacheHit := getCachedDiskChunk(logBuffer, chunkStartOffset)
if cacheHit {
// Found in cache - extract requested messages
glog.V(3).Infof("[DiskCache] Cache HIT for chunk starting at offset %d (requested: %d)",
chunkStartOffset, startOffset)
return extractMessagesFromCache(cachedMessages, startOffset, maxMessages, maxBytes)
}
glog.V(3).Infof("[DiskCache] Cache MISS for chunk starting at offset %d, reading from disk",
chunkStartOffset)
// Not in cache - read entire chunk from disk for caching
chunkMessages := make([]*filer_pb.LogEntry, 0, chunkSize)
chunkNextOffset := chunkStartOffset
// Create a position for the chunk start
chunkPosition := MessagePosition{
IsOffsetBased: true,
Offset: chunkStartOffset,
}
// Define callback to collect the entire chunk
eachMessageFn := func(logEntry *filer_pb.LogEntry) (isDone bool, err error) {
// Read up to chunkSize messages for caching
if len(chunkMessages) >= chunkSize {
return true, nil
}
chunkMessages = append(chunkMessages, logEntry)
chunkNextOffset++
// Continue reading the chunk
return false, nil
}
// Read chunk from disk
_, _, readErr := logBuffer.ReadFromDiskFn(chunkPosition, 0, eachMessageFn)
if readErr != nil {
return nil, startOffset, fmt.Errorf("failed to read from disk: %w", readErr)
}
// Cache the chunk for future reads
if len(chunkMessages) > 0 {
cacheDiskChunk(logBuffer, chunkStartOffset, chunkNextOffset-1, chunkMessages)
glog.V(3).Infof("[DiskCache] Cached chunk: offsets %d-%d (%d messages)",
chunkStartOffset, chunkNextOffset-1, len(chunkMessages))
}
// Extract requested messages from the chunk
return extractMessagesFromCache(chunkMessages, startOffset, maxMessages, maxBytes)
}
@ -246,13 +246,13 @@ func readHistoricalDataFromDisk(
func getCachedDiskChunk(logBuffer *LogBuffer, chunkStartOffset int64) ([]*filer_pb.LogEntry, bool) {
logBuffer.diskChunkCache.mu.RLock()
defer logBuffer.diskChunkCache.mu.RUnlock()
if chunk, exists := logBuffer.diskChunkCache.chunks[chunkStartOffset]; exists {
// Update last access time
chunk.lastAccess = time.Now()
return chunk.messages, true
}
return nil, false
}
@ -260,14 +260,14 @@ func getCachedDiskChunk(logBuffer *LogBuffer, chunkStartOffset int64) ([]*filer_
func cacheDiskChunk(logBuffer *LogBuffer, startOffset, endOffset int64, messages []*filer_pb.LogEntry) {
logBuffer.diskChunkCache.mu.Lock()
defer logBuffer.diskChunkCache.mu.Unlock()
// Check if we need to evict old chunks (LRU policy)
if len(logBuffer.diskChunkCache.chunks) >= logBuffer.diskChunkCache.maxChunks {
// Find least recently used chunk
var oldestOffset int64
var oldestTime time.Time
first := true
for offset, chunk := range logBuffer.diskChunkCache.chunks {
if first || chunk.lastAccess.Before(oldestTime) {
oldestOffset = offset
@ -275,12 +275,12 @@ func cacheDiskChunk(logBuffer *LogBuffer, startOffset, endOffset int64, messages
first = false
}
}
// Evict oldest chunk
delete(logBuffer.diskChunkCache.chunks, oldestOffset)
glog.V(4).Infof("[DiskCache] Evicted chunk at offset %d (LRU)", oldestOffset)
}
// Store new chunk
logBuffer.diskChunkCache.chunks[startOffset] = &CachedDiskChunk{
startOffset: startOffset,
@ -296,39 +296,39 @@ func cacheDiskChunk(logBuffer *LogBuffer, startOffset, endOffset int64, messages
func extractMessagesFromCache(chunkMessages []*filer_pb.LogEntry, startOffset int64, maxMessages, maxBytes int) ([]*filer_pb.LogEntry, int64, error) {
const chunkSize = 1000
chunkStartOffset := (startOffset / chunkSize) * chunkSize
// Calculate position within chunk
positionInChunk := int(startOffset - chunkStartOffset)
// Check if requested offset is within the chunk
if positionInChunk < 0 || positionInChunk >= len(chunkMessages) {
glog.V(4).Infof("[DiskCache] Requested offset %d out of chunk range (chunk start: %d, size: %d)",
startOffset, chunkStartOffset, len(chunkMessages))
return nil, startOffset, nil // Return empty but no error
}
// Extract messages starting from the requested position
messages := make([]*filer_pb.LogEntry, 0, maxMessages)
nextOffset := startOffset
totalBytes := 0
for i := positionInChunk; i < len(chunkMessages) && len(messages) < maxMessages; i++ {
entry := chunkMessages[i]
entrySize := proto.Size(entry)
// Check byte limit
if totalBytes > 0 && totalBytes+entrySize > maxBytes {
break
}
messages = append(messages, entry)
totalBytes += entrySize
nextOffset++
}
glog.V(4).Infof("[DiskCache] Extracted %d messages from cache (offset %d-%d, bytes=%d)",
len(messages), startOffset, nextOffset-1, totalBytes)
return messages, nextOffset, nil
}

Loading…
Cancel
Save