Browse Source

mq(kafka): Fix CreateTopics v2 request parsing - Phase 4 progress

- Fixed CreateTopics v2 request parsing (was reading wrong offset)
- kafka-go uses CreateTopics v2, not v0 as we implemented
- Removed incorrect timeout field parsing for v2 format
- Topics count now parses correctly (was 1274981, now 1)
- Response size increased from 12 to 37 bytes (processing topics correctly)
- Added detailed debug logging for protocol analysis
- Added hex dump capability to analyze request structure
- Still working on v2 response format compatibility

This fixes the critical parsing bug where we were reading topics count
from inside the client ID string due to wrong v2 format assumptions.
Next: Fix v2 response format for full CreateTopics compatibility.
pull/7231/head
chrislu 2 months ago
parent
commit
a0426ff2ac
  1. 369
      test/kafka/client_integration_test.go
  2. 241
      test/kafka/debug_connection_test.go
  3. 36
      weed/mq/kafka/protocol/handler.go

369
test/kafka/client_integration_test.go

@ -0,0 +1,369 @@
package kafka
import (
"context"
"fmt"
"net"
"strings"
"testing"
"time"
"github.com/segmentio/kafka-go"
"github.com/seaweedfs/seaweedfs/weed/mq/kafka/gateway"
)
// TestKafkaGoClient_BasicProduceConsume tests our gateway with real kafka-go client
func TestKafkaGoClient_BasicProduceConsume(t *testing.T) {
// Start the gateway server
srv := gateway.NewServer(gateway.Options{
Listen: ":0", // Use random port
UseSeaweedMQ: false, // Use in-memory mode for testing
})
if err := srv.Start(); err != nil {
t.Fatalf("Failed to start gateway: %v", err)
}
defer srv.Close()
// Get the actual address
brokerAddr := srv.Addr()
t.Logf("Gateway running on %s", brokerAddr)
// Create topic first
topicName := "test-kafka-go-topic"
if err := createTopicWithKafkaGo(brokerAddr, topicName); err != nil {
t.Fatalf("Failed to create topic: %v", err)
}
// Test basic produce
messages := []kafka.Message{
{
Key: []byte("key1"),
Value: []byte("Hello, Kafka Gateway!"),
},
{
Key: []byte("key2"),
Value: []byte("This is message 2"),
},
{
Key: []byte("key3"),
Value: []byte("Final test message"),
},
}
if err := produceMessages(brokerAddr, topicName, messages); err != nil {
t.Fatalf("Failed to produce messages: %v", err)
}
// Test basic consume
consumedMessages, err := consumeMessages(brokerAddr, topicName, len(messages))
if err != nil {
t.Fatalf("Failed to consume messages: %v", err)
}
// Validate consumed messages
if len(consumedMessages) != len(messages) {
t.Errorf("Expected %d messages, got %d", len(messages), len(consumedMessages))
}
for i, msg := range consumedMessages {
if i < len(messages) {
expectedValue := string(messages[i].Value)
actualValue := string(msg.Value)
if actualValue != expectedValue {
t.Errorf("Message %d: expected value %q, got %q", i, expectedValue, actualValue)
}
}
}
t.Logf("Successfully produced and consumed %d messages", len(consumedMessages))
}
// TestKafkaGoClient_ConsumerGroups tests consumer group functionality
func TestKafkaGoClient_ConsumerGroups(t *testing.T) {
// Start the gateway server
srv := gateway.NewServer(gateway.Options{
Listen: ":0",
UseSeaweedMQ: false,
})
if err := srv.Start(); err != nil {
t.Fatalf("Failed to start gateway: %v", err)
}
defer srv.Close()
brokerAddr := srv.Addr()
topicName := "test-consumer-group-topic"
groupID := "test-consumer-group"
// Create topic
if err := createTopicWithKafkaGo(brokerAddr, topicName); err != nil {
t.Fatalf("Failed to create topic: %v", err)
}
// Produce some messages
messages := []kafka.Message{
{Value: []byte("group-message-1")},
{Value: []byte("group-message-2")},
{Value: []byte("group-message-3")},
}
if err := produceMessages(brokerAddr, topicName, messages); err != nil {
t.Fatalf("Failed to produce messages: %v", err)
}
// Test consumer group
consumedMessages, err := consumeWithGroup(brokerAddr, topicName, groupID, len(messages))
if err != nil {
t.Fatalf("Failed to consume with group: %v", err)
}
if len(consumedMessages) != len(messages) {
t.Errorf("Expected %d messages, got %d", len(messages), len(consumedMessages))
}
t.Logf("Consumer group successfully consumed %d messages", len(consumedMessages))
}
// TestKafkaGoClient_MultiplePartitions tests behavior with multiple partitions
func TestKafkaGoClient_MultiplePartitions(t *testing.T) {
t.Skip("TODO: Enable once partition support is improved")
// This test will be enabled once we fix partition handling
// For now, our implementation assumes single partition per topic
}
// TestKafkaGoClient_OffsetManagement tests offset commit/fetch operations
func TestKafkaGoClient_OffsetManagement(t *testing.T) {
// Start the gateway server
srv := gateway.NewServer(gateway.Options{
Listen: ":0",
UseSeaweedMQ: false,
})
if err := srv.Start(); err != nil {
t.Fatalf("Failed to start gateway: %v", err)
}
defer srv.Close()
brokerAddr := srv.Addr()
topicName := "test-offset-topic"
groupID := "test-offset-group"
// Create topic
if err := createTopicWithKafkaGo(brokerAddr, topicName); err != nil {
t.Fatalf("Failed to create topic: %v", err)
}
// Produce messages
messages := []kafka.Message{
{Value: []byte("offset-message-1")},
{Value: []byte("offset-message-2")},
{Value: []byte("offset-message-3")},
{Value: []byte("offset-message-4")},
{Value: []byte("offset-message-5")},
}
if err := produceMessages(brokerAddr, topicName, messages); err != nil {
t.Fatalf("Failed to produce messages: %v", err)
}
// Consume only first 3 messages and commit offset
partialMessages, err := consumeWithGroupAndCommit(brokerAddr, topicName, groupID, 3)
if err != nil {
t.Fatalf("Failed to consume with offset commit: %v", err)
}
if len(partialMessages) != 3 {
t.Errorf("Expected 3 messages, got %d", len(partialMessages))
}
// Create new consumer with same group ID - should resume from committed offset
remainingMessages, err := consumeWithGroup(brokerAddr, topicName, groupID, 2)
if err != nil {
t.Fatalf("Failed to consume remaining messages: %v", err)
}
if len(remainingMessages) != 2 {
t.Errorf("Expected 2 remaining messages, got %d", len(remainingMessages))
}
t.Logf("Offset management test passed: consumed %d + %d messages",
len(partialMessages), len(remainingMessages))
}
// Helper functions
func createTopicWithKafkaGo(brokerAddr, topicName string) error {
// Create connection with timeout
dialer := &kafka.Dialer{
Timeout: 5 * time.Second,
DualStack: true,
}
conn, err := dialer.Dial("tcp", brokerAddr)
if err != nil {
return fmt.Errorf("dial broker: %w", err)
}
defer conn.Close()
// Set read/write deadlines for debugging
conn.SetDeadline(time.Now().Add(10 * time.Second))
fmt.Printf("DEBUG: Connected to broker at %s\n", brokerAddr)
topicConfigs := []kafka.TopicConfig{
{
Topic: topicName,
NumPartitions: 1,
ReplicationFactor: 1,
},
}
fmt.Printf("DEBUG: Creating topic %s with 1 partition\n", topicName)
err = conn.CreateTopics(topicConfigs...)
if err != nil {
return fmt.Errorf("create topic: %w", err)
}
fmt.Printf("DEBUG: Topic %s created successfully\n", topicName)
return nil
}
func produceMessages(brokerAddr, topicName string, messages []kafka.Message) error {
writer := &kafka.Writer{
Addr: kafka.TCP(brokerAddr),
Topic: topicName,
Balancer: &kafka.LeastBytes{},
// Enable detailed logging for debugging protocol issues
Logger: kafka.LoggerFunc(func(msg string, args ...interface{}) {
if strings.Contains(msg, "error") || strings.Contains(msg, "failed") {
fmt.Printf("PRODUCER ERROR: "+msg+"\n", args...)
}
}),
ErrorLogger: kafka.LoggerFunc(func(msg string, args ...interface{}) {
fmt.Printf("PRODUCER ERROR: "+msg+"\n", args...)
}),
}
defer writer.Close()
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
return writer.WriteMessages(ctx, messages...)
}
func consumeMessages(brokerAddr, topicName string, expectedCount int) ([]kafka.Message, error) {
reader := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{brokerAddr},
Topic: topicName,
// Start from the beginning
StartOffset: kafka.FirstOffset,
// Enable detailed logging for debugging protocol issues
Logger: kafka.LoggerFunc(func(msg string, args ...interface{}) {
if strings.Contains(msg, "error") || strings.Contains(msg, "failed") {
fmt.Printf("CONSUMER ERROR: "+msg+"\n", args...)
}
}),
ErrorLogger: kafka.LoggerFunc(func(msg string, args ...interface{}) {
fmt.Printf("CONSUMER ERROR: "+msg+"\n", args...)
}),
})
defer reader.Close()
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
var messages []kafka.Message
for i := 0; i < expectedCount; i++ {
msg, err := reader.ReadMessage(ctx)
if err != nil {
return messages, fmt.Errorf("read message %d: %w", i, err)
}
messages = append(messages, msg)
}
return messages, nil
}
func consumeWithGroup(brokerAddr, topicName, groupID string, expectedCount int) ([]kafka.Message, error) {
reader := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{brokerAddr},
Topic: topicName,
GroupID: groupID,
// Enable detailed logging for debugging protocol issues
Logger: kafka.LoggerFunc(func(msg string, args ...interface{}) {
if strings.Contains(msg, "error") || strings.Contains(msg, "failed") {
fmt.Printf("GROUP CONSUMER ERROR: "+msg+"\n", args...)
}
}),
ErrorLogger: kafka.LoggerFunc(func(msg string, args ...interface{}) {
fmt.Printf("GROUP CONSUMER ERROR: "+msg+"\n", args...)
}),
})
defer reader.Close()
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
var messages []kafka.Message
for i := 0; i < expectedCount; i++ {
msg, err := reader.ReadMessage(ctx)
if err != nil {
return messages, fmt.Errorf("read message %d: %w", i, err)
}
messages = append(messages, msg)
}
return messages, nil
}
func consumeWithGroupAndCommit(brokerAddr, topicName, groupID string, expectedCount int) ([]kafka.Message, error) {
reader := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{brokerAddr},
Topic: topicName,
GroupID: groupID,
// Enable detailed logging for debugging protocol issues
Logger: kafka.LoggerFunc(func(msg string, args ...interface{}) {
if strings.Contains(msg, "error") || strings.Contains(msg, "failed") {
fmt.Printf("GROUP CONSUMER WITH COMMIT ERROR: "+msg+"\n", args...)
}
}),
ErrorLogger: kafka.LoggerFunc(func(msg string, args ...interface{}) {
fmt.Printf("GROUP CONSUMER WITH COMMIT ERROR: "+msg+"\n", args...)
}),
})
defer reader.Close()
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
var messages []kafka.Message
for i := 0; i < expectedCount; i++ {
msg, err := reader.ReadMessage(ctx)
if err != nil {
return messages, fmt.Errorf("read message %d: %w", i, err)
}
messages = append(messages, msg)
// Commit the message
if err := reader.CommitMessages(ctx, msg); err != nil {
return messages, fmt.Errorf("commit message %d: %w", i, err)
}
}
return messages, nil
}
// waitForPort waits for a TCP port to become available
func waitForPort(addr string) error {
for i := 0; i < 50; i++ { // Wait up to 5 seconds
conn, err := net.Dial("tcp", addr)
if err == nil {
conn.Close()
return nil
}
time.Sleep(100 * time.Millisecond)
}
return fmt.Errorf("port %s not available after 5 seconds", addr)
}

241
test/kafka/debug_connection_test.go

@ -0,0 +1,241 @@
package kafka
import (
"encoding/binary"
"net"
"testing"
"time"
"github.com/seaweedfs/seaweedfs/weed/mq/kafka/gateway"
)
// TestGateway_BasicConnection tests if the gateway can handle basic TCP connections
func TestGateway_BasicConnection(t *testing.T) {
// Start the gateway server
srv := gateway.NewServer(gateway.Options{
Listen: ":0",
UseSeaweedMQ: false,
})
if err := srv.Start(); err != nil {
t.Fatalf("Failed to start gateway: %v", err)
}
defer srv.Close()
brokerAddr := srv.Addr()
t.Logf("Gateway running on %s", brokerAddr)
// Test basic TCP connection
conn, err := net.Dial("tcp", brokerAddr)
if err != nil {
t.Fatalf("Failed to connect: %v", err)
}
defer conn.Close()
t.Logf("Successfully connected to gateway")
}
// TestGateway_ApiVersionsRequest tests if we can send an ApiVersions request
func TestGateway_ApiVersionsRequest(t *testing.T) {
// Start the gateway server
srv := gateway.NewServer(gateway.Options{
Listen: ":0",
UseSeaweedMQ: false,
})
if err := srv.Start(); err != nil {
t.Fatalf("Failed to start gateway: %v", err)
}
defer srv.Close()
brokerAddr := srv.Addr()
t.Logf("Gateway running on %s", brokerAddr)
// Create connection
conn, err := net.Dial("tcp", brokerAddr)
if err != nil {
t.Fatalf("Failed to connect: %v", err)
}
defer conn.Close()
// Set read timeout
conn.SetReadDeadline(time.Now().Add(5 * time.Second))
// Build ApiVersions request (API key 18, version 0)
// Request format: message_size(4) + api_key(2) + api_version(2) + correlation_id(4) + client_id(2+string)
correlationID := uint32(1)
clientID := "debug-client"
request := make([]byte, 0, 64)
// Build message body first (without size)
msgBody := make([]byte, 0, 32)
msgBody = append(msgBody, 0, 18) // API key 18 (ApiVersions)
msgBody = append(msgBody, 0, 0) // API version 0
// Correlation ID
correlationBytes := make([]byte, 4)
binary.BigEndian.PutUint32(correlationBytes, correlationID)
msgBody = append(msgBody, correlationBytes...)
// Client ID string
clientIDBytes := []byte(clientID)
msgBody = append(msgBody, byte(len(clientIDBytes)>>8), byte(len(clientIDBytes)))
msgBody = append(msgBody, clientIDBytes...)
// Message size (4 bytes) + message body
sizeBytes := make([]byte, 4)
binary.BigEndian.PutUint32(sizeBytes, uint32(len(msgBody)))
request = append(request, sizeBytes...)
request = append(request, msgBody...)
t.Logf("Sending ApiVersions request: %d bytes", len(request))
// Send request
_, err = conn.Write(request)
if err != nil {
t.Fatalf("Failed to write request: %v", err)
}
// Read response size
var responseSizeBytes [4]byte
_, err = conn.Read(responseSizeBytes[:])
if err != nil {
t.Fatalf("Failed to read response size: %v", err)
}
responseSize := binary.BigEndian.Uint32(responseSizeBytes[:])
t.Logf("Response size: %d bytes", responseSize)
if responseSize == 0 || responseSize > 1024*1024 {
t.Fatalf("Invalid response size: %d", responseSize)
}
// Read response body
responseBody := make([]byte, responseSize)
totalRead := 0
for totalRead < int(responseSize) {
n, err := conn.Read(responseBody[totalRead:])
if err != nil {
t.Fatalf("Failed to read response body: %v (read %d/%d bytes)", err, totalRead, responseSize)
}
totalRead += n
}
t.Logf("Received response: %d bytes", len(responseBody))
// Parse basic response structure
if len(responseBody) < 4 {
t.Fatalf("Response too short: %d bytes", len(responseBody))
}
responseCorrelationID := binary.BigEndian.Uint32(responseBody[0:4])
if responseCorrelationID != correlationID {
t.Errorf("Correlation ID mismatch: sent %d, got %d", correlationID, responseCorrelationID)
}
t.Logf("ApiVersions request completed successfully")
}
// TestGateway_CreateTopicsRequest tests if we can send a CreateTopics request
func TestGateway_CreateTopicsRequest(t *testing.T) {
// Start the gateway server
srv := gateway.NewServer(gateway.Options{
Listen: ":0",
UseSeaweedMQ: false,
})
if err := srv.Start(); err != nil {
t.Fatalf("Failed to start gateway: %v", err)
}
defer srv.Close()
brokerAddr := srv.Addr()
t.Logf("Gateway running on %s", brokerAddr)
// Create connection
conn, err := net.Dial("tcp", brokerAddr)
if err != nil {
t.Fatalf("Failed to connect: %v", err)
}
defer conn.Close()
// Set read timeout
conn.SetReadDeadline(time.Now().Add(5 * time.Second))
// Build CreateTopics request (API key 19, version 0)
correlationID := uint32(2)
clientID := "debug-client"
topicName := "debug-topic"
// Build message body
msgBody := make([]byte, 0, 128)
msgBody = append(msgBody, 0, 19) // API key 19 (CreateTopics)
msgBody = append(msgBody, 0, 0) // API version 0
// Correlation ID
correlationBytes := make([]byte, 4)
binary.BigEndian.PutUint32(correlationBytes, correlationID)
msgBody = append(msgBody, correlationBytes...)
// Client ID string
clientIDBytes := []byte(clientID)
msgBody = append(msgBody, byte(len(clientIDBytes)>>8), byte(len(clientIDBytes)))
msgBody = append(msgBody, clientIDBytes...)
// Topics array - count (4 bytes)
msgBody = append(msgBody, 0, 0, 0, 1) // 1 topic
// Topic name
topicNameBytes := []byte(topicName)
msgBody = append(msgBody, byte(len(topicNameBytes)>>8), byte(len(topicNameBytes)))
msgBody = append(msgBody, topicNameBytes...)
// Num partitions (4 bytes)
msgBody = append(msgBody, 0, 0, 0, 1) // 1 partition
// Replication factor (2 bytes)
msgBody = append(msgBody, 0, 1) // replication factor 1
// Configs count (4 bytes)
msgBody = append(msgBody, 0, 0, 0, 0) // 0 configs
// Timeout (4 bytes)
msgBody = append(msgBody, 0, 0, 0x75, 0x30) // 30 seconds
// Message size + message body
sizeBytes := make([]byte, 4)
binary.BigEndian.PutUint32(sizeBytes, uint32(len(msgBody)))
request := append(sizeBytes, msgBody...)
t.Logf("Sending CreateTopics request: %d bytes", len(request))
// Send request
_, err = conn.Write(request)
if err != nil {
t.Fatalf("Failed to write request: %v", err)
}
// Read response size
var responseSizeBytes [4]byte
_, err = conn.Read(responseSizeBytes[:])
if err != nil {
t.Fatalf("Failed to read response size: %v", err)
}
responseSize := binary.BigEndian.Uint32(responseSizeBytes[:])
t.Logf("Response size: %d bytes", responseSize)
// Read response body
responseBody := make([]byte, responseSize)
totalRead := 0
for totalRead < int(responseSize) {
n, err := conn.Read(responseBody[totalRead:])
if err != nil {
t.Fatalf("Failed to read response body: %v (read %d/%d bytes)", err, totalRead, responseSize)
}
totalRead += n
}
t.Logf("CreateTopics request completed successfully, received %d bytes", len(responseBody))
}

36
weed/mq/kafka/protocol/handler.go

@ -163,6 +163,10 @@ func (h *Handler) HandleConn(conn net.Conn) error {
apiVersion := binary.BigEndian.Uint16(messageBuf[2:4])
correlationID := binary.BigEndian.Uint32(messageBuf[4:8])
// DEBUG: Log all incoming requests for debugging client compatibility
fmt.Printf("DEBUG: Received request - API Key: %d, Version: %d, Correlation: %d, Size: %d\n",
apiKey, apiVersion, correlationID, size)
// TODO: IMPORTANT - API version validation is missing
// Different API versions have different request/response formats
// Need to validate apiVersion against supported versions for each API
@ -207,6 +211,9 @@ func (h *Handler) HandleConn(conn net.Conn) error {
return fmt.Errorf("handle request: %w", err)
}
// DEBUG: Log response details
fmt.Printf("DEBUG: Sending response for API %d - Size: %d bytes\n", apiKey, len(response))
// Write response size and data
responseSizeBytes := make([]byte, 4)
binary.BigEndian.PutUint32(responseSizeBytes, uint32(len(response)))
@ -488,6 +495,11 @@ func (h *Handler) handleListOffsets(correlationID uint32, requestBody []byte) ([
}
func (h *Handler) handleCreateTopics(correlationID uint32, requestBody []byte) ([]byte, error) {
// TODO: CRITICAL - This function only supports CreateTopics v0 format
// kafka-go uses v2 which has a different request structure!
// The wrong topics count (1274981) shows we're parsing from wrong offset
// Need to implement proper v2 request parsing or negotiate API version
// Parse minimal CreateTopics request
// Request format: client_id + timeout(4) + topics_array
@ -498,17 +510,29 @@ func (h *Handler) handleCreateTopics(correlationID uint32, requestBody []byte) (
// Skip client_id
clientIDSize := binary.BigEndian.Uint16(requestBody[0:2])
offset := 2 + int(clientIDSize)
if len(requestBody) < offset+8 { // timeout(4) + topics_count(4)
return nil, fmt.Errorf("CreateTopics request missing data")
fmt.Printf("DEBUG: Client ID size: %d, client ID: %s\n", clientIDSize, string(requestBody[2:2+clientIDSize]))
// CreateTopics v2 has different format than v0
// v2 format: client_id + topics_array + timeout(4) + validate_only(1)
// (no separate timeout field before topics like in v0)
if len(requestBody) < offset+4 {
return nil, fmt.Errorf("CreateTopics request missing topics array")
}
// Skip timeout
offset += 4
// Read topics count directly (no timeout field before it in v2)
topicsCount := binary.BigEndian.Uint32(requestBody[offset : offset+4])
offset += 4
// DEBUG: Hex dump first 50 bytes to understand v2 format
dumpLen := len(requestBody)
if dumpLen > 50 {
dumpLen = 50
}
fmt.Printf("DEBUG: CreateTopics v2 request hex dump (first %d bytes): %x\n", dumpLen, requestBody[:dumpLen])
fmt.Printf("DEBUG: CreateTopics v2 - Topics count: %d, remaining bytes: %d\n", topicsCount, len(requestBody)-offset)
response := make([]byte, 0, 256)
// Correlation ID

Loading…
Cancel
Save