Browse Source

feat: Add detailed logging for offset tracking and partition assignment

pull/7329/head
chrislu 4 days ago
parent
commit
851fe6f69e
  1. 56
      test/kafka/kafka-client-loadtest/internal/consumer/consumer.go

56
test/kafka/kafka-client-loadtest/internal/consumer/consumer.go

@ -584,8 +584,24 @@ type ConsumerGroupHandler struct {
}
// Setup is run at the beginning of a new session, before ConsumeClaim
func (h *ConsumerGroupHandler) Setup(sarama.ConsumerGroupSession) error {
func (h *ConsumerGroupHandler) Setup(session sarama.ConsumerGroupSession) error {
log.Printf("Consumer %d: Consumer group session setup", h.consumer.id)
// Log the generation ID and member ID for this session
log.Printf("Consumer %d: Generation=%d, MemberID=%s",
h.consumer.id, session.GenerationID(), session.MemberID())
// Log all assigned partitions and their starting offsets
assignments := session.Claims()
totalPartitions := 0
for topic, partitions := range assignments {
for _, partition := range partitions {
totalPartitions++
log.Printf("Consumer %d: ASSIGNED %s[%d]",
h.consumer.id, topic, partition)
}
}
log.Printf("Consumer %d: Total partitions assigned: %d", h.consumer.id, totalPartitions)
return nil
}
@ -606,13 +622,35 @@ func (h *ConsumerGroupHandler) Cleanup(session sarama.ConsumerGroupSession) erro
// ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages()
func (h *ConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
msgCount := 0
topic := claim.Topic()
partition := claim.Partition()
initialOffset := claim.InitialOffset()
// Log the starting offset for this partition
log.Printf("Consumer %d: START consuming %s[%d] from offset %d",
h.consumer.id, topic, partition, initialOffset)
startTime := time.Now()
for {
select {
case message, ok := <-claim.Messages():
if !ok {
elapsed := time.Since(startTime)
log.Printf("Consumer %d: STOP consuming %s[%d] after %d messages (%.1f sec, %.1f msgs/sec, last offset=%d)",
h.consumer.id, topic, partition, msgCount, elapsed.Seconds(),
float64(msgCount)/elapsed.Seconds(), claim.HighWaterMarkOffset()-1)
return nil
}
msgCount++
// Log progress every 500 messages
if msgCount%500 == 0 {
elapsed := time.Since(startTime)
throughput := float64(msgCount) / elapsed.Seconds()
log.Printf("Consumer %d: %s[%d] progress: %d messages, offset=%d, rate=%.1f msgs/sec",
h.consumer.id, topic, partition, msgCount, message.Offset, throughput)
}
// Process the message
var key []byte
@ -621,16 +659,9 @@ func (h *ConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession,
}
if err := h.consumer.processMessage(&message.Topic, message.Partition, message.Offset, key, message.Value); err != nil {
log.Printf("Consumer %d: Error processing message: %v", h.consumer.id, err)
log.Printf("Consumer %d: Error processing message at %s[%d]@%d: %v",
h.consumer.id, message.Topic, message.Partition, message.Offset, err)
h.consumer.metricsCollector.RecordConsumerError()
// Add a small delay for schema validation or other processing errors to avoid overloading
// select {
// case <-time.After(100 * time.Millisecond):
// // Continue after brief delay
// case <-session.Context().Done():
// return nil
// }
} else {
// Track consumed message
if h.consumer.tracker != nil {
@ -658,8 +689,9 @@ func (h *ConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession,
}
case <-session.Context().Done():
log.Printf("Consumer %d: Session context cancelled for %s[%d], committing final offsets",
h.consumer.id, claim.Topic(), claim.Partition())
elapsed := time.Since(startTime)
log.Printf("Consumer %d: Session context cancelled for %s[%d], committing final offsets (consumed %d messages in %.1f sec, last offset=%d)",
h.consumer.id, claim.Topic(), claim.Partition(), msgCount, elapsed.Seconds(), claim.HighWaterMarkOffset()-1)
// Commit all remaining marked offsets before shutting down
session.Commit()
return nil

Loading…
Cancel
Save