You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
402 lines
10 KiB
402 lines
10 KiB
package protocol
|
|
|
|
import (
|
|
"encoding/binary"
|
|
"fmt"
|
|
)
|
|
|
|
// FlexibleVersions provides utilities for handling Kafka flexible versions protocol
|
|
// Flexible versions use compact arrays/strings and tagged fields for backward compatibility
|
|
|
|
// CompactArrayLength encodes a length for compact arrays
|
|
// Compact arrays encode length as length+1, where 0 means empty array
|
|
func CompactArrayLength(length uint32) []byte {
|
|
if length == 0 {
|
|
return []byte{0} // Empty array
|
|
}
|
|
return EncodeUvarint(length + 1)
|
|
}
|
|
|
|
// DecodeCompactArrayLength decodes a compact array length
|
|
// Returns the actual length and number of bytes consumed
|
|
func DecodeCompactArrayLength(data []byte) (uint32, int, error) {
|
|
if len(data) == 0 {
|
|
return 0, 0, fmt.Errorf("no data for compact array length")
|
|
}
|
|
|
|
if data[0] == 0 {
|
|
return 0, 1, nil // Empty array
|
|
}
|
|
|
|
length, consumed, err := DecodeUvarint(data)
|
|
if err != nil {
|
|
return 0, 0, fmt.Errorf("decode compact array length: %w", err)
|
|
}
|
|
|
|
if length == 0 {
|
|
return 0, consumed, fmt.Errorf("invalid compact array length encoding")
|
|
}
|
|
|
|
return length - 1, consumed, nil
|
|
}
|
|
|
|
// CompactStringLength encodes a length for compact strings
|
|
// Compact strings encode length as length+1, where 0 means null string
|
|
func CompactStringLength(length int) []byte {
|
|
if length < 0 {
|
|
return []byte{0} // Null string
|
|
}
|
|
return EncodeUvarint(uint32(length + 1))
|
|
}
|
|
|
|
// DecodeCompactStringLength decodes a compact string length
|
|
// Returns the actual length (-1 for null), and number of bytes consumed
|
|
func DecodeCompactStringLength(data []byte) (int, int, error) {
|
|
if len(data) == 0 {
|
|
return 0, 0, fmt.Errorf("no data for compact string length")
|
|
}
|
|
|
|
if data[0] == 0 {
|
|
return -1, 1, nil // Null string
|
|
}
|
|
|
|
length, consumed, err := DecodeUvarint(data)
|
|
if err != nil {
|
|
return 0, 0, fmt.Errorf("decode compact string length: %w", err)
|
|
}
|
|
|
|
if length == 0 {
|
|
return 0, consumed, fmt.Errorf("invalid compact string length encoding")
|
|
}
|
|
|
|
return int(length - 1), consumed, nil
|
|
}
|
|
|
|
// EncodeUvarint encodes an unsigned integer using variable-length encoding
|
|
// This is used for compact arrays, strings, and tagged fields
|
|
func EncodeUvarint(value uint32) []byte {
|
|
var buf []byte
|
|
for value >= 0x80 {
|
|
buf = append(buf, byte(value)|0x80)
|
|
value >>= 7
|
|
}
|
|
buf = append(buf, byte(value))
|
|
return buf
|
|
}
|
|
|
|
// DecodeUvarint decodes a variable-length unsigned integer
|
|
// Returns the decoded value and number of bytes consumed
|
|
func DecodeUvarint(data []byte) (uint32, int, error) {
|
|
var value uint32
|
|
var shift uint
|
|
var consumed int
|
|
|
|
for i, b := range data {
|
|
consumed = i + 1
|
|
value |= uint32(b&0x7F) << shift
|
|
|
|
if (b & 0x80) == 0 {
|
|
return value, consumed, nil
|
|
}
|
|
|
|
shift += 7
|
|
if shift >= 32 {
|
|
return 0, consumed, fmt.Errorf("uvarint overflow")
|
|
}
|
|
}
|
|
|
|
return 0, consumed, fmt.Errorf("incomplete uvarint")
|
|
}
|
|
|
|
// TaggedField represents a tagged field in flexible versions
|
|
type TaggedField struct {
|
|
Tag uint32
|
|
Data []byte
|
|
}
|
|
|
|
// TaggedFields represents a collection of tagged fields
|
|
type TaggedFields struct {
|
|
Fields []TaggedField
|
|
}
|
|
|
|
// EncodeTaggedFields encodes tagged fields for flexible versions
|
|
func (tf *TaggedFields) Encode() []byte {
|
|
if len(tf.Fields) == 0 {
|
|
return []byte{0} // Empty tagged fields
|
|
}
|
|
|
|
var buf []byte
|
|
|
|
// Number of tagged fields
|
|
buf = append(buf, EncodeUvarint(uint32(len(tf.Fields)))...)
|
|
|
|
for _, field := range tf.Fields {
|
|
// Tag
|
|
buf = append(buf, EncodeUvarint(field.Tag)...)
|
|
// Size
|
|
buf = append(buf, EncodeUvarint(uint32(len(field.Data)))...)
|
|
// Data
|
|
buf = append(buf, field.Data...)
|
|
}
|
|
|
|
return buf
|
|
}
|
|
|
|
// DecodeTaggedFields decodes tagged fields from flexible versions
|
|
func DecodeTaggedFields(data []byte) (*TaggedFields, int, error) {
|
|
if len(data) == 0 {
|
|
return &TaggedFields{}, 0, fmt.Errorf("no data for tagged fields")
|
|
}
|
|
|
|
if data[0] == 0 {
|
|
return &TaggedFields{}, 1, nil // Empty tagged fields
|
|
}
|
|
|
|
offset := 0
|
|
|
|
// Number of tagged fields
|
|
numFields, consumed, err := DecodeUvarint(data[offset:])
|
|
if err != nil {
|
|
return nil, 0, fmt.Errorf("decode tagged fields count: %w", err)
|
|
}
|
|
offset += consumed
|
|
|
|
fields := make([]TaggedField, numFields)
|
|
|
|
for i := uint32(0); i < numFields; i++ {
|
|
// Tag
|
|
tag, consumed, err := DecodeUvarint(data[offset:])
|
|
if err != nil {
|
|
return nil, 0, fmt.Errorf("decode tagged field %d tag: %w", i, err)
|
|
}
|
|
offset += consumed
|
|
|
|
// Size
|
|
size, consumed, err := DecodeUvarint(data[offset:])
|
|
if err != nil {
|
|
return nil, 0, fmt.Errorf("decode tagged field %d size: %w", i, err)
|
|
}
|
|
offset += consumed
|
|
|
|
// Data
|
|
if offset+int(size) > len(data) {
|
|
return nil, 0, fmt.Errorf("tagged field %d data truncated", i)
|
|
}
|
|
|
|
fields[i] = TaggedField{
|
|
Tag: tag,
|
|
Data: data[offset : offset+int(size)],
|
|
}
|
|
offset += int(size)
|
|
}
|
|
|
|
return &TaggedFields{Fields: fields}, offset, nil
|
|
}
|
|
|
|
// IsFlexibleVersion determines if an API version uses flexible versions
|
|
// This is API-specific and based on when each API adopted flexible versions
|
|
func IsFlexibleVersion(apiKey, apiVersion uint16) bool {
|
|
switch apiKey {
|
|
case 18: // ApiVersions
|
|
return apiVersion >= 3
|
|
case 3: // Metadata
|
|
return apiVersion >= 9
|
|
case 1: // Fetch
|
|
return apiVersion >= 12
|
|
case 0: // Produce
|
|
return apiVersion >= 9
|
|
case 11: // JoinGroup
|
|
return apiVersion >= 6
|
|
case 14: // SyncGroup
|
|
return apiVersion >= 4
|
|
case 8: // OffsetCommit
|
|
return apiVersion >= 8
|
|
case 9: // OffsetFetch
|
|
return apiVersion >= 6
|
|
case 10: // FindCoordinator
|
|
return apiVersion >= 3
|
|
case 12: // Heartbeat
|
|
return apiVersion >= 4
|
|
case 13: // LeaveGroup
|
|
return apiVersion >= 4
|
|
case 19: // CreateTopics
|
|
return apiVersion >= 2
|
|
case 20: // DeleteTopics
|
|
return apiVersion >= 4
|
|
default:
|
|
return false
|
|
}
|
|
}
|
|
|
|
// FlexibleString encodes a string for flexible versions (compact format)
|
|
func FlexibleString(s string) []byte {
|
|
if s == "" {
|
|
return []byte{0} // Null string
|
|
}
|
|
|
|
var buf []byte
|
|
buf = append(buf, CompactStringLength(len(s))...)
|
|
buf = append(buf, []byte(s)...)
|
|
return buf
|
|
}
|
|
|
|
// FlexibleNullableString encodes a nullable string for flexible versions
|
|
func FlexibleNullableString(s *string) []byte {
|
|
if s == nil {
|
|
return []byte{0} // Null string
|
|
}
|
|
return FlexibleString(*s)
|
|
}
|
|
|
|
// DecodeFlexibleString decodes a flexible string
|
|
// Returns the string (empty for null) and bytes consumed
|
|
func DecodeFlexibleString(data []byte) (string, int, error) {
|
|
length, consumed, err := DecodeCompactStringLength(data)
|
|
if err != nil {
|
|
return "", 0, err
|
|
}
|
|
|
|
if length < 0 {
|
|
return "", consumed, nil // Null string -> empty string
|
|
}
|
|
|
|
if consumed+length > len(data) {
|
|
return "", 0, fmt.Errorf("string data truncated")
|
|
}
|
|
|
|
return string(data[consumed : consumed+length]), consumed + length, nil
|
|
}
|
|
|
|
// FlexibleVersionHeader handles the request header parsing for flexible versions
|
|
type FlexibleVersionHeader struct {
|
|
APIKey uint16
|
|
APIVersion uint16
|
|
CorrelationID uint32
|
|
ClientID *string
|
|
TaggedFields *TaggedFields
|
|
}
|
|
|
|
// parseRegularHeader parses a regular (non-flexible) Kafka request header
|
|
func parseRegularHeader(data []byte) (*FlexibleVersionHeader, []byte, error) {
|
|
if len(data) < 8 {
|
|
return nil, nil, fmt.Errorf("header too short")
|
|
}
|
|
|
|
header := &FlexibleVersionHeader{}
|
|
offset := 0
|
|
|
|
// API Key (2 bytes)
|
|
header.APIKey = binary.BigEndian.Uint16(data[offset : offset+2])
|
|
offset += 2
|
|
|
|
// API Version (2 bytes)
|
|
header.APIVersion = binary.BigEndian.Uint16(data[offset : offset+2])
|
|
offset += 2
|
|
|
|
// Correlation ID (4 bytes)
|
|
header.CorrelationID = binary.BigEndian.Uint32(data[offset : offset+4])
|
|
offset += 4
|
|
|
|
// Regular versions use standard strings
|
|
if len(data) < offset+2 {
|
|
return nil, nil, fmt.Errorf("missing client_id length")
|
|
}
|
|
|
|
clientIDLen := int16(binary.BigEndian.Uint16(data[offset : offset+2]))
|
|
offset += 2
|
|
|
|
if clientIDLen >= 0 {
|
|
if len(data) < offset+int(clientIDLen) {
|
|
return nil, nil, fmt.Errorf("client_id truncated")
|
|
}
|
|
clientID := string(data[offset : offset+int(clientIDLen)])
|
|
header.ClientID = &clientID
|
|
offset += int(clientIDLen)
|
|
}
|
|
|
|
return header, data[offset:], nil
|
|
}
|
|
|
|
// ParseRequestHeader parses a Kafka request header, handling both regular and flexible versions
|
|
func ParseRequestHeader(data []byte) (*FlexibleVersionHeader, []byte, error) {
|
|
if len(data) < 8 {
|
|
return nil, nil, fmt.Errorf("header too short")
|
|
}
|
|
|
|
header := &FlexibleVersionHeader{}
|
|
offset := 0
|
|
|
|
// API Key (2 bytes)
|
|
header.APIKey = binary.BigEndian.Uint16(data[offset : offset+2])
|
|
offset += 2
|
|
|
|
// API Version (2 bytes)
|
|
header.APIVersion = binary.BigEndian.Uint16(data[offset : offset+2])
|
|
offset += 2
|
|
|
|
// Correlation ID (4 bytes)
|
|
header.CorrelationID = binary.BigEndian.Uint32(data[offset : offset+4])
|
|
offset += 4
|
|
|
|
// Client ID handling depends on flexible version
|
|
isFlexible := IsFlexibleVersion(header.APIKey, header.APIVersion)
|
|
|
|
if isFlexible {
|
|
// Flexible versions use compact strings
|
|
clientID, consumed, err := DecodeFlexibleString(data[offset:])
|
|
if err != nil {
|
|
return nil, nil, fmt.Errorf("decode flexible client_id: %w", err)
|
|
}
|
|
offset += consumed
|
|
|
|
if clientID != "" {
|
|
header.ClientID = &clientID
|
|
}
|
|
|
|
// Parse tagged fields in header
|
|
taggedFields, consumed, err := DecodeTaggedFields(data[offset:])
|
|
if err != nil {
|
|
// If tagged fields parsing fails, this might be a regular header sent by kafka-go
|
|
// Fall back to regular header parsing
|
|
return parseRegularHeader(data)
|
|
}
|
|
offset += consumed
|
|
header.TaggedFields = taggedFields
|
|
|
|
} else {
|
|
// Regular versions use standard strings
|
|
if len(data) < offset+2 {
|
|
return nil, nil, fmt.Errorf("missing client_id length")
|
|
}
|
|
|
|
clientIDLen := int16(binary.BigEndian.Uint16(data[offset : offset+2]))
|
|
offset += 2
|
|
|
|
if clientIDLen >= 0 {
|
|
if len(data) < offset+int(clientIDLen) {
|
|
return nil, nil, fmt.Errorf("client_id truncated")
|
|
}
|
|
|
|
clientID := string(data[offset : offset+int(clientIDLen)])
|
|
header.ClientID = &clientID
|
|
offset += int(clientIDLen)
|
|
}
|
|
// No tagged fields in regular versions
|
|
}
|
|
|
|
return header, data[offset:], nil
|
|
}
|
|
|
|
// EncodeFlexibleResponse encodes a response with proper flexible version formatting
|
|
func EncodeFlexibleResponse(correlationID uint32, data []byte, hasTaggedFields bool) []byte {
|
|
response := make([]byte, 4)
|
|
binary.BigEndian.PutUint32(response, correlationID)
|
|
response = append(response, data...)
|
|
|
|
if hasTaggedFields {
|
|
// Add empty tagged fields for flexible responses
|
|
response = append(response, 0)
|
|
}
|
|
|
|
return response
|
|
}
|