Browse Source

fmt

pull/7231/head
chrislu 2 months ago
parent
commit
9d54b5f569
  1. 90
      weed/mq/kafka/protocol/handler.go

90
weed/mq/kafka/protocol/handler.go

@ -303,73 +303,73 @@ func (h *Handler) handleListOffsets(correlationID uint32, requestBody []byte) ([
response = append(response, offsetBytes...)
}
}
return response, nil
}
func (h *Handler) handleCreateTopics(correlationID uint32, requestBody []byte) ([]byte, error) {
// Parse minimal CreateTopics request
// Request format: client_id + timeout(4) + topics_array
if len(requestBody) < 6 { // client_id_size(2) + timeout(4)
return nil, fmt.Errorf("CreateTopics request too short")
}
// Skip client_id
clientIDSize := binary.BigEndian.Uint16(requestBody[0:2])
offset := 2 + int(clientIDSize)
if len(requestBody) < offset+8 { // timeout(4) + topics_count(4)
if len(requestBody) < offset+8 { // timeout(4) + topics_count(4)
return nil, fmt.Errorf("CreateTopics request missing data")
}
// Skip timeout
offset += 4
topicsCount := binary.BigEndian.Uint32(requestBody[offset : offset+4])
offset += 4
response := make([]byte, 0, 256)
// Correlation ID
correlationIDBytes := make([]byte, 4)
binary.BigEndian.PutUint32(correlationIDBytes, correlationID)
response = append(response, correlationIDBytes...)
// Throttle time (4 bytes, 0 = no throttling)
response = append(response, 0, 0, 0, 0)
// Topics count (same as request)
topicsCountBytes := make([]byte, 4)
binary.BigEndian.PutUint32(topicsCountBytes, topicsCount)
response = append(response, topicsCountBytes...)
// Process each topic
h.topicsMu.Lock()
defer h.topicsMu.Unlock()
for i := uint32(0); i < topicsCount && offset < len(requestBody); i++ {
if len(requestBody) < offset+2 {
break
}
// Parse topic name
topicNameSize := binary.BigEndian.Uint16(requestBody[offset : offset+2])
offset += 2
if len(requestBody) < offset+int(topicNameSize)+12 { // name + num_partitions(4) + replication_factor(2) + configs_count(4) + timeout(4) - simplified
break
}
topicName := string(requestBody[offset : offset+int(topicNameSize)])
offset += int(topicNameSize)
// Parse num_partitions and replication_factor (skip others for simplicity)
numPartitions := binary.BigEndian.Uint32(requestBody[offset : offset+4])
offset += 4
replicationFactor := binary.BigEndian.Uint16(requestBody[offset : offset+2])
offset += 2
// Skip configs and remaining fields for simplicity
// In a real implementation, we'd parse these properly
if len(requestBody) >= offset+4 {
@ -387,20 +387,20 @@ func (h *Handler) handleCreateTopics(correlationID uint32, requestBody []byte) (
}
}
}
// Skip timeout field if present
if len(requestBody) >= offset+4 {
offset += 4
}
// Response: topic_name + error_code(2) + error_message
response = append(response, byte(topicNameSize>>8), byte(topicNameSize))
response = append(response, []byte(topicName)...)
// Check if topic already exists
var errorCode uint16 = 0
var errorMessage string = ""
if _, exists := h.topics[topicName]; exists {
errorCode = 36 // TOPIC_ALREADY_EXISTS
errorMessage = "Topic already exists"
@ -418,10 +418,10 @@ func (h *Handler) handleCreateTopics(correlationID uint32, requestBody []byte) (
CreatedAt: time.Now().UnixNano(),
}
}
// Error code
response = append(response, byte(errorCode>>8), byte(errorCode))
// Error message (nullable string)
if errorMessage == "" {
response = append(response, 0xFF, 0xFF) // null string
@ -431,75 +431,75 @@ func (h *Handler) handleCreateTopics(correlationID uint32, requestBody []byte) (
response = append(response, []byte(errorMessage)...)
}
}
return response, nil
}
func (h *Handler) handleDeleteTopics(correlationID uint32, requestBody []byte) ([]byte, error) {
// Parse minimal DeleteTopics request
// Request format: client_id + timeout(4) + topics_array
if len(requestBody) < 6 { // client_id_size(2) + timeout(4)
return nil, fmt.Errorf("DeleteTopics request too short")
}
// Skip client_id
clientIDSize := binary.BigEndian.Uint16(requestBody[0:2])
offset := 2 + int(clientIDSize)
if len(requestBody) < offset+8 { // timeout(4) + topics_count(4)
return nil, fmt.Errorf("DeleteTopics request missing data")
}
// Skip timeout
offset += 4
topicsCount := binary.BigEndian.Uint32(requestBody[offset : offset+4])
offset += 4
response := make([]byte, 0, 256)
// Correlation ID
correlationIDBytes := make([]byte, 4)
binary.BigEndian.PutUint32(correlationIDBytes, correlationID)
response = append(response, correlationIDBytes...)
// Throttle time (4 bytes, 0 = no throttling)
response = append(response, 0, 0, 0, 0)
// Topics count (same as request)
topicsCountBytes := make([]byte, 4)
binary.BigEndian.PutUint32(topicsCountBytes, topicsCount)
response = append(response, topicsCountBytes...)
// Process each topic
h.topicsMu.Lock()
defer h.topicsMu.Unlock()
for i := uint32(0); i < topicsCount && offset < len(requestBody); i++ {
if len(requestBody) < offset+2 {
break
}
// Parse topic name
topicNameSize := binary.BigEndian.Uint16(requestBody[offset : offset+2])
offset += 2
if len(requestBody) < offset+int(topicNameSize) {
break
}
topicName := string(requestBody[offset : offset+int(topicNameSize)])
offset += int(topicNameSize)
// Response: topic_name + error_code(2) + error_message
response = append(response, byte(topicNameSize>>8), byte(topicNameSize))
response = append(response, []byte(topicName)...)
// Check if topic exists and delete it
var errorCode uint16 = 0
var errorMessage string = ""
if _, exists := h.topics[topicName]; !exists {
errorCode = 3 // UNKNOWN_TOPIC_OR_PARTITION
errorMessage = "Unknown topic"
@ -507,10 +507,10 @@ func (h *Handler) handleDeleteTopics(correlationID uint32, requestBody []byte) (
// Delete the topic
delete(h.topics, topicName)
}
// Error code
response = append(response, byte(errorCode>>8), byte(errorCode))
// Error message (nullable string)
if errorMessage == "" {
response = append(response, 0xFF, 0xFF) // null string
@ -520,6 +520,6 @@ func (h *Handler) handleDeleteTopics(correlationID uint32, requestBody []byte) (
response = append(response, []byte(errorMessage)...)
}
}
return response, nil
}
Loading…
Cancel
Save