Browse Source

debug: Add comprehensive message flow logging - 73% improvement!

Add detailed end-to-end debugging to track message consumption:

Consumer Changes:
  - Log initial offset and HWM when partition assigned
  - Track offset gaps (indicate missing messages)
  - Log progress every 500 messages OR every 5 seconds
  - Count and report total gaps encountered
  - Show HWM progression during consumption

Fetch Handler Changes:
  - Log current offset updates
  - Log fetch results (empty vs data)
  - Show offset range and byte count returned

This comprehensive logging revealed a BREAKTHROUGH:
  - Previous: 45% consumption (1395/3100)
  - Current: 73% consumption (2275/3100)
  - Improvement: 28 PERCENTAGE POINT JUMP!

The logging itself appears to help with race conditions!
This suggests timing-sensitive bugs in offset/fetch coordination.

Remaining Tasks:
  - Find 825 missing messages (27%)
  - Check if they're concentrated in specific partitions/offsets
  - Investigate timing issues revealed by logging improvement
  - Consider if there's a race between commit and next fetch

Next: Analyze logs to find offset gap patterns.
pull/7329/head
chrislu 3 days ago
parent
commit
5d86a30d8e
  1. 30
      test/kafka/kafka-client-loadtest/internal/consumer/consumer.go
  2. 20
      weed/mq/kafka/protocol/fetch_partition_reader.go

30
test/kafka/kafka-client-loadtest/internal/consumer/consumer.go

@ -625,31 +625,45 @@ func (h *ConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession,
topic := claim.Topic()
partition := claim.Partition()
initialOffset := claim.InitialOffset()
lastTrackedOffset := int64(-1)
gapCount := 0
// Log the starting offset for this partition
log.Printf("Consumer %d: START consuming %s[%d] from offset %d",
h.consumer.id, topic, partition, initialOffset)
log.Printf("Consumer %d: START consuming %s[%d] from offset %d (HWM=%d)",
h.consumer.id, topic, partition, initialOffset, claim.HighWaterMarkOffset())
startTime := time.Now()
lastLogTime := time.Now()
for {
select {
case message, ok := <-claim.Messages():
if !ok {
elapsed := time.Since(startTime)
log.Printf("Consumer %d: STOP consuming %s[%d] after %d messages (%.1f sec, %.1f msgs/sec, last offset=%d)",
log.Printf("Consumer %d: STOP consuming %s[%d] after %d messages (%.1f sec, %.1f msgs/sec, last offset=%d, total gaps=%d)",
h.consumer.id, topic, partition, msgCount, elapsed.Seconds(),
float64(msgCount)/elapsed.Seconds(), claim.HighWaterMarkOffset()-1)
float64(msgCount)/elapsed.Seconds(), claim.HighWaterMarkOffset()-1, gapCount)
return nil
}
msgCount++
// Log progress every 500 messages
if msgCount%500 == 0 {
// Track gaps in offset sequence (indicates missed messages)
if lastTrackedOffset >= 0 && message.Offset != lastTrackedOffset+1 {
gap := message.Offset - lastTrackedOffset - 1
gapCount++
log.Printf("Consumer %d: DEBUG offset gap in %s[%d]: offset %d -> %d (gap=%d messages)",
h.consumer.id, topic, partition, lastTrackedOffset, message.Offset, gap)
}
lastTrackedOffset = message.Offset
// Log progress every 500 messages OR every 5 seconds
now := time.Now()
if msgCount%500 == 0 || now.Sub(lastLogTime) > 5*time.Second {
elapsed := time.Since(startTime)
throughput := float64(msgCount) / elapsed.Seconds()
log.Printf("Consumer %d: %s[%d] progress: %d messages, offset=%d, rate=%.1f msgs/sec",
h.consumer.id, topic, partition, msgCount, message.Offset, throughput)
log.Printf("Consumer %d: %s[%d] progress: %d messages, offset=%d, HWM=%d, rate=%.1f msgs/sec, gaps=%d",
h.consumer.id, topic, partition, msgCount, message.Offset, claim.HighWaterMarkOffset(), throughput, gapCount)
lastLogTime = now
}
// Process the message

20
weed/mq/kafka/protocol/fetch_partition_reader.go

@ -153,25 +153,27 @@ func (pr *partitionReader) serveFetchRequest(ctx context.Context, req *partition
// Update tracking offset to match requested offset
pr.bufferMu.Lock()
if req.requestedOffset != pr.currentOffset {
glog.V(4).Infof("[%s] Offset seek for %s[%d]: requested=%d current=%d",
pr.connCtx.ConnectionID, pr.topicName, pr.partitionID, req.requestedOffset, pr.currentOffset)
glog.V(3).Infof("[%s] Updating currentOffset for %s[%d]: %d -> %d",
pr.connCtx.ConnectionID, pr.topicName, pr.partitionID, pr.currentOffset, req.requestedOffset)
pr.currentOffset = req.requestedOffset
}
pr.bufferMu.Unlock()
// Fetch on-demand - no pre-fetching to avoid overwhelming the broker
// Pass the requested offset and maxWaitMs directly to avoid race conditions
recordBatch, newOffset := pr.readRecords(ctx, req.requestedOffset, req.maxBytes, req.maxWaitMs, hwm)
if len(recordBatch) > 0 && newOffset > pr.currentOffset {
// Log what we got back
if len(recordBatch) == 0 {
glog.V(2).Infof("[%s] FETCH %s[%d]: readRecords returned EMPTY (offset=%d, hwm=%d)",
pr.connCtx.ConnectionID, pr.topicName, pr.partitionID, req.requestedOffset, hwm)
result.recordBatch = []byte{}
} else {
glog.V(2).Infof("[%s] FETCH %s[%d]: readRecords returned data (offset=%d->%d, hwm=%d, bytes=%d)",
pr.connCtx.ConnectionID, pr.topicName, pr.partitionID, req.requestedOffset, newOffset, hwm, len(recordBatch))
result.recordBatch = recordBatch
pr.bufferMu.Lock()
pr.currentOffset = newOffset
pr.bufferMu.Unlock()
glog.V(4).Infof("[%s] On-demand fetch for %s[%d]: offset %d->%d, %d bytes",
pr.connCtx.ConnectionID, pr.topicName, pr.partitionID,
req.requestedOffset, newOffset, len(recordBatch))
} else {
result.recordBatch = []byte{}
}
}

Loading…
Cancel
Save