Browse Source

fix undefined method errors

pull/7231/head
chrislu 2 months ago
parent
commit
2f040a0fe4
  1. 145
      weed/mq/kafka/protocol/handler.go

145
weed/mq/kafka/protocol/handler.go

@ -1335,8 +1335,8 @@ func (h *Handler) handleCreateTopics(correlationID uint32, apiVersion uint16, re
case 0, 1: case 0, 1:
return h.handleCreateTopicsV0V1(correlationID, requestBody) return h.handleCreateTopicsV0V1(correlationID, requestBody)
case 2, 3, 4, 5: case 2, 3, 4, 5:
// Use non-flexible parser for CI clients
return h.handleCreateTopicsV2To4(correlationID, requestBody)
// Flexible versions (v2+)
return h.handleCreateTopicsV2Plus(correlationID, apiVersion, requestBody)
default: default:
return nil, fmt.Errorf("unsupported CreateTopics API version: %d", apiVersion) return nil, fmt.Errorf("unsupported CreateTopics API version: %d", apiVersion)
} }
@ -1635,6 +1635,147 @@ func (h *Handler) handleCreateTopicsV0V1(correlationID uint32, requestBody []byt
return response, nil return response, nil
} }
// handleCreateTopicsV2Plus handles CreateTopics API versions 2+ (flexible versions with compact arrays/strings)
// For simplicity and consistency with existing response builder, this parses the flexible request,
// converts it into the non-flexible v2-v4 body format, and reuses handleCreateTopicsV2To4 to build the response.
func (h *Handler) handleCreateTopicsV2Plus(correlationID uint32, apiVersion uint16, requestBody []byte) ([]byte, error) {
fmt.Printf("DEBUG: CreateTopics V2+ (flexible) - parsing request of %d bytes (version %d)\n", len(requestBody), apiVersion)
offset := 0
// Topics (compact array)
topicsCount, consumed, err := DecodeCompactArrayLength(requestBody[offset:])
if err != nil {
return nil, fmt.Errorf("CreateTopics v%d: decode topics compact array: %w", apiVersion, err)
}
offset += consumed
type topicSpec struct {
name string
partitions uint32
replication uint16
}
topics := make([]topicSpec, 0, topicsCount)
for i := uint32(0); i < topicsCount; i++ {
// Topic name (compact string)
name, consumed, err := DecodeFlexibleString(requestBody[offset:])
if err != nil {
return nil, fmt.Errorf("CreateTopics v%d: decode topic[%d] name: %w", apiVersion, i, err)
}
offset += consumed
if len(requestBody) < offset+6 {
return nil, fmt.Errorf("CreateTopics v%d: truncated partitions/replication for topic[%d]", apiVersion, i)
}
partitions := binary.BigEndian.Uint32(requestBody[offset : offset+4])
offset += 4
replication := binary.BigEndian.Uint16(requestBody[offset : offset+2])
offset += 2
// Configs (compact array) - skip entries
cfgCount, consumed, err := DecodeCompactArrayLength(requestBody[offset:])
if err != nil {
return nil, fmt.Errorf("CreateTopics v%d: decode topic[%d] configs array: %w", apiVersion, i, err)
}
offset += consumed
for j := uint32(0); j < cfgCount; j++ {
// name (compact string)
_, consumed, err := DecodeFlexibleString(requestBody[offset:])
if err != nil {
return nil, fmt.Errorf("CreateTopics v%d: decode topic[%d] config[%d] name: %w", apiVersion, i, j, err)
}
offset += consumed
// value (nullable compact string)
_, consumed, err = DecodeFlexibleString(requestBody[offset:])
if err != nil {
return nil, fmt.Errorf("CreateTopics v%d: decode topic[%d] config[%d] value: %w", apiVersion, i, j, err)
}
offset += consumed
// tagged fields for each config
_, consumed, err = DecodeTaggedFields(requestBody[offset:])
if err != nil {
return nil, fmt.Errorf("CreateTopics v%d: decode topic[%d] config[%d] tagged fields: %w", apiVersion, i, j, err)
}
offset += consumed
}
// Tagged fields for topic
_, consumed, err = DecodeTaggedFields(requestBody[offset:])
if err != nil {
return nil, fmt.Errorf("CreateTopics v%d: decode topic[%d] tagged fields: %w", apiVersion, i, err)
}
offset += consumed
topics = append(topics, topicSpec{name: name, partitions: partitions, replication: replication})
}
// timeout_ms (int32)
if len(requestBody) < offset+4 {
return nil, fmt.Errorf("CreateTopics v%d: missing timeout_ms", apiVersion)
}
timeoutMs := binary.BigEndian.Uint32(requestBody[offset : offset+4])
offset += 4
// validate_only (boolean)
if len(requestBody) < offset+1 {
return nil, fmt.Errorf("CreateTopics v%d: missing validate_only flag", apiVersion)
}
validateOnly := requestBody[offset] != 0
offset += 1
// Tagged fields (top-level)
if _, consumed, err = DecodeTaggedFields(requestBody[offset:]); err != nil {
return nil, fmt.Errorf("CreateTopics v%d: decode top-level tagged fields: %w", apiVersion, err)
}
// offset += consumed // Not needed further
// Reconstruct a non-flexible v2-like request body and reuse existing handler
// Format: topics(ARRAY) + timeout_ms(INT32) + validate_only(BOOLEAN)
var legacyBody []byte
// topics count (int32)
legacyBody = append(legacyBody, 0, 0, 0, byte(len(topics)))
if len(topics) > 0 {
legacyBody[len(legacyBody)-1] = byte(len(topics))
}
for _, t := range topics {
// topic name (STRING)
nameLen := uint16(len(t.name))
legacyBody = append(legacyBody, byte(nameLen>>8), byte(nameLen))
legacyBody = append(legacyBody, []byte(t.name)...)
// num_partitions (INT32)
legacyBody = append(legacyBody, byte(t.partitions>>24), byte(t.partitions>>16), byte(t.partitions>>8), byte(t.partitions))
// replication_factor (INT16)
legacyBody = append(legacyBody, byte(t.replication>>8), byte(t.replication))
// assignments array (INT32 count = 0)
legacyBody = append(legacyBody, 0, 0, 0, 0)
// configs array (INT32 count = 0)
legacyBody = append(legacyBody, 0, 0, 0, 0)
}
// timeout_ms
legacyBody = append(legacyBody, byte(timeoutMs>>24), byte(timeoutMs>>16), byte(timeoutMs>>8), byte(timeoutMs))
// validate_only
if validateOnly {
legacyBody = append(legacyBody, 1)
} else {
legacyBody = append(legacyBody, 0)
}
return h.handleCreateTopicsV2To4(correlationID, legacyBody)
}
func (h *Handler) handleDeleteTopics(correlationID uint32, requestBody []byte) ([]byte, error) { func (h *Handler) handleDeleteTopics(correlationID uint32, requestBody []byte) ([]byte, error) {
// Parse minimal DeleteTopics request // Parse minimal DeleteTopics request
// Request format: client_id + timeout(4) + topics_array // Request format: client_id + timeout(4) + topics_array

Loading…
Cancel
Save