diff --git a/test/kafka/metadata_version_test.go b/test/kafka/metadata_version_test.go new file mode 100644 index 000000000..c35766e49 --- /dev/null +++ b/test/kafka/metadata_version_test.go @@ -0,0 +1,37 @@ +package kafka + +import ( + "fmt" + "testing" + + "github.com/seaweedfs/seaweedfs/weed/mq/kafka/protocol" +) + +func TestMetadataVersionComparison(t *testing.T) { + // Create handler + handler := protocol.NewHandler() + + // Add test topic + handler.AddTopicForTesting("test-topic", 1) + + // Set broker address + handler.SetBrokerAddress("127.0.0.1", 9092) + + // Test v0 response + v0Response, err := handler.HandleMetadataV0(12345, []byte{0, 0}) // empty client_id + empty topics + if err != nil { + t.Fatalf("v0 error: %v", err) + } + + // Test v1 response + v1Response, err := handler.HandleMetadataV1(12345, []byte{0, 0}) // empty client_id + empty topics + if err != nil { + t.Fatalf("v1 error: %v", err) + } + + fmt.Printf("Metadata v0 response (%d bytes): %x\n", len(v0Response), v0Response) + fmt.Printf("Metadata v1 response (%d bytes): %x\n", len(v1Response), v1Response) + + // Compare lengths + fmt.Printf("Length difference: v1 is %d bytes longer than v0\n", len(v1Response) - len(v0Response)) +} diff --git a/weed/mq/kafka/protocol/handler.go b/weed/mq/kafka/protocol/handler.go index 608a0ad5b..804dcde69 100644 --- a/weed/mq/kafka/protocol/handler.go +++ b/weed/mq/kafka/protocol/handler.go @@ -294,10 +294,11 @@ func (h *Handler) handleApiVersions(correlationID uint32) ([]byte, error) { response = append(response, 0, 3) // max version 3 // API Key 3 (Metadata): api_key(2) + min_version(2) + max_version(2) - // Strictly advertise v0 to ensure response matches client expectation + // Keep v0 only until v1 format issue is resolved + // TODO: Fix Metadata v1 format - kafka-go rejects our v1 response with "Unknown Topic Or Partition" response = append(response, 0, 3) // API key 3 response = append(response, 0, 0) // min version 0 - response = append(response, 0, 0) // max version 0 + response = append(response, 0, 0) // max version 0 (v1 has format issue) // API Key 2 (ListOffsets): api_key(2) + min_version(2) + max_version(2) response = append(response, 0, 2) // API key 2 @@ -367,7 +368,7 @@ func (h *Handler) handleApiVersions(correlationID uint32) ([]byte, error) { // broker: node_id(4) + host(STRING) + port(4) // topic: error_code(2) + name(STRING) + partitions(ARRAY) // partition: error_code(2) + partition_id(4) + leader(4) + replicas(ARRAY) + isr(ARRAY) -func (h *Handler) handleMetadataV0(correlationID uint32, requestBody []byte) ([]byte, error) { +func (h *Handler) HandleMetadataV0(correlationID uint32, requestBody []byte) ([]byte, error) { response := make([]byte, 0, 256) // Correlation ID @@ -454,6 +455,104 @@ func (h *Handler) handleMetadataV0(correlationID uint32, requestBody []byte) ([] return response, nil } +func (h *Handler) HandleMetadataV1(correlationID uint32, requestBody []byte) ([]byte, error) { + response := make([]byte, 0, 256) + + // Correlation ID + correlationIDBytes := make([]byte, 4) + binary.BigEndian.PutUint32(correlationIDBytes, correlationID) + response = append(response, correlationIDBytes...) + + // Brokers array length (4 bytes) - 1 broker (this gateway) + response = append(response, 0, 0, 0, 1) + + // Broker 0: node_id(4) + host(STRING) + port(4) + rack(NULLABLE_STRING) + response = append(response, 0, 0, 0, 0) // node_id = 0 + + // Use dynamic broker address set by the server + host := h.brokerHost + port := h.brokerPort + fmt.Printf("DEBUG: Advertising broker (v1) at %s:%d\n", host, port) + + // Host (STRING: 2 bytes length + bytes) + hostLen := uint16(len(host)) + response = append(response, byte(hostLen>>8), byte(hostLen)) + response = append(response, []byte(host)...) + + // Port (4 bytes) + portBytes := make([]byte, 4) + binary.BigEndian.PutUint32(portBytes, uint32(port)) + response = append(response, portBytes...) + + // Rack (NULLABLE_STRING) - null (-1 length, 2 bytes) + response = append(response, 0xFF, 0xFF) + + // Cluster ID (NULLABLE_STRING) - null (-1 length, 2 bytes) + response = append(response, 0xFF, 0xFF) + + // Controller ID (4 bytes) - -1 (no controller) + response = append(response, 0xFF, 0xFF, 0xFF, 0xFF) + + // Parse requested topics (empty means all) + requestedTopics := h.parseMetadataTopics(requestBody) + fmt.Printf("DEBUG: 🔍 METADATA v1 REQUEST - Requested: %v (empty=all)\n", requestedTopics) + + // Determine topics to return + h.topicsMu.RLock() + var topicsToReturn []string + if len(requestedTopics) == 0 { + topicsToReturn = make([]string, 0, len(h.topics)) + for name := range h.topics { + topicsToReturn = append(topicsToReturn, name) + } + } else { + for _, name := range requestedTopics { + if _, exists := h.topics[name]; exists { + topicsToReturn = append(topicsToReturn, name) + } + } + } + h.topicsMu.RUnlock() + + // Topics array length (4 bytes) + topicsCountBytes := make([]byte, 4) + binary.BigEndian.PutUint32(topicsCountBytes, uint32(len(topicsToReturn))) + response = append(response, topicsCountBytes...) + + // Topic entries (same format as v0) + for _, topicName := range topicsToReturn { + // error_code(2) = 0 + response = append(response, 0, 0) + + // name (STRING) + nameBytes := []byte(topicName) + nameLen := uint16(len(nameBytes)) + response = append(response, byte(nameLen>>8), byte(nameLen)) + response = append(response, nameBytes...) + + // is_internal(1) = false (v1 addition) + response = append(response, 0) + + // partitions array length (4 bytes) - 1 partition + response = append(response, 0, 0, 0, 1) + + // partition: error_code(2) + partition_id(4) + leader(4) + response = append(response, 0, 0) // error_code + response = append(response, 0, 0, 0, 0) // partition_id = 0 + response = append(response, 0, 0, 0, 0) // leader = 0 (this broker) + + // replicas: array length(4) + one broker id (0) + response = append(response, 0, 0, 0, 1) + response = append(response, 0, 0, 0, 0) + + // isr: array length(4) + one broker id (0) + response = append(response, 0, 0, 0, 1) + response = append(response, 0, 0, 0, 0) + } + + fmt.Printf("DEBUG: Metadata v1 response for %d topics: %v\n", len(topicsToReturn), topicsToReturn) + return response, nil +} func (h *Handler) parseMetadataTopics(requestBody []byte) []string { // Parse Metadata request to extract requested topics @@ -921,9 +1020,9 @@ func (h *Handler) handleDeleteTopics(correlationID uint32, requestBody []byte) ( func (h *Handler) validateAPIVersion(apiKey, apiVersion uint16) error { supportedVersions := map[uint16][2]uint16{ 18: {0, 3}, // ApiVersions: v0-v3 - 3: {0, 0}, // Metadata: only v0 for now + 3: {0, 0}, // Metadata: v0 only (v1 has format issue) 0: {0, 1}, // Produce: v0-v1 - 1: {0, 1}, // Fetch: v0-v1 + 1: {0, 1}, // Fetch: v0-v1 2: {0, 5}, // ListOffsets: v0-v5 19: {0, 4}, // CreateTopics: v0-v4 20: {0, 4}, // DeleteTopics: v0-v4 @@ -938,33 +1037,33 @@ func (h *Handler) validateAPIVersion(apiKey, apiVersion uint16) error { if versionRange, exists := supportedVersions[apiKey]; exists { minVer, maxVer := versionRange[0], versionRange[1] if apiVersion < minVer || apiVersion > maxVer { - return fmt.Errorf("unsupported API version %d for API key %d (supported: %d-%d)", + return fmt.Errorf("unsupported API version %d for API key %d (supported: %d-%d)", apiVersion, apiKey, minVer, maxVer) } return nil } - + return fmt.Errorf("unsupported API key: %d", apiKey) } // buildUnsupportedVersionResponse creates a proper Kafka error response func (h *Handler) buildUnsupportedVersionResponse(correlationID uint32, apiKey, apiVersion uint16) ([]byte, error) { response := make([]byte, 0, 16) - + // Correlation ID correlationIDBytes := make([]byte, 4) binary.BigEndian.PutUint32(correlationIDBytes, correlationID) response = append(response, correlationIDBytes...) - + // Error code: UNSUPPORTED_VERSION (35) response = append(response, 0, 35) - + // Error message errorMsg := fmt.Sprintf("Unsupported version %d for API key %d", apiVersion, apiKey) errorMsgLen := uint16(len(errorMsg)) response = append(response, byte(errorMsgLen>>8), byte(errorMsgLen)) response = append(response, []byte(errorMsg)...) - + return response, nil } @@ -972,7 +1071,9 @@ func (h *Handler) buildUnsupportedVersionResponse(correlationID uint32, apiKey, func (h *Handler) handleMetadata(correlationID uint32, apiVersion uint16, requestBody []byte) ([]byte, error) { switch apiVersion { case 0: - return h.handleMetadataV0(correlationID, requestBody) + return h.HandleMetadataV0(correlationID, requestBody) + case 1: + return h.HandleMetadataV1(correlationID, requestBody) default: return nil, fmt.Errorf("metadata version %d not implemented yet", apiVersion) } diff --git a/weed/mq/kafka/protocol/produce.go b/weed/mq/kafka/protocol/produce.go index 1839a0bd0..671c88656 100644 --- a/weed/mq/kafka/protocol/produce.go +++ b/weed/mq/kafka/protocol/produce.go @@ -19,7 +19,7 @@ func (h *Handler) handleProduce(correlationID uint32, apiVersion uint16, request func (h *Handler) handleProduceV0V1(correlationID uint32, apiVersion uint16, requestBody []byte) ([]byte, error) { // DEBUG: Show version being handled fmt.Printf("DEBUG: Handling Produce v%d request\n", apiVersion) - + // DEBUG: Hex dump first 50 bytes to understand actual request format dumpLen := len(requestBody) if dumpLen > 50 {