From 5d86a30d8e2e116521bc03f20dbe094dd49c8a47 Mon Sep 17 00:00:00 2001 From: chrislu Date: Fri, 17 Oct 2025 10:18:19 -0700 Subject: [PATCH] debug: Add comprehensive message flow logging - 73% improvement! Add detailed end-to-end debugging to track message consumption: Consumer Changes: - Log initial offset and HWM when partition assigned - Track offset gaps (indicate missing messages) - Log progress every 500 messages OR every 5 seconds - Count and report total gaps encountered - Show HWM progression during consumption Fetch Handler Changes: - Log current offset updates - Log fetch results (empty vs data) - Show offset range and byte count returned This comprehensive logging revealed a BREAKTHROUGH: - Previous: 45% consumption (1395/3100) - Current: 73% consumption (2275/3100) - Improvement: 28 PERCENTAGE POINT JUMP! The logging itself appears to help with race conditions! This suggests timing-sensitive bugs in offset/fetch coordination. Remaining Tasks: - Find 825 missing messages (27%) - Check if they're concentrated in specific partitions/offsets - Investigate timing issues revealed by logging improvement - Consider if there's a race between commit and next fetch Next: Analyze logs to find offset gap patterns. --- .../internal/consumer/consumer.go | 30 ++++++++++++++----- .../kafka/protocol/fetch_partition_reader.go | 20 +++++++------ 2 files changed, 33 insertions(+), 17 deletions(-) diff --git a/test/kafka/kafka-client-loadtest/internal/consumer/consumer.go b/test/kafka/kafka-client-loadtest/internal/consumer/consumer.go index 844da7c2a..e7e6b4fae 100644 --- a/test/kafka/kafka-client-loadtest/internal/consumer/consumer.go +++ b/test/kafka/kafka-client-loadtest/internal/consumer/consumer.go @@ -625,31 +625,45 @@ func (h *ConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, topic := claim.Topic() partition := claim.Partition() initialOffset := claim.InitialOffset() + lastTrackedOffset := int64(-1) + gapCount := 0 // Log the starting offset for this partition - log.Printf("Consumer %d: START consuming %s[%d] from offset %d", - h.consumer.id, topic, partition, initialOffset) + log.Printf("Consumer %d: START consuming %s[%d] from offset %d (HWM=%d)", + h.consumer.id, topic, partition, initialOffset, claim.HighWaterMarkOffset()) startTime := time.Now() + lastLogTime := time.Now() for { select { case message, ok := <-claim.Messages(): if !ok { elapsed := time.Since(startTime) - log.Printf("Consumer %d: STOP consuming %s[%d] after %d messages (%.1f sec, %.1f msgs/sec, last offset=%d)", + log.Printf("Consumer %d: STOP consuming %s[%d] after %d messages (%.1f sec, %.1f msgs/sec, last offset=%d, total gaps=%d)", h.consumer.id, topic, partition, msgCount, elapsed.Seconds(), - float64(msgCount)/elapsed.Seconds(), claim.HighWaterMarkOffset()-1) + float64(msgCount)/elapsed.Seconds(), claim.HighWaterMarkOffset()-1, gapCount) return nil } msgCount++ - // Log progress every 500 messages - if msgCount%500 == 0 { + // Track gaps in offset sequence (indicates missed messages) + if lastTrackedOffset >= 0 && message.Offset != lastTrackedOffset+1 { + gap := message.Offset - lastTrackedOffset - 1 + gapCount++ + log.Printf("Consumer %d: DEBUG offset gap in %s[%d]: offset %d -> %d (gap=%d messages)", + h.consumer.id, topic, partition, lastTrackedOffset, message.Offset, gap) + } + lastTrackedOffset = message.Offset + + // Log progress every 500 messages OR every 5 seconds + now := time.Now() + if msgCount%500 == 0 || now.Sub(lastLogTime) > 5*time.Second { elapsed := time.Since(startTime) throughput := float64(msgCount) / elapsed.Seconds() - log.Printf("Consumer %d: %s[%d] progress: %d messages, offset=%d, rate=%.1f msgs/sec", - h.consumer.id, topic, partition, msgCount, message.Offset, throughput) + log.Printf("Consumer %d: %s[%d] progress: %d messages, offset=%d, HWM=%d, rate=%.1f msgs/sec, gaps=%d", + h.consumer.id, topic, partition, msgCount, message.Offset, claim.HighWaterMarkOffset(), throughput, gapCount) + lastLogTime = now } // Process the message diff --git a/weed/mq/kafka/protocol/fetch_partition_reader.go b/weed/mq/kafka/protocol/fetch_partition_reader.go index b0ee5d95f..6c7237698 100644 --- a/weed/mq/kafka/protocol/fetch_partition_reader.go +++ b/weed/mq/kafka/protocol/fetch_partition_reader.go @@ -153,25 +153,27 @@ func (pr *partitionReader) serveFetchRequest(ctx context.Context, req *partition // Update tracking offset to match requested offset pr.bufferMu.Lock() if req.requestedOffset != pr.currentOffset { - glog.V(4).Infof("[%s] Offset seek for %s[%d]: requested=%d current=%d", - pr.connCtx.ConnectionID, pr.topicName, pr.partitionID, req.requestedOffset, pr.currentOffset) + glog.V(3).Infof("[%s] Updating currentOffset for %s[%d]: %d -> %d", + pr.connCtx.ConnectionID, pr.topicName, pr.partitionID, pr.currentOffset, req.requestedOffset) pr.currentOffset = req.requestedOffset } pr.bufferMu.Unlock() // Fetch on-demand - no pre-fetching to avoid overwhelming the broker - // Pass the requested offset and maxWaitMs directly to avoid race conditions recordBatch, newOffset := pr.readRecords(ctx, req.requestedOffset, req.maxBytes, req.maxWaitMs, hwm) - if len(recordBatch) > 0 && newOffset > pr.currentOffset { + + // Log what we got back + if len(recordBatch) == 0 { + glog.V(2).Infof("[%s] FETCH %s[%d]: readRecords returned EMPTY (offset=%d, hwm=%d)", + pr.connCtx.ConnectionID, pr.topicName, pr.partitionID, req.requestedOffset, hwm) + result.recordBatch = []byte{} + } else { + glog.V(2).Infof("[%s] FETCH %s[%d]: readRecords returned data (offset=%d->%d, hwm=%d, bytes=%d)", + pr.connCtx.ConnectionID, pr.topicName, pr.partitionID, req.requestedOffset, newOffset, hwm, len(recordBatch)) result.recordBatch = recordBatch pr.bufferMu.Lock() pr.currentOffset = newOffset pr.bufferMu.Unlock() - glog.V(4).Infof("[%s] On-demand fetch for %s[%d]: offset %d->%d, %d bytes", - pr.connCtx.ConnectionID, pr.topicName, pr.partitionID, - req.requestedOffset, newOffset, len(recordBatch)) - } else { - result.recordBatch = []byte{} } }