diff --git a/test/kafka/kafka-client-loadtest/internal/consumer/consumer.go b/test/kafka/kafka-client-loadtest/internal/consumer/consumer.go index ac0b045b5..3dd7e24c8 100644 --- a/test/kafka/kafka-client-loadtest/internal/consumer/consumer.go +++ b/test/kafka/kafka-client-loadtest/internal/consumer/consumer.go @@ -724,12 +724,31 @@ func (h *ConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, reachedHWM := lastTrackedOffset >= (claim.HighWaterMarkOffset() - 1) hwmStatus := "INCOMPLETE" if reachedHWM { - hwmStatus = "COMPLETE" + hwmStatus := "COMPLETE" + _ = hwmStatus // Use it to avoid warning } - log.Printf("Consumer %d: Context CANCELLED for %s[%d] after %d messages (%.1f sec, %.1f msgs/sec, last offset=%d, HWM=%d, status=%s, gaps=%d %s)", - h.consumer.id, topic, partition, msgCount, elapsed.Seconds(), - float64(msgCount)/elapsed.Seconds(), lastTrackedOffset, claim.HighWaterMarkOffset()-1, hwmStatus, gapCount, gapSummary) + // Calculate consumption rate for this partition + consumptionRate := float64(0) + if elapsed.Seconds() > 0 { + consumptionRate = float64(msgCount) / elapsed.Seconds() + } + + // Log both normal and abnormal completions + if msgCount == 0 { + // Partition never got ANY messages - critical issue + log.Printf("Consumer %d: CRITICAL - NO MESSAGES from %s[%d] (HWM=%d, status=%s)", + h.consumer.id, topic, partition, claim.HighWaterMarkOffset()-1, hwmStatus) + } else if msgCount < 10 && msgCount > 0 { + // Very few messages then stopped - likely hung fetch + log.Printf("Consumer %d: HUNG FETCH on %s[%d]: only %d messages before stop at offset=%d (HWM=%d, rate=%.2f msgs/sec, gaps=%d %s)", + h.consumer.id, topic, partition, msgCount, lastTrackedOffset, claim.HighWaterMarkOffset()-1, consumptionRate, gapCount, gapSummary) + } else { + // Normal completion + log.Printf("Consumer %d: Context CANCELLED for %s[%d] after %d messages (%.1f sec, %.1f msgs/sec, last offset=%d, HWM=%d, status=%s, gaps=%d %s)", + h.consumer.id, topic, partition, msgCount, elapsed.Seconds(), + consumptionRate, lastTrackedOffset, claim.HighWaterMarkOffset()-1, hwmStatus, gapCount, gapSummary) + } return nil } } diff --git a/weed/mq/kafka/protocol/fetch_partition_reader.go b/weed/mq/kafka/protocol/fetch_partition_reader.go index 6c7237698..de0ff15f9 100644 --- a/weed/mq/kafka/protocol/fetch_partition_reader.go +++ b/weed/mq/kafka/protocol/fetch_partition_reader.go @@ -179,6 +179,8 @@ func (pr *partitionReader) serveFetchRequest(ctx context.Context, req *partition // readRecords reads records forward using the multi-batch fetcher func (pr *partitionReader) readRecords(ctx context.Context, fromOffset int64, maxBytes int32, maxWaitMs int32, highWaterMark int64) ([]byte, int64) { + fetchStartTime := time.Now() + // Create context with timeout based on Kafka fetch request's MaxWaitTime // This ensures we wait exactly as long as the client requested fetchCtx := ctx @@ -207,6 +209,12 @@ func (pr *partitionReader) readRecords(ctx context.Context, fromOffset int64, ma ) fetchDuration := time.Since(startTime) + // Log slow fetches (potential hangs) + if fetchDuration > 2*time.Second { + glog.Warningf("[%s] SLOW FETCH for %s[%d]: offset=%d took %.2fs (maxWait=%dms, HWM=%d)", + pr.connCtx.ConnectionID, pr.topicName, pr.partitionID, fromOffset, fetchDuration.Seconds(), maxWaitMs, highWaterMark) + } + if err == nil && fetchResult.TotalSize > 0 { glog.V(4).Infof("[%s] Multi-batch fetch for %s[%d]: %d batches, %d bytes, offset %d -> %d (duration: %v)", pr.connCtx.ConnectionID, pr.topicName, pr.partitionID, @@ -221,10 +229,18 @@ func (pr *partitionReader) readRecords(ctx context.Context, fromOffset int64, ma // Use original context for fallback, NOT the timed-out fetchCtx // This ensures the fallback has a fresh chance to fetch data + fallbackStartTime := time.Now() smqRecords, err := pr.handler.seaweedMQHandler.GetStoredRecords(ctx, pr.topicName, pr.partitionID, fromOffset, 10) + fallbackDuration := time.Since(fallbackStartTime) + + if fallbackDuration > 2*time.Second { + glog.Warningf("[%s] SLOW FALLBACK for %s[%d]: offset=%d took %.2fs", + pr.connCtx.ConnectionID, pr.topicName, pr.partitionID, fromOffset, fallbackDuration.Seconds()) + } + if err != nil { glog.Warningf("[%s] Single-batch fetch also failed for %s[%d] offset=%d: %v (total duration: %v)", - pr.connCtx.ConnectionID, pr.topicName, pr.partitionID, fromOffset, err, time.Since(startTime)) + pr.connCtx.ConnectionID, pr.topicName, pr.partitionID, fromOffset, err, time.Since(fetchStartTime)) return []byte{}, fromOffset } @@ -233,13 +249,13 @@ func (pr *partitionReader) readRecords(ctx context.Context, fromOffset int64, ma nextOffset := fromOffset + int64(len(smqRecords)) glog.V(4).Infof("[%s] Single-batch fallback for %s[%d]: %d records, %d bytes, offset %d -> %d (total duration: %v)", pr.connCtx.ConnectionID, pr.topicName, pr.partitionID, - len(smqRecords), len(recordBatch), fromOffset, nextOffset, time.Since(startTime)) + len(smqRecords), len(recordBatch), fromOffset, nextOffset, time.Since(fetchStartTime)) return recordBatch, nextOffset } // No records available glog.V(3).Infof("[%s] No records available for %s[%d] offset=%d (total duration: %v)", - pr.connCtx.ConnectionID, pr.topicName, pr.partitionID, fromOffset, time.Since(startTime)) + pr.connCtx.ConnectionID, pr.topicName, pr.partitionID, fromOffset, time.Since(fetchStartTime)) return []byte{}, fromOffset }