diff --git a/weed/mq/kafka/integration/broker_client.go b/weed/mq/kafka/integration/broker_client.go index 783a559c4..c1f743f0b 100644 --- a/weed/mq/kafka/integration/broker_client.go +++ b/weed/mq/kafka/integration/broker_client.go @@ -11,6 +11,7 @@ import ( "google.golang.org/grpc" "github.com/seaweedfs/seaweedfs/weed/filer_client" + "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/mq" "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" @@ -434,6 +435,7 @@ func (bc *BrokerClient) TopicExists(topicName string) (bool, error) { ctx, cancel := context.WithTimeout(bc.ctx, 5*time.Second) defer cancel() + glog.V(2).Infof("[BrokerClient] TopicExists: Querying broker for topic %s", topicName) resp, err := bc.client.TopicExists(ctx, &mq_pb.TopicExistsRequest{ Topic: &schema_pb.Topic{ Namespace: "kafka", @@ -441,8 +443,10 @@ func (bc *BrokerClient) TopicExists(topicName string) (bool, error) { }, }) if err != nil { + glog.V(1).Infof("[BrokerClient] TopicExists: ERROR for topic %s: %v", topicName, err) return false, fmt.Errorf("failed to check topic existence: %v", err) } + glog.V(2).Infof("[BrokerClient] TopicExists: Topic %s exists=%v", topicName, resp.Exists) return resp.Exists, nil } diff --git a/weed/mq/kafka/protocol/produce.go b/weed/mq/kafka/protocol/produce.go index 7c3ee0dba..73cdb46cb 100644 --- a/weed/mq/kafka/protocol/produce.go +++ b/weed/mq/kafka/protocol/produce.go @@ -7,6 +7,7 @@ import ( "strings" "time" + "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/mq/kafka/compression" "github.com/seaweedfs/seaweedfs/weed/mq/kafka/schema" "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" @@ -93,11 +94,17 @@ func (h *Handler) handleProduceV0V1(ctx context.Context, correlationID uint32, a if !topicExists { // Use schema-aware topic creation for auto-created topics with configurable default partitions defaultPartitions := h.GetDefaultPartitions() + glog.V(1).Infof("[PRODUCE] Topic %s does not exist, auto-creating with %d partitions", topicName, defaultPartitions) if err := h.createTopicWithSchemaSupport(topicName, defaultPartitions); err != nil { + glog.V(0).Infof("[PRODUCE] ERROR: Failed to auto-create topic %s: %v", topicName, err) } else { - // Ledger initialization REMOVED - SMQ handles offsets natively + glog.V(1).Infof("[PRODUCE] Successfully auto-created topic %s", topicName) + // Invalidate cache immediately after creation so consumers can find it + h.seaweedMQHandler.InvalidateTopicExistsCache(topicName) topicExists = true } + } else { + glog.V(2).Infof("[PRODUCE] Topic %s already exists", topicName) } // Response: topic_name_size(2) + topic_name + partitions_array