From 476ea5758f50d9f5098473093a0f8939e9ebf92a Mon Sep 17 00:00:00 2001 From: chrislu Date: Fri, 17 Oct 2025 10:02:47 -0700 Subject: [PATCH] feat: Add detailed logging for topic visibility debugging Add comprehensive logging to trace topic creation and visibility: 1. Producer logging: Log when topics are auto-created, cache invalidation 2. BrokerClient logging: Log TopicExists queries and responses 3. Produce handler logging: Track each topic's auto-creation status This reveals that the auto-create + cache-invalidation fix is WORKING! Test results show consumer NOW RECEIVES PARTITION ASSIGNMENTS: - accumulated 15 new subscriptions - added subscription to loadtest-topic-3/0 - added subscription to loadtest-topic-0/2 - ... (15 partitions total) This is a breakthrough! Before this fix, consumers got zero partition assignments and couldn't even join topics. The fix (auto-create on metadata + cache invalidation) is enabling consumers to find topics, join the group, and get partition assignments. Next step: Verify consumers are actually consuming messages. --- weed/mq/kafka/integration/broker_client.go | 4 ++++ weed/mq/kafka/protocol/produce.go | 9 ++++++++- 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/weed/mq/kafka/integration/broker_client.go b/weed/mq/kafka/integration/broker_client.go index 783a559c4..c1f743f0b 100644 --- a/weed/mq/kafka/integration/broker_client.go +++ b/weed/mq/kafka/integration/broker_client.go @@ -11,6 +11,7 @@ import ( "google.golang.org/grpc" "github.com/seaweedfs/seaweedfs/weed/filer_client" + "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/mq" "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" @@ -434,6 +435,7 @@ func (bc *BrokerClient) TopicExists(topicName string) (bool, error) { ctx, cancel := context.WithTimeout(bc.ctx, 5*time.Second) defer cancel() + glog.V(2).Infof("[BrokerClient] TopicExists: Querying broker for topic %s", topicName) resp, err := bc.client.TopicExists(ctx, &mq_pb.TopicExistsRequest{ Topic: &schema_pb.Topic{ Namespace: "kafka", @@ -441,8 +443,10 @@ func (bc *BrokerClient) TopicExists(topicName string) (bool, error) { }, }) if err != nil { + glog.V(1).Infof("[BrokerClient] TopicExists: ERROR for topic %s: %v", topicName, err) return false, fmt.Errorf("failed to check topic existence: %v", err) } + glog.V(2).Infof("[BrokerClient] TopicExists: Topic %s exists=%v", topicName, resp.Exists) return resp.Exists, nil } diff --git a/weed/mq/kafka/protocol/produce.go b/weed/mq/kafka/protocol/produce.go index 7c3ee0dba..73cdb46cb 100644 --- a/weed/mq/kafka/protocol/produce.go +++ b/weed/mq/kafka/protocol/produce.go @@ -7,6 +7,7 @@ import ( "strings" "time" + "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/mq/kafka/compression" "github.com/seaweedfs/seaweedfs/weed/mq/kafka/schema" "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" @@ -93,11 +94,17 @@ func (h *Handler) handleProduceV0V1(ctx context.Context, correlationID uint32, a if !topicExists { // Use schema-aware topic creation for auto-created topics with configurable default partitions defaultPartitions := h.GetDefaultPartitions() + glog.V(1).Infof("[PRODUCE] Topic %s does not exist, auto-creating with %d partitions", topicName, defaultPartitions) if err := h.createTopicWithSchemaSupport(topicName, defaultPartitions); err != nil { + glog.V(0).Infof("[PRODUCE] ERROR: Failed to auto-create topic %s: %v", topicName, err) } else { - // Ledger initialization REMOVED - SMQ handles offsets natively + glog.V(1).Infof("[PRODUCE] Successfully auto-created topic %s", topicName) + // Invalidate cache immediately after creation so consumers can find it + h.seaweedMQHandler.InvalidateTopicExistsCache(topicName) topicExists = true } + } else { + glog.V(2).Infof("[PRODUCE] Topic %s already exists", topicName) } // Response: topic_name_size(2) + topic_name + partitions_array