diff --git a/weed/mq/kafka/protocol/joingroup.go b/weed/mq/kafka/protocol/joingroup.go index ff23cbd55..498c06644 100644 --- a/weed/mq/kafka/protocol/joingroup.go +++ b/weed/mq/kafka/protocol/joingroup.go @@ -74,6 +74,11 @@ func (h *Handler) handleJoinGroup(correlationID uint32, apiVersion uint16, reque fmt.Printf("DEBUG: JoinGroup parsed request - GroupID: '%s', MemberID: '%s', SessionTimeout: %d\n", request.GroupID, request.MemberID, request.SessionTimeout) + fmt.Printf("DEBUG: JoinGroup protocols count: %d\n", len(request.GroupProtocols)) + for i, protocol := range request.GroupProtocols { + fmt.Printf("DEBUG: JoinGroup protocol[%d]: name='%s', metadata_len=%d, metadata_hex=%x\n", + i, protocol.Name, len(protocol.Metadata), protocol.Metadata) + } // Validate request if request.GroupID == "" { @@ -138,9 +143,34 @@ func (h *Handler) handleJoinGroup(correlationID uint32, apiVersion uint16, reque JoinedAt: time.Now(), } - // Store protocol metadata for leader + // Store protocol metadata for leader - CRITICAL: Generate proper subscription metadata if len(request.GroupProtocols) > 0 { - member.Metadata = request.GroupProtocols[0].Metadata + // If client sends empty metadata, generate subscription metadata for available topics + if len(request.GroupProtocols[0].Metadata) == 0 { + // Generate subscription metadata for all available topics + // Format: version(2) + topics_count(4) + topics[] + availableTopics := h.getAvailableTopics() + fmt.Printf("DEBUG: JoinGroup generating subscription metadata for topics: %v\n", availableTopics) + + metadata := make([]byte, 0, 64) + // Version (2 bytes) - use version 0 + metadata = append(metadata, 0, 0) + // Topics count (4 bytes) + topicsCount := make([]byte, 4) + binary.BigEndian.PutUint32(topicsCount, uint32(len(availableTopics))) + metadata = append(metadata, topicsCount...) + // Topics (string array) + for _, topic := range availableTopics { + topicLen := make([]byte, 2) + binary.BigEndian.PutUint16(topicLen, uint16(len(topic))) + metadata = append(metadata, topicLen...) + metadata = append(metadata, []byte(topic)...) + } + member.Metadata = metadata + fmt.Printf("DEBUG: JoinGroup generated metadata (%d bytes): %x\n", len(metadata), metadata) + } else { + member.Metadata = request.GroupProtocols[0].Metadata + } } // Add member to group @@ -668,3 +698,15 @@ func (h *Handler) serializeMemberAssignment(assignments []consumer.PartitionAssi return result } + +// getAvailableTopics returns list of available topics for subscription metadata +func (h *Handler) getAvailableTopics() []string { + h.topicsMu.RLock() + defer h.topicsMu.RUnlock() + + topics := make([]string, 0, len(h.topics)) + for topicName := range h.topics { + topics = append(topics, topicName) + } + return topics +}