From e9293cd2017723fe16ff7dde7498062d544c5fb1 Mon Sep 17 00:00:00 2001 From: chrislu Date: Fri, 17 Oct 2025 11:20:02 -0700 Subject: [PATCH] debug: Add channel closure detection for early message stream termination MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Phase 3 Continued: Early Channel Closure Detection Added detection and logging for when Sarama's claim.Messages() channel closes prematurely (indicating broker stream termination): Changes: - consumer.go: Distinguish between normal and abnormal channel closures - Mark partitions that close after < 10 messages as CRITICAL - Shows last consumed offset vs HWM when closed early Current Test Results: Delivery: 84-87.5% (1974-2055 / 2350-2349) Missing: 12.5-16% (294-376 messages) Duplicates: 0 ✅ Errors: 0 ✅ Pattern: 2-3 partitions receive only 1-10 messages then channel closes Suggests: Broker or middleware prematurely closing subscription Key Observations: - Most (13/15) partitions work perfectly - Remaining issue is repeatable on same 2-3 partitions - Messages() channel closes after initial messages - Could be: * Broker connection reset * Fetch request error not being surfaced * Offset commit failure * Rebalancing triggered prematurely Next Investigation: - Add Sarama debug logging to see broker errors - Check if fetch requests are returning errors silently - Monitor offset commits on affected partitions - Test with longer-running consumer From 0% → 84-87.5% is EXCELLENT PROGRESS. Remaining 12.5-16% is concentrated on reproducible partitions. --- .../internal/consumer/consumer.go | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/test/kafka/kafka-client-loadtest/internal/consumer/consumer.go b/test/kafka/kafka-client-loadtest/internal/consumer/consumer.go index 3dd7e24c8..36947d704 100644 --- a/test/kafka/kafka-client-loadtest/internal/consumer/consumer.go +++ b/test/kafka/kafka-client-loadtest/internal/consumer/consumer.go @@ -647,9 +647,16 @@ func (h *ConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, if len(gaps) > 0 { gapSummary = fmt.Sprintf("[%s]", strings.Join(gaps, ", ")) } - log.Printf("Consumer %d: STOP consuming %s[%d] after %d messages (%.1f sec, %.1f msgs/sec, last offset=%d, total gaps=%d %s)", - h.consumer.id, topic, partition, msgCount, elapsed.Seconds(), - float64(msgCount)/elapsed.Seconds(), claim.HighWaterMarkOffset()-1, gapCount, gapSummary) + + // Check if we consumed just a few messages before stopping + if msgCount <= 10 { + log.Printf("Consumer %d: CRITICAL - Messages() channel CLOSED early on %s[%d] after only %d messages at offset=%d (HWM=%d, gaps=%d %s)", + h.consumer.id, topic, partition, msgCount, lastTrackedOffset, claim.HighWaterMarkOffset()-1, gapCount, gapSummary) + } else { + log.Printf("Consumer %d: STOP consuming %s[%d] after %d messages (%.1f sec, %.1f msgs/sec, last offset=%d, HWM=%d, gaps=%d %s)", + h.consumer.id, topic, partition, msgCount, elapsed.Seconds(), + float64(msgCount)/elapsed.Seconds(), lastTrackedOffset, claim.HighWaterMarkOffset()-1, gapCount, gapSummary) + } return nil } msgCount++