Browse Source

fix: Add topic cache invalidation and auto-creation on metadata requests

Add InvalidateTopicExistsCache method to SeaweedMQHandlerInterface and impl
ement cache refresh logic in metadata response handler.

When a consumer requests metadata for a topic that doesn't appear in the
cache (but was just created by a producer), force a fresh broker check
and auto-create the topic if needed with default partitions.

This fix attempts to address the consumer stalling issue by:
1. Invalidating stale cache entries before checking broker
2. Automatically creating topics on metadata requests (like Kafka's auto.create.topics.enable=true)
3. Returning topics to consumers more reliably

However, testing shows consumers still can't find topics even after creation,
suggesting a deeper issue with topic persistence or broker client communication.

Added InvalidateTopicExistsCache to mock handler as no-op for testing.

Note: Integration testing reveals that consumers get 'topic does not exist'
errors even when producers successfully create topics. This suggests the
real issue is either:
- Topics created by producers aren't visible to broker client queries
- Broker client TopicExists() doesn't work correctly
- There's a race condition in topic creation/registration

Requires further investigation of broker client implementation and SMQ
topic persistence logic.
pull/7329/head
chrislu 4 days ago
parent
commit
a5db489f82
  1. 2
      test/kafka/kafka-client-loadtest/internal/consumer/consumer_stalling_test.go
  2. 4
      weed/mq/kafka/gateway/test_mock_handler.go
  3. 22
      weed/mq/kafka/protocol/handler.go
  4. 2
      weed/mq/kafka/protocol/offset_fetch_pattern_test.go

2
test/kafka/kafka-client-loadtest/internal/consumer/consumer_stalling_test.go

@ -107,8 +107,6 @@ func TestEmptyFetchShouldNotStopConsumer(t *testing.T) {
nextFetchOffset := committedOffset + 1
// First attempt: get empty (transient - data might not be available yet)
firstFetchResult := 0 // bytes returned
// WRONG behavior (bug): Consumer sees 0 bytes and stops
// wrongConsumerLogic := (firstFetchResult == 0) // gives up!

4
weed/mq/kafka/gateway/test_mock_handler.go

@ -98,6 +98,10 @@ func (m *mockSeaweedMQHandler) GetTopicInfo(topic string) (*integration.KafkaTop
return info, exists
}
func (m *mockSeaweedMQHandler) InvalidateTopicExistsCache(topic string) {
// Mock handler doesn't cache topic existence, so this is a no-op
}
func (m *mockSeaweedMQHandler) ProduceRecord(ctx context.Context, topicName string, partitionID int32, key, value []byte) (int64, error) {
m.mu.Lock()
defer m.mu.Unlock()

22
weed/mq/kafka/protocol/handler.go

@ -172,6 +172,7 @@ type SeaweedMQHandlerInterface interface {
CreateTopicWithSchemas(name string, partitions int32, keyRecordType *schema_pb.RecordType, valueRecordType *schema_pb.RecordType) error
DeleteTopic(topic string) error
GetTopicInfo(topic string) (*integration.KafkaTopicInfo, bool)
InvalidateTopicExistsCache(topic string)
// Ledger methods REMOVED - SMQ handles Kafka offsets natively
ProduceRecord(ctx context.Context, topicName string, partitionID int32, key, value []byte) (int64, error)
ProduceRecordValue(ctx context.Context, topicName string, partitionID int32, key []byte, recordValueBytes []byte) (int64, error)
@ -1727,6 +1728,27 @@ func (h *Handler) handleMetadataV5ToV8(correlationID uint32, requestBody []byte,
topicsToReturn = append(topicsToReturn, topic)
} else if h.seaweedMQHandler.TopicExists(topic) {
topicsToReturn = append(topicsToReturn, topic)
} else {
// Topic doesn't exist according to current cache, but let's check broker directly
// This handles the race condition where producers just created topics
// and consumers are requesting metadata before cache TTL expires
glog.V(3).Infof("[METADATA v%d] Topic %s not in cache, checking broker directly", apiVersion, topic)
// Force cache invalidation to do fresh broker check
h.seaweedMQHandler.InvalidateTopicExistsCache(topic)
if h.seaweedMQHandler.TopicExists(topic) {
glog.V(3).Infof("[METADATA v%d] Topic %s found on broker after cache refresh", apiVersion, topic)
topicsToReturn = append(topicsToReturn, topic)
} else {
glog.V(3).Infof("[METADATA v%d] Topic %s not found on broker, auto-creating with default partitions", apiVersion, topic)
// Auto-create non-system topics with default partitions (matches Kafka behavior)
if err := h.createTopicWithSchemaSupport(topic, h.GetDefaultPartitions()); err != nil {
glog.V(2).Infof("[METADATA v%d] Failed to auto-create topic %s: %v", apiVersion, topic, err)
// Don't add to topicsToReturn - client will get UNKNOWN_TOPIC_OR_PARTITION
} else {
glog.V(2).Infof("[METADATA v%d] Successfully auto-created topic %s", apiVersion, topic)
topicsToReturn = append(topicsToReturn, topic)
}
}
}
}
glog.V(3).Infof("[METADATA v%d] Returning topics: %v (requested: %v)", apiVersion, topicsToReturn, requestedTopics)

2
weed/mq/kafka/protocol/offset_fetch_pattern_test.go

@ -1,4 +1,4 @@
package kafka
package protocol
import (
"context"

Loading…
Cancel
Save