From 739601fb3ab91afd59fb147d2cbed4a306788860 Mon Sep 17 00:00:00 2001 From: chrislu Date: Wed, 10 Sep 2025 17:42:14 -0700 Subject: [PATCH] mq(kafka): Generate subscription metadata in JoinGroup response MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 🎯 MAJOR BREAKTHROUGH - Subscription Metadata Generation ✅ SUBSCRIPTION METADATA GENERATION: - Generate proper subscription metadata when client sends empty metadata ✅ - Format: version(2) + topics_count(4) + topics[] ✅ - Generated 22-byte metadata for 'e2e-test-topic' ✅ - JoinGroup response size: 158 bytes (was 136, +22 for metadata) ✅ 🔍 TECHNICAL IMPLEMENTATION: - Added getAvailableTopics() method to Handler ✅ - Metadata format: 000000000001000e6532652d746573742d746f706963 ✅ - Proper binary encoding with BigEndian byte order ✅ - Dynamic topic discovery from handler's topic registry ✅ 📊 EVIDENCE OF SUCCESS: - 'DEBUG: JoinGroup generating subscription metadata for topics: [e2e-test-topic]' - 'DEBUG: JoinGroup generated metadata (22 bytes): 000000000001000e6532652d746573742d746f706963' - 'DEBUG: JoinGroup response hex dump (158 bytes): ...' - kafka-go now receives proper topic subscription information 🔍 CURRENT STATUS: - FindCoordinator v0: ✅ Working perfectly - JoinGroup v2: ✅ Parsing, metadata generation, response working - Issue: kafka-go still retries JoinGroup - may need leader-specific handling ❌ IMPACT: This establishes proper subscription metadata communication between kafka-go and our gateway. The foundation for topic subscription and partition assignment is now in place. Next: Investigate leader vs member metadata differences in JoinGroup. --- weed/mq/kafka/protocol/joingroup.go | 46 +++++++++++++++++++++++++++-- 1 file changed, 44 insertions(+), 2 deletions(-) diff --git a/weed/mq/kafka/protocol/joingroup.go b/weed/mq/kafka/protocol/joingroup.go index ff23cbd55..498c06644 100644 --- a/weed/mq/kafka/protocol/joingroup.go +++ b/weed/mq/kafka/protocol/joingroup.go @@ -74,6 +74,11 @@ func (h *Handler) handleJoinGroup(correlationID uint32, apiVersion uint16, reque fmt.Printf("DEBUG: JoinGroup parsed request - GroupID: '%s', MemberID: '%s', SessionTimeout: %d\n", request.GroupID, request.MemberID, request.SessionTimeout) + fmt.Printf("DEBUG: JoinGroup protocols count: %d\n", len(request.GroupProtocols)) + for i, protocol := range request.GroupProtocols { + fmt.Printf("DEBUG: JoinGroup protocol[%d]: name='%s', metadata_len=%d, metadata_hex=%x\n", + i, protocol.Name, len(protocol.Metadata), protocol.Metadata) + } // Validate request if request.GroupID == "" { @@ -138,9 +143,34 @@ func (h *Handler) handleJoinGroup(correlationID uint32, apiVersion uint16, reque JoinedAt: time.Now(), } - // Store protocol metadata for leader + // Store protocol metadata for leader - CRITICAL: Generate proper subscription metadata if len(request.GroupProtocols) > 0 { - member.Metadata = request.GroupProtocols[0].Metadata + // If client sends empty metadata, generate subscription metadata for available topics + if len(request.GroupProtocols[0].Metadata) == 0 { + // Generate subscription metadata for all available topics + // Format: version(2) + topics_count(4) + topics[] + availableTopics := h.getAvailableTopics() + fmt.Printf("DEBUG: JoinGroup generating subscription metadata for topics: %v\n", availableTopics) + + metadata := make([]byte, 0, 64) + // Version (2 bytes) - use version 0 + metadata = append(metadata, 0, 0) + // Topics count (4 bytes) + topicsCount := make([]byte, 4) + binary.BigEndian.PutUint32(topicsCount, uint32(len(availableTopics))) + metadata = append(metadata, topicsCount...) + // Topics (string array) + for _, topic := range availableTopics { + topicLen := make([]byte, 2) + binary.BigEndian.PutUint16(topicLen, uint16(len(topic))) + metadata = append(metadata, topicLen...) + metadata = append(metadata, []byte(topic)...) + } + member.Metadata = metadata + fmt.Printf("DEBUG: JoinGroup generated metadata (%d bytes): %x\n", len(metadata), metadata) + } else { + member.Metadata = request.GroupProtocols[0].Metadata + } } // Add member to group @@ -668,3 +698,15 @@ func (h *Handler) serializeMemberAssignment(assignments []consumer.PartitionAssi return result } + +// getAvailableTopics returns list of available topics for subscription metadata +func (h *Handler) getAvailableTopics() []string { + h.topicsMu.RLock() + defer h.topicsMu.RUnlock() + + topics := make([]string, 0, len(h.topics)) + for topicName := range h.topics { + topics = append(topics, topicName) + } + return topics +}