@ -1335,213 +1335,159 @@ func (h *Handler) handleCreateTopics(correlationID uint32, apiVersion uint16, re
case 0 , 1 :
return h . handleCreateTopicsV0V1 ( correlationID , requestBody )
case 2 , 3 , 4 , 5 :
return h . handleCreateTopicsV2Plus ( correlationID , apiVersion , requestBody )
// Use non-flexible parser for CI clients
return h . handleCreateTopicsV2To4 ( correlationID , requestBody )
default :
return nil , fmt . Errorf ( "unsupported CreateTopics API version: %d" , apiVersion )
}
}
func ( h * Handler ) handleCreateTopicsV2Plus ( correlationID uint32 , apiVersion uint16 , requestBody [ ] byte ) ( [ ] byte , error ) {
// CreateTopics v2+ format:
// topics_array + timeout_ms(4) + validate_only(1) + [tagged_fields]
// handleCreateTopicsV2To4 handles CreateTopics API versions 2-4 (non-flexible, regular arrays/strings)
func ( h * Handler ) handleCreateTopicsV2To4 ( correlationID uint32 , requestBody [ ] byte ) ( [ ] byte , error ) {
// Format (v2-v4):
// topics (ARRAY) + timeout_ms (INT32) + validate_only (BOOLEAN)
offset := 0
// Parse topics array (compact array format in v2+)
if len ( requestBody ) < offset + 1 {
return nil , fmt . Errorf ( "CreateTopics v2+ request missing topics array" )
}
// Read topics count (compact array: length + 1)
topicsCountRaw := requestBody [ offset ]
offset += 1
var topicsCount uint32
if topicsCountRaw == 0 {
topicsCount = 0
} else {
topicsCount = uint32 ( topicsCountRaw ) - 1
}
fmt . Printf ( "DEBUG: CreateTopics v%d - Topics count: %d, remaining bytes: %d\n" , apiVersion , topicsCount , len ( requestBody ) - offset )
// DEBUG: Hex dump to understand request format
dumpLen := len ( requestBody )
if dumpLen > 50 {
dumpLen = 50
}
fmt . Printf ( "DEBUG: CreateTopics v%d request hex dump (first %d bytes): %x\n" , apiVersion , dumpLen , requestBody [ : dumpLen ] )
// Build response
response := make ( [ ] byte , 0 , 256 )
// Correlation ID
correlationIDBytes := make ( [ ] byte , 4 )
binary . BigEndian . PutUint32 ( correlationIDBytes , correlationID )
response = append ( response , correlationIDBytes ... )
// Throttle time (4 bytes, 0 = no throttling)
response = append ( response , 0 , 0 , 0 , 0 )
// Topics array (compact format in v2+: count + 1)
if topicsCount == 0 {
response = append ( response , 0 ) // Empty array
} else {
response = append ( response , byte ( topicsCount + 1 ) ) // Compact array format
if len ( requestBody ) < offset + 4 {
return nil , fmt . Errorf ( "CreateTopics v2-4 request too short for topics array" )
}
// Process each topic (using SeaweedMQ handler)
for i := uint32 ( 0 ) ; i < topicsCount && offset < len ( requestBody ) ; i ++ {
// Parse topic name (compact string in v2+)
if len ( requestBody ) < offset + 1 {
break
}
topicNameLengthRaw := requestBody [ offset ]
offset += 1
var topicNameLength int
if topicNameLengthRaw == 0 {
topicNameLength = 0
} else {
topicNameLength = int ( topicNameLengthRaw ) - 1
topicsCount := binary . BigEndian . Uint32 ( requestBody [ offset : offset + 4 ] )
offset += 4
fmt . Printf ( "DEBUG: CreateTopics v2-4 - Topics count: %d, remaining bytes: %d\n" , topicsCount , len ( requestBody ) - offset )
// Parse topics
topics := make ( [ ] struct {
name string
partitions uint32
replication uint16
} , 0 , topicsCount )
for i := uint32 ( 0 ) ; i < topicsCount ; i ++ {
if len ( requestBody ) < offset + 2 {
return nil , fmt . Errorf ( "CreateTopics v2-4: truncated topic name length" )
}
if len ( requestBody ) < offset + topicNameLength {
break
nameLen := binary . BigEndian . Uint16 ( requestBody [ offset : offset + 2 ] )
offset += 2
if len ( requestBody ) < offset + int ( nameLen ) {
return nil , fmt . Errorf ( "CreateTopics v2-4: truncated topic name" )
}
topicName := string ( requestBody [ offset : offset + int ( nameLen ) ] )
offset += int ( nameLen )
topicName := string ( requestBody [ offset : offset + topicNameLength ] )
offset += topicNameLength
// Parse num_partitions (4 bytes)
if len ( requestBody ) < offset + 4 {
b reak
return nil , fmt . Errorf ( "CreateTopics v2-4: truncated num_partitions" )
}
numPartitions := binary . BigEndian . Uint32 ( requestBody [ offset : offset + 4 ] )
offset += 4
// Parse replication_factor (2 bytes)
if len ( requestBody ) < offset + 2 {
b reak
return nil , fmt . Errorf ( "CreateTopics v2-4: truncated replication_factor" )
}
replicationFactor := binary . BigEndian . Uint16 ( requestBody [ offset : offset + 2 ] )
replication := binary . BigEndian . Uint16 ( requestBody [ offset : offset + 2 ] )
offset += 2
// Parse configs (compact array in v2+)
if len ( requestBody ) >= offset + 1 {
configsCountRaw := requestBody [ offset ]
offset += 1
var configsCount uint32
if configsCountRaw == 0 {
configsCount = 0
} else {
configsCount = uint32 ( configsCountRaw ) - 1
}
// Skip configs for now (simplified)
for j := uint32 ( 0 ) ; j < configsCount && offset < len ( requestBody ) ; j ++ {
// Skip config name (compact string)
if len ( requestBody ) >= offset + 1 {
configNameLengthRaw := requestBody [ offset ]
offset += 1
if configNameLengthRaw > 0 {
configNameLength := int ( configNameLengthRaw ) - 1
offset += configNameLength
}
}
// Skip config value (compact string)
if len ( requestBody ) >= offset + 1 {
configValueLengthRaw := requestBody [ offset ]
offset += 1
if configValueLengthRaw > 0 {
configValueLength := int ( configValueLengthRaw ) - 1
offset += configValueLength
}
}
}
// Assignments array (array of partition assignments) - skip contents
if len ( requestBody ) < offset + 4 {
return nil , fmt . Errorf ( "CreateTopics v2-4: truncated assignments count" )
}
// Skip tagged fields (empty for now)
if len ( requestBody ) >= offset + 1 {
taggedFieldsCount := requestBody [ offset ]
offset += 1
// Skip tagged fields (simplified - should be 0 for basic requests)
for j := 0 ; j < int ( taggedFieldsCount ) ; j ++ {
// Skip tagged field parsing for now
break
assignments := binary . BigEndian . Uint32 ( requestBody [ offset : offset + 4 ] )
offset += 4
for j := uint32 ( 0 ) ; j < assignments ; j ++ {
// partition_id (int32) + replicas (array int32)
if len ( requestBody ) < offset + 4 {
return nil , fmt . Errorf ( "CreateTopics v2-4: truncated assignment partition id" )
}
offset += 4
if len ( requestBody ) < offset + 4 {
return nil , fmt . Errorf ( "CreateTopics v2-4: truncated replicas count" )
}
replicasCount := binary . BigEndian . Uint32 ( requestBody [ offset : offset + 4 ] )
offset += 4
// skip replica ids
offset += int ( replicasCount ) * 4
}
fmt . Printf ( "DEBUG: Parsed topic: %s, partitions: %d, replication: %d\n" , topicName , numPartitions , replicationFactor )
// Response: topic_name (compact string) + error_code(2) + error_message (compact string)
if len ( topicName ) == 0 {
response = append ( response , 0 ) // Empty string
} else {
response = append ( response , byte ( len ( topicName ) + 1 ) ) // Compact string format
// Configs array (array of (name,value) strings) - skip contents
if len ( requestBody ) < offset + 4 {
return nil , fmt . Errorf ( "CreateTopics v2-4: truncated configs count" )
}
response = append ( response , [ ] byte ( topicName ) ... )
// Check if topic already exists
var errorCode uint16 = 0
var errorMessage string = ""
// Use SeaweedMQ integration
if h . seaweedMQHandler . TopicExists ( topicName ) {
errorCode = 36 // TOPIC_ALREADY_EXISTS
errorMessage = "Topic already exists"
} else if numPartitions <= 0 {
errorCode = 37 // INVALID_PARTITIONS
errorMessage = "Invalid number of partitions"
} else if replicationFactor <= 0 {
errorCode = 38 // INVALID_REPLICATION_FACTOR
errorMessage = "Invalid replication factor"
} else {
// Create the topic in SeaweedMQ
if err := h . seaweedMQHandler . CreateTopic ( topicName , int32 ( numPartitions ) ) ; err != nil {
errorCode = 1 // UNKNOWN_SERVER_ERROR
errorMessage = err . Error ( )
configs := binary . BigEndian . Uint32 ( requestBody [ offset : offset + 4 ] )
offset += 4
for j := uint32 ( 0 ) ; j < configs ; j ++ {
// name (string)
if len ( requestBody ) < offset + 2 {
return nil , fmt . Errorf ( "CreateTopics v2-4: truncated config name length" )
}
nameLen := binary . BigEndian . Uint16 ( requestBody [ offset : offset + 2 ] )
offset += 2 + int ( nameLen )
// value (string)
if len ( requestBody ) < offset + 2 {
return nil , fmt . Errorf ( "CreateTopics v2-4: truncated config value length" )
}
valLen := binary . BigEndian . Uint16 ( requestBody [ offset : offset + 2 ] )
offset += 2 + int ( valLen )
}
// Error code
response = append ( response , byte ( errorCode >> 8 ) , byte ( errorCode ) )
// Error message (compact nullable string in v2+)
if errorMessage == "" {
response = append ( response , 0 ) // null string in compact format
} else {
response = append ( response , byte ( len ( errorMessage ) + 1 ) ) // Compact string format
response = append ( response , [ ] byte ( errorMessage ) ... )
}
// Tagged fields (empty)
response = append ( response , 0 )
topics = append ( topics , struct {
name string
partitions uint32
replication uint16
} { topicName , numPartitions , replication } )
}
// Parse timeout_ms and validate_only at the end (after all topics)
// timeout_ms
if len ( requestBody ) >= offset + 4 {
timeoutMs : = binary . BigEndian . Uint32 ( requestBody [ offset : offset + 4 ] )
_ = binary . BigEndian . Uint32 ( requestBody [ offset : offset + 4 ] )
offset += 4
fmt . Printf ( "DEBUG: CreateTopics timeout_ms: %d\n" , timeoutMs )
}
// validate_only (boolean)
if len ( requestBody ) >= offset + 1 {
validateOnly : = requestBody [ offset ] != 0
_ = requestBody [ offset ]
offset += 1
fmt . Printf ( "DEBUG: CreateTopics validate_only: %v\n" , validateOnly )
}
// Tagged fields at the end
response = append ( response , 0 )
// Build response
response := make ( [ ] byte , 0 , 128 )
// Correlation ID
cid := make ( [ ] byte , 4 )
binary . BigEndian . PutUint32 ( cid , correlationID )
response = append ( response , cid ... )
// throttle_time_ms (4 bytes)
response = append ( response , 0 , 0 , 0 , 0 )
// topics array count (int32)
countBytes := make ( [ ] byte , 4 )
binary . BigEndian . PutUint32 ( countBytes , uint32 ( len ( topics ) ) )
response = append ( response , countBytes ... )
// per-topic responses
for _ , t := range topics {
// topic name (string)
nameLen := make ( [ ] byte , 2 )
binary . BigEndian . PutUint16 ( nameLen , uint16 ( len ( t . name ) ) )
response = append ( response , nameLen ... )
response = append ( response , [ ] byte ( t . name ) ... )
// error_code (int16)
var errCode uint16 = 0
if h . seaweedMQHandler . TopicExists ( t . name ) {
errCode = 36 // TOPIC_ALREADY_EXISTS
} else if t . partitions == 0 {
errCode = 37 // INVALID_PARTITIONS
} else if t . replication == 0 {
errCode = 38 // INVALID_REPLICATION_FACTOR
} else {
if err := h . seaweedMQHandler . CreateTopic ( t . name , int32 ( t . partitions ) ) ; err != nil {
errCode = 1 // UNKNOWN_SERVER_ERROR
}
}
eb := make ( [ ] byte , 2 )
binary . BigEndian . PutUint16 ( eb , errCode )
response = append ( response , eb ... )
// error_message (nullable string) -> null
response = append ( response , 0xFF , 0xFF )
}
return response , nil
}
// handleCreateTopicsV0V1 handles CreateTopics API versions 0 and 1
func ( h * Handler ) handleCreateTopicsV0V1 ( correlationID uint32 , requestBody [ ] byte ) ( [ ] byte , error ) {
fmt . Printf ( "DEBUG: CreateTopics v0/v1 - parsing request of %d bytes\n" , len ( requestBody ) )