diff --git a/test/kafka/kafka-client-loadtest/internal/consumer/consumer.go b/test/kafka/kafka-client-loadtest/internal/consumer/consumer.go index e7e6b4fae..ac0b045b5 100644 --- a/test/kafka/kafka-client-loadtest/internal/consumer/consumer.go +++ b/test/kafka/kafka-client-loadtest/internal/consumer/consumer.go @@ -7,6 +7,7 @@ import ( "fmt" "log" "os" + "strings" "sync" "time" @@ -627,6 +628,7 @@ func (h *ConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, initialOffset := claim.InitialOffset() lastTrackedOffset := int64(-1) gapCount := 0 + var gaps []string // Track gap ranges for detailed analysis // Log the starting offset for this partition log.Printf("Consumer %d: START consuming %s[%d] from offset %d (HWM=%d)", @@ -640,9 +642,14 @@ func (h *ConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, case message, ok := <-claim.Messages(): if !ok { elapsed := time.Since(startTime) - log.Printf("Consumer %d: STOP consuming %s[%d] after %d messages (%.1f sec, %.1f msgs/sec, last offset=%d, total gaps=%d)", + // Log detailed gap analysis + gapSummary := "none" + if len(gaps) > 0 { + gapSummary = fmt.Sprintf("[%s]", strings.Join(gaps, ", ")) + } + log.Printf("Consumer %d: STOP consuming %s[%d] after %d messages (%.1f sec, %.1f msgs/sec, last offset=%d, total gaps=%d %s)", h.consumer.id, topic, partition, msgCount, elapsed.Seconds(), - float64(msgCount)/elapsed.Seconds(), claim.HighWaterMarkOffset()-1, gapCount) + float64(msgCount)/elapsed.Seconds(), claim.HighWaterMarkOffset()-1, gapCount, gapSummary) return nil } msgCount++ @@ -651,8 +658,11 @@ func (h *ConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, if lastTrackedOffset >= 0 && message.Offset != lastTrackedOffset+1 { gap := message.Offset - lastTrackedOffset - 1 gapCount++ - log.Printf("Consumer %d: DEBUG offset gap in %s[%d]: offset %d -> %d (gap=%d messages)", - h.consumer.id, topic, partition, lastTrackedOffset, message.Offset, gap) + gapDesc := fmt.Sprintf("%d-%d", lastTrackedOffset+1, message.Offset-1) + gaps = append(gaps, gapDesc) + elapsed := time.Since(startTime) + log.Printf("Consumer %d: DEBUG offset gap in %s[%d] at %.1fs: offset %d -> %d (gap=%d messages, gapDesc=%s)", + h.consumer.id, topic, partition, elapsed.Seconds(), lastTrackedOffset, message.Offset, gap, gapDesc) } lastTrackedOffset = message.Offset @@ -704,10 +714,22 @@ func (h *ConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, case <-session.Context().Done(): elapsed := time.Since(startTime) - log.Printf("Consumer %d: Session context cancelled for %s[%d], committing final offsets (consumed %d messages in %.1f sec, last offset=%d)", - h.consumer.id, claim.Topic(), claim.Partition(), msgCount, elapsed.Seconds(), claim.HighWaterMarkOffset()-1) - // Commit all remaining marked offsets before shutting down - session.Commit() + lastOffset := claim.HighWaterMarkOffset() - 1 + gapSummary := "none" + if len(gaps) > 0 { + gapSummary = fmt.Sprintf("[%s]", strings.Join(gaps, ", ")) + } + + // Determine if we reached HWM + reachedHWM := lastTrackedOffset >= (claim.HighWaterMarkOffset() - 1) + hwmStatus := "INCOMPLETE" + if reachedHWM { + hwmStatus = "COMPLETE" + } + + log.Printf("Consumer %d: Context CANCELLED for %s[%d] after %d messages (%.1f sec, %.1f msgs/sec, last offset=%d, HWM=%d, status=%s, gaps=%d %s)", + h.consumer.id, topic, partition, msgCount, elapsed.Seconds(), + float64(msgCount)/elapsed.Seconds(), lastTrackedOffset, claim.HighWaterMarkOffset()-1, hwmStatus, gapCount, gapSummary) return nil } } diff --git a/weed/mq/kafka/protocol/handler.go b/weed/mq/kafka/protocol/handler.go index f0ead2854..8aeb3ad96 100644 --- a/weed/mq/kafka/protocol/handler.go +++ b/weed/mq/kafka/protocol/handler.go @@ -1270,6 +1270,26 @@ func (h *Handler) HandleMetadataV0(correlationID uint32, requestBody []byte) ([] for _, name := range requestedTopics { if h.seaweedMQHandler.TopicExists(name) { topicsToReturn = append(topicsToReturn, name) + } else { + // Topic doesn't exist according to current cache, check broker directly + // This handles the race condition where producers just created topics + // and consumers are requesting metadata before cache TTL expires + glog.V(3).Infof("[METADATA v0] Topic %s not in cache, checking broker directly", name) + h.seaweedMQHandler.InvalidateTopicExistsCache(name) + if h.seaweedMQHandler.TopicExists(name) { + glog.V(3).Infof("[METADATA v0] Topic %s found on broker after cache refresh", name) + topicsToReturn = append(topicsToReturn, name) + } else { + glog.V(3).Infof("[METADATA v0] Topic %s not found, auto-creating with default partitions", name) + // Auto-create topic (matches Kafka's auto.create.topics.enable=true) + if err := h.createTopicWithSchemaSupport(name, h.GetDefaultPartitions()); err != nil { + glog.V(2).Infof("[METADATA v0] Failed to auto-create topic %s: %v", name, err) + // Don't add to topicsToReturn - client will get error + } else { + glog.V(2).Infof("[METADATA v0] Successfully auto-created topic %s", name) + topicsToReturn = append(topicsToReturn, name) + } + } } } } @@ -1345,6 +1365,22 @@ func (h *Handler) HandleMetadataV1(correlationID uint32, requestBody []byte) ([] for _, name := range requestedTopics { if h.seaweedMQHandler.TopicExists(name) { topicsToReturn = append(topicsToReturn, name) + } else { + // Topic doesn't exist according to current cache, check broker directly + glog.V(3).Infof("[METADATA v1] Topic %s not in cache, checking broker directly", name) + h.seaweedMQHandler.InvalidateTopicExistsCache(name) + if h.seaweedMQHandler.TopicExists(name) { + glog.V(3).Infof("[METADATA v1] Topic %s found on broker after cache refresh", name) + topicsToReturn = append(topicsToReturn, name) + } else { + glog.V(3).Infof("[METADATA v1] Topic %s not found, auto-creating with default partitions", name) + if err := h.createTopicWithSchemaSupport(name, h.GetDefaultPartitions()); err != nil { + glog.V(2).Infof("[METADATA v1] Failed to auto-create topic %s: %v", name, err) + } else { + glog.V(2).Infof("[METADATA v1] Successfully auto-created topic %s", name) + topicsToReturn = append(topicsToReturn, name) + } + } } } } @@ -1463,6 +1499,22 @@ func (h *Handler) HandleMetadataV2(correlationID uint32, requestBody []byte) ([] for _, name := range requestedTopics { if h.seaweedMQHandler.TopicExists(name) { topicsToReturn = append(topicsToReturn, name) + } else { + // Topic doesn't exist according to current cache, check broker directly + glog.V(3).Infof("[METADATA v2] Topic %s not in cache, checking broker directly", name) + h.seaweedMQHandler.InvalidateTopicExistsCache(name) + if h.seaweedMQHandler.TopicExists(name) { + glog.V(3).Infof("[METADATA v2] Topic %s found on broker after cache refresh", name) + topicsToReturn = append(topicsToReturn, name) + } else { + glog.V(3).Infof("[METADATA v2] Topic %s not found, auto-creating with default partitions", name) + if err := h.createTopicWithSchemaSupport(name, h.GetDefaultPartitions()); err != nil { + glog.V(2).Infof("[METADATA v2] Failed to auto-create topic %s: %v", name, err) + } else { + glog.V(2).Infof("[METADATA v2] Successfully auto-created topic %s", name) + topicsToReturn = append(topicsToReturn, name) + } + } } } } @@ -1571,6 +1623,22 @@ func (h *Handler) HandleMetadataV3V4(correlationID uint32, requestBody []byte) ( for _, name := range requestedTopics { if h.seaweedMQHandler.TopicExists(name) { topicsToReturn = append(topicsToReturn, name) + } else { + // Topic doesn't exist according to current cache, check broker directly + glog.V(3).Infof("[METADATA v3/v4] Topic %s not in cache, checking broker directly", name) + h.seaweedMQHandler.InvalidateTopicExistsCache(name) + if h.seaweedMQHandler.TopicExists(name) { + glog.V(3).Infof("[METADATA v3/v4] Topic %s found on broker after cache refresh", name) + topicsToReturn = append(topicsToReturn, name) + } else { + glog.V(3).Infof("[METADATA v3/v4] Topic %s not found, auto-creating with default partitions", name) + if err := h.createTopicWithSchemaSupport(name, h.GetDefaultPartitions()); err != nil { + glog.V(2).Infof("[METADATA v3/v4] Failed to auto-create topic %s: %v", name, err) + } else { + glog.V(2).Infof("[METADATA v3/v4] Successfully auto-created topic %s", name) + topicsToReturn = append(topicsToReturn, name) + } + } } } }