From 2f040a0fe4dd206d677fe109ff0f205bd94c68fe Mon Sep 17 00:00:00 2001 From: chrislu Date: Sat, 13 Sep 2025 17:34:06 -0700 Subject: [PATCH] fix undefined method errors --- weed/mq/kafka/protocol/handler.go | 145 +++++++++++++++++++++++++++++- 1 file changed, 143 insertions(+), 2 deletions(-) diff --git a/weed/mq/kafka/protocol/handler.go b/weed/mq/kafka/protocol/handler.go index 3bd5e3eb4..69b63b27a 100644 --- a/weed/mq/kafka/protocol/handler.go +++ b/weed/mq/kafka/protocol/handler.go @@ -1335,8 +1335,8 @@ func (h *Handler) handleCreateTopics(correlationID uint32, apiVersion uint16, re case 0, 1: return h.handleCreateTopicsV0V1(correlationID, requestBody) case 2, 3, 4, 5: - // Use non-flexible parser for CI clients - return h.handleCreateTopicsV2To4(correlationID, requestBody) + // Flexible versions (v2+) + return h.handleCreateTopicsV2Plus(correlationID, apiVersion, requestBody) default: return nil, fmt.Errorf("unsupported CreateTopics API version: %d", apiVersion) } @@ -1635,6 +1635,147 @@ func (h *Handler) handleCreateTopicsV0V1(correlationID uint32, requestBody []byt return response, nil } +// handleCreateTopicsV2Plus handles CreateTopics API versions 2+ (flexible versions with compact arrays/strings) +// For simplicity and consistency with existing response builder, this parses the flexible request, +// converts it into the non-flexible v2-v4 body format, and reuses handleCreateTopicsV2To4 to build the response. +func (h *Handler) handleCreateTopicsV2Plus(correlationID uint32, apiVersion uint16, requestBody []byte) ([]byte, error) { + fmt.Printf("DEBUG: CreateTopics V2+ (flexible) - parsing request of %d bytes (version %d)\n", len(requestBody), apiVersion) + + offset := 0 + + // Topics (compact array) + topicsCount, consumed, err := DecodeCompactArrayLength(requestBody[offset:]) + if err != nil { + return nil, fmt.Errorf("CreateTopics v%d: decode topics compact array: %w", apiVersion, err) + } + offset += consumed + + type topicSpec struct { + name string + partitions uint32 + replication uint16 + } + topics := make([]topicSpec, 0, topicsCount) + + for i := uint32(0); i < topicsCount; i++ { + // Topic name (compact string) + name, consumed, err := DecodeFlexibleString(requestBody[offset:]) + if err != nil { + return nil, fmt.Errorf("CreateTopics v%d: decode topic[%d] name: %w", apiVersion, i, err) + } + offset += consumed + + if len(requestBody) < offset+6 { + return nil, fmt.Errorf("CreateTopics v%d: truncated partitions/replication for topic[%d]", apiVersion, i) + } + + partitions := binary.BigEndian.Uint32(requestBody[offset : offset+4]) + offset += 4 + replication := binary.BigEndian.Uint16(requestBody[offset : offset+2]) + offset += 2 + + // Configs (compact array) - skip entries + cfgCount, consumed, err := DecodeCompactArrayLength(requestBody[offset:]) + if err != nil { + return nil, fmt.Errorf("CreateTopics v%d: decode topic[%d] configs array: %w", apiVersion, i, err) + } + offset += consumed + + for j := uint32(0); j < cfgCount; j++ { + // name (compact string) + _, consumed, err := DecodeFlexibleString(requestBody[offset:]) + if err != nil { + return nil, fmt.Errorf("CreateTopics v%d: decode topic[%d] config[%d] name: %w", apiVersion, i, j, err) + } + offset += consumed + + // value (nullable compact string) + _, consumed, err = DecodeFlexibleString(requestBody[offset:]) + if err != nil { + return nil, fmt.Errorf("CreateTopics v%d: decode topic[%d] config[%d] value: %w", apiVersion, i, j, err) + } + offset += consumed + + // tagged fields for each config + _, consumed, err = DecodeTaggedFields(requestBody[offset:]) + if err != nil { + return nil, fmt.Errorf("CreateTopics v%d: decode topic[%d] config[%d] tagged fields: %w", apiVersion, i, j, err) + } + offset += consumed + } + + // Tagged fields for topic + _, consumed, err = DecodeTaggedFields(requestBody[offset:]) + if err != nil { + return nil, fmt.Errorf("CreateTopics v%d: decode topic[%d] tagged fields: %w", apiVersion, i, err) + } + offset += consumed + + topics = append(topics, topicSpec{name: name, partitions: partitions, replication: replication}) + } + + // timeout_ms (int32) + if len(requestBody) < offset+4 { + return nil, fmt.Errorf("CreateTopics v%d: missing timeout_ms", apiVersion) + } + timeoutMs := binary.BigEndian.Uint32(requestBody[offset : offset+4]) + offset += 4 + + // validate_only (boolean) + if len(requestBody) < offset+1 { + return nil, fmt.Errorf("CreateTopics v%d: missing validate_only flag", apiVersion) + } + validateOnly := requestBody[offset] != 0 + offset += 1 + + // Tagged fields (top-level) + if _, consumed, err = DecodeTaggedFields(requestBody[offset:]); err != nil { + return nil, fmt.Errorf("CreateTopics v%d: decode top-level tagged fields: %w", apiVersion, err) + } + // offset += consumed // Not needed further + + // Reconstruct a non-flexible v2-like request body and reuse existing handler + // Format: topics(ARRAY) + timeout_ms(INT32) + validate_only(BOOLEAN) + var legacyBody []byte + + // topics count (int32) + legacyBody = append(legacyBody, 0, 0, 0, byte(len(topics))) + if len(topics) > 0 { + legacyBody[len(legacyBody)-1] = byte(len(topics)) + } + + for _, t := range topics { + // topic name (STRING) + nameLen := uint16(len(t.name)) + legacyBody = append(legacyBody, byte(nameLen>>8), byte(nameLen)) + legacyBody = append(legacyBody, []byte(t.name)...) + + // num_partitions (INT32) + legacyBody = append(legacyBody, byte(t.partitions>>24), byte(t.partitions>>16), byte(t.partitions>>8), byte(t.partitions)) + + // replication_factor (INT16) + legacyBody = append(legacyBody, byte(t.replication>>8), byte(t.replication)) + + // assignments array (INT32 count = 0) + legacyBody = append(legacyBody, 0, 0, 0, 0) + + // configs array (INT32 count = 0) + legacyBody = append(legacyBody, 0, 0, 0, 0) + } + + // timeout_ms + legacyBody = append(legacyBody, byte(timeoutMs>>24), byte(timeoutMs>>16), byte(timeoutMs>>8), byte(timeoutMs)) + + // validate_only + if validateOnly { + legacyBody = append(legacyBody, 1) + } else { + legacyBody = append(legacyBody, 0) + } + + return h.handleCreateTopicsV2To4(correlationID, legacyBody) +} + func (h *Handler) handleDeleteTopics(correlationID uint32, requestBody []byte) ([]byte, error) { // Parse minimal DeleteTopics request // Request format: client_id + timeout(4) + topics_array