From a5db489f82fd48446ff25246013a9114bc50c70b Mon Sep 17 00:00:00 2001 From: chrislu Date: Fri, 17 Oct 2025 09:55:07 -0700 Subject: [PATCH] fix: Add topic cache invalidation and auto-creation on metadata requests Add InvalidateTopicExistsCache method to SeaweedMQHandlerInterface and impl ement cache refresh logic in metadata response handler. When a consumer requests metadata for a topic that doesn't appear in the cache (but was just created by a producer), force a fresh broker check and auto-create the topic if needed with default partitions. This fix attempts to address the consumer stalling issue by: 1. Invalidating stale cache entries before checking broker 2. Automatically creating topics on metadata requests (like Kafka's auto.create.topics.enable=true) 3. Returning topics to consumers more reliably However, testing shows consumers still can't find topics even after creation, suggesting a deeper issue with topic persistence or broker client communication. Added InvalidateTopicExistsCache to mock handler as no-op for testing. Note: Integration testing reveals that consumers get 'topic does not exist' errors even when producers successfully create topics. This suggests the real issue is either: - Topics created by producers aren't visible to broker client queries - Broker client TopicExists() doesn't work correctly - There's a race condition in topic creation/registration Requires further investigation of broker client implementation and SMQ topic persistence logic. --- .../consumer/consumer_stalling_test.go | 2 -- weed/mq/kafka/gateway/test_mock_handler.go | 4 ++++ weed/mq/kafka/protocol/handler.go | 22 +++++++++++++++++++ .../protocol/offset_fetch_pattern_test.go | 2 +- 4 files changed, 27 insertions(+), 3 deletions(-) diff --git a/test/kafka/kafka-client-loadtest/internal/consumer/consumer_stalling_test.go b/test/kafka/kafka-client-loadtest/internal/consumer/consumer_stalling_test.go index d4dff1b5b..8e67f703e 100644 --- a/test/kafka/kafka-client-loadtest/internal/consumer/consumer_stalling_test.go +++ b/test/kafka/kafka-client-loadtest/internal/consumer/consumer_stalling_test.go @@ -107,8 +107,6 @@ func TestEmptyFetchShouldNotStopConsumer(t *testing.T) { nextFetchOffset := committedOffset + 1 // First attempt: get empty (transient - data might not be available yet) - firstFetchResult := 0 // bytes returned - // WRONG behavior (bug): Consumer sees 0 bytes and stops // wrongConsumerLogic := (firstFetchResult == 0) // gives up! diff --git a/weed/mq/kafka/gateway/test_mock_handler.go b/weed/mq/kafka/gateway/test_mock_handler.go index 8a76f811d..ef0a012ef 100644 --- a/weed/mq/kafka/gateway/test_mock_handler.go +++ b/weed/mq/kafka/gateway/test_mock_handler.go @@ -98,6 +98,10 @@ func (m *mockSeaweedMQHandler) GetTopicInfo(topic string) (*integration.KafkaTop return info, exists } +func (m *mockSeaweedMQHandler) InvalidateTopicExistsCache(topic string) { + // Mock handler doesn't cache topic existence, so this is a no-op +} + func (m *mockSeaweedMQHandler) ProduceRecord(ctx context.Context, topicName string, partitionID int32, key, value []byte) (int64, error) { m.mu.Lock() defer m.mu.Unlock() diff --git a/weed/mq/kafka/protocol/handler.go b/weed/mq/kafka/protocol/handler.go index 07896518e..f0ead2854 100644 --- a/weed/mq/kafka/protocol/handler.go +++ b/weed/mq/kafka/protocol/handler.go @@ -172,6 +172,7 @@ type SeaweedMQHandlerInterface interface { CreateTopicWithSchemas(name string, partitions int32, keyRecordType *schema_pb.RecordType, valueRecordType *schema_pb.RecordType) error DeleteTopic(topic string) error GetTopicInfo(topic string) (*integration.KafkaTopicInfo, bool) + InvalidateTopicExistsCache(topic string) // Ledger methods REMOVED - SMQ handles Kafka offsets natively ProduceRecord(ctx context.Context, topicName string, partitionID int32, key, value []byte) (int64, error) ProduceRecordValue(ctx context.Context, topicName string, partitionID int32, key []byte, recordValueBytes []byte) (int64, error) @@ -1727,6 +1728,27 @@ func (h *Handler) handleMetadataV5ToV8(correlationID uint32, requestBody []byte, topicsToReturn = append(topicsToReturn, topic) } else if h.seaweedMQHandler.TopicExists(topic) { topicsToReturn = append(topicsToReturn, topic) + } else { + // Topic doesn't exist according to current cache, but let's check broker directly + // This handles the race condition where producers just created topics + // and consumers are requesting metadata before cache TTL expires + glog.V(3).Infof("[METADATA v%d] Topic %s not in cache, checking broker directly", apiVersion, topic) + // Force cache invalidation to do fresh broker check + h.seaweedMQHandler.InvalidateTopicExistsCache(topic) + if h.seaweedMQHandler.TopicExists(topic) { + glog.V(3).Infof("[METADATA v%d] Topic %s found on broker after cache refresh", apiVersion, topic) + topicsToReturn = append(topicsToReturn, topic) + } else { + glog.V(3).Infof("[METADATA v%d] Topic %s not found on broker, auto-creating with default partitions", apiVersion, topic) + // Auto-create non-system topics with default partitions (matches Kafka behavior) + if err := h.createTopicWithSchemaSupport(topic, h.GetDefaultPartitions()); err != nil { + glog.V(2).Infof("[METADATA v%d] Failed to auto-create topic %s: %v", apiVersion, topic, err) + // Don't add to topicsToReturn - client will get UNKNOWN_TOPIC_OR_PARTITION + } else { + glog.V(2).Infof("[METADATA v%d] Successfully auto-created topic %s", apiVersion, topic) + topicsToReturn = append(topicsToReturn, topic) + } + } } } glog.V(3).Infof("[METADATA v%d] Returning topics: %v (requested: %v)", apiVersion, topicsToReturn, requestedTopics) diff --git a/weed/mq/kafka/protocol/offset_fetch_pattern_test.go b/weed/mq/kafka/protocol/offset_fetch_pattern_test.go index 024b9dbd5..6c2beb5da 100644 --- a/weed/mq/kafka/protocol/offset_fetch_pattern_test.go +++ b/weed/mq/kafka/protocol/offset_fetch_pattern_test.go @@ -1,4 +1,4 @@ -package kafka +package protocol import ( "context"