|
|
|
@ -587,18 +587,18 @@ type ConsumerGroupHandler struct { |
|
|
|
// Setup is run at the beginning of a new session, before ConsumeClaim
|
|
|
|
func (h *ConsumerGroupHandler) Setup(session sarama.ConsumerGroupSession) error { |
|
|
|
log.Printf("Consumer %d: Consumer group session setup", h.consumer.id) |
|
|
|
|
|
|
|
|
|
|
|
// Log the generation ID and member ID for this session
|
|
|
|
log.Printf("Consumer %d: Generation=%d, MemberID=%s", |
|
|
|
log.Printf("Consumer %d: Generation=%d, MemberID=%s", |
|
|
|
h.consumer.id, session.GenerationID(), session.MemberID()) |
|
|
|
|
|
|
|
|
|
|
|
// Log all assigned partitions and their starting offsets
|
|
|
|
assignments := session.Claims() |
|
|
|
totalPartitions := 0 |
|
|
|
for topic, partitions := range assignments { |
|
|
|
for _, partition := range partitions { |
|
|
|
totalPartitions++ |
|
|
|
log.Printf("Consumer %d: ASSIGNED %s[%d]", |
|
|
|
log.Printf("Consumer %d: ASSIGNED %s[%d]", |
|
|
|
h.consumer.id, topic, partition) |
|
|
|
} |
|
|
|
} |
|
|
|
@ -629,14 +629,14 @@ func (h *ConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, |
|
|
|
lastTrackedOffset := int64(-1) |
|
|
|
gapCount := 0 |
|
|
|
var gaps []string // Track gap ranges for detailed analysis
|
|
|
|
|
|
|
|
|
|
|
|
// Log the starting offset for this partition
|
|
|
|
log.Printf("Consumer %d: START consuming %s[%d] from offset %d (HWM=%d)", |
|
|
|
log.Printf("Consumer %d: START consuming %s[%d] from offset %d (HWM=%d)", |
|
|
|
h.consumer.id, topic, partition, initialOffset, claim.HighWaterMarkOffset()) |
|
|
|
|
|
|
|
|
|
|
|
startTime := time.Now() |
|
|
|
lastLogTime := time.Now() |
|
|
|
|
|
|
|
|
|
|
|
for { |
|
|
|
select { |
|
|
|
case message, ok := <-claim.Messages(): |
|
|
|
@ -647,20 +647,20 @@ func (h *ConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, |
|
|
|
if len(gaps) > 0 { |
|
|
|
gapSummary = fmt.Sprintf("[%s]", strings.Join(gaps, ", ")) |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
// Check if we consumed just a few messages before stopping
|
|
|
|
if msgCount <= 10 { |
|
|
|
log.Printf("Consumer %d: CRITICAL - Messages() channel CLOSED early on %s[%d] after only %d messages at offset=%d (HWM=%d, gaps=%d %s)", |
|
|
|
log.Printf("Consumer %d: CRITICAL - Messages() channel CLOSED early on %s[%d] after only %d messages at offset=%d (HWM=%d, gaps=%d %s)", |
|
|
|
h.consumer.id, topic, partition, msgCount, lastTrackedOffset, claim.HighWaterMarkOffset()-1, gapCount, gapSummary) |
|
|
|
} else { |
|
|
|
log.Printf("Consumer %d: STOP consuming %s[%d] after %d messages (%.1f sec, %.1f msgs/sec, last offset=%d, HWM=%d, gaps=%d %s)", |
|
|
|
h.consumer.id, topic, partition, msgCount, elapsed.Seconds(), |
|
|
|
log.Printf("Consumer %d: STOP consuming %s[%d] after %d messages (%.1f sec, %.1f msgs/sec, last offset=%d, HWM=%d, gaps=%d %s)", |
|
|
|
h.consumer.id, topic, partition, msgCount, elapsed.Seconds(), |
|
|
|
float64(msgCount)/elapsed.Seconds(), lastTrackedOffset, claim.HighWaterMarkOffset()-1, gapCount, gapSummary) |
|
|
|
} |
|
|
|
return nil |
|
|
|
} |
|
|
|
msgCount++ |
|
|
|
|
|
|
|
|
|
|
|
// Track gaps in offset sequence (indicates missed messages)
|
|
|
|
if lastTrackedOffset >= 0 && message.Offset != lastTrackedOffset+1 { |
|
|
|
gap := message.Offset - lastTrackedOffset - 1 |
|
|
|
@ -668,17 +668,17 @@ func (h *ConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, |
|
|
|
gapDesc := fmt.Sprintf("%d-%d", lastTrackedOffset+1, message.Offset-1) |
|
|
|
gaps = append(gaps, gapDesc) |
|
|
|
elapsed := time.Since(startTime) |
|
|
|
log.Printf("Consumer %d: DEBUG offset gap in %s[%d] at %.1fs: offset %d -> %d (gap=%d messages, gapDesc=%s)", |
|
|
|
log.Printf("Consumer %d: DEBUG offset gap in %s[%d] at %.1fs: offset %d -> %d (gap=%d messages, gapDesc=%s)", |
|
|
|
h.consumer.id, topic, partition, elapsed.Seconds(), lastTrackedOffset, message.Offset, gap, gapDesc) |
|
|
|
} |
|
|
|
lastTrackedOffset = message.Offset |
|
|
|
|
|
|
|
|
|
|
|
// Log progress every 500 messages OR every 5 seconds
|
|
|
|
now := time.Now() |
|
|
|
if msgCount%500 == 0 || now.Sub(lastLogTime) > 5*time.Second { |
|
|
|
elapsed := time.Since(startTime) |
|
|
|
throughput := float64(msgCount) / elapsed.Seconds() |
|
|
|
log.Printf("Consumer %d: %s[%d] progress: %d messages, offset=%d, HWM=%d, rate=%.1f msgs/sec, gaps=%d", |
|
|
|
log.Printf("Consumer %d: %s[%d] progress: %d messages, offset=%d, HWM=%d, rate=%.1f msgs/sec, gaps=%d", |
|
|
|
h.consumer.id, topic, partition, msgCount, message.Offset, claim.HighWaterMarkOffset(), throughput, gapCount) |
|
|
|
lastLogTime = now |
|
|
|
} |
|
|
|
@ -690,7 +690,7 @@ func (h *ConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, |
|
|
|
} |
|
|
|
|
|
|
|
if err := h.consumer.processMessage(&message.Topic, message.Partition, message.Offset, key, message.Value); err != nil { |
|
|
|
log.Printf("Consumer %d: Error processing message at %s[%d]@%d: %v", |
|
|
|
log.Printf("Consumer %d: Error processing message at %s[%d]@%d: %v", |
|
|
|
h.consumer.id, message.Topic, message.Partition, message.Offset, err) |
|
|
|
h.consumer.metricsCollector.RecordConsumerError() |
|
|
|
} else { |
|
|
|
@ -726,33 +726,33 @@ func (h *ConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, |
|
|
|
if len(gaps) > 0 { |
|
|
|
gapSummary = fmt.Sprintf("[%s]", strings.Join(gaps, ", ")) |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
// Determine if we reached HWM
|
|
|
|
reachedHWM := lastTrackedOffset >= (claim.HighWaterMarkOffset() - 1) |
|
|
|
reachedHWM := lastTrackedOffset >= lastOffset |
|
|
|
hwmStatus := "INCOMPLETE" |
|
|
|
if reachedHWM { |
|
|
|
hwmStatus := "COMPLETE" |
|
|
|
_ = hwmStatus // Use it to avoid warning
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
// Calculate consumption rate for this partition
|
|
|
|
consumptionRate := float64(0) |
|
|
|
if elapsed.Seconds() > 0 { |
|
|
|
consumptionRate = float64(msgCount) / elapsed.Seconds() |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
// Log both normal and abnormal completions
|
|
|
|
if msgCount == 0 { |
|
|
|
// Partition never got ANY messages - critical issue
|
|
|
|
log.Printf("Consumer %d: CRITICAL - NO MESSAGES from %s[%d] (HWM=%d, status=%s)", |
|
|
|
log.Printf("Consumer %d: CRITICAL - NO MESSAGES from %s[%d] (HWM=%d, status=%s)", |
|
|
|
h.consumer.id, topic, partition, claim.HighWaterMarkOffset()-1, hwmStatus) |
|
|
|
} else if msgCount < 10 && msgCount > 0 { |
|
|
|
// Very few messages then stopped - likely hung fetch
|
|
|
|
log.Printf("Consumer %d: HUNG FETCH on %s[%d]: only %d messages before stop at offset=%d (HWM=%d, rate=%.2f msgs/sec, gaps=%d %s)", |
|
|
|
log.Printf("Consumer %d: HUNG FETCH on %s[%d]: only %d messages before stop at offset=%d (HWM=%d, rate=%.2f msgs/sec, gaps=%d %s)", |
|
|
|
h.consumer.id, topic, partition, msgCount, lastTrackedOffset, claim.HighWaterMarkOffset()-1, consumptionRate, gapCount, gapSummary) |
|
|
|
} else { |
|
|
|
// Normal completion
|
|
|
|
log.Printf("Consumer %d: Context CANCELLED for %s[%d] after %d messages (%.1f sec, %.1f msgs/sec, last offset=%d, HWM=%d, status=%s, gaps=%d %s)", |
|
|
|
log.Printf("Consumer %d: Context CANCELLED for %s[%d] after %d messages (%.1f sec, %.1f msgs/sec, last offset=%d, HWM=%d, status=%s, gaps=%d %s)", |
|
|
|
h.consumer.id, topic, partition, msgCount, elapsed.Seconds(), |
|
|
|
consumptionRate, lastTrackedOffset, claim.HighWaterMarkOffset()-1, hwmStatus, gapCount, gapSummary) |
|
|
|
} |
|
|
|
|