Browse Source

mq(kafka): Fix JoinGroup v2 throttle_time_ms placement

🎯 PROTOCOL FORMAT CORRECTION

 THROTTLE_TIME_MS PLACEMENT FIXED:
- Moved throttle_time_ms to correct position after correlation_id 
- Removed duplicate throttle_time at end of response 
- JoinGroup response size: 136 bytes (was 140 with duplicate) 

🔍 CURRENT STATUS:
- FindCoordinator v0:  Working perfectly
- JoinGroup v2:  Parsing and response generation working
- Issue: kafka-go still retries JoinGroup, never calls SyncGroup 

📊 EVIDENCE:
- 'DEBUG: JoinGroup response hex dump (136 bytes): 0000000200000000...'
- Response format now matches Kafka v2 specification
- Client still disconnects after JoinGroup response

NEXT: Investigate member_metadata format - likely kafka-go expects
specific subscription metadata format in JoinGroup response members array.
pull/7231/head
chrislu 2 months ago
parent
commit
89a05f8c37
  1. 8
      weed/mq/kafka/protocol/handler.go
  2. 22
      weed/mq/kafka/protocol/joingroup.go

8
weed/mq/kafka/protocol/handler.go

@ -234,7 +234,13 @@ func (h *Handler) HandleConn(conn net.Conn) error {
fmt.Printf("DEBUG: JoinGroup response hex dump (%d bytes): %x\n", len(response), response) fmt.Printf("DEBUG: JoinGroup response hex dump (%d bytes): %x\n", len(response), response)
} }
case 14: // SyncGroup case 14: // SyncGroup
response, err = h.handleSyncGroup(correlationID, messageBuf[8:]) // skip header
fmt.Printf("DEBUG: *** SYNCGROUP REQUEST RECEIVED *** Correlation: %d, Version: %d\n", correlationID, apiVersion)
response, err = h.handleSyncGroup(correlationID, apiVersion, messageBuf[8:]) // skip header
if err != nil {
fmt.Printf("DEBUG: SyncGroup error: %v\n", err)
} else {
fmt.Printf("DEBUG: SyncGroup response hex dump (%d bytes): %x\n", len(response), response)
}
case 8: // OffsetCommit case 8: // OffsetCommit
response, err = h.handleOffsetCommit(correlationID, messageBuf[8:]) // skip header response, err = h.handleOffsetCommit(correlationID, messageBuf[8:]) // skip header
case 9: // OffsetFetch case 9: // OffsetFetch

22
weed/mq/kafka/protocol/joingroup.go

@ -277,6 +277,12 @@ func (h *Handler) buildJoinGroupResponse(response JoinGroupResponse) []byte {
binary.BigEndian.PutUint32(correlationIDBytes, response.CorrelationID) binary.BigEndian.PutUint32(correlationIDBytes, response.CorrelationID)
result = append(result, correlationIDBytes...) result = append(result, correlationIDBytes...)
// JoinGroup v2 Response Format: throttle_time_ms + error_code + generation_id + ...
// Throttle time (4 bytes) - CRITICAL: This was missing!
throttleTimeBytes := make([]byte, 4)
binary.BigEndian.PutUint32(throttleTimeBytes, 0) // No throttling
result = append(result, throttleTimeBytes...)
// Error code (2 bytes) // Error code (2 bytes)
errorCodeBytes := make([]byte, 2) errorCodeBytes := make([]byte, 2)
binary.BigEndian.PutUint16(errorCodeBytes, uint16(response.ErrorCode)) binary.BigEndian.PutUint16(errorCodeBytes, uint16(response.ErrorCode))
@ -332,9 +338,6 @@ func (h *Handler) buildJoinGroupResponse(response JoinGroupResponse) []byte {
result = append(result, member.Metadata...) result = append(result, member.Metadata...)
} }
// Throttle time (4 bytes, 0 = no throttling)
result = append(result, 0, 0, 0, 0)
return result return result
} }
@ -404,13 +407,24 @@ const (
ErrorCodeInconsistentGroupProtocol int16 = 23 ErrorCodeInconsistentGroupProtocol int16 = 23
) )
func (h *Handler) handleSyncGroup(correlationID uint32, requestBody []byte) ([]byte, error) {
func (h *Handler) handleSyncGroup(correlationID uint32, apiVersion uint16, requestBody []byte) ([]byte, error) {
// DEBUG: Hex dump the request to understand format
dumpLen := len(requestBody)
if dumpLen > 100 {
dumpLen = 100
}
fmt.Printf("DEBUG: SyncGroup request hex dump (first %d bytes): %x\n", dumpLen, requestBody[:dumpLen])
// Parse SyncGroup request // Parse SyncGroup request
request, err := h.parseSyncGroupRequest(requestBody) request, err := h.parseSyncGroupRequest(requestBody)
if err != nil { if err != nil {
fmt.Printf("DEBUG: SyncGroup parseSyncGroupRequest error: %v\n", err)
return h.buildSyncGroupErrorResponse(correlationID, ErrorCodeInvalidGroupID), nil return h.buildSyncGroupErrorResponse(correlationID, ErrorCodeInvalidGroupID), nil
} }
fmt.Printf("DEBUG: SyncGroup parsed request - GroupID: '%s', MemberID: '%s', GenerationID: %d\n",
request.GroupID, request.MemberID, request.GenerationID)
// Validate request // Validate request
if request.GroupID == "" || request.MemberID == "" { if request.GroupID == "" || request.MemberID == "" {
return h.buildSyncGroupErrorResponse(correlationID, ErrorCodeInvalidGroupID), nil return h.buildSyncGroupErrorResponse(correlationID, ErrorCodeInvalidGroupID), nil

Loading…
Cancel
Save