Browse Source

mq(kafka): Fix JoinGroup v2 parsing - Consumer group membership working

🎯 MASSIVE BREAKTHROUGH - JoinGroup API Fully Working

 JOINGROUP V2 PARSING FIXED:
- Fixed client_id parsing issue in JoinGroup request 
- Correctly skip 56-byte client_id header 
- Successfully parse GroupID: 'test-consumer-group' 
- Parse SessionTimeout: 30000ms 

 CONSUMER GROUP MEMBERSHIP SUCCESS:
- Step 1: FindCoordinator  WORKING
- Step 2: JoinGroup  WORKING (136-byte response)
- Step 3: SyncGroup → Next to implement
- Step 4: Fetch → Ready for messages

🔍 TECHNICAL BREAKTHROUGH:
- Member ID generation: '-unknown-host-1757547386572219000' 
- Proper JoinGroup v2 response format (136 bytes vs 24-byte error) 
- Consumer group coordinator working correctly 
- kafka-go Reader progressing through consumer group workflow 

📊 EVIDENCE OF SUCCESS:
- 'DEBUG: JoinGroup skipped client_id (56 bytes), offset now: 58'
- 'DEBUG: JoinGroup parsed GroupID: test-consumer-group, offset now: 79'
- 'DEBUG: JoinGroup response hex dump (136 bytes): 00000002000000000001...'
- 'DEBUG: API 11 (JoinGroup) response: 136 bytes, 37.916µs'

IMPACT:
This completes the consumer group membership workflow.
kafka-go Reader can now successfully join consumer groups and receive
member IDs from the coordinator. The foundation for partition assignment
and message consumption is now established.

Next: Implement SyncGroup API for partition assignment coordination.
pull/7231/head
chrislu 2 months ago
parent
commit
3322d4fdd1
  1. 38
      weed/mq/kafka/protocol/find_coordinator.go
  2. 8
      weed/mq/kafka/protocol/handler.go
  3. 22
      weed/mq/kafka/protocol/joingroup.go

38
weed/mq/kafka/protocol/find_coordinator.go

@ -8,80 +8,80 @@ import (
func (h *Handler) handleFindCoordinator(correlationID uint32, requestBody []byte) ([]byte, error) {
// Parse FindCoordinator request
// Request format: client_id + coordinator_key + coordinator_type(1)
// DEBUG: Hex dump the request to understand format
dumpLen := len(requestBody)
if dumpLen > 50 {
dumpLen = 50
}
fmt.Printf("DEBUG: FindCoordinator request hex dump (first %d bytes): %x\n", dumpLen, requestBody[:dumpLen])
if len(requestBody) < 2 { // client_id_size(2)
return nil, fmt.Errorf("FindCoordinator request too short")
}
// Skip client_id
clientIDSize := binary.BigEndian.Uint16(requestBody[0:2])
fmt.Printf("DEBUG: FindCoordinator client_id_size: %d\n", clientIDSize)
offset := 2 + int(clientIDSize)
if len(requestBody) < offset+3 { // coordinator_key_size(2) + coordinator_type(1)
return nil, fmt.Errorf("FindCoordinator request missing data (need %d bytes, have %d)", offset+3, len(requestBody))
}
// Parse coordinator key (group ID for consumer groups)
coordinatorKeySize := binary.BigEndian.Uint16(requestBody[offset : offset+2])
fmt.Printf("DEBUG: FindCoordinator coordinator_key_size: %d, offset: %d\n", coordinatorKeySize, offset)
offset += 2
if len(requestBody) < offset+int(coordinatorKeySize) {
return nil, fmt.Errorf("FindCoordinator request missing coordinator key (need %d bytes, have %d)", offset+int(coordinatorKeySize), len(requestBody))
}
coordinatorKey := string(requestBody[offset : offset+int(coordinatorKeySize)])
offset += int(coordinatorKeySize)
// Coordinator type is optional in some versions, default to 0 (group coordinator)
var coordinatorType byte = 0
if offset < len(requestBody) {
coordinatorType = requestBody[offset]
}
_ = coordinatorType // 0 = group coordinator, 1 = transaction coordinator
fmt.Printf("DEBUG: FindCoordinator request for key '%s' (type: %d)\n", coordinatorKey, coordinatorType)
response := make([]byte, 0, 64)
// Correlation ID
correlationIDBytes := make([]byte, 4)
binary.BigEndian.PutUint32(correlationIDBytes, correlationID)
response = append(response, correlationIDBytes...)
// FindCoordinator v0 Response Format (no throttle_time or error_message):
// - error_code (INT16)
// - node_id (INT32)
// - node_id (INT32)
// - host (STRING)
// - port (INT32)
// Error code (2 bytes, 0 = no error)
response = append(response, 0, 0)
// Coordinator node_id (4 bytes) - use broker 0 (this gateway)
response = append(response, 0, 0, 0, 0)
// Coordinator host (string)
host := h.brokerHost
hostLen := uint16(len(host))
response = append(response, byte(hostLen>>8), byte(hostLen))
response = append(response, []byte(host)...)
// Coordinator port (4 bytes)
portBytes := make([]byte, 4)
binary.BigEndian.PutUint32(portBytes, uint32(h.brokerPort))
response = append(response, portBytes...)
fmt.Printf("DEBUG: FindCoordinator response: coordinator at %s:%d\n", host, h.brokerPort)
fmt.Printf("DEBUG: FindCoordinator response hex dump (%d bytes): %x\n", len(response), response)
return response, nil
}

8
weed/mq/kafka/protocol/handler.go

@ -226,7 +226,13 @@ func (h *Handler) HandleConn(conn net.Conn) error {
case 1: // Fetch
response, err = h.handleFetch(correlationID, messageBuf[8:]) // skip header
case 11: // JoinGroup
response, err = h.handleJoinGroup(correlationID, messageBuf[8:]) // skip header
fmt.Printf("DEBUG: *** JOINGROUP REQUEST RECEIVED *** Correlation: %d, Version: %d\n", correlationID, apiVersion)
response, err = h.handleJoinGroup(correlationID, apiVersion, messageBuf[8:]) // skip header
if err != nil {
fmt.Printf("DEBUG: JoinGroup error: %v\n", err)
} else {
fmt.Printf("DEBUG: JoinGroup response hex dump (%d bytes): %x\n", len(response), response)
}
case 14: // SyncGroup
response, err = h.handleSyncGroup(correlationID, messageBuf[8:]) // skip header
case 8: // OffsetCommit

22
weed/mq/kafka/protocol/joingroup.go

@ -57,13 +57,24 @@ const (
ErrorCodeFencedInstanceID int16 = 82
)
func (h *Handler) handleJoinGroup(correlationID uint32, requestBody []byte) ([]byte, error) {
func (h *Handler) handleJoinGroup(correlationID uint32, apiVersion uint16, requestBody []byte) ([]byte, error) {
// DEBUG: Hex dump the request to understand format
dumpLen := len(requestBody)
if dumpLen > 100 {
dumpLen = 100
}
fmt.Printf("DEBUG: JoinGroup request hex dump (first %d bytes): %x\n", dumpLen, requestBody[:dumpLen])
// Parse JoinGroup request
request, err := h.parseJoinGroupRequest(requestBody)
if err != nil {
fmt.Printf("DEBUG: JoinGroup parseJoinGroupRequest error: %v\n", err)
return h.buildJoinGroupErrorResponse(correlationID, ErrorCodeInvalidGroupID), nil
}
fmt.Printf("DEBUG: JoinGroup parsed request - GroupID: '%s', MemberID: '%s', SessionTimeout: %d\n",
request.GroupID, request.MemberID, request.SessionTimeout)
// Validate request
if request.GroupID == "" {
return h.buildJoinGroupErrorResponse(correlationID, ErrorCodeInvalidGroupID), nil
@ -185,7 +196,15 @@ func (h *Handler) parseJoinGroupRequest(data []byte) (*JoinGroupRequest, error)
offset := 0
// Skip client_id (part of request header, not JoinGroup payload)
clientIDLength := int(binary.BigEndian.Uint16(data[offset:]))
offset += 2 + clientIDLength
fmt.Printf("DEBUG: JoinGroup skipped client_id (%d bytes), offset now: %d\n", clientIDLength, offset)
// GroupID (string)
if offset+2 > len(data) {
return nil, fmt.Errorf("missing group ID length")
}
groupIDLength := int(binary.BigEndian.Uint16(data[offset:]))
offset += 2
if offset+groupIDLength > len(data) {
@ -193,6 +212,7 @@ func (h *Handler) parseJoinGroupRequest(data []byte) (*JoinGroupRequest, error)
}
groupID := string(data[offset : offset+groupIDLength])
offset += groupIDLength
fmt.Printf("DEBUG: JoinGroup parsed GroupID: '%s', offset now: %d\n", groupID, offset)
// Session timeout (4 bytes)
if offset+4 > len(data) {

Loading…
Cancel
Save