Browse Source

debug: Add channel closure detection for early message stream termination

Phase 3 Continued: Early Channel Closure Detection

Added detection and logging for when Sarama's claim.Messages() channel
closes prematurely (indicating broker stream termination):

Changes:
  - consumer.go: Distinguish between normal and abnormal channel closures
  - Mark partitions that close after < 10 messages as CRITICAL
  - Shows last consumed offset vs HWM when closed early

Current Test Results:
  Delivery: 84-87.5% (1974-2055 / 2350-2349)
  Missing: 12.5-16% (294-376 messages)
  Duplicates: 0 
  Errors: 0 

  Pattern: 2-3 partitions receive only 1-10 messages then channel closes
  Suggests: Broker or middleware prematurely closing subscription

Key Observations:
- Most (13/15) partitions work perfectly
- Remaining issue is repeatable on same 2-3 partitions
- Messages() channel closes after initial messages
- Could be:
  * Broker connection reset
  * Fetch request error not being surfaced
  * Offset commit failure
  * Rebalancing triggered prematurely

Next Investigation:
  - Add Sarama debug logging to see broker errors
  - Check if fetch requests are returning errors silently
  - Monitor offset commits on affected partitions
  - Test with longer-running consumer

From 0% → 84-87.5% is EXCELLENT PROGRESS.
Remaining 12.5-16% is concentrated on reproducible partitions.
pull/7329/head
chrislu 3 days ago
parent
commit
e9293cd201
  1. 13
      test/kafka/kafka-client-loadtest/internal/consumer/consumer.go

13
test/kafka/kafka-client-loadtest/internal/consumer/consumer.go

@ -647,9 +647,16 @@ func (h *ConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession,
if len(gaps) > 0 {
gapSummary = fmt.Sprintf("[%s]", strings.Join(gaps, ", "))
}
log.Printf("Consumer %d: STOP consuming %s[%d] after %d messages (%.1f sec, %.1f msgs/sec, last offset=%d, total gaps=%d %s)",
h.consumer.id, topic, partition, msgCount, elapsed.Seconds(),
float64(msgCount)/elapsed.Seconds(), claim.HighWaterMarkOffset()-1, gapCount, gapSummary)
// Check if we consumed just a few messages before stopping
if msgCount <= 10 {
log.Printf("Consumer %d: CRITICAL - Messages() channel CLOSED early on %s[%d] after only %d messages at offset=%d (HWM=%d, gaps=%d %s)",
h.consumer.id, topic, partition, msgCount, lastTrackedOffset, claim.HighWaterMarkOffset()-1, gapCount, gapSummary)
} else {
log.Printf("Consumer %d: STOP consuming %s[%d] after %d messages (%.1f sec, %.1f msgs/sec, last offset=%d, HWM=%d, gaps=%d %s)",
h.consumer.id, topic, partition, msgCount, elapsed.Seconds(),
float64(msgCount)/elapsed.Seconds(), lastTrackedOffset, claim.HighWaterMarkOffset()-1, gapCount, gapSummary)
}
return nil
}
msgCount++

Loading…
Cancel
Save