From 851fe6f69e59a509e671ccf1c82bf64aed349b3d Mon Sep 17 00:00:00 2001 From: chrislu Date: Fri, 17 Oct 2025 00:42:03 -0700 Subject: [PATCH] feat: Add detailed logging for offset tracking and partition assignment --- .../internal/consumer/consumer.go | 56 +++++++++++++++---- 1 file changed, 44 insertions(+), 12 deletions(-) diff --git a/test/kafka/kafka-client-loadtest/internal/consumer/consumer.go b/test/kafka/kafka-client-loadtest/internal/consumer/consumer.go index ed266091d..844da7c2a 100644 --- a/test/kafka/kafka-client-loadtest/internal/consumer/consumer.go +++ b/test/kafka/kafka-client-loadtest/internal/consumer/consumer.go @@ -584,8 +584,24 @@ type ConsumerGroupHandler struct { } // Setup is run at the beginning of a new session, before ConsumeClaim -func (h *ConsumerGroupHandler) Setup(sarama.ConsumerGroupSession) error { +func (h *ConsumerGroupHandler) Setup(session sarama.ConsumerGroupSession) error { log.Printf("Consumer %d: Consumer group session setup", h.consumer.id) + + // Log the generation ID and member ID for this session + log.Printf("Consumer %d: Generation=%d, MemberID=%s", + h.consumer.id, session.GenerationID(), session.MemberID()) + + // Log all assigned partitions and their starting offsets + assignments := session.Claims() + totalPartitions := 0 + for topic, partitions := range assignments { + for _, partition := range partitions { + totalPartitions++ + log.Printf("Consumer %d: ASSIGNED %s[%d]", + h.consumer.id, topic, partition) + } + } + log.Printf("Consumer %d: Total partitions assigned: %d", h.consumer.id, totalPartitions) return nil } @@ -606,13 +622,35 @@ func (h *ConsumerGroupHandler) Cleanup(session sarama.ConsumerGroupSession) erro // ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages() func (h *ConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { msgCount := 0 + topic := claim.Topic() + partition := claim.Partition() + initialOffset := claim.InitialOffset() + + // Log the starting offset for this partition + log.Printf("Consumer %d: START consuming %s[%d] from offset %d", + h.consumer.id, topic, partition, initialOffset) + + startTime := time.Now() + for { select { case message, ok := <-claim.Messages(): if !ok { + elapsed := time.Since(startTime) + log.Printf("Consumer %d: STOP consuming %s[%d] after %d messages (%.1f sec, %.1f msgs/sec, last offset=%d)", + h.consumer.id, topic, partition, msgCount, elapsed.Seconds(), + float64(msgCount)/elapsed.Seconds(), claim.HighWaterMarkOffset()-1) return nil } msgCount++ + + // Log progress every 500 messages + if msgCount%500 == 0 { + elapsed := time.Since(startTime) + throughput := float64(msgCount) / elapsed.Seconds() + log.Printf("Consumer %d: %s[%d] progress: %d messages, offset=%d, rate=%.1f msgs/sec", + h.consumer.id, topic, partition, msgCount, message.Offset, throughput) + } // Process the message var key []byte @@ -621,16 +659,9 @@ func (h *ConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, } if err := h.consumer.processMessage(&message.Topic, message.Partition, message.Offset, key, message.Value); err != nil { - log.Printf("Consumer %d: Error processing message: %v", h.consumer.id, err) + log.Printf("Consumer %d: Error processing message at %s[%d]@%d: %v", + h.consumer.id, message.Topic, message.Partition, message.Offset, err) h.consumer.metricsCollector.RecordConsumerError() - - // Add a small delay for schema validation or other processing errors to avoid overloading - // select { - // case <-time.After(100 * time.Millisecond): - // // Continue after brief delay - // case <-session.Context().Done(): - // return nil - // } } else { // Track consumed message if h.consumer.tracker != nil { @@ -658,8 +689,9 @@ func (h *ConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, } case <-session.Context().Done(): - log.Printf("Consumer %d: Session context cancelled for %s[%d], committing final offsets", - h.consumer.id, claim.Topic(), claim.Partition()) + elapsed := time.Since(startTime) + log.Printf("Consumer %d: Session context cancelled for %s[%d], committing final offsets (consumed %d messages in %.1f sec, last offset=%d)", + h.consumer.id, claim.Topic(), claim.Partition(), msgCount, elapsed.Seconds(), claim.HighWaterMarkOffset()-1) // Commit all remaining marked offsets before shutting down session.Commit() return nil