From 56608aead3ab96844fc0e7f468ea186ae5b29847 Mon Sep 17 00:00:00 2001 From: chrislu Date: Fri, 12 Sep 2025 15:16:39 -0700 Subject: [PATCH] feat: major consumer group breakthrough - fix FindCoordinator v2 and JoinGroup v5 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 🎉 MAJOR PROGRESS: - Fixed FindCoordinator v2 response format (added throttle_time, error_code, error_message, node_id) - Fixed JoinGroup v5 request parsing (added GroupInstanceID field parsing) - Consumer group coordination now working: FindCoordinator -> JoinGroup -> SyncGroup - Sarama consumer successfully joins group, gets member ID, calls Setup handler ✅ Working: - FindCoordinator v2: Sarama finds coordinator successfully - JoinGroup v5: Consumer joins group, gets generation 1, member ID assigned - Consumer group session setup called with generation 1 ❌ Current issue: - SyncGroup v3 parsing error: 'invalid member ID length' - Consumer has no partition assignments (Claims: map[]) - Need to fix SyncGroup parsing to complete consumer group flow Next: Fix SyncGroup v3 parsing to enable partition assignment and message consumption --- test/kafka/consumer_group_debug_test.go | 18 ++++----- weed/mq/kafka/protocol/find_coordinator.go | 18 +++++++-- weed/mq/kafka/protocol/handler.go | 2 +- weed/mq/kafka/protocol/joingroup.go | 44 ++++++++++++---------- 4 files changed, 50 insertions(+), 32 deletions(-) diff --git a/test/kafka/consumer_group_debug_test.go b/test/kafka/consumer_group_debug_test.go index 664628803..b2de4e3cd 100644 --- a/test/kafka/consumer_group_debug_test.go +++ b/test/kafka/consumer_group_debug_test.go @@ -29,7 +29,7 @@ func TestConsumerGroup_Debug(t *testing.T) { // Test configuration topicName := "debug-test" groupID := "debug-group" - + // Add topic for testing gatewayServer.GetHandler().AddTopicForTesting(topicName, 1) @@ -70,7 +70,7 @@ func TestConsumerGroup_Debug(t *testing.T) { // Start one consumer t.Logf("=== Starting 1 consumer in group '%s' ===", groupID) - + consumerGroup, err := sarama.NewConsumerGroup([]string{brokerAddr}, groupID, config) if err != nil { t.Fatalf("Failed to create consumer group: %v", err) @@ -95,16 +95,16 @@ func TestConsumerGroup_Debug(t *testing.T) { select { case <-handler.ready: t.Logf("✅ Consumer is ready!") - + // Try to consume the message select { case msg := <-handler.messages: - t.Logf("✅ Consumed message: key=%s, value=%s, offset=%d", + t.Logf("✅ Consumed message: key=%s, value=%s, offset=%d", string(msg.Key), string(msg.Value), msg.Offset) case <-time.After(5 * time.Second): t.Logf("⚠️ No message received within timeout") } - + case <-time.After(8 * time.Second): t.Logf("❌ Timeout waiting for consumer to be ready") } @@ -120,7 +120,7 @@ type DebugHandler struct { } func (h *DebugHandler) Setup(session sarama.ConsumerGroupSession) error { - h.t.Logf("🔧 Consumer group session setup - Generation: %d, Claims: %v", + h.t.Logf("🔧 Consumer group session setup - Generation: %d, Claims: %v", session.GenerationID(), session.Claims()) close(h.ready) return nil @@ -132,9 +132,9 @@ func (h *DebugHandler) Cleanup(session sarama.ConsumerGroupSession) error { } func (h *DebugHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { - h.t.Logf("🍽️ Starting to consume partition %d from offset %d", + h.t.Logf("🍽️ Starting to consume partition %d from offset %d", claim.Partition(), claim.InitialOffset()) - + for { select { case message := <-claim.Messages(): @@ -142,7 +142,7 @@ func (h *DebugHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim s h.t.Logf("📭 Received nil message, ending consumption") return nil } - h.t.Logf("📨 Received message: key=%s, value=%s, offset=%d", + h.t.Logf("📨 Received message: key=%s, value=%s, offset=%d", string(message.Key), string(message.Value), message.Offset) h.messages <- message session.MarkMessage(message, "") diff --git a/weed/mq/kafka/protocol/find_coordinator.go b/weed/mq/kafka/protocol/find_coordinator.go index a5123666a..dd5df5611 100644 --- a/weed/mq/kafka/protocol/find_coordinator.go +++ b/weed/mq/kafka/protocol/find_coordinator.go @@ -6,6 +6,10 @@ import ( ) func (h *Handler) handleFindCoordinator(correlationID uint32, requestBody []byte) ([]byte, error) { + return h.handleFindCoordinatorV2(correlationID, requestBody) +} + +func (h *Handler) handleFindCoordinatorV2(correlationID uint32, requestBody []byte) ([]byte, error) { // Parse FindCoordinator request // Request format: client_id + coordinator_key + coordinator_type(1) @@ -57,17 +61,25 @@ func (h *Handler) handleFindCoordinator(correlationID uint32, requestBody []byte binary.BigEndian.PutUint32(correlationIDBytes, correlationID) response = append(response, correlationIDBytes...) - // FindCoordinator v0 Response Format (no throttle_time or error_message): + // FindCoordinator v2 Response Format: + // - throttle_time_ms (INT32) // - error_code (INT16) + // - error_message (STRING) - nullable // - node_id (INT32) // - host (STRING) // - port (INT32) + // Throttle time (4 bytes, 0 = no throttling) + response = append(response, 0, 0, 0, 0) + // Error code (2 bytes, 0 = no error) response = append(response, 0, 0) - // Coordinator node_id (4 bytes) - use broker 0 (this gateway) - response = append(response, 0, 0, 0, 0) + // Error message (nullable string) - null for success + response = append(response, 0xff, 0xff) // -1 length indicates null + + // Coordinator node_id (4 bytes) - use broker 1 (this gateway) + response = append(response, 0, 0, 0, 1) // Coordinator host (string) host := h.brokerHost diff --git a/weed/mq/kafka/protocol/handler.go b/weed/mq/kafka/protocol/handler.go index ab9227d31..8f8c4a893 100644 --- a/weed/mq/kafka/protocol/handler.go +++ b/weed/mq/kafka/protocol/handler.go @@ -364,7 +364,7 @@ func (h *Handler) HandleConn(conn net.Conn) error { case 9: // OffsetFetch response, err = h.handleOffsetFetch(correlationID, messageBuf[8:]) // skip header case 10: // FindCoordinator - fmt.Printf("DEBUG: *** FINDCOORDINATOR REQUEST RECEIVED *** Correlation: %d\n", correlationID) + fmt.Printf("DEBUG: *** FINDCOORDINATOR REQUEST RECEIVED *** Correlation: %d, Version: %d\n", correlationID, apiVersion) response, err = h.handleFindCoordinator(correlationID, messageBuf[8:]) // skip header if err != nil { fmt.Printf("DEBUG: FindCoordinator error: %v\n", err) diff --git a/weed/mq/kafka/protocol/joingroup.go b/weed/mq/kafka/protocol/joingroup.go index 6c2b71798..982afe72f 100644 --- a/weed/mq/kafka/protocol/joingroup.go +++ b/weed/mq/kafka/protocol/joingroup.go @@ -269,10 +269,11 @@ func (h *Handler) parseJoinGroupRequest(data []byte) (*JoinGroupRequest, error) offset := 0 - // Skip client_id (part of request header, not JoinGroup payload) + // Skip client_id (part of JoinGroup v5 payload) clientIDLength := int(binary.BigEndian.Uint16(data[offset:])) offset += 2 + clientIDLength - fmt.Printf("DEBUG: JoinGroup skipped client_id (%d bytes), offset now: %d\n", clientIDLength, offset) + fmt.Printf("DEBUG: JoinGroup v5 skipped client_id (%d bytes: '%s'), offset now: %d\n", + clientIDLength, string(data[2:2+clientIDLength]), offset) // GroupID (string) if offset+2 > len(data) { @@ -316,6 +317,27 @@ func (h *Handler) parseJoinGroupRequest(data []byte) (*JoinGroupRequest, error) offset += memberIDLength } + // Parse Group Instance ID (nullable string) - for JoinGroup v5+ + var groupInstanceID string + if offset+2 > len(data) { + return nil, fmt.Errorf("missing group instance ID length") + } + instanceIDLength := int16(binary.BigEndian.Uint16(data[offset:])) + offset += 2 + + if instanceIDLength == -1 { + groupInstanceID = "" // null string + } else if instanceIDLength >= 0 { + if offset+int(instanceIDLength) > len(data) { + return nil, fmt.Errorf("invalid group instance ID length") + } + groupInstanceID = string(data[offset : offset+int(instanceIDLength)]) + offset += int(instanceIDLength) + } + + fmt.Printf("DEBUG: JoinGroup v5 - MemberID: '%s', GroupInstanceID: '%s' (len=%d), offset now: %d\n", + memberID, groupInstanceID, instanceIDLength, offset) + // Parse Protocol Type if len(data) < offset+2 { return nil, fmt.Errorf("JoinGroup request missing protocol type") @@ -377,29 +399,13 @@ func (h *Handler) parseJoinGroupRequest(data []byte) (*JoinGroupRequest, error) fmt.Printf("DEBUG: JoinGroup - Protocol: %s, MetadataLength: %d\n", protocolName, metadataLength) } - // Parse Group Instance ID (nullable string) - for static membership (Kafka 2.3+) - var groupInstanceID string - if len(data) >= offset+2 { - instanceIDLength := int16(binary.BigEndian.Uint16(data[offset : offset+2])) - offset += 2 - - if instanceIDLength == -1 { - groupInstanceID = "" // null string - } else if instanceIDLength >= 0 && len(data) >= offset+int(instanceIDLength) { - groupInstanceID = string(data[offset : offset+int(instanceIDLength)]) - offset += int(instanceIDLength) - } - - if groupInstanceID != "" { - fmt.Printf("DEBUG: JoinGroup - GroupInstanceID: %s\n", groupInstanceID) - } - } return &JoinGroupRequest{ GroupID: groupID, SessionTimeout: sessionTimeout, RebalanceTimeout: rebalanceTimeout, MemberID: memberID, + GroupInstanceID: groupInstanceID, ProtocolType: protocolType, GroupProtocols: protocols, }, nil