diff --git a/weed/mq/kafka/protocol/handler.go b/weed/mq/kafka/protocol/handler.go index c19e5b612..8027aff6c 100644 --- a/weed/mq/kafka/protocol/handler.go +++ b/weed/mq/kafka/protocol/handler.go @@ -313,9 +313,10 @@ func (h *Handler) handleApiVersions(correlationID uint32) ([]byte, error) { response = append(response, 0, 3) // max version 3 // API Key 3 (Metadata): api_key(2) + min_version(2) + max_version(2) + // TEMPORARY: Force v0 only until kafka-go compatibility issue is resolved response = append(response, 0, 3) // API key 3 response = append(response, 0, 0) // min version 0 - response = append(response, 0, 1) // max version 1 + response = append(response, 0, 0) // max version 0 (force v0 for kafka-go compatibility) // API Key 2 (ListOffsets): api_key(2) + min_version(2) + max_version(2) response = append(response, 0, 2) // API key 2 @@ -480,7 +481,7 @@ func (h *Handler) HandleMetadataV0(correlationID uint32, requestBody []byte) ([] func (h *Handler) HandleMetadataV1(correlationID uint32, requestBody []byte) ([]byte, error) { // TEMPORARY: Use v0 format as base and add only the essential v1 differences // This is to debug the kafka-go parsing issue - + response := make([]byte, 0, 256) // Correlation ID @@ -571,12 +572,12 @@ func (h *Handler) HandleMetadataV1(correlationID uint32, requestBody []byte) ([] fmt.Printf("DEBUG: Metadata v1 response for %d topics: %v\n", len(topicsToReturn), topicsToReturn) fmt.Printf("DEBUG: Metadata v1 response hex dump (%d bytes): %x\n", len(response), response) - + // CRITICAL DEBUG: Let's also compare with v0 format v0Response, _ := h.HandleMetadataV0(correlationID, requestBody) fmt.Printf("DEBUG: Metadata v0 response hex dump (%d bytes): %x\n", len(v0Response), v0Response) - fmt.Printf("DEBUG: v1 vs v0 length difference: %d bytes\n", len(response) - len(v0Response)) - + fmt.Printf("DEBUG: v1 vs v0 length difference: %d bytes\n", len(response)-len(v0Response)) + return response, nil } @@ -1109,6 +1110,10 @@ func (h *Handler) handleMetadata(correlationID uint32, apiVersion uint16, reques return h.HandleMetadataV0(correlationID, requestBody) case 1: return h.HandleMetadataV1(correlationID, requestBody) + case 2, 3, 4, 5, 6: + // For now, use v1 format for higher versions (kafka-go compatibility) + // TODO: Implement proper v2-v6 formats with additional fields + return h.HandleMetadataV1(correlationID, requestBody) default: return nil, fmt.Errorf("metadata version %d not implemented yet", apiVersion) }