|
|
|
@ -1325,6 +1325,7 @@ func (h *Handler) handleListOffsets(correlationID uint32, apiVersion uint16, req |
|
|
|
|
|
|
|
func (h *Handler) handleCreateTopics(correlationID uint32, apiVersion uint16, requestBody []byte) ([]byte, error) { |
|
|
|
fmt.Printf("DEBUG: *** CREATETOPICS REQUEST RECEIVED *** Correlation: %d, Version: %d\n", correlationID, apiVersion) |
|
|
|
fmt.Printf("DEBUG: CreateTopics - Request body size: %d bytes\n", len(requestBody)) |
|
|
|
|
|
|
|
if len(requestBody) < 2 { |
|
|
|
return nil, fmt.Errorf("CreateTopics request too short") |
|
|
|
@ -1333,13 +1334,22 @@ func (h *Handler) handleCreateTopics(correlationID uint32, apiVersion uint16, re |
|
|
|
// Parse based on API version
|
|
|
|
switch apiVersion { |
|
|
|
case 0, 1: |
|
|
|
return h.handleCreateTopicsV0V1(correlationID, requestBody) |
|
|
|
fmt.Printf("DEBUG: CreateTopics - Routing to v0/v1 handler\n") |
|
|
|
response, err := h.handleCreateTopicsV0V1(correlationID, requestBody) |
|
|
|
fmt.Printf("DEBUG: CreateTopics - v0/v1 handler returned, response size: %d bytes, err: %v\n", len(response), err) |
|
|
|
return response, err |
|
|
|
case 2, 3, 4: |
|
|
|
// kafka-go sends v2-4 in regular format, not compact
|
|
|
|
return h.handleCreateTopicsV2To4(correlationID, requestBody) |
|
|
|
fmt.Printf("DEBUG: CreateTopics - Routing to v2-4 handler\n") |
|
|
|
response, err := h.handleCreateTopicsV2To4(correlationID, requestBody) |
|
|
|
fmt.Printf("DEBUG: CreateTopics - v2-4 handler returned, response size: %d bytes, err: %v\n", len(response), err) |
|
|
|
return response, err |
|
|
|
case 5: |
|
|
|
// v5+ uses flexible format with compact arrays
|
|
|
|
return h.handleCreateTopicsV2Plus(correlationID, apiVersion, requestBody) |
|
|
|
fmt.Printf("DEBUG: CreateTopics - Routing to v5+ handler\n") |
|
|
|
response, err := h.handleCreateTopicsV2Plus(correlationID, apiVersion, requestBody) |
|
|
|
fmt.Printf("DEBUG: CreateTopics - v5+ handler returned, response size: %d bytes, err: %v\n", len(response), err) |
|
|
|
return response, err |
|
|
|
default: |
|
|
|
return nil, fmt.Errorf("unsupported CreateTopics API version: %d", apiVersion) |
|
|
|
} |
|
|
|
@ -1371,7 +1381,9 @@ func (h *Handler) handleCreateTopicsV2To4(correlationID uint32, requestBody []by |
|
|
|
if isCompactFormat { |
|
|
|
fmt.Printf("DEBUG: CreateTopics v2-4 - Detected compact format\n") |
|
|
|
// Delegate to the compact format handler
|
|
|
|
return h.handleCreateTopicsV2Plus(correlationID, 2, requestBody) |
|
|
|
response, err := h.handleCreateTopicsV2Plus(correlationID, 2, requestBody) |
|
|
|
fmt.Printf("DEBUG: CreateTopics v2-4 - Compact format handler returned, response size: %d bytes, err: %v\n", len(response), err) |
|
|
|
return response, err |
|
|
|
} |
|
|
|
|
|
|
|
fmt.Printf("DEBUG: CreateTopics v2-4 - Detected regular format\n") |
|
|
|
@ -1517,6 +1529,7 @@ func (h *Handler) handleCreateTopicsV2To4(correlationID uint32, requestBody []by |
|
|
|
response = append(response, 0xFF, 0xFF) |
|
|
|
} |
|
|
|
|
|
|
|
fmt.Printf("DEBUG: CreateTopics v2-4 - Regular format handler completed, response size: %d bytes\n", len(response)) |
|
|
|
return response, nil |
|
|
|
} |
|
|
|
|
|
|
|
|