diff --git a/weed/mq/kafka/protocol/handler.go b/weed/mq/kafka/protocol/handler.go index fedab36f6..52fda1456 100644 --- a/weed/mq/kafka/protocol/handler.go +++ b/weed/mq/kafka/protocol/handler.go @@ -303,73 +303,73 @@ func (h *Handler) handleListOffsets(correlationID uint32, requestBody []byte) ([ response = append(response, offsetBytes...) } } - + return response, nil } func (h *Handler) handleCreateTopics(correlationID uint32, requestBody []byte) ([]byte, error) { // Parse minimal CreateTopics request // Request format: client_id + timeout(4) + topics_array - + if len(requestBody) < 6 { // client_id_size(2) + timeout(4) return nil, fmt.Errorf("CreateTopics request too short") } - + // Skip client_id clientIDSize := binary.BigEndian.Uint16(requestBody[0:2]) offset := 2 + int(clientIDSize) - - if len(requestBody) < offset+8 { // timeout(4) + topics_count(4) + + if len(requestBody) < offset+8 { // timeout(4) + topics_count(4) return nil, fmt.Errorf("CreateTopics request missing data") } - + // Skip timeout offset += 4 - + topicsCount := binary.BigEndian.Uint32(requestBody[offset : offset+4]) offset += 4 - + response := make([]byte, 0, 256) - + // Correlation ID correlationIDBytes := make([]byte, 4) binary.BigEndian.PutUint32(correlationIDBytes, correlationID) response = append(response, correlationIDBytes...) - + // Throttle time (4 bytes, 0 = no throttling) response = append(response, 0, 0, 0, 0) - + // Topics count (same as request) topicsCountBytes := make([]byte, 4) binary.BigEndian.PutUint32(topicsCountBytes, topicsCount) response = append(response, topicsCountBytes...) - + // Process each topic h.topicsMu.Lock() defer h.topicsMu.Unlock() - + for i := uint32(0); i < topicsCount && offset < len(requestBody); i++ { if len(requestBody) < offset+2 { break } - + // Parse topic name topicNameSize := binary.BigEndian.Uint16(requestBody[offset : offset+2]) offset += 2 - + if len(requestBody) < offset+int(topicNameSize)+12 { // name + num_partitions(4) + replication_factor(2) + configs_count(4) + timeout(4) - simplified break } - + topicName := string(requestBody[offset : offset+int(topicNameSize)]) offset += int(topicNameSize) - + // Parse num_partitions and replication_factor (skip others for simplicity) numPartitions := binary.BigEndian.Uint32(requestBody[offset : offset+4]) offset += 4 replicationFactor := binary.BigEndian.Uint16(requestBody[offset : offset+2]) offset += 2 - + // Skip configs and remaining fields for simplicity // In a real implementation, we'd parse these properly if len(requestBody) >= offset+4 { @@ -387,20 +387,20 @@ func (h *Handler) handleCreateTopics(correlationID uint32, requestBody []byte) ( } } } - + // Skip timeout field if present if len(requestBody) >= offset+4 { offset += 4 } - + // Response: topic_name + error_code(2) + error_message response = append(response, byte(topicNameSize>>8), byte(topicNameSize)) response = append(response, []byte(topicName)...) - + // Check if topic already exists var errorCode uint16 = 0 var errorMessage string = "" - + if _, exists := h.topics[topicName]; exists { errorCode = 36 // TOPIC_ALREADY_EXISTS errorMessage = "Topic already exists" @@ -418,10 +418,10 @@ func (h *Handler) handleCreateTopics(correlationID uint32, requestBody []byte) ( CreatedAt: time.Now().UnixNano(), } } - + // Error code response = append(response, byte(errorCode>>8), byte(errorCode)) - + // Error message (nullable string) if errorMessage == "" { response = append(response, 0xFF, 0xFF) // null string @@ -431,75 +431,75 @@ func (h *Handler) handleCreateTopics(correlationID uint32, requestBody []byte) ( response = append(response, []byte(errorMessage)...) } } - + return response, nil } func (h *Handler) handleDeleteTopics(correlationID uint32, requestBody []byte) ([]byte, error) { // Parse minimal DeleteTopics request // Request format: client_id + timeout(4) + topics_array - + if len(requestBody) < 6 { // client_id_size(2) + timeout(4) return nil, fmt.Errorf("DeleteTopics request too short") } - + // Skip client_id clientIDSize := binary.BigEndian.Uint16(requestBody[0:2]) offset := 2 + int(clientIDSize) - + if len(requestBody) < offset+8 { // timeout(4) + topics_count(4) return nil, fmt.Errorf("DeleteTopics request missing data") } - + // Skip timeout offset += 4 - + topicsCount := binary.BigEndian.Uint32(requestBody[offset : offset+4]) offset += 4 - + response := make([]byte, 0, 256) - + // Correlation ID correlationIDBytes := make([]byte, 4) binary.BigEndian.PutUint32(correlationIDBytes, correlationID) response = append(response, correlationIDBytes...) - + // Throttle time (4 bytes, 0 = no throttling) response = append(response, 0, 0, 0, 0) - + // Topics count (same as request) topicsCountBytes := make([]byte, 4) binary.BigEndian.PutUint32(topicsCountBytes, topicsCount) response = append(response, topicsCountBytes...) - + // Process each topic h.topicsMu.Lock() defer h.topicsMu.Unlock() - + for i := uint32(0); i < topicsCount && offset < len(requestBody); i++ { if len(requestBody) < offset+2 { break } - + // Parse topic name topicNameSize := binary.BigEndian.Uint16(requestBody[offset : offset+2]) offset += 2 - + if len(requestBody) < offset+int(topicNameSize) { break } - + topicName := string(requestBody[offset : offset+int(topicNameSize)]) offset += int(topicNameSize) - + // Response: topic_name + error_code(2) + error_message response = append(response, byte(topicNameSize>>8), byte(topicNameSize)) response = append(response, []byte(topicName)...) - + // Check if topic exists and delete it var errorCode uint16 = 0 var errorMessage string = "" - + if _, exists := h.topics[topicName]; !exists { errorCode = 3 // UNKNOWN_TOPIC_OR_PARTITION errorMessage = "Unknown topic" @@ -507,10 +507,10 @@ func (h *Handler) handleDeleteTopics(correlationID uint32, requestBody []byte) ( // Delete the topic delete(h.topics, topicName) } - + // Error code response = append(response, byte(errorCode>>8), byte(errorCode)) - + // Error message (nullable string) if errorMessage == "" { response = append(response, 0xFF, 0xFF) // null string @@ -520,6 +520,6 @@ func (h *Handler) handleDeleteTopics(correlationID uint32, requestBody []byte) ( response = append(response, []byte(errorMessage)...) } } - + return response, nil }