Browse Source

mq(kafka): implement CreateTopics/DeleteTopics handlers with in-memory topic registry and comprehensive validation; now supports 5 API keys

pull/7231/head
chrislu 2 months ago
parent
commit
c7b6103e31
  1. 346
      weed/mq/kafka/protocol/handler.go
  2. 54
      weed/mq/kafka/protocol/handler_test.go

346
weed/mq/kafka/protocol/handler.go

@ -6,14 +6,27 @@ import (
"fmt"
"io"
"net"
"sync"
"time"
)
// TopicInfo holds basic information about a topic
type TopicInfo struct {
Name string
Partitions int32
CreatedAt int64
}
// Handler processes Kafka protocol requests from clients
type Handler struct {
topicsMu sync.RWMutex
topics map[string]*TopicInfo // topic name -> topic info
}
func NewHandler() *Handler {
return &Handler{}
return &Handler{
topics: make(map[string]*TopicInfo),
}
}
// HandleConn processes a single client connection
@ -65,6 +78,10 @@ func (h *Handler) HandleConn(conn net.Conn) error {
response, err = h.handleMetadata(correlationID, messageBuf[8:]) // skip header
case 2: // ListOffsets
response, err = h.handleListOffsets(correlationID, messageBuf[8:]) // skip header
case 19: // CreateTopics
response, err = h.handleCreateTopics(correlationID, messageBuf[8:]) // skip header
case 20: // DeleteTopics
response, err = h.handleDeleteTopics(correlationID, messageBuf[8:]) // skip header
default:
err = fmt.Errorf("unsupported API key: %d (version %d)", apiKey, apiVersion)
}
@ -93,38 +110,48 @@ func (h *Handler) HandleConn(conn net.Conn) error {
func (h *Handler) handleApiVersions(correlationID uint32) ([]byte, error) {
// Build ApiVersions response manually
// Response format: correlation_id(4) + error_code(2) + num_api_keys(4) + api_keys + throttle_time(4)
response := make([]byte, 0, 64)
// Correlation ID
correlationIDBytes := make([]byte, 4)
binary.BigEndian.PutUint32(correlationIDBytes, correlationID)
response = append(response, correlationIDBytes...)
// Error code (0 = no error)
response = append(response, 0, 0)
// Number of API keys (compact array format in newer versions, but using basic format for simplicity)
response = append(response, 0, 0, 0, 3) // 3 API keys
response = append(response, 0, 0, 0, 5) // 5 API keys
// API Key 18 (ApiVersions): api_key(2) + min_version(2) + max_version(2)
response = append(response, 0, 18) // API key 18
response = append(response, 0, 0) // min version 0
response = append(response, 0, 3) // max version 3
// API Key 3 (Metadata): api_key(2) + min_version(2) + max_version(2)
response = append(response, 0, 3) // API key 3
response = append(response, 0, 0) // min version 0
response = append(response, 0, 7) // max version 7
response = append(response, 0, 3) // API key 3
response = append(response, 0, 0) // min version 0
response = append(response, 0, 7) // max version 7
// API Key 2 (ListOffsets): api_key(2) + min_version(2) + max_version(2)
response = append(response, 0, 2) // API key 2
response = append(response, 0, 2) // API key 2
response = append(response, 0, 0) // min version 0
response = append(response, 0, 5) // max version 5
// API Key 19 (CreateTopics): api_key(2) + min_version(2) + max_version(2)
response = append(response, 0, 19) // API key 19
response = append(response, 0, 0) // min version 0
response = append(response, 0, 5) // max version 5
response = append(response, 0, 4) // max version 4
// API Key 20 (DeleteTopics): api_key(2) + min_version(2) + max_version(2)
response = append(response, 0, 20) // API key 20
response = append(response, 0, 0) // min version 0
response = append(response, 0, 4) // max version 4
// Throttle time (4 bytes, 0 = no throttling)
response = append(response, 0, 0, 0, 0)
return response, nil
}
@ -132,43 +159,43 @@ func (h *Handler) handleMetadata(correlationID uint32, requestBody []byte) ([]by
// For now, ignore the request body content (topics filter, etc.)
// Build minimal Metadata response
// Response format: correlation_id(4) + throttle_time(4) + brokers + cluster_id + controller_id + topics
response := make([]byte, 0, 256)
// Correlation ID
correlationIDBytes := make([]byte, 4)
binary.BigEndian.PutUint32(correlationIDBytes, correlationID)
response = append(response, correlationIDBytes...)
// Throttle time (4 bytes, 0 = no throttling)
response = append(response, 0, 0, 0, 0)
// Brokers array length (4 bytes) - 1 broker (this gateway)
response = append(response, 0, 0, 0, 1)
// Broker 0: node_id(4) + host + port(4) + rack
response = append(response, 0, 0, 0, 0) // node_id = 0
// Host string: length(2) + "localhost"
host := "localhost"
response = append(response, 0, byte(len(host)))
response = append(response, []byte(host)...)
// Port (4 bytes) - 9092 (standard Kafka port)
response = append(response, 0, 0, 0x23, 0x84) // 9092 in big-endian
// Rack - nullable string, using null (-1 length)
response = append(response, 0xFF, 0xFF) // null rack
// Cluster ID - nullable string, using null
response = append(response, 0xFF, 0xFF) // null cluster_id
// Controller ID (4 bytes) - -1 (no controller)
response = append(response, 0xFF, 0xFF, 0xFF, 0xFF)
// Topics array length (4 bytes) - 0 topics for now
response = append(response, 0, 0, 0, 0)
return response, nil
}
@ -176,101 +203,101 @@ func (h *Handler) handleListOffsets(correlationID uint32, requestBody []byte) ([
// Parse minimal request to understand what's being asked
// For this stub, we'll just return stub responses for any requested topic/partition
// Request format after client_id: topics_array
if len(requestBody) < 6 { // at minimum need client_id_size(2) + topics_count(4)
return nil, fmt.Errorf("ListOffsets request too short")
}
// Skip client_id: client_id_size(2) + client_id_data
clientIDSize := binary.BigEndian.Uint16(requestBody[0:2])
offset := 2 + int(clientIDSize)
if len(requestBody) < offset+4 {
return nil, fmt.Errorf("ListOffsets request missing topics count")
}
topicsCount := binary.BigEndian.Uint32(requestBody[offset : offset+4])
offset += 4
response := make([]byte, 0, 256)
// Correlation ID
correlationIDBytes := make([]byte, 4)
binary.BigEndian.PutUint32(correlationIDBytes, correlationID)
response = append(response, correlationIDBytes...)
// Throttle time (4 bytes, 0 = no throttling)
response = append(response, 0, 0, 0, 0)
// Topics count (same as request)
topicsCountBytes := make([]byte, 4)
binary.BigEndian.PutUint32(topicsCountBytes, topicsCount)
response = append(response, topicsCountBytes...)
// Process each requested topic
for i := uint32(0); i < topicsCount && offset < len(requestBody); i++ {
if len(requestBody) < offset+2 {
break
}
// Parse topic name
topicNameSize := binary.BigEndian.Uint16(requestBody[offset : offset+2])
offset += 2
if len(requestBody) < offset+int(topicNameSize)+4 {
break
}
topicName := requestBody[offset : offset+int(topicNameSize)]
offset += int(topicNameSize)
// Parse partitions count for this topic
partitionsCount := binary.BigEndian.Uint32(requestBody[offset : offset+4])
offset += 4
// Response: topic_name_size(2) + topic_name + partitions_array
response = append(response, byte(topicNameSize>>8), byte(topicNameSize))
response = append(response, topicName...)
partitionsCountBytes := make([]byte, 4)
binary.BigEndian.PutUint32(partitionsCountBytes, partitionsCount)
response = append(response, partitionsCountBytes...)
// Process each partition
for j := uint32(0); j < partitionsCount && offset+12 <= len(requestBody); j++ {
// Parse partition request: partition_id(4) + timestamp(8)
partitionID := binary.BigEndian.Uint32(requestBody[offset : offset+4])
timestamp := int64(binary.BigEndian.Uint64(requestBody[offset+4 : offset+12]))
offset += 12
// Response: partition_id(4) + error_code(2) + timestamp(8) + offset(8)
partitionIDBytes := make([]byte, 4)
binary.BigEndian.PutUint32(partitionIDBytes, partitionID)
response = append(response, partitionIDBytes...)
// Error code (0 = no error)
response = append(response, 0, 0)
// For stub: return the original timestamp for timestamp queries, or current time for earliest/latest
var responseTimestamp int64
var responseOffset int64
switch timestamp {
case -2: // earliest offset
responseTimestamp = 0
responseOffset = 0
case -1: // latest offset
case -1: // latest offset
responseTimestamp = 1000000000 // some timestamp
responseOffset = 0 // stub: no messages yet
default: // specific timestamp
responseTimestamp = timestamp
responseOffset = 0 // stub: no messages at any timestamp
}
timestampBytes := make([]byte, 8)
binary.BigEndian.PutUint64(timestampBytes, uint64(responseTimestamp))
response = append(response, timestampBytes...)
offsetBytes := make([]byte, 8)
binary.BigEndian.PutUint64(offsetBytes, uint64(responseOffset))
response = append(response, offsetBytes...)
@ -279,3 +306,220 @@ func (h *Handler) handleListOffsets(correlationID uint32, requestBody []byte) ([
return response, nil
}
func (h *Handler) handleCreateTopics(correlationID uint32, requestBody []byte) ([]byte, error) {
// Parse minimal CreateTopics request
// Request format: client_id + timeout(4) + topics_array
if len(requestBody) < 6 { // client_id_size(2) + timeout(4)
return nil, fmt.Errorf("CreateTopics request too short")
}
// Skip client_id
clientIDSize := binary.BigEndian.Uint16(requestBody[0:2])
offset := 2 + int(clientIDSize)
if len(requestBody) < offset+8 { // timeout(4) + topics_count(4)
return nil, fmt.Errorf("CreateTopics request missing data")
}
// Skip timeout
offset += 4
topicsCount := binary.BigEndian.Uint32(requestBody[offset : offset+4])
offset += 4
response := make([]byte, 0, 256)
// Correlation ID
correlationIDBytes := make([]byte, 4)
binary.BigEndian.PutUint32(correlationIDBytes, correlationID)
response = append(response, correlationIDBytes...)
// Throttle time (4 bytes, 0 = no throttling)
response = append(response, 0, 0, 0, 0)
// Topics count (same as request)
topicsCountBytes := make([]byte, 4)
binary.BigEndian.PutUint32(topicsCountBytes, topicsCount)
response = append(response, topicsCountBytes...)
// Process each topic
h.topicsMu.Lock()
defer h.topicsMu.Unlock()
for i := uint32(0); i < topicsCount && offset < len(requestBody); i++ {
if len(requestBody) < offset+2 {
break
}
// Parse topic name
topicNameSize := binary.BigEndian.Uint16(requestBody[offset : offset+2])
offset += 2
if len(requestBody) < offset+int(topicNameSize)+12 { // name + num_partitions(4) + replication_factor(2) + configs_count(4) + timeout(4) - simplified
break
}
topicName := string(requestBody[offset : offset+int(topicNameSize)])
offset += int(topicNameSize)
// Parse num_partitions and replication_factor (skip others for simplicity)
numPartitions := binary.BigEndian.Uint32(requestBody[offset : offset+4])
offset += 4
replicationFactor := binary.BigEndian.Uint16(requestBody[offset : offset+2])
offset += 2
// Skip configs and remaining fields for simplicity
// In a real implementation, we'd parse these properly
if len(requestBody) >= offset+4 {
configsCount := binary.BigEndian.Uint32(requestBody[offset : offset+4])
offset += 4
// Skip configs (simplified)
for j := uint32(0); j < configsCount && offset+6 <= len(requestBody); j++ {
if len(requestBody) >= offset+2 {
configNameSize := binary.BigEndian.Uint16(requestBody[offset : offset+2])
offset += 2 + int(configNameSize)
if len(requestBody) >= offset+2 {
configValueSize := binary.BigEndian.Uint16(requestBody[offset : offset+2])
offset += 2 + int(configValueSize)
}
}
}
}
// Skip timeout field if present
if len(requestBody) >= offset+4 {
offset += 4
}
// Response: topic_name + error_code(2) + error_message
response = append(response, byte(topicNameSize>>8), byte(topicNameSize))
response = append(response, []byte(topicName)...)
// Check if topic already exists
var errorCode uint16 = 0
var errorMessage string = ""
if _, exists := h.topics[topicName]; exists {
errorCode = 36 // TOPIC_ALREADY_EXISTS
errorMessage = "Topic already exists"
} else if numPartitions <= 0 {
errorCode = 37 // INVALID_PARTITIONS
errorMessage = "Invalid number of partitions"
} else if replicationFactor <= 0 {
errorCode = 38 // INVALID_REPLICATION_FACTOR
errorMessage = "Invalid replication factor"
} else {
// Create the topic
h.topics[topicName] = &TopicInfo{
Name: topicName,
Partitions: int32(numPartitions),
CreatedAt: time.Now().UnixNano(),
}
}
// Error code
response = append(response, byte(errorCode>>8), byte(errorCode))
// Error message (nullable string)
if errorMessage == "" {
response = append(response, 0xFF, 0xFF) // null string
} else {
errorMsgLen := uint16(len(errorMessage))
response = append(response, byte(errorMsgLen>>8), byte(errorMsgLen))
response = append(response, []byte(errorMessage)...)
}
}
return response, nil
}
func (h *Handler) handleDeleteTopics(correlationID uint32, requestBody []byte) ([]byte, error) {
// Parse minimal DeleteTopics request
// Request format: client_id + timeout(4) + topics_array
if len(requestBody) < 6 { // client_id_size(2) + timeout(4)
return nil, fmt.Errorf("DeleteTopics request too short")
}
// Skip client_id
clientIDSize := binary.BigEndian.Uint16(requestBody[0:2])
offset := 2 + int(clientIDSize)
if len(requestBody) < offset+8 { // timeout(4) + topics_count(4)
return nil, fmt.Errorf("DeleteTopics request missing data")
}
// Skip timeout
offset += 4
topicsCount := binary.BigEndian.Uint32(requestBody[offset : offset+4])
offset += 4
response := make([]byte, 0, 256)
// Correlation ID
correlationIDBytes := make([]byte, 4)
binary.BigEndian.PutUint32(correlationIDBytes, correlationID)
response = append(response, correlationIDBytes...)
// Throttle time (4 bytes, 0 = no throttling)
response = append(response, 0, 0, 0, 0)
// Topics count (same as request)
topicsCountBytes := make([]byte, 4)
binary.BigEndian.PutUint32(topicsCountBytes, topicsCount)
response = append(response, topicsCountBytes...)
// Process each topic
h.topicsMu.Lock()
defer h.topicsMu.Unlock()
for i := uint32(0); i < topicsCount && offset < len(requestBody); i++ {
if len(requestBody) < offset+2 {
break
}
// Parse topic name
topicNameSize := binary.BigEndian.Uint16(requestBody[offset : offset+2])
offset += 2
if len(requestBody) < offset+int(topicNameSize) {
break
}
topicName := string(requestBody[offset : offset+int(topicNameSize)])
offset += int(topicNameSize)
// Response: topic_name + error_code(2) + error_message
response = append(response, byte(topicNameSize>>8), byte(topicNameSize))
response = append(response, []byte(topicName)...)
// Check if topic exists and delete it
var errorCode uint16 = 0
var errorMessage string = ""
if _, exists := h.topics[topicName]; !exists {
errorCode = 3 // UNKNOWN_TOPIC_OR_PARTITION
errorMessage = "Unknown topic"
} else {
// Delete the topic
delete(h.topics, topicName)
}
// Error code
response = append(response, byte(errorCode>>8), byte(errorCode))
// Error message (nullable string)
if errorMessage == "" {
response = append(response, 0xFF, 0xFF) // null string
} else {
errorMsgLen := uint16(len(errorMessage))
response = append(response, byte(errorMsgLen>>8), byte(errorMsgLen))
response = append(response, []byte(errorMessage)...)
}
}
return response, nil
}

54
weed/mq/kafka/protocol/handler_test.go

@ -92,12 +92,12 @@ func TestHandler_ApiVersions(t *testing.T) {
// Check number of API keys
numAPIKeys := binary.BigEndian.Uint32(respBuf[6:10])
if numAPIKeys != 3 {
t.Errorf("expected 3 API keys, got: %d", numAPIKeys)
if numAPIKeys != 5 {
t.Errorf("expected 5 API keys, got: %d", numAPIKeys)
}
// Check API key details: api_key(2) + min_version(2) + max_version(2)
if len(respBuf) < 28 { // need space for 3 API keys
if len(respBuf) < 40 { // need space for 5 API keys
t.Fatalf("response too short for API key data")
}
@ -145,6 +145,36 @@ func TestHandler_ApiVersions(t *testing.T) {
if maxVersion3 != 5 {
t.Errorf("expected max version 5, got: %d", maxVersion3)
}
// Fourth API key (CreateTopics)
apiKey4 := binary.BigEndian.Uint16(respBuf[28:30])
minVersion4 := binary.BigEndian.Uint16(respBuf[30:32])
maxVersion4 := binary.BigEndian.Uint16(respBuf[32:34])
if apiKey4 != 19 {
t.Errorf("expected API key 19, got: %d", apiKey4)
}
if minVersion4 != 0 {
t.Errorf("expected min version 0, got: %d", minVersion4)
}
if maxVersion4 != 4 {
t.Errorf("expected max version 4, got: %d", maxVersion4)
}
// Fifth API key (DeleteTopics)
apiKey5 := binary.BigEndian.Uint16(respBuf[34:36])
minVersion5 := binary.BigEndian.Uint16(respBuf[36:38])
maxVersion5 := binary.BigEndian.Uint16(respBuf[38:40])
if apiKey5 != 20 {
t.Errorf("expected API key 20, got: %d", apiKey5)
}
if minVersion5 != 0 {
t.Errorf("expected min version 0, got: %d", minVersion5)
}
if maxVersion5 != 4 {
t.Errorf("expected max version 4, got: %d", maxVersion5)
}
// Close client to end handler
client.Close()
@ -169,7 +199,7 @@ func TestHandler_handleApiVersions(t *testing.T) {
t.Fatalf("handleApiVersions: %v", err)
}
if len(response) < 30 { // minimum expected size (now has 3 API keys)
if len(response) < 42 { // minimum expected size (now has 5 API keys)
t.Fatalf("response too short: %d bytes", len(response))
}
@ -187,8 +217,8 @@ func TestHandler_handleApiVersions(t *testing.T) {
// Check number of API keys
numAPIKeys := binary.BigEndian.Uint32(response[6:10])
if numAPIKeys != 3 {
t.Errorf("expected 3 API keys, got: %d", numAPIKeys)
if numAPIKeys != 5 {
t.Errorf("expected 5 API keys, got: %d", numAPIKeys)
}
// Check first API key (ApiVersions)
@ -208,6 +238,18 @@ func TestHandler_handleApiVersions(t *testing.T) {
if apiKey3 != 2 {
t.Errorf("third API key: got %d, want 2", apiKey3)
}
// Check fourth API key (CreateTopics)
apiKey4 := binary.BigEndian.Uint16(response[28:30])
if apiKey4 != 19 {
t.Errorf("fourth API key: got %d, want 19", apiKey4)
}
// Check fifth API key (DeleteTopics)
apiKey5 := binary.BigEndian.Uint16(response[34:36])
if apiKey5 != 20 {
t.Errorf("fifth API key: got %d, want 20", apiKey5)
}
}
func TestHandler_handleMetadata(t *testing.T) {

Loading…
Cancel
Save