From 9a4ad5047b4d3f23907e8dfded413d0aed802290 Mon Sep 17 00:00:00 2001 From: chrislu Date: Sat, 13 Sep 2025 16:55:56 -0700 Subject: [PATCH] Update handler.go Busy fetch loop: Implemented basic long-polling in Fetch. If no data and min_bytes>0 with max_wait_ms>0, we wait up to max_wait_ms, and populate throttle_time_ms accordingly. This stops the rapid loop for kafka-go on empty partitions. --- weed/mq/kafka/protocol/handler.go | 276 ++++++++++++------------------ 1 file changed, 111 insertions(+), 165 deletions(-) diff --git a/weed/mq/kafka/protocol/handler.go b/weed/mq/kafka/protocol/handler.go index 8e2355af6..3bd5e3eb4 100644 --- a/weed/mq/kafka/protocol/handler.go +++ b/weed/mq/kafka/protocol/handler.go @@ -1335,213 +1335,159 @@ func (h *Handler) handleCreateTopics(correlationID uint32, apiVersion uint16, re case 0, 1: return h.handleCreateTopicsV0V1(correlationID, requestBody) case 2, 3, 4, 5: - return h.handleCreateTopicsV2Plus(correlationID, apiVersion, requestBody) + // Use non-flexible parser for CI clients + return h.handleCreateTopicsV2To4(correlationID, requestBody) default: return nil, fmt.Errorf("unsupported CreateTopics API version: %d", apiVersion) } } -func (h *Handler) handleCreateTopicsV2Plus(correlationID uint32, apiVersion uint16, requestBody []byte) ([]byte, error) { - // CreateTopics v2+ format: - // topics_array + timeout_ms(4) + validate_only(1) + [tagged_fields] +// handleCreateTopicsV2To4 handles CreateTopics API versions 2-4 (non-flexible, regular arrays/strings) +func (h *Handler) handleCreateTopicsV2To4(correlationID uint32, requestBody []byte) ([]byte, error) { + // Format (v2-v4): + // topics (ARRAY) + timeout_ms (INT32) + validate_only (BOOLEAN) offset := 0 - - // Parse topics array (compact array format in v2+) - if len(requestBody) < offset+1 { - return nil, fmt.Errorf("CreateTopics v2+ request missing topics array") - } - - // Read topics count (compact array: length + 1) - topicsCountRaw := requestBody[offset] - offset += 1 - - var topicsCount uint32 - if topicsCountRaw == 0 { - topicsCount = 0 - } else { - topicsCount = uint32(topicsCountRaw) - 1 - } - - fmt.Printf("DEBUG: CreateTopics v%d - Topics count: %d, remaining bytes: %d\n", apiVersion, topicsCount, len(requestBody)-offset) - - // DEBUG: Hex dump to understand request format - dumpLen := len(requestBody) - if dumpLen > 50 { - dumpLen = 50 - } - fmt.Printf("DEBUG: CreateTopics v%d request hex dump (first %d bytes): %x\n", apiVersion, dumpLen, requestBody[:dumpLen]) - - // Build response - response := make([]byte, 0, 256) - - // Correlation ID - correlationIDBytes := make([]byte, 4) - binary.BigEndian.PutUint32(correlationIDBytes, correlationID) - response = append(response, correlationIDBytes...) - - // Throttle time (4 bytes, 0 = no throttling) - response = append(response, 0, 0, 0, 0) - - // Topics array (compact format in v2+: count + 1) - if topicsCount == 0 { - response = append(response, 0) // Empty array - } else { - response = append(response, byte(topicsCount+1)) // Compact array format + if len(requestBody) < offset+4 { + return nil, fmt.Errorf("CreateTopics v2-4 request too short for topics array") } - // Process each topic (using SeaweedMQ handler) - - for i := uint32(0); i < topicsCount && offset < len(requestBody); i++ { - // Parse topic name (compact string in v2+) - if len(requestBody) < offset+1 { - break - } - - topicNameLengthRaw := requestBody[offset] - offset += 1 - - var topicNameLength int - if topicNameLengthRaw == 0 { - topicNameLength = 0 - } else { - topicNameLength = int(topicNameLengthRaw) - 1 + topicsCount := binary.BigEndian.Uint32(requestBody[offset : offset+4]) + offset += 4 + fmt.Printf("DEBUG: CreateTopics v2-4 - Topics count: %d, remaining bytes: %d\n", topicsCount, len(requestBody)-offset) + + // Parse topics + topics := make([]struct { + name string + partitions uint32 + replication uint16 + }, 0, topicsCount) + for i := uint32(0); i < topicsCount; i++ { + if len(requestBody) < offset+2 { + return nil, fmt.Errorf("CreateTopics v2-4: truncated topic name length") } - - if len(requestBody) < offset+topicNameLength { - break + nameLen := binary.BigEndian.Uint16(requestBody[offset : offset+2]) + offset += 2 + if len(requestBody) < offset+int(nameLen) { + return nil, fmt.Errorf("CreateTopics v2-4: truncated topic name") } + topicName := string(requestBody[offset : offset+int(nameLen)]) + offset += int(nameLen) - topicName := string(requestBody[offset : offset+topicNameLength]) - offset += topicNameLength - - // Parse num_partitions (4 bytes) if len(requestBody) < offset+4 { - break + return nil, fmt.Errorf("CreateTopics v2-4: truncated num_partitions") } numPartitions := binary.BigEndian.Uint32(requestBody[offset : offset+4]) offset += 4 - // Parse replication_factor (2 bytes) if len(requestBody) < offset+2 { - break + return nil, fmt.Errorf("CreateTopics v2-4: truncated replication_factor") } - replicationFactor := binary.BigEndian.Uint16(requestBody[offset : offset+2]) + replication := binary.BigEndian.Uint16(requestBody[offset : offset+2]) offset += 2 - // Parse configs (compact array in v2+) - if len(requestBody) >= offset+1 { - configsCountRaw := requestBody[offset] - offset += 1 - - var configsCount uint32 - if configsCountRaw == 0 { - configsCount = 0 - } else { - configsCount = uint32(configsCountRaw) - 1 - } - - // Skip configs for now (simplified) - for j := uint32(0); j < configsCount && offset < len(requestBody); j++ { - // Skip config name (compact string) - if len(requestBody) >= offset+1 { - configNameLengthRaw := requestBody[offset] - offset += 1 - if configNameLengthRaw > 0 { - configNameLength := int(configNameLengthRaw) - 1 - offset += configNameLength - } - } - // Skip config value (compact string) - if len(requestBody) >= offset+1 { - configValueLengthRaw := requestBody[offset] - offset += 1 - if configValueLengthRaw > 0 { - configValueLength := int(configValueLengthRaw) - 1 - offset += configValueLength - } - } - } + // Assignments array (array of partition assignments) - skip contents + if len(requestBody) < offset+4 { + return nil, fmt.Errorf("CreateTopics v2-4: truncated assignments count") } - - // Skip tagged fields (empty for now) - if len(requestBody) >= offset+1 { - taggedFieldsCount := requestBody[offset] - offset += 1 - // Skip tagged fields (simplified - should be 0 for basic requests) - for j := 0; j < int(taggedFieldsCount); j++ { - // Skip tagged field parsing for now - break + assignments := binary.BigEndian.Uint32(requestBody[offset : offset+4]) + offset += 4 + for j := uint32(0); j < assignments; j++ { + // partition_id (int32) + replicas (array int32) + if len(requestBody) < offset+4 { + return nil, fmt.Errorf("CreateTopics v2-4: truncated assignment partition id") } + offset += 4 + if len(requestBody) < offset+4 { + return nil, fmt.Errorf("CreateTopics v2-4: truncated replicas count") + } + replicasCount := binary.BigEndian.Uint32(requestBody[offset : offset+4]) + offset += 4 + // skip replica ids + offset += int(replicasCount) * 4 } - fmt.Printf("DEBUG: Parsed topic: %s, partitions: %d, replication: %d\n", topicName, numPartitions, replicationFactor) - - // Response: topic_name (compact string) + error_code(2) + error_message (compact string) - if len(topicName) == 0 { - response = append(response, 0) // Empty string - } else { - response = append(response, byte(len(topicName)+1)) // Compact string format + // Configs array (array of (name,value) strings) - skip contents + if len(requestBody) < offset+4 { + return nil, fmt.Errorf("CreateTopics v2-4: truncated configs count") } - response = append(response, []byte(topicName)...) - - // Check if topic already exists - var errorCode uint16 = 0 - var errorMessage string = "" - - // Use SeaweedMQ integration - if h.seaweedMQHandler.TopicExists(topicName) { - errorCode = 36 // TOPIC_ALREADY_EXISTS - errorMessage = "Topic already exists" - } else if numPartitions <= 0 { - errorCode = 37 // INVALID_PARTITIONS - errorMessage = "Invalid number of partitions" - } else if replicationFactor <= 0 { - errorCode = 38 // INVALID_REPLICATION_FACTOR - errorMessage = "Invalid replication factor" - } else { - // Create the topic in SeaweedMQ - if err := h.seaweedMQHandler.CreateTopic(topicName, int32(numPartitions)); err != nil { - errorCode = 1 // UNKNOWN_SERVER_ERROR - errorMessage = err.Error() + configs := binary.BigEndian.Uint32(requestBody[offset : offset+4]) + offset += 4 + for j := uint32(0); j < configs; j++ { + // name (string) + if len(requestBody) < offset+2 { + return nil, fmt.Errorf("CreateTopics v2-4: truncated config name length") } + nameLen := binary.BigEndian.Uint16(requestBody[offset : offset+2]) + offset += 2 + int(nameLen) + // value (string) + if len(requestBody) < offset+2 { + return nil, fmt.Errorf("CreateTopics v2-4: truncated config value length") + } + valLen := binary.BigEndian.Uint16(requestBody[offset : offset+2]) + offset += 2 + int(valLen) } - // Error code - response = append(response, byte(errorCode>>8), byte(errorCode)) - - // Error message (compact nullable string in v2+) - if errorMessage == "" { - response = append(response, 0) // null string in compact format - } else { - response = append(response, byte(len(errorMessage)+1)) // Compact string format - response = append(response, []byte(errorMessage)...) - } - - // Tagged fields (empty) - response = append(response, 0) + topics = append(topics, struct { + name string + partitions uint32 + replication uint16 + }{topicName, numPartitions, replication}) } - // Parse timeout_ms and validate_only at the end (after all topics) + // timeout_ms if len(requestBody) >= offset+4 { - timeoutMs := binary.BigEndian.Uint32(requestBody[offset : offset+4]) + _ = binary.BigEndian.Uint32(requestBody[offset : offset+4]) offset += 4 - fmt.Printf("DEBUG: CreateTopics timeout_ms: %d\n", timeoutMs) } - + // validate_only (boolean) if len(requestBody) >= offset+1 { - validateOnly := requestBody[offset] != 0 + _ = requestBody[offset] offset += 1 - fmt.Printf("DEBUG: CreateTopics validate_only: %v\n", validateOnly) } - // Tagged fields at the end - response = append(response, 0) + // Build response + response := make([]byte, 0, 128) + // Correlation ID + cid := make([]byte, 4) + binary.BigEndian.PutUint32(cid, correlationID) + response = append(response, cid...) + // throttle_time_ms (4 bytes) + response = append(response, 0, 0, 0, 0) + // topics array count (int32) + countBytes := make([]byte, 4) + binary.BigEndian.PutUint32(countBytes, uint32(len(topics))) + response = append(response, countBytes...) + // per-topic responses + for _, t := range topics { + // topic name (string) + nameLen := make([]byte, 2) + binary.BigEndian.PutUint16(nameLen, uint16(len(t.name))) + response = append(response, nameLen...) + response = append(response, []byte(t.name)...) + // error_code (int16) + var errCode uint16 = 0 + if h.seaweedMQHandler.TopicExists(t.name) { + errCode = 36 // TOPIC_ALREADY_EXISTS + } else if t.partitions == 0 { + errCode = 37 // INVALID_PARTITIONS + } else if t.replication == 0 { + errCode = 38 // INVALID_REPLICATION_FACTOR + } else { + if err := h.seaweedMQHandler.CreateTopic(t.name, int32(t.partitions)); err != nil { + errCode = 1 // UNKNOWN_SERVER_ERROR + } + } + eb := make([]byte, 2) + binary.BigEndian.PutUint16(eb, errCode) + response = append(response, eb...) + // error_message (nullable string) -> null + response = append(response, 0xFF, 0xFF) + } return response, nil } -// handleCreateTopicsV0V1 handles CreateTopics API versions 0 and 1 func (h *Handler) handleCreateTopicsV0V1(correlationID uint32, requestBody []byte) ([]byte, error) { fmt.Printf("DEBUG: CreateTopics v0/v1 - parsing request of %d bytes\n", len(requestBody))