diff --git a/weed/mq/kafka/protocol/handler.go b/weed/mq/kafka/protocol/handler.go index 933788b5a..fedab36f6 100644 --- a/weed/mq/kafka/protocol/handler.go +++ b/weed/mq/kafka/protocol/handler.go @@ -6,14 +6,27 @@ import ( "fmt" "io" "net" + "sync" + "time" ) +// TopicInfo holds basic information about a topic +type TopicInfo struct { + Name string + Partitions int32 + CreatedAt int64 +} + // Handler processes Kafka protocol requests from clients type Handler struct { + topicsMu sync.RWMutex + topics map[string]*TopicInfo // topic name -> topic info } func NewHandler() *Handler { - return &Handler{} + return &Handler{ + topics: make(map[string]*TopicInfo), + } } // HandleConn processes a single client connection @@ -65,6 +78,10 @@ func (h *Handler) HandleConn(conn net.Conn) error { response, err = h.handleMetadata(correlationID, messageBuf[8:]) // skip header case 2: // ListOffsets response, err = h.handleListOffsets(correlationID, messageBuf[8:]) // skip header + case 19: // CreateTopics + response, err = h.handleCreateTopics(correlationID, messageBuf[8:]) // skip header + case 20: // DeleteTopics + response, err = h.handleDeleteTopics(correlationID, messageBuf[8:]) // skip header default: err = fmt.Errorf("unsupported API key: %d (version %d)", apiKey, apiVersion) } @@ -93,38 +110,48 @@ func (h *Handler) HandleConn(conn net.Conn) error { func (h *Handler) handleApiVersions(correlationID uint32) ([]byte, error) { // Build ApiVersions response manually // Response format: correlation_id(4) + error_code(2) + num_api_keys(4) + api_keys + throttle_time(4) - + response := make([]byte, 0, 64) - + // Correlation ID correlationIDBytes := make([]byte, 4) binary.BigEndian.PutUint32(correlationIDBytes, correlationID) response = append(response, correlationIDBytes...) - + // Error code (0 = no error) response = append(response, 0, 0) - + // Number of API keys (compact array format in newer versions, but using basic format for simplicity) - response = append(response, 0, 0, 0, 3) // 3 API keys - + response = append(response, 0, 0, 0, 5) // 5 API keys + // API Key 18 (ApiVersions): api_key(2) + min_version(2) + max_version(2) response = append(response, 0, 18) // API key 18 response = append(response, 0, 0) // min version 0 response = append(response, 0, 3) // max version 3 - + // API Key 3 (Metadata): api_key(2) + min_version(2) + max_version(2) - response = append(response, 0, 3) // API key 3 - response = append(response, 0, 0) // min version 0 - response = append(response, 0, 7) // max version 7 - + response = append(response, 0, 3) // API key 3 + response = append(response, 0, 0) // min version 0 + response = append(response, 0, 7) // max version 7 + // API Key 2 (ListOffsets): api_key(2) + min_version(2) + max_version(2) - response = append(response, 0, 2) // API key 2 + response = append(response, 0, 2) // API key 2 + response = append(response, 0, 0) // min version 0 + response = append(response, 0, 5) // max version 5 + + // API Key 19 (CreateTopics): api_key(2) + min_version(2) + max_version(2) + response = append(response, 0, 19) // API key 19 response = append(response, 0, 0) // min version 0 - response = append(response, 0, 5) // max version 5 - + response = append(response, 0, 4) // max version 4 + + // API Key 20 (DeleteTopics): api_key(2) + min_version(2) + max_version(2) + response = append(response, 0, 20) // API key 20 + response = append(response, 0, 0) // min version 0 + response = append(response, 0, 4) // max version 4 + // Throttle time (4 bytes, 0 = no throttling) response = append(response, 0, 0, 0, 0) - + return response, nil } @@ -132,43 +159,43 @@ func (h *Handler) handleMetadata(correlationID uint32, requestBody []byte) ([]by // For now, ignore the request body content (topics filter, etc.) // Build minimal Metadata response // Response format: correlation_id(4) + throttle_time(4) + brokers + cluster_id + controller_id + topics - + response := make([]byte, 0, 256) - + // Correlation ID correlationIDBytes := make([]byte, 4) binary.BigEndian.PutUint32(correlationIDBytes, correlationID) response = append(response, correlationIDBytes...) - + // Throttle time (4 bytes, 0 = no throttling) response = append(response, 0, 0, 0, 0) - + // Brokers array length (4 bytes) - 1 broker (this gateway) response = append(response, 0, 0, 0, 1) - + // Broker 0: node_id(4) + host + port(4) + rack response = append(response, 0, 0, 0, 0) // node_id = 0 - + // Host string: length(2) + "localhost" host := "localhost" response = append(response, 0, byte(len(host))) response = append(response, []byte(host)...) - + // Port (4 bytes) - 9092 (standard Kafka port) response = append(response, 0, 0, 0x23, 0x84) // 9092 in big-endian - + // Rack - nullable string, using null (-1 length) response = append(response, 0xFF, 0xFF) // null rack - + // Cluster ID - nullable string, using null response = append(response, 0xFF, 0xFF) // null cluster_id - + // Controller ID (4 bytes) - -1 (no controller) response = append(response, 0xFF, 0xFF, 0xFF, 0xFF) - + // Topics array length (4 bytes) - 0 topics for now response = append(response, 0, 0, 0, 0) - + return response, nil } @@ -176,101 +203,101 @@ func (h *Handler) handleListOffsets(correlationID uint32, requestBody []byte) ([ // Parse minimal request to understand what's being asked // For this stub, we'll just return stub responses for any requested topic/partition // Request format after client_id: topics_array - + if len(requestBody) < 6 { // at minimum need client_id_size(2) + topics_count(4) return nil, fmt.Errorf("ListOffsets request too short") } - + // Skip client_id: client_id_size(2) + client_id_data clientIDSize := binary.BigEndian.Uint16(requestBody[0:2]) offset := 2 + int(clientIDSize) - + if len(requestBody) < offset+4 { return nil, fmt.Errorf("ListOffsets request missing topics count") } - + topicsCount := binary.BigEndian.Uint32(requestBody[offset : offset+4]) offset += 4 - + response := make([]byte, 0, 256) - + // Correlation ID correlationIDBytes := make([]byte, 4) binary.BigEndian.PutUint32(correlationIDBytes, correlationID) response = append(response, correlationIDBytes...) - + // Throttle time (4 bytes, 0 = no throttling) response = append(response, 0, 0, 0, 0) - + // Topics count (same as request) topicsCountBytes := make([]byte, 4) binary.BigEndian.PutUint32(topicsCountBytes, topicsCount) response = append(response, topicsCountBytes...) - + // Process each requested topic for i := uint32(0); i < topicsCount && offset < len(requestBody); i++ { if len(requestBody) < offset+2 { break } - + // Parse topic name topicNameSize := binary.BigEndian.Uint16(requestBody[offset : offset+2]) offset += 2 - + if len(requestBody) < offset+int(topicNameSize)+4 { break } - + topicName := requestBody[offset : offset+int(topicNameSize)] offset += int(topicNameSize) - + // Parse partitions count for this topic partitionsCount := binary.BigEndian.Uint32(requestBody[offset : offset+4]) offset += 4 - + // Response: topic_name_size(2) + topic_name + partitions_array response = append(response, byte(topicNameSize>>8), byte(topicNameSize)) response = append(response, topicName...) - + partitionsCountBytes := make([]byte, 4) binary.BigEndian.PutUint32(partitionsCountBytes, partitionsCount) response = append(response, partitionsCountBytes...) - + // Process each partition for j := uint32(0); j < partitionsCount && offset+12 <= len(requestBody); j++ { // Parse partition request: partition_id(4) + timestamp(8) partitionID := binary.BigEndian.Uint32(requestBody[offset : offset+4]) timestamp := int64(binary.BigEndian.Uint64(requestBody[offset+4 : offset+12])) offset += 12 - + // Response: partition_id(4) + error_code(2) + timestamp(8) + offset(8) partitionIDBytes := make([]byte, 4) binary.BigEndian.PutUint32(partitionIDBytes, partitionID) response = append(response, partitionIDBytes...) - + // Error code (0 = no error) response = append(response, 0, 0) - + // For stub: return the original timestamp for timestamp queries, or current time for earliest/latest var responseTimestamp int64 var responseOffset int64 - + switch timestamp { case -2: // earliest offset responseTimestamp = 0 responseOffset = 0 - case -1: // latest offset + case -1: // latest offset responseTimestamp = 1000000000 // some timestamp responseOffset = 0 // stub: no messages yet default: // specific timestamp responseTimestamp = timestamp responseOffset = 0 // stub: no messages at any timestamp } - + timestampBytes := make([]byte, 8) binary.BigEndian.PutUint64(timestampBytes, uint64(responseTimestamp)) response = append(response, timestampBytes...) - + offsetBytes := make([]byte, 8) binary.BigEndian.PutUint64(offsetBytes, uint64(responseOffset)) response = append(response, offsetBytes...) @@ -279,3 +306,220 @@ func (h *Handler) handleListOffsets(correlationID uint32, requestBody []byte) ([ return response, nil } + +func (h *Handler) handleCreateTopics(correlationID uint32, requestBody []byte) ([]byte, error) { + // Parse minimal CreateTopics request + // Request format: client_id + timeout(4) + topics_array + + if len(requestBody) < 6 { // client_id_size(2) + timeout(4) + return nil, fmt.Errorf("CreateTopics request too short") + } + + // Skip client_id + clientIDSize := binary.BigEndian.Uint16(requestBody[0:2]) + offset := 2 + int(clientIDSize) + + if len(requestBody) < offset+8 { // timeout(4) + topics_count(4) + return nil, fmt.Errorf("CreateTopics request missing data") + } + + // Skip timeout + offset += 4 + + topicsCount := binary.BigEndian.Uint32(requestBody[offset : offset+4]) + offset += 4 + + response := make([]byte, 0, 256) + + // Correlation ID + correlationIDBytes := make([]byte, 4) + binary.BigEndian.PutUint32(correlationIDBytes, correlationID) + response = append(response, correlationIDBytes...) + + // Throttle time (4 bytes, 0 = no throttling) + response = append(response, 0, 0, 0, 0) + + // Topics count (same as request) + topicsCountBytes := make([]byte, 4) + binary.BigEndian.PutUint32(topicsCountBytes, topicsCount) + response = append(response, topicsCountBytes...) + + // Process each topic + h.topicsMu.Lock() + defer h.topicsMu.Unlock() + + for i := uint32(0); i < topicsCount && offset < len(requestBody); i++ { + if len(requestBody) < offset+2 { + break + } + + // Parse topic name + topicNameSize := binary.BigEndian.Uint16(requestBody[offset : offset+2]) + offset += 2 + + if len(requestBody) < offset+int(topicNameSize)+12 { // name + num_partitions(4) + replication_factor(2) + configs_count(4) + timeout(4) - simplified + break + } + + topicName := string(requestBody[offset : offset+int(topicNameSize)]) + offset += int(topicNameSize) + + // Parse num_partitions and replication_factor (skip others for simplicity) + numPartitions := binary.BigEndian.Uint32(requestBody[offset : offset+4]) + offset += 4 + replicationFactor := binary.BigEndian.Uint16(requestBody[offset : offset+2]) + offset += 2 + + // Skip configs and remaining fields for simplicity + // In a real implementation, we'd parse these properly + if len(requestBody) >= offset+4 { + configsCount := binary.BigEndian.Uint32(requestBody[offset : offset+4]) + offset += 4 + // Skip configs (simplified) + for j := uint32(0); j < configsCount && offset+6 <= len(requestBody); j++ { + if len(requestBody) >= offset+2 { + configNameSize := binary.BigEndian.Uint16(requestBody[offset : offset+2]) + offset += 2 + int(configNameSize) + if len(requestBody) >= offset+2 { + configValueSize := binary.BigEndian.Uint16(requestBody[offset : offset+2]) + offset += 2 + int(configValueSize) + } + } + } + } + + // Skip timeout field if present + if len(requestBody) >= offset+4 { + offset += 4 + } + + // Response: topic_name + error_code(2) + error_message + response = append(response, byte(topicNameSize>>8), byte(topicNameSize)) + response = append(response, []byte(topicName)...) + + // Check if topic already exists + var errorCode uint16 = 0 + var errorMessage string = "" + + if _, exists := h.topics[topicName]; exists { + errorCode = 36 // TOPIC_ALREADY_EXISTS + errorMessage = "Topic already exists" + } else if numPartitions <= 0 { + errorCode = 37 // INVALID_PARTITIONS + errorMessage = "Invalid number of partitions" + } else if replicationFactor <= 0 { + errorCode = 38 // INVALID_REPLICATION_FACTOR + errorMessage = "Invalid replication factor" + } else { + // Create the topic + h.topics[topicName] = &TopicInfo{ + Name: topicName, + Partitions: int32(numPartitions), + CreatedAt: time.Now().UnixNano(), + } + } + + // Error code + response = append(response, byte(errorCode>>8), byte(errorCode)) + + // Error message (nullable string) + if errorMessage == "" { + response = append(response, 0xFF, 0xFF) // null string + } else { + errorMsgLen := uint16(len(errorMessage)) + response = append(response, byte(errorMsgLen>>8), byte(errorMsgLen)) + response = append(response, []byte(errorMessage)...) + } + } + + return response, nil +} + +func (h *Handler) handleDeleteTopics(correlationID uint32, requestBody []byte) ([]byte, error) { + // Parse minimal DeleteTopics request + // Request format: client_id + timeout(4) + topics_array + + if len(requestBody) < 6 { // client_id_size(2) + timeout(4) + return nil, fmt.Errorf("DeleteTopics request too short") + } + + // Skip client_id + clientIDSize := binary.BigEndian.Uint16(requestBody[0:2]) + offset := 2 + int(clientIDSize) + + if len(requestBody) < offset+8 { // timeout(4) + topics_count(4) + return nil, fmt.Errorf("DeleteTopics request missing data") + } + + // Skip timeout + offset += 4 + + topicsCount := binary.BigEndian.Uint32(requestBody[offset : offset+4]) + offset += 4 + + response := make([]byte, 0, 256) + + // Correlation ID + correlationIDBytes := make([]byte, 4) + binary.BigEndian.PutUint32(correlationIDBytes, correlationID) + response = append(response, correlationIDBytes...) + + // Throttle time (4 bytes, 0 = no throttling) + response = append(response, 0, 0, 0, 0) + + // Topics count (same as request) + topicsCountBytes := make([]byte, 4) + binary.BigEndian.PutUint32(topicsCountBytes, topicsCount) + response = append(response, topicsCountBytes...) + + // Process each topic + h.topicsMu.Lock() + defer h.topicsMu.Unlock() + + for i := uint32(0); i < topicsCount && offset < len(requestBody); i++ { + if len(requestBody) < offset+2 { + break + } + + // Parse topic name + topicNameSize := binary.BigEndian.Uint16(requestBody[offset : offset+2]) + offset += 2 + + if len(requestBody) < offset+int(topicNameSize) { + break + } + + topicName := string(requestBody[offset : offset+int(topicNameSize)]) + offset += int(topicNameSize) + + // Response: topic_name + error_code(2) + error_message + response = append(response, byte(topicNameSize>>8), byte(topicNameSize)) + response = append(response, []byte(topicName)...) + + // Check if topic exists and delete it + var errorCode uint16 = 0 + var errorMessage string = "" + + if _, exists := h.topics[topicName]; !exists { + errorCode = 3 // UNKNOWN_TOPIC_OR_PARTITION + errorMessage = "Unknown topic" + } else { + // Delete the topic + delete(h.topics, topicName) + } + + // Error code + response = append(response, byte(errorCode>>8), byte(errorCode)) + + // Error message (nullable string) + if errorMessage == "" { + response = append(response, 0xFF, 0xFF) // null string + } else { + errorMsgLen := uint16(len(errorMessage)) + response = append(response, byte(errorMsgLen>>8), byte(errorMsgLen)) + response = append(response, []byte(errorMessage)...) + } + } + + return response, nil +} diff --git a/weed/mq/kafka/protocol/handler_test.go b/weed/mq/kafka/protocol/handler_test.go index 1ed58033d..3d881fc86 100644 --- a/weed/mq/kafka/protocol/handler_test.go +++ b/weed/mq/kafka/protocol/handler_test.go @@ -92,12 +92,12 @@ func TestHandler_ApiVersions(t *testing.T) { // Check number of API keys numAPIKeys := binary.BigEndian.Uint32(respBuf[6:10]) - if numAPIKeys != 3 { - t.Errorf("expected 3 API keys, got: %d", numAPIKeys) + if numAPIKeys != 5 { + t.Errorf("expected 5 API keys, got: %d", numAPIKeys) } // Check API key details: api_key(2) + min_version(2) + max_version(2) - if len(respBuf) < 28 { // need space for 3 API keys + if len(respBuf) < 40 { // need space for 5 API keys t.Fatalf("response too short for API key data") } @@ -145,6 +145,36 @@ func TestHandler_ApiVersions(t *testing.T) { if maxVersion3 != 5 { t.Errorf("expected max version 5, got: %d", maxVersion3) } + + // Fourth API key (CreateTopics) + apiKey4 := binary.BigEndian.Uint16(respBuf[28:30]) + minVersion4 := binary.BigEndian.Uint16(respBuf[30:32]) + maxVersion4 := binary.BigEndian.Uint16(respBuf[32:34]) + + if apiKey4 != 19 { + t.Errorf("expected API key 19, got: %d", apiKey4) + } + if minVersion4 != 0 { + t.Errorf("expected min version 0, got: %d", minVersion4) + } + if maxVersion4 != 4 { + t.Errorf("expected max version 4, got: %d", maxVersion4) + } + + // Fifth API key (DeleteTopics) + apiKey5 := binary.BigEndian.Uint16(respBuf[34:36]) + minVersion5 := binary.BigEndian.Uint16(respBuf[36:38]) + maxVersion5 := binary.BigEndian.Uint16(respBuf[38:40]) + + if apiKey5 != 20 { + t.Errorf("expected API key 20, got: %d", apiKey5) + } + if minVersion5 != 0 { + t.Errorf("expected min version 0, got: %d", minVersion5) + } + if maxVersion5 != 4 { + t.Errorf("expected max version 4, got: %d", maxVersion5) + } // Close client to end handler client.Close() @@ -169,7 +199,7 @@ func TestHandler_handleApiVersions(t *testing.T) { t.Fatalf("handleApiVersions: %v", err) } - if len(response) < 30 { // minimum expected size (now has 3 API keys) + if len(response) < 42 { // minimum expected size (now has 5 API keys) t.Fatalf("response too short: %d bytes", len(response)) } @@ -187,8 +217,8 @@ func TestHandler_handleApiVersions(t *testing.T) { // Check number of API keys numAPIKeys := binary.BigEndian.Uint32(response[6:10]) - if numAPIKeys != 3 { - t.Errorf("expected 3 API keys, got: %d", numAPIKeys) + if numAPIKeys != 5 { + t.Errorf("expected 5 API keys, got: %d", numAPIKeys) } // Check first API key (ApiVersions) @@ -208,6 +238,18 @@ func TestHandler_handleApiVersions(t *testing.T) { if apiKey3 != 2 { t.Errorf("third API key: got %d, want 2", apiKey3) } + + // Check fourth API key (CreateTopics) + apiKey4 := binary.BigEndian.Uint16(response[28:30]) + if apiKey4 != 19 { + t.Errorf("fourth API key: got %d, want 19", apiKey4) + } + + // Check fifth API key (DeleteTopics) + apiKey5 := binary.BigEndian.Uint16(response[34:36]) + if apiKey5 != 20 { + t.Errorf("fifth API key: got %d, want 20", apiKey5) + } } func TestHandler_handleMetadata(t *testing.T) {