diff --git a/test/kafka/kafka-client-loadtest/internal/consumer/consumer.go b/test/kafka/kafka-client-loadtest/internal/consumer/consumer.go index 3dd7e24c8..36947d704 100644 --- a/test/kafka/kafka-client-loadtest/internal/consumer/consumer.go +++ b/test/kafka/kafka-client-loadtest/internal/consumer/consumer.go @@ -647,9 +647,16 @@ func (h *ConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, if len(gaps) > 0 { gapSummary = fmt.Sprintf("[%s]", strings.Join(gaps, ", ")) } - log.Printf("Consumer %d: STOP consuming %s[%d] after %d messages (%.1f sec, %.1f msgs/sec, last offset=%d, total gaps=%d %s)", - h.consumer.id, topic, partition, msgCount, elapsed.Seconds(), - float64(msgCount)/elapsed.Seconds(), claim.HighWaterMarkOffset()-1, gapCount, gapSummary) + + // Check if we consumed just a few messages before stopping + if msgCount <= 10 { + log.Printf("Consumer %d: CRITICAL - Messages() channel CLOSED early on %s[%d] after only %d messages at offset=%d (HWM=%d, gaps=%d %s)", + h.consumer.id, topic, partition, msgCount, lastTrackedOffset, claim.HighWaterMarkOffset()-1, gapCount, gapSummary) + } else { + log.Printf("Consumer %d: STOP consuming %s[%d] after %d messages (%.1f sec, %.1f msgs/sec, last offset=%d, HWM=%d, gaps=%d %s)", + h.consumer.id, topic, partition, msgCount, elapsed.Seconds(), + float64(msgCount)/elapsed.Seconds(), lastTrackedOffset, claim.HighWaterMarkOffset()-1, gapCount, gapSummary) + } return nil } msgCount++