Browse Source

test with an un-decoded bytes of message value

pull/7231/head
chrislu 2 months ago
parent
commit
82f8b647de
  1. 403
      test/kafka/seaweedmq_integration_test.go
  2. 130
      weed/mq/kafka/integration/agent_client.go
  3. 28
      weed/mq/kafka/protocol/produce.go

403
test/kafka/seaweedmq_integration_test.go

@ -1,168 +1,335 @@
package kafka
package kafka_test
import (
"fmt"
"net"
"testing"
"time"
"github.com/IBM/sarama"
"github.com/seaweedfs/seaweedfs/weed/mq/kafka/gateway"
)
// TestSeaweedMQIntegration tests the Kafka Gateway with SeaweedMQ backend
func TestSeaweedMQIntegration(t *testing.T) {
// Start gateway in SeaweedMQ mode (will fallback to in-memory if agent not available)
// TestSeaweedMQIntegration_E2E tests the complete workflow with SeaweedMQ backend
// This test requires a real SeaweedMQ Agent running
func TestSeaweedMQIntegration_E2E(t *testing.T) {
// Skip by default - requires real SeaweedMQ setup
t.Skip("Integration test requires real SeaweedMQ setup - run manually")
// Test configuration
agentAddress := "localhost:17777" // Default SeaweedMQ Agent address
// Start the gateway with SeaweedMQ backend
gatewayServer := gateway.NewServer(gateway.Options{
Listen: "127.0.0.1:0",
AgentAddress: "localhost:17777", // SeaweedMQ Agent address
UseSeaweedMQ: true, // Enable SeaweedMQ backend
Listen: ":0", // random port
AgentAddress: agentAddress,
UseSeaweedMQ: true,
})
go func() {
if err := gatewayServer.Start(); err != nil {
t.Errorf("Failed to start gateway: %v", err)
}
}()
err := gatewayServer.Start()
if err != nil {
t.Fatalf("Failed to start gateway with SeaweedMQ backend: %v", err)
}
defer gatewayServer.Close()
// Wait for server to start
time.Sleep(100 * time.Millisecond)
addr := gatewayServer.Addr()
t.Logf("Started Kafka Gateway with SeaweedMQ backend on %s", addr)
host, port := gatewayServer.GetListenerAddr()
brokerAddr := fmt.Sprintf("%s:%d", host, port)
t.Logf("Gateway running on %s (SeaweedMQ mode)", brokerAddr)
// Wait for startup
time.Sleep(200 * time.Millisecond)
// Add test topic (this will use enhanced schema)
gatewayHandler := gatewayServer.GetHandler()
topicName := "seaweedmq-integration-topic"
gatewayHandler.AddTopicForTesting(topicName, 1)
t.Logf("Added topic: %s with enhanced Kafka schema", topicName)
// Test basic connectivity
t.Run("SeaweedMQ_BasicConnectivity", func(t *testing.T) {
testSeaweedMQConnectivity(t, addr)
})
// Configure Sarama for Kafka 2.1.0
config := sarama.NewConfig()
config.Version = sarama.V2_1_0_0
config.Producer.Return.Successes = true
config.Producer.RequiredAcks = sarama.WaitForAll
config.Consumer.Return.Errors = true
// Test topic lifecycle with SeaweedMQ
t.Run("SeaweedMQ_TopicLifecycle", func(t *testing.T) {
testSeaweedMQTopicLifecycle(t, addr)
})
t.Logf("=== Testing Enhanced Schema Integration ===")
// Test produce/consume workflow
t.Run("SeaweedMQ_ProduceConsume", func(t *testing.T) {
testSeaweedMQProduceConsume(t, addr)
})
}
// Create producer
producer, err := sarama.NewSyncProducer([]string{brokerAddr}, config)
// testSeaweedMQConnectivity verifies gateway responds correctly
func testSeaweedMQConnectivity(t *testing.T, addr string) {
conn, err := net.DialTimeout("tcp", addr, 5*time.Second)
if err != nil {
t.Fatalf("Failed to create producer: %v", err)
t.Fatalf("Failed to connect to SeaweedMQ gateway: %v", err)
}
defer producer.Close()
defer conn.Close()
// Produce messages with enhanced schema
messages := []struct {
key string
value string
}{
{"user-123", "Enhanced SeaweedMQ message 1"},
{"user-456", "Enhanced SeaweedMQ message 2"},
{"user-789", "Enhanced SeaweedMQ message 3"},
// Send ApiVersions request
req := buildApiVersionsRequest()
_, err = conn.Write(req)
if err != nil {
t.Fatalf("Failed to send ApiVersions: %v", err)
}
for i, msg := range messages {
producerMsg := &sarama.ProducerMessage{
Topic: topicName,
Key: sarama.StringEncoder(msg.key),
Value: sarama.StringEncoder(msg.value),
}
// Read response
sizeBytes := make([]byte, 4)
conn.SetReadDeadline(time.Now().Add(5 * time.Second))
_, err = conn.Read(sizeBytes)
if err != nil {
t.Fatalf("Failed to read response size: %v", err)
}
partition, offset, err := producer.SendMessage(producerMsg)
if err != nil {
t.Fatalf("Failed to produce message %d: %v", i, err)
}
t.Logf("✅ Produced message %d with enhanced schema: partition=%d, offset=%d", i, partition, offset)
responseSize := uint32(sizeBytes[0])<<24 | uint32(sizeBytes[1])<<16 | uint32(sizeBytes[2])<<8 | uint32(sizeBytes[3])
if responseSize == 0 || responseSize > 10000 {
t.Fatalf("Invalid response size: %d", responseSize)
}
t.Logf("=== Testing Enhanced Consumer (Future Phase) ===")
// Consumer testing will be implemented in Phase 2
responseBody := make([]byte, responseSize)
_, err = conn.Read(responseBody)
if err != nil {
t.Fatalf("Failed to read response body: %v", err)
}
// Verify API keys are advertised
if len(responseBody) < 20 {
t.Fatalf("Response too short")
}
apiKeyCount := uint32(responseBody[6])<<24 | uint32(responseBody[7])<<16 | uint32(responseBody[8])<<8 | uint32(responseBody[9])
if apiKeyCount < 6 {
t.Errorf("Expected at least 6 API keys, got %d", apiKeyCount)
}
t.Logf("🎉 SUCCESS: SeaweedMQ Integration test completed!")
t.Logf(" - Enhanced Kafka schema integration: ✅")
t.Logf(" - Agent client architecture: ✅")
t.Logf(" - Schema-aware topic creation: ✅")
t.Logf(" - Structured message storage: ✅")
t.Logf("SeaweedMQ gateway connectivity test passed, %d API keys advertised", apiKeyCount)
}
// TestSchemaCompatibility tests that the enhanced schema works with different message types
func TestSchemaCompatibility(t *testing.T) {
// This test verifies that our enhanced Kafka schema can handle various message formats
gatewayServer := gateway.NewServer(gateway.Options{
Listen: "127.0.0.1:0",
UseSeaweedMQ: false, // Use in-memory mode for this test
})
// testSeaweedMQTopicLifecycle tests creating and managing topics
func testSeaweedMQTopicLifecycle(t *testing.T, addr string) {
conn, err := net.DialTimeout("tcp", addr, 5*time.Second)
if err != nil {
t.Fatalf("Failed to connect: %v", err)
}
defer conn.Close()
go func() {
if err := gatewayServer.Start(); err != nil {
t.Errorf("Failed to start gateway: %v", err)
}
}()
defer gatewayServer.Close()
// Test CreateTopics request
topicName := "seaweedmq-test-topic"
createReq := buildCreateTopicsRequestCustom(topicName)
_, err = conn.Write(createReq)
if err != nil {
t.Fatalf("Failed to send CreateTopics: %v", err)
}
// Read response
sizeBytes := make([]byte, 4)
conn.SetReadDeadline(time.Now().Add(5 * time.Second))
_, err = conn.Read(sizeBytes)
if err != nil {
t.Fatalf("Failed to read CreateTopics response size: %v", err)
}
time.Sleep(100 * time.Millisecond)
responseSize := uint32(sizeBytes[0])<<24 | uint32(sizeBytes[1])<<16 | uint32(sizeBytes[2])<<8 | uint32(sizeBytes[3])
responseBody := make([]byte, responseSize)
_, err = conn.Read(responseBody)
if err != nil {
t.Fatalf("Failed to read CreateTopics response: %v", err)
}
host, port := gatewayServer.GetListenerAddr()
brokerAddr := fmt.Sprintf("%s:%d", host, port)
// Parse response to check for success (basic validation)
if len(responseBody) < 10 {
t.Fatalf("CreateTopics response too short")
}
gatewayHandler := gatewayServer.GetHandler()
topicName := "schema-compatibility-topic"
gatewayHandler.AddTopicForTesting(topicName, 1)
t.Logf("SeaweedMQ topic creation test completed: %d bytes response", len(responseBody))
}
config := sarama.NewConfig()
config.Version = sarama.V2_1_0_0
config.Producer.Return.Successes = true
// testSeaweedMQProduceConsume tests the produce/consume workflow
func testSeaweedMQProduceConsume(t *testing.T, addr string) {
// This would be a more comprehensive test in a full implementation
// For now, just test that Produce requests are handled
producer, err := sarama.NewSyncProducer([]string{brokerAddr}, config)
conn, err := net.DialTimeout("tcp", addr, 5*time.Second)
if err != nil {
t.Fatalf("Failed to create producer: %v", err)
t.Fatalf("Failed to connect: %v", err)
}
defer producer.Close()
defer conn.Close()
// Test different message types that should work with enhanced schema
testCases := []struct {
name string
key interface{}
value interface{}
}{
{"String key-value", "string-key", "string-value"},
{"Byte key-value", []byte("byte-key"), []byte("byte-value")},
{"Empty key", nil, "value-only-message"},
{"JSON value", "json-key", `{"field": "value", "number": 42}`},
{"Binary value", "binary-key", []byte{0x01, 0x02, 0x03, 0x04}},
// First create a topic
createReq := buildCreateTopicsRequestCustom("produce-test-topic")
_, err = conn.Write(createReq)
if err != nil {
t.Fatalf("Failed to send CreateTopics: %v", err)
}
for i, tc := range testCases {
msg := &sarama.ProducerMessage{
Topic: topicName,
}
// Read CreateTopics response
sizeBytes := make([]byte, 4)
conn.SetReadDeadline(time.Now().Add(5 * time.Second))
_, err = conn.Read(sizeBytes)
if err != nil {
t.Fatalf("Failed to read CreateTopics size: %v", err)
}
if tc.key != nil {
switch k := tc.key.(type) {
case string:
msg.Key = sarama.StringEncoder(k)
case []byte:
msg.Key = sarama.ByteEncoder(k)
}
responseSize := uint32(sizeBytes[0])<<24 | uint32(sizeBytes[1])<<16 | uint32(sizeBytes[2])<<8 | uint32(sizeBytes[3])
responseBody := make([]byte, responseSize)
_, err = conn.Read(responseBody)
if err != nil {
t.Fatalf("Failed to read CreateTopics response: %v", err)
}
// TODO: Send a Produce request and verify it works with SeaweedMQ
// This would require building a proper Kafka Produce request
t.Logf("SeaweedMQ produce/consume test placeholder completed")
}
// buildCreateTopicsRequestCustom creates a CreateTopics request for a specific topic
func buildCreateTopicsRequestCustom(topicName string) []byte {
clientID := "seaweedmq-test-client"
// Approximate message size
messageSize := 2 + 2 + 4 + 2 + len(clientID) + 4 + 4 + 2 + len(topicName) + 4 + 2 + 4 + 4
request := make([]byte, 0, messageSize+4)
// Message size placeholder
sizePos := len(request)
request = append(request, 0, 0, 0, 0)
// API key (CreateTopics = 19)
request = append(request, 0, 19)
// API version
request = append(request, 0, 4)
// Correlation ID
request = append(request, 0, 0, 0x30, 0x42) // 12354
// Client ID
request = append(request, 0, byte(len(clientID)))
request = append(request, []byte(clientID)...)
// Timeout (5000ms)
request = append(request, 0, 0, 0x13, 0x88)
// Topics count (1)
request = append(request, 0, 0, 0, 1)
// Topic name
request = append(request, 0, byte(len(topicName)))
request = append(request, []byte(topicName)...)
// Num partitions (1)
request = append(request, 0, 0, 0, 1)
// Replication factor (1)
request = append(request, 0, 1)
// Configs count (0)
request = append(request, 0, 0, 0, 0)
// Topic timeout (5000ms)
request = append(request, 0, 0, 0x13, 0x88)
// Fix message size
actualSize := len(request) - 4
request[sizePos] = byte(actualSize >> 24)
request[sizePos+1] = byte(actualSize >> 16)
request[sizePos+2] = byte(actualSize >> 8)
request[sizePos+3] = byte(actualSize)
return request
}
// TestSeaweedMQGateway_ModeSelection tests that the gateway properly selects backends
func TestSeaweedMQGateway_ModeSelection(t *testing.T) {
// Test in-memory mode (should always work)
t.Run("InMemoryMode", func(t *testing.T) {
server := gateway.NewServer(gateway.Options{
Listen: ":0",
UseSeaweedMQ: false,
})
err := server.Start()
if err != nil {
t.Fatalf("In-memory mode should start: %v", err)
}
defer server.Close()
switch v := tc.value.(type) {
case string:
msg.Value = sarama.StringEncoder(v)
case []byte:
msg.Value = sarama.ByteEncoder(v)
addr := server.Addr()
if addr == "" {
t.Errorf("Server should have listening address")
}
partition, offset, err := producer.SendMessage(msg)
t.Logf("In-memory mode started on %s", addr)
})
// Test SeaweedMQ mode with invalid agent (should fall back)
t.Run("SeaweedMQModeFallback", func(t *testing.T) {
server := gateway.NewServer(gateway.Options{
Listen: ":0",
AgentAddress: "invalid:99999", // Invalid address
UseSeaweedMQ: true,
})
err := server.Start()
if err != nil {
t.Errorf("Failed to produce message %d (%s): %v", i, tc.name, err)
continue
t.Fatalf("Should start even with invalid agent (fallback to in-memory): %v", err)
}
t.Logf("✅ %s: partition=%d, offset=%d", tc.name, partition, offset)
defer server.Close()
addr := server.Addr()
if addr == "" {
t.Errorf("Server should have listening address")
}
t.Logf("SeaweedMQ mode with fallback started on %s", addr)
})
}
// TestSeaweedMQGateway_ConfigValidation tests configuration validation
func TestSeaweedMQGateway_ConfigValidation(t *testing.T) {
testCases := []struct {
name string
options gateway.Options
shouldWork bool
}{
{
name: "ValidInMemory",
options: gateway.Options{
Listen: ":0",
UseSeaweedMQ: false,
},
shouldWork: true,
},
{
name: "ValidSeaweedMQWithAgent",
options: gateway.Options{
Listen: ":0",
AgentAddress: "localhost:17777",
UseSeaweedMQ: true,
},
shouldWork: true, // May fail if no agent, but config is valid
},
{
name: "SeaweedMQWithoutAgent",
options: gateway.Options{
Listen: ":0",
UseSeaweedMQ: true,
// AgentAddress is empty
},
shouldWork: true, // Should fall back to in-memory
},
}
t.Logf("🎉 SUCCESS: Schema compatibility test completed!")
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
server := gateway.NewServer(tc.options)
err := server.Start()
if tc.shouldWork && err != nil {
t.Errorf("Expected config to work, got error: %v", err)
}
if err == nil {
server.Close()
t.Logf("Config test passed for %s", tc.name)
}
})
}
}

130
weed/mq/kafka/integration/agent_client.go

@ -134,12 +134,37 @@ func (ac *AgentClient) GetOrCreatePublisher(topic string, partition int32) (*Pub
// createPublishSession creates a new publishing session with SeaweedMQ Agent
func (ac *AgentClient) createPublishSession(topic string, partition int32) (*PublisherSession, error) {
// Create comprehensive Kafka record schema for SeaweedMQ
recordType := ac.createKafkaRecordSchema()
// Check if topic already exists in SeaweedMQ, create if needed
if err := ac.ensureTopicExists(topic, recordType); err != nil {
return nil, fmt.Errorf("failed to ensure topic exists: %v", err)
// Create a basic record type for Kafka messages
recordType := &schema_pb.RecordType{
Fields: []*schema_pb.Field{
{
Name: "key",
FieldIndex: 0,
Type: &schema_pb.Type{
Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_BYTES},
},
IsRequired: false,
IsRepeated: false,
},
{
Name: "value",
FieldIndex: 1,
Type: &schema_pb.Type{
Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_BYTES},
},
IsRequired: true,
IsRepeated: false,
},
{
Name: "timestamp",
FieldIndex: 2,
Type: &schema_pb.Type{
Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_TIMESTAMP},
},
IsRequired: false,
IsRepeated: false,
},
},
}
// Start publish session
@ -186,16 +211,16 @@ func (ac *AgentClient) PublishRecord(topic string, partition int32, key []byte,
return 0, err
}
// Convert to SeaweedMQ record format using enhanced Kafka schema
// Convert to SeaweedMQ record format
record := &schema_pb.RecordValue{
Fields: map[string]*schema_pb.Value{
"kafka_key": {
"key": {
Kind: &schema_pb.Value_BytesValue{BytesValue: key},
},
"kafka_value": {
"value": {
Kind: &schema_pb.Value_BytesValue{BytesValue: value},
},
"kafka_timestamp": {
"timestamp": {
Kind: &schema_pb.Value_TimestampValue{
TimestampValue: &schema_pb.TimestampValue{
TimestampMicros: timestamp / 1000, // Convert nanoseconds to microseconds
@ -203,15 +228,6 @@ func (ac *AgentClient) PublishRecord(topic string, partition int32, key []byte,
},
},
},
"kafka_headers": {
Kind: &schema_pb.Value_BytesValue{BytesValue: []byte{}}, // Empty headers for now
},
"kafka_offset": {
Kind: &schema_pb.Value_Int64Value{Int64Value: 0}, // Will be set by SeaweedMQ
},
"kafka_partition": {
Kind: &schema_pb.Value_Int32Value{Int32Value: partition},
},
},
}
@ -385,79 +401,3 @@ func (ac *AgentClient) HealthCheck() error {
return nil
}
// createKafkaRecordSchema creates a comprehensive schema for Kafka messages in SeaweedMQ
func (ac *AgentClient) createKafkaRecordSchema() *schema_pb.RecordType {
return &schema_pb.RecordType{
Fields: []*schema_pb.Field{
{
Name: "kafka_key",
FieldIndex: 0,
Type: &schema_pb.Type{
Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_BYTES},
},
IsRequired: false,
IsRepeated: false,
},
{
Name: "kafka_value",
FieldIndex: 1,
Type: &schema_pb.Type{
Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_BYTES},
},
IsRequired: true,
IsRepeated: false,
},
{
Name: "kafka_timestamp",
FieldIndex: 2,
Type: &schema_pb.Type{
Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_TIMESTAMP},
},
IsRequired: false,
IsRepeated: false,
},
{
Name: "kafka_headers",
FieldIndex: 3,
Type: &schema_pb.Type{
Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_BYTES},
},
IsRequired: false,
IsRepeated: false,
},
{
Name: "kafka_offset",
FieldIndex: 4,
Type: &schema_pb.Type{
Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_INT64},
},
IsRequired: false,
IsRepeated: false,
},
{
Name: "kafka_partition",
FieldIndex: 5,
Type: &schema_pb.Type{
Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_INT32},
},
IsRequired: false,
IsRepeated: false,
},
},
}
}
// ensureTopicExists checks if topic exists in SeaweedMQ and creates it if needed
func (ac *AgentClient) ensureTopicExists(topic string, recordType *schema_pb.RecordType) error {
// For Phase 1, we'll rely on SeaweedMQ's auto-creation during publish
// In Phase 3, we'll implement proper topic discovery and creation
return nil
}
// CreateTopicWithSchema creates a topic in SeaweedMQ with the specified schema
func (ac *AgentClient) CreateTopicWithSchema(topic string, partitions int32, recordType *schema_pb.RecordType) error {
// This will be implemented in Phase 3 when we integrate with CreateTopics API
// For now, topics are auto-created during first publish
return nil
}

28
weed/mq/kafka/protocol/produce.go

@ -174,20 +174,16 @@ func (h *Handler) handleProduceV0V1(correlationID uint32, apiVersion uint16, req
if parseErr != nil {
errorCode = 42 // INVALID_RECORD
} else if recordCount > 0 {
if h.useSeaweedMQ && h.seaweedMQHandler != nil {
if h.useSeaweedMQ {
// Use SeaweedMQ integration for production
fmt.Printf("DEBUG: Using SeaweedMQ backend for topic '%s' partition %d\n", topicName, partitionID)
offset, err := h.produceToSeaweedMQ(topicName, int32(partitionID), recordSetData)
if err != nil {
fmt.Printf("DEBUG: SeaweedMQ produce error: %v\n", err)
errorCode = 1 // UNKNOWN_SERVER_ERROR
} else {
fmt.Printf("DEBUG: SeaweedMQ produce success, offset: %d\n", offset)
baseOffset = offset
}
} else {
// Use legacy in-memory mode for tests
fmt.Printf("DEBUG: Using in-memory backend for topic '%s' partition %d\n", topicName, partitionID)
ledger := h.GetOrCreateLedger(topicName, int32(partitionID))
baseOffset = ledger.AssignOffsets(int64(recordCount))
@ -270,26 +266,16 @@ func (h *Handler) parseRecordSet(recordSetData []byte) (recordCount int32, total
return recordCount, int32(len(recordSetData)), nil
}
// produceToSeaweedMQ publishes records to SeaweedMQ via enhanced agent client
// produceToSeaweedMQ publishes a single record to SeaweedMQ (simplified for Phase 2)
func (h *Handler) produceToSeaweedMQ(topic string, partition int32, recordSetData []byte) (int64, error) {
if h.seaweedMQHandler == nil {
return 0, fmt.Errorf("SeaweedMQ handler not available")
}
fmt.Printf("DEBUG: Producing to SeaweedMQ - topic: %s, partition: %d, data size: %d\n", topic, partition, len(recordSetData))
// For Phase 2, we'll extract a simple key-value from the record set
// In a full implementation, this would parse the entire batch properly
// For Phase 1, extract a simple key-value from the record set
// This will be enhanced in Phase 2 with proper record batch parsing
// Extract first record from record set (simplified)
key, value := h.extractFirstRecord(recordSetData)
// Publish to SeaweedMQ using enhanced schema
offset, err := h.seaweedMQHandler.ProduceRecord(topic, partition, key, value)
if err != nil {
return 0, fmt.Errorf("failed to produce to SeaweedMQ: %v", err)
}
fmt.Printf("DEBUG: Successfully produced to SeaweedMQ, offset: %d\n", offset)
return offset, nil
// Publish to SeaweedMQ
return h.seaweedMQHandler.ProduceRecord(topic, partition, key, value)
}
// extractFirstRecord extracts the first record from a Kafka record set (simplified)

Loading…
Cancel
Save