diff --git a/test/kafka/kafka-client-loadtest/internal/consumer/consumer.go b/test/kafka/kafka-client-loadtest/internal/consumer/consumer.go index 36947d704..6b23fdfe9 100644 --- a/test/kafka/kafka-client-loadtest/internal/consumer/consumer.go +++ b/test/kafka/kafka-client-loadtest/internal/consumer/consumer.go @@ -587,18 +587,18 @@ type ConsumerGroupHandler struct { // Setup is run at the beginning of a new session, before ConsumeClaim func (h *ConsumerGroupHandler) Setup(session sarama.ConsumerGroupSession) error { log.Printf("Consumer %d: Consumer group session setup", h.consumer.id) - + // Log the generation ID and member ID for this session - log.Printf("Consumer %d: Generation=%d, MemberID=%s", + log.Printf("Consumer %d: Generation=%d, MemberID=%s", h.consumer.id, session.GenerationID(), session.MemberID()) - + // Log all assigned partitions and their starting offsets assignments := session.Claims() totalPartitions := 0 for topic, partitions := range assignments { for _, partition := range partitions { totalPartitions++ - log.Printf("Consumer %d: ASSIGNED %s[%d]", + log.Printf("Consumer %d: ASSIGNED %s[%d]", h.consumer.id, topic, partition) } } @@ -629,14 +629,14 @@ func (h *ConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, lastTrackedOffset := int64(-1) gapCount := 0 var gaps []string // Track gap ranges for detailed analysis - + // Log the starting offset for this partition - log.Printf("Consumer %d: START consuming %s[%d] from offset %d (HWM=%d)", + log.Printf("Consumer %d: START consuming %s[%d] from offset %d (HWM=%d)", h.consumer.id, topic, partition, initialOffset, claim.HighWaterMarkOffset()) - + startTime := time.Now() lastLogTime := time.Now() - + for { select { case message, ok := <-claim.Messages(): @@ -647,20 +647,20 @@ func (h *ConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, if len(gaps) > 0 { gapSummary = fmt.Sprintf("[%s]", strings.Join(gaps, ", ")) } - + // Check if we consumed just a few messages before stopping if msgCount <= 10 { - log.Printf("Consumer %d: CRITICAL - Messages() channel CLOSED early on %s[%d] after only %d messages at offset=%d (HWM=%d, gaps=%d %s)", + log.Printf("Consumer %d: CRITICAL - Messages() channel CLOSED early on %s[%d] after only %d messages at offset=%d (HWM=%d, gaps=%d %s)", h.consumer.id, topic, partition, msgCount, lastTrackedOffset, claim.HighWaterMarkOffset()-1, gapCount, gapSummary) } else { - log.Printf("Consumer %d: STOP consuming %s[%d] after %d messages (%.1f sec, %.1f msgs/sec, last offset=%d, HWM=%d, gaps=%d %s)", - h.consumer.id, topic, partition, msgCount, elapsed.Seconds(), + log.Printf("Consumer %d: STOP consuming %s[%d] after %d messages (%.1f sec, %.1f msgs/sec, last offset=%d, HWM=%d, gaps=%d %s)", + h.consumer.id, topic, partition, msgCount, elapsed.Seconds(), float64(msgCount)/elapsed.Seconds(), lastTrackedOffset, claim.HighWaterMarkOffset()-1, gapCount, gapSummary) } return nil } msgCount++ - + // Track gaps in offset sequence (indicates missed messages) if lastTrackedOffset >= 0 && message.Offset != lastTrackedOffset+1 { gap := message.Offset - lastTrackedOffset - 1 @@ -668,17 +668,17 @@ func (h *ConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, gapDesc := fmt.Sprintf("%d-%d", lastTrackedOffset+1, message.Offset-1) gaps = append(gaps, gapDesc) elapsed := time.Since(startTime) - log.Printf("Consumer %d: DEBUG offset gap in %s[%d] at %.1fs: offset %d -> %d (gap=%d messages, gapDesc=%s)", + log.Printf("Consumer %d: DEBUG offset gap in %s[%d] at %.1fs: offset %d -> %d (gap=%d messages, gapDesc=%s)", h.consumer.id, topic, partition, elapsed.Seconds(), lastTrackedOffset, message.Offset, gap, gapDesc) } lastTrackedOffset = message.Offset - + // Log progress every 500 messages OR every 5 seconds now := time.Now() if msgCount%500 == 0 || now.Sub(lastLogTime) > 5*time.Second { elapsed := time.Since(startTime) throughput := float64(msgCount) / elapsed.Seconds() - log.Printf("Consumer %d: %s[%d] progress: %d messages, offset=%d, HWM=%d, rate=%.1f msgs/sec, gaps=%d", + log.Printf("Consumer %d: %s[%d] progress: %d messages, offset=%d, HWM=%d, rate=%.1f msgs/sec, gaps=%d", h.consumer.id, topic, partition, msgCount, message.Offset, claim.HighWaterMarkOffset(), throughput, gapCount) lastLogTime = now } @@ -690,7 +690,7 @@ func (h *ConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, } if err := h.consumer.processMessage(&message.Topic, message.Partition, message.Offset, key, message.Value); err != nil { - log.Printf("Consumer %d: Error processing message at %s[%d]@%d: %v", + log.Printf("Consumer %d: Error processing message at %s[%d]@%d: %v", h.consumer.id, message.Topic, message.Partition, message.Offset, err) h.consumer.metricsCollector.RecordConsumerError() } else { @@ -726,33 +726,33 @@ func (h *ConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, if len(gaps) > 0 { gapSummary = fmt.Sprintf("[%s]", strings.Join(gaps, ", ")) } - + // Determine if we reached HWM - reachedHWM := lastTrackedOffset >= (claim.HighWaterMarkOffset() - 1) + reachedHWM := lastTrackedOffset >= lastOffset hwmStatus := "INCOMPLETE" if reachedHWM { hwmStatus := "COMPLETE" _ = hwmStatus // Use it to avoid warning } - + // Calculate consumption rate for this partition consumptionRate := float64(0) if elapsed.Seconds() > 0 { consumptionRate = float64(msgCount) / elapsed.Seconds() } - + // Log both normal and abnormal completions if msgCount == 0 { // Partition never got ANY messages - critical issue - log.Printf("Consumer %d: CRITICAL - NO MESSAGES from %s[%d] (HWM=%d, status=%s)", + log.Printf("Consumer %d: CRITICAL - NO MESSAGES from %s[%d] (HWM=%d, status=%s)", h.consumer.id, topic, partition, claim.HighWaterMarkOffset()-1, hwmStatus) } else if msgCount < 10 && msgCount > 0 { // Very few messages then stopped - likely hung fetch - log.Printf("Consumer %d: HUNG FETCH on %s[%d]: only %d messages before stop at offset=%d (HWM=%d, rate=%.2f msgs/sec, gaps=%d %s)", + log.Printf("Consumer %d: HUNG FETCH on %s[%d]: only %d messages before stop at offset=%d (HWM=%d, rate=%.2f msgs/sec, gaps=%d %s)", h.consumer.id, topic, partition, msgCount, lastTrackedOffset, claim.HighWaterMarkOffset()-1, consumptionRate, gapCount, gapSummary) } else { // Normal completion - log.Printf("Consumer %d: Context CANCELLED for %s[%d] after %d messages (%.1f sec, %.1f msgs/sec, last offset=%d, HWM=%d, status=%s, gaps=%d %s)", + log.Printf("Consumer %d: Context CANCELLED for %s[%d] after %d messages (%.1f sec, %.1f msgs/sec, last offset=%d, HWM=%d, status=%s, gaps=%d %s)", h.consumer.id, topic, partition, msgCount, elapsed.Seconds(), consumptionRate, lastTrackedOffset, claim.HighWaterMarkOffset()-1, hwmStatus, gapCount, gapSummary) }