diff --git a/weed/mq/broker/broker_grpc_fetch.go b/weed/mq/broker/broker_grpc_fetch.go index dafb0abd1..09503fb3b 100644 --- a/weed/mq/broker/broker_grpc_fetch.go +++ b/weed/mq/broker/broker_grpc_fetch.go @@ -105,15 +105,25 @@ func (b *MessageQueueBroker) FetchMessage(ctx context.Context, req *mq_pb.FetchM requestedOffset := req.StartOffset // Read messages from LogBuffer (stateless read) - glog.V(3).Infof("[FetchMessage] About to read from LogBuffer at offset %d (requested=%d)", requestedOffset, req.StartOffset) + glog.Infof("[FetchMessage] About to read from LogBuffer: topic=%s partition=%v offset=%d maxMessages=%d maxBytes=%d", + t.Name, partition, requestedOffset, maxMessages, maxBytes) + logEntries, nextOffset, highWaterMark, endOfPartition, err := localPartition.LogBuffer.ReadMessagesAtOffset( requestedOffset, maxMessages, maxBytes, ) - glog.V(3).Infof("[FetchMessage] Read completed: %d entries, nextOffset=%d, hwm=%d, eop=%v, err=%v", - len(logEntries), nextOffset, highWaterMark, endOfPartition, err) + // CRITICAL: Log the result with full details + if len(logEntries) == 0 && highWaterMark > requestedOffset && err == nil { + glog.Errorf("[FetchMessage] CRITICAL: ReadMessagesAtOffset returned 0 entries but HWM=%d > requestedOffset=%d (should return data!)", + highWaterMark, requestedOffset) + glog.Errorf("[FetchMessage] Details: nextOffset=%d, endOfPartition=%v, bufferStartOffset=%d", + nextOffset, endOfPartition, localPartition.LogBuffer.GetLogStartOffset()) + } + + glog.Infof("[FetchMessage] Read completed: topic=%s partition=%v offset=%d -> %d entries, nextOffset=%d, hwm=%d, eop=%v, err=%v", + t.Name, partition, requestedOffset, len(logEntries), nextOffset, highWaterMark, endOfPartition, err) if err != nil { // Check if this is an "offset out of range" error diff --git a/weed/mq/kafka/integration/broker_client_fetch.go b/weed/mq/kafka/integration/broker_client_fetch.go index 51517d224..25af9e809 100644 --- a/weed/mq/kafka/integration/broker_client_fetch.go +++ b/weed/mq/kafka/integration/broker_client_fetch.go @@ -83,7 +83,7 @@ func (bc *BrokerClient) FetchMessagesStateless(ctx context.Context, topic string // CRITICAL DEBUGGING: Log what broker returned glog.Infof("[FETCH-STATELESS-CLIENT] Broker response for %s[%d] offset %d: messages=%d, nextOffset=%d, hwm=%d, logStart=%d, endOfPartition=%v", topic, partition, startOffset, len(resp.Messages), resp.NextOffset, resp.HighWaterMark, resp.LogStartOffset, resp.EndOfPartition) - + // CRITICAL: If broker returns 0 messages but hwm > startOffset, something is wrong if len(resp.Messages) == 0 && resp.HighWaterMark > startOffset { glog.Errorf("[FETCH-STATELESS-CLIENT] CRITICAL BUG: Broker returned 0 messages for %s[%d] offset %d, but HWM=%d (should have %d messages available)", @@ -103,7 +103,7 @@ func (bc *BrokerClient) FetchMessagesStateless(ctx context.Context, topic string Offset: startOffset + int64(i), // Sequential offset assignment } records = append(records, record) - + // Log each message for debugging glog.V(4).Infof("[FETCH-STATELESS-CLIENT] Message %d: offset=%d, keyLen=%d, valueLen=%d", i, record.Offset, len(msg.Key), len(msg.Value)) diff --git a/weed/util/log_buffer/log_read_stateless.go b/weed/util/log_buffer/log_read_stateless.go index e2dd8eee8..e44664f62 100644 --- a/weed/util/log_buffer/log_read_stateless.go +++ b/weed/util/log_buffer/log_read_stateless.go @@ -34,7 +34,7 @@ func (logBuffer *LogBuffer) ReadMessagesAtOffset(startOffset int64, maxMessages endOfPartition bool, err error, ) { - glog.V(4).Infof("[StatelessRead] Reading from offset %d, maxMessages=%d, maxBytes=%d", + glog.Infof("[StatelessRead] ENTRY: startOffset=%d, maxMessages=%d, maxBytes=%d", startOffset, maxMessages, maxBytes) // Quick validation @@ -54,10 +54,13 @@ func (logBuffer *LogBuffer) ReadMessagesAtOffset(startOffset int64, maxMessages bufferStartOffset := logBuffer.bufferStartOffset highWaterMark = currentBufferEnd + glog.Infof("[StatelessRead] Buffer state: startOffset=%d, bufferStart=%d, bufferEnd=%d, HWM=%d, pos=%d", + startOffset, bufferStartOffset, currentBufferEnd, highWaterMark, logBuffer.pos) + // Special case: empty buffer (no data written yet) if currentBufferEnd == 0 && bufferStartOffset == 0 && logBuffer.pos == 0 { logBuffer.RUnlock() - glog.V(4).Infof("[StatelessRead] Empty buffer, returning no data with endOfPartition=true") + glog.Infof("[StatelessRead] PATH: Empty buffer (no data written yet)") // Return empty result - partition exists but has no data yet // Preserve the requested offset in nextOffset return messages, startOffset, 0, true, nil @@ -65,6 +68,7 @@ func (logBuffer *LogBuffer) ReadMessagesAtOffset(startOffset int64, maxMessages // Check if requested offset is in current buffer if startOffset >= bufferStartOffset && startOffset < currentBufferEnd { + glog.Infof("[StatelessRead] PATH: Attempting to read from current/previous memory buffers") // Read from current buffer glog.V(4).Infof("[StatelessRead] Reading from current buffer: start=%d, end=%d", bufferStartOffset, currentBufferEnd) @@ -139,38 +143,48 @@ func (logBuffer *LogBuffer) ReadMessagesAtOffset(startOffset int64, maxMessages // 1. startOffset < bufferStartOffset: Historical data // 2. startOffset in buffer range but not in memory: Data was flushed (from fall-through above) if startOffset < currentBufferEnd { + glog.Infof("[StatelessRead] PATH: Data not in memory, attempting DISK READ") + // Historical data or flushed data - try to read from disk if ReadFromDiskFn is configured if startOffset < bufferStartOffset { - glog.V(2).Infof("[StatelessRead] Requested offset %d < buffer start %d (historical data), attempting disk read", + glog.Errorf("[StatelessRead] CASE 1: Historical data - offset %d < bufferStart %d", startOffset, bufferStartOffset) } else { - glog.V(2).Infof("[StatelessRead] Requested offset %d in range but not in memory (flushed data), attempting disk read", - startOffset) + glog.Errorf("[StatelessRead] CASE 2: Flushed data - offset %d in range [%d, %d) but not in memory", + startOffset, bufferStartOffset, currentBufferEnd) } // Check if disk read function is configured if logBuffer.ReadFromDiskFn == nil { - glog.V(2).Infof("[StatelessRead] No disk read function configured, returning error") + glog.Errorf("[StatelessRead] CRITICAL: ReadFromDiskFn is NIL! Cannot read from disk.") if startOffset < bufferStartOffset { - return messages, startOffset, highWaterMark, false, fmt.Errorf("offset %d too old (earliest in-memory: %d)", + return messages, startOffset, highWaterMark, false, fmt.Errorf("offset %d too old (earliest in-memory: %d), and ReadFromDiskFn is nil", startOffset, bufferStartOffset) } - return messages, startOffset, highWaterMark, false, fmt.Errorf("offset %d not in memory (buffer: %d-%d), no disk read available", + return messages, startOffset, highWaterMark, false, fmt.Errorf("offset %d not in memory (buffer: %d-%d), and ReadFromDiskFn is nil", startOffset, bufferStartOffset, currentBufferEnd) } + glog.Infof("[StatelessRead] ReadFromDiskFn is configured, calling readHistoricalDataFromDisk...") + // Read from disk (this is async/non-blocking if the ReadFromDiskFn is properly implemented) // The ReadFromDiskFn should handle its own timeouts and not block indefinitely diskMessages, diskNextOffset, diskErr := readHistoricalDataFromDisk( logBuffer, startOffset, maxMessages, maxBytes, highWaterMark) if diskErr != nil { - glog.V(2).Infof("[StatelessRead] Disk read failed for offset %d: %v", startOffset, diskErr) + glog.Errorf("[StatelessRead] CRITICAL: Disk read FAILED for offset %d: %v", startOffset, diskErr) + // IMPORTANT: Return retryable error instead of silently returning empty! return messages, startOffset, highWaterMark, false, fmt.Errorf("disk read failed for offset %d: %v", startOffset, diskErr) } - glog.V(2).Infof("[StatelessRead] Successfully read %d messages from disk, nextOffset=%d", - len(diskMessages), diskNextOffset) + if len(diskMessages) == 0 { + glog.Errorf("[StatelessRead] WARNING: Disk read returned 0 messages for offset %d (HWM=%d, bufferStart=%d)", + startOffset, highWaterMark, bufferStartOffset) + } else { + glog.Infof("[StatelessRead] SUCCESS: Disk read returned %d messages, nextOffset=%d", + len(diskMessages), diskNextOffset) + } // Return disk data endOfPartition = diskNextOffset >= bufferStartOffset && len(diskMessages) < maxMessages @@ -195,21 +209,26 @@ func readHistoricalDataFromDisk( ) (messages []*filer_pb.LogEntry, nextOffset int64, err error) { const chunkSize = 1000 // Size of each cached chunk + glog.Infof("[DiskRead] ENTRY: startOffset=%d, maxMessages=%d, maxBytes=%d, HWM=%d", + startOffset, maxMessages, maxBytes, highWaterMark) + // Calculate chunk start offset (aligned to chunkSize boundary) chunkStartOffset := (startOffset / chunkSize) * chunkSize + glog.Infof("[DiskRead] Calculated chunkStartOffset=%d (aligned from %d)", chunkStartOffset, startOffset) + // Try to get from cache first cachedMessages, cacheHit := getCachedDiskChunk(logBuffer, chunkStartOffset) if cacheHit { // Found in cache - extract requested messages - glog.V(3).Infof("[DiskCache] Cache HIT for chunk starting at offset %d (requested: %d)", - chunkStartOffset, startOffset) + glog.Infof("[DiskCache] Cache HIT for chunk starting at offset %d (requested: %d), cachedMessages=%d", + chunkStartOffset, startOffset, len(cachedMessages)) return extractMessagesFromCache(cachedMessages, startOffset, maxMessages, maxBytes) } - glog.V(3).Infof("[DiskCache] Cache MISS for chunk starting at offset %d, reading from disk", + glog.Infof("[DiskCache] Cache MISS for chunk starting at offset %d, reading from disk via ReadFromDiskFn", chunkStartOffset) // Not in cache - read entire chunk from disk for caching @@ -237,21 +256,29 @@ func readHistoricalDataFromDisk( } // Read chunk from disk + glog.Infof("[DiskRead] Calling ReadFromDiskFn with position offset=%d...", chunkStartOffset) _, _, readErr := logBuffer.ReadFromDiskFn(chunkPosition, 0, eachMessageFn) if readErr != nil { + glog.Errorf("[DiskRead] CRITICAL: ReadFromDiskFn returned ERROR: %v", readErr) return nil, startOffset, fmt.Errorf("failed to read from disk: %w", readErr) } + glog.Infof("[DiskRead] ReadFromDiskFn completed successfully, read %d messages", len(chunkMessages)) + // Cache the chunk for future reads if len(chunkMessages) > 0 { cacheDiskChunk(logBuffer, chunkStartOffset, chunkNextOffset-1, chunkMessages) - glog.V(3).Infof("[DiskCache] Cached chunk: offsets %d-%d (%d messages)", + glog.Infof("[DiskCache] Cached chunk: offsets %d-%d (%d messages)", chunkStartOffset, chunkNextOffset-1, len(chunkMessages)) + } else { + glog.Errorf("[DiskRead] WARNING: ReadFromDiskFn returned 0 messages for chunkStart=%d", chunkStartOffset) } // Extract requested messages from the chunk - return extractMessagesFromCache(chunkMessages, startOffset, maxMessages, maxBytes) + result, resNextOffset, resErr := extractMessagesFromCache(chunkMessages, startOffset, maxMessages, maxBytes) + glog.Infof("[DiskRead] EXIT: Returning %d messages, nextOffset=%d, err=%v", len(result), resNextOffset, resErr) + return result, resNextOffset, resErr } // getCachedDiskChunk retrieves a cached disk chunk if available