diff --git a/weed/mq/kafka/protocol/handler.go b/weed/mq/kafka/protocol/handler.go index ffa4e06e9..608a0ad5b 100644 --- a/weed/mq/kafka/protocol/handler.go +++ b/weed/mq/kafka/protocol/handler.go @@ -189,12 +189,23 @@ func (h *Handler) HandleConn(conn net.Conn) error { fmt.Printf("DEBUG: API %d (%s) v%d - Correlation: %d, Size: %d\n", apiKey, apiName, apiVersion, correlationID, size) - // TODO: IMPORTANT - API version validation is missing - // Different API versions have different request/response formats - // Need to validate apiVersion against supported versions for each API - // Currently ignoring apiVersion completely which may cause parsing errors + // Validate API version against what we support + if err := h.validateAPIVersion(apiKey, apiVersion); err != nil { + // Return proper Kafka error response for unsupported version + response, writeErr := h.buildUnsupportedVersionResponse(correlationID, apiKey, apiVersion) + if writeErr != nil { + return fmt.Errorf("build error response: %w", writeErr) + } + // Send error response and continue to next request + responseSizeBytes := make([]byte, 4) + binary.BigEndian.PutUint32(responseSizeBytes, uint32(len(response))) + w.Write(responseSizeBytes) + w.Write(response) + w.Flush() + continue + } - // Handle the request based on API key + // Handle the request based on API key and version var response []byte var err error @@ -202,8 +213,7 @@ func (h *Handler) HandleConn(conn net.Conn) error { case 18: // ApiVersions response, err = h.handleApiVersions(correlationID) case 3: // Metadata - // For now, serve Metadata v0 to avoid version mismatches - response, err = h.handleMetadataV0(correlationID, messageBuf[8:]) + response, err = h.handleMetadata(correlationID, apiVersion, messageBuf[8:]) case 2: // ListOffsets response, err = h.handleListOffsets(correlationID, messageBuf[8:]) // skip header case 19: // CreateTopics @@ -212,7 +222,7 @@ func (h *Handler) HandleConn(conn net.Conn) error { response, err = h.handleDeleteTopics(correlationID, messageBuf[8:]) // skip header case 0: // Produce fmt.Printf("DEBUG: *** PRODUCE REQUEST RECEIVED *** Correlation: %d\n", correlationID) - response, err = h.handleProduce(correlationID, messageBuf[8:]) // skip header + response, err = h.handleProduce(correlationID, apiVersion, messageBuf[8:]) case 1: // Fetch response, err = h.handleFetch(correlationID, messageBuf[8:]) // skip header case 11: // JoinGroup @@ -444,139 +454,6 @@ func (h *Handler) handleMetadataV0(correlationID uint32, requestBody []byte) ([] return response, nil } -func (h *Handler) handleMetadata(correlationID uint32, requestBody []byte) ([]byte, error) { - // Parse Metadata request to extract requested topics and auto-create them - // This implements auto.create.topics.enable=true behavior - // Request format: client_id + topics_array (if topics_count > 0) - // Response format: correlation_id(4) + throttle_time(4) + brokers + cluster_id + controller_id + topics - - response := make([]byte, 0, 256) - - // Correlation ID - correlationIDBytes := make([]byte, 4) - binary.BigEndian.PutUint32(correlationIDBytes, correlationID) - response = append(response, correlationIDBytes...) - - // Throttle time (4 bytes, 0 = no throttling) - response = append(response, 0, 0, 0, 0) - - // Brokers array length (4 bytes) - 1 broker (this gateway) - response = append(response, 0, 0, 0, 1) - - // Broker 0: node_id(4) + host + port(4) + rack - response = append(response, 0, 0, 0, 0) // node_id = 0 - - // Use dynamic broker address set by the server - host := h.brokerHost - port := h.brokerPort - - fmt.Printf("DEBUG: Advertising broker at %s:%d\n", host, port) - - response = append(response, 0, byte(len(host))) - response = append(response, []byte(host)...) - - // Port (4 bytes) - Use actual gateway port - portBytes := make([]byte, 4) - binary.BigEndian.PutUint32(portBytes, uint32(port)) - response = append(response, portBytes...) - - // Rack - nullable string, using null (-1 length) - response = append(response, 0xFF, 0xFF) // null rack - - // Cluster ID - nullable string, using null - response = append(response, 0xFF, 0xFF) // null cluster_id - - // Controller ID (4 bytes) - -1 (no controller) - response = append(response, 0xFF, 0xFF, 0xFF, 0xFF) - - // TEMP: Removed v7+ fields to test with Metadata v1 - // Cluster authorized operations removed for v1 compatibility - - // Parse topics from request (for metadata discovery) - requestedTopics := h.parseMetadataTopics(requestBody) - fmt.Printf("DEBUG: 🔍 METADATA REQUEST - Requested topics: %v (empty=all topics)\n", requestedTopics) - - // Build topics array response - return existing topics only - h.topicsMu.RLock() - - // Debug: Show all available topics - availableTopics := make([]string, 0, len(h.topics)) - for topicName := range h.topics { - availableTopics = append(availableTopics, topicName) - } - fmt.Printf("DEBUG: 📋 AVAILABLE TOPICS: %v\n", availableTopics) - - var topicsToReturn []string - if len(requestedTopics) == 0 { - // If no specific topics requested, return all existing topics - for topicName := range h.topics { - topicsToReturn = append(topicsToReturn, topicName) - } - fmt.Printf("DEBUG: 📤 RETURNING all existing topics: %v\n", topicsToReturn) - } else { - // Return only requested topics that exist - fmt.Printf("DEBUG: 🔍 CHECKING requested topics: %v\n", requestedTopics) - for _, topicName := range requestedTopics { - if _, exists := h.topics[topicName]; exists { - topicsToReturn = append(topicsToReturn, topicName) - fmt.Printf("DEBUG: ✅ Found requested topic: '%s'\n", topicName) - } else { - fmt.Printf("DEBUG: ❌ Topic NOT FOUND: '%s'\n", topicName) - } - } - fmt.Printf("DEBUG: 📤 RETURNING requested existing topics: %v\n", topicsToReturn) - } - h.topicsMu.RUnlock() - - // Topics array length (4 bytes) - topicsCountBytes := make([]byte, 4) - binary.BigEndian.PutUint32(topicsCountBytes, uint32(len(topicsToReturn))) - response = append(response, topicsCountBytes...) - - // Build each topic response - for _, topicName := range topicsToReturn { - // fmt.Printf("DEBUG: Building topic response for: '%s' (length: %d)\n", topicName, len(topicName)) - - // Topic error code (2 bytes) - 0 = no error - response = append(response, 0, 0) - - // Topic name - topicNameBytes := []byte(topicName) - topicNameLen := make([]byte, 2) - binary.BigEndian.PutUint16(topicNameLen, uint16(len(topicNameBytes))) - response = append(response, topicNameLen...) - response = append(response, topicNameBytes...) - - // TEMP: Removed v7+ fields for v1 compatibility - // Topic UUID and is_internal_topic removed - - // Partitions array length (4 bytes) - 1 partition - response = append(response, 0, 0, 0, 1) - // fmt.Printf("DEBUG: Added partitions count: 1\n") - - // Partition 0: error_code(2) + partition_id(4) + leader_id(4) + replicas + isr - response = append(response, 0, 0) // no error - response = append(response, 0, 0, 0, 0) // partition_id = 0 - response = append(response, 0, 0, 0, 0) // leader_id = 0 (this broker) - - // Replicas array: length(4) + broker_ids - response = append(response, 0, 0, 0, 1) // replicas count = 1 - response = append(response, 0, 0, 0, 0) // replica broker_id = 0 - - // ISR (In-Sync Replicas) array: length(4) + broker_ids - response = append(response, 0, 0, 0, 1) // isr count = 1 - response = append(response, 0, 0, 0, 0) // isr broker_id = 0 - - // Debug: Show detailed partition info - fmt.Printf("DEBUG: Partition 0 - leader_id=0, replicas=[0], isr=[0]\n") - - // TEMP: Removed v7+ topic authorized operations for v1 compatibility - } - - fmt.Printf("DEBUG: Metadata response for %d topics: %v\n", len(topicsToReturn), topicsToReturn) - fmt.Printf("DEBUG: Metadata response full hex dump (%d bytes): %x\n", len(response), response) - return response, nil -} func (h *Handler) parseMetadataTopics(requestBody []byte) []string { // Parse Metadata request to extract requested topics @@ -1040,6 +917,67 @@ func (h *Handler) handleDeleteTopics(correlationID uint32, requestBody []byte) ( return response, nil } +// validateAPIVersion checks if we support the requested API version +func (h *Handler) validateAPIVersion(apiKey, apiVersion uint16) error { + supportedVersions := map[uint16][2]uint16{ + 18: {0, 3}, // ApiVersions: v0-v3 + 3: {0, 0}, // Metadata: only v0 for now + 0: {0, 1}, // Produce: v0-v1 + 1: {0, 1}, // Fetch: v0-v1 + 2: {0, 5}, // ListOffsets: v0-v5 + 19: {0, 4}, // CreateTopics: v0-v4 + 20: {0, 4}, // DeleteTopics: v0-v4 + 11: {0, 7}, // JoinGroup: v0-v7 + 14: {0, 5}, // SyncGroup: v0-v5 + 8: {0, 8}, // OffsetCommit: v0-v8 + 9: {0, 8}, // OffsetFetch: v0-v8 + 12: {0, 4}, // Heartbeat: v0-v4 + 13: {0, 4}, // LeaveGroup: v0-v4 + } + + if versionRange, exists := supportedVersions[apiKey]; exists { + minVer, maxVer := versionRange[0], versionRange[1] + if apiVersion < minVer || apiVersion > maxVer { + return fmt.Errorf("unsupported API version %d for API key %d (supported: %d-%d)", + apiVersion, apiKey, minVer, maxVer) + } + return nil + } + + return fmt.Errorf("unsupported API key: %d", apiKey) +} + +// buildUnsupportedVersionResponse creates a proper Kafka error response +func (h *Handler) buildUnsupportedVersionResponse(correlationID uint32, apiKey, apiVersion uint16) ([]byte, error) { + response := make([]byte, 0, 16) + + // Correlation ID + correlationIDBytes := make([]byte, 4) + binary.BigEndian.PutUint32(correlationIDBytes, correlationID) + response = append(response, correlationIDBytes...) + + // Error code: UNSUPPORTED_VERSION (35) + response = append(response, 0, 35) + + // Error message + errorMsg := fmt.Sprintf("Unsupported version %d for API key %d", apiVersion, apiKey) + errorMsgLen := uint16(len(errorMsg)) + response = append(response, byte(errorMsgLen>>8), byte(errorMsgLen)) + response = append(response, []byte(errorMsg)...) + + return response, nil +} + +// handleMetadata routes to the appropriate version-specific handler +func (h *Handler) handleMetadata(correlationID uint32, apiVersion uint16, requestBody []byte) ([]byte, error) { + switch apiVersion { + case 0: + return h.handleMetadataV0(correlationID, requestBody) + default: + return nil, fmt.Errorf("metadata version %d not implemented yet", apiVersion) + } +} + // getAPIName returns a human-readable name for Kafka API keys (for debugging) func getAPIName(apiKey uint16) string { switch apiKey { diff --git a/weed/mq/kafka/protocol/produce.go b/weed/mq/kafka/protocol/produce.go index f923f51b2..1839a0bd0 100644 --- a/weed/mq/kafka/protocol/produce.go +++ b/weed/mq/kafka/protocol/produce.go @@ -6,7 +6,20 @@ import ( "time" ) -func (h *Handler) handleProduce(correlationID uint32, requestBody []byte) ([]byte, error) { +func (h *Handler) handleProduce(correlationID uint32, apiVersion uint16, requestBody []byte) ([]byte, error) { + // Version-specific handling + switch apiVersion { + case 0, 1: + return h.handleProduceV0V1(correlationID, apiVersion, requestBody) + default: + return nil, fmt.Errorf("produce version %d not implemented yet", apiVersion) + } +} + +func (h *Handler) handleProduceV0V1(correlationID uint32, apiVersion uint16, requestBody []byte) ([]byte, error) { + // DEBUG: Show version being handled + fmt.Printf("DEBUG: Handling Produce v%d request\n", apiVersion) + // DEBUG: Hex dump first 50 bytes to understand actual request format dumpLen := len(requestBody) if dumpLen > 50 { @@ -14,7 +27,7 @@ func (h *Handler) handleProduce(correlationID uint32, requestBody []byte) ([]byt } fmt.Printf("DEBUG: Produce request hex dump (first %d bytes): %x\n", dumpLen, requestBody[:dumpLen]) - // Parse minimal Produce request + // Parse Produce v0/v1 request // Request format: client_id + acks(2) + timeout(4) + topics_array if len(requestBody) < 8 { // client_id_size(2) + acks(2) + timeout(4) @@ -24,11 +37,11 @@ func (h *Handler) handleProduce(correlationID uint32, requestBody []byte) ([]byt // Skip client_id clientIDSize := binary.BigEndian.Uint16(requestBody[0:2]) fmt.Printf("DEBUG: Client ID size: %d\n", clientIDSize) - + if len(requestBody) < 2+int(clientIDSize) { return nil, fmt.Errorf("Produce request client_id too short") } - + clientID := string(requestBody[2 : 2+int(clientIDSize)]) offset := 2 + int(clientIDSize) fmt.Printf("DEBUG: Client ID: '%s', offset after client_id: %d\n", clientID, offset) @@ -41,7 +54,7 @@ func (h *Handler) handleProduce(correlationID uint32, requestBody []byte) ([]byt acks := int16(binary.BigEndian.Uint16(requestBody[offset : offset+2])) offset += 2 fmt.Printf("DEBUG: Acks: %d, offset after acks: %d\n", acks, offset) - + timeout := binary.BigEndian.Uint32(requestBody[offset : offset+4]) offset += 4 fmt.Printf("DEBUG: Timeout: %d, offset after timeout: %d\n", timeout, offset) @@ -83,13 +96,13 @@ func (h *Handler) handleProduce(correlationID uint32, requestBody []byte) ([]byt // Parse partitions count partitionsCount := binary.BigEndian.Uint32(requestBody[offset : offset+4]) offset += 4 - + fmt.Printf("DEBUG: Produce request for topic '%s' (%d partitions)\n", topicName, partitionsCount) // Check if topic exists, auto-create if it doesn't (simulates auto.create.topics.enable=true) h.topicsMu.Lock() _, topicExists := h.topics[topicName] - + // Debug: show all existing topics existingTopics := make([]string, 0, len(h.topics)) for tName := range h.topics { @@ -105,7 +118,7 @@ func (h *Handler) handleProduce(correlationID uint32, requestBody []byte) ([]byt } // Initialize ledger for partition 0 h.GetOrCreateLedger(topicName, 0) - topicExists = true // CRITICAL FIX: Update the flag after creating the topic + topicExists = true // CRITICAL FIX: Update the flag after creating the topic fmt.Printf("DEBUG: Topic '%s' auto-created successfully, topicExists = %v\n", topicName, topicExists) } h.topicsMu.Unlock() @@ -212,7 +225,7 @@ func (h *Handler) handleProduce(correlationID uint32, requestBody []byte) ([]byt // TODO: CRITICAL - This is a simplified parser that needs complete rewrite for protocol compatibility // Missing: // - Proper record batch format parsing (v0, v1, v2) -// - Compression support (gzip, snappy, lz4, zstd) +// - Compression support (gzip, snappy, lz4, zstd) // - CRC32 validation // - Transaction markers and control records // - Individual record extraction (key, value, headers, timestamps) @@ -266,7 +279,7 @@ func (h *Handler) produceToSeaweedMQ(topic string, partition int32, recordSetDat // extractFirstRecord extracts the first record from a Kafka record set (simplified) // TODO: CRITICAL - This function returns placeholder data instead of parsing real records // For real client compatibility, need to: -// - Parse record batch header properly +// - Parse record batch header properly // - Extract actual key/value from first record in batch // - Handle compressed record batches // - Support all record formats (v0, v1, v2)