Browse Source

refactoring

pull/7231/head
chrislu 2 months ago
parent
commit
c0b15ed489
  1. 2
      test/kafka/api_sequence_test.go
  2. 6
      test/kafka/client_integration_test.go
  3. 4
      test/kafka/comprehensive_e2e_test.go
  4. 2
      test/kafka/connection_close_debug_test.go
  5. 2
      test/kafka/connection_debug_test.go
  6. 2
      test/kafka/consumer_group_debug_test.go
  7. 6
      test/kafka/consumer_group_test.go
  8. 2
      test/kafka/consumer_only_test.go
  9. 2
      test/kafka/consumer_test.go
  10. 6
      test/kafka/debug_connection_test.go
  11. 2
      test/kafka/debug_consumer_group_test.go
  12. 2
      test/kafka/debug_produce_response_test.go
  13. 2
      test/kafka/debug_produce_v7_test.go
  14. 2
      test/kafka/debug_readpartitions_test.go
  15. 6
      test/kafka/e2e_test.go
  16. 4
      test/kafka/gateway_smoke_test.go
  17. 2
      test/kafka/joingroup_debug_test.go
  18. 2
      test/kafka/kafka_go_debug_test.go
  19. 2
      test/kafka/kafka_go_internal_debug_test.go
  20. 2
      test/kafka/kafka_go_metadata_test.go
  21. 22
      test/kafka/kafka_go_produce_only_test.go
  22. 2
      test/kafka/metadata_comparison_test.go
  23. 2
      test/kafka/metadata_debug_test.go
  24. 2
      test/kafka/metadata_v1_isolation_test.go
  25. 293
      test/kafka/mock_smq_handler.go
  26. 2
      test/kafka/network_capture_test.go
  27. 2
      test/kafka/parsing_debug_test.go
  28. 2
      test/kafka/produce_consume_cycle_test.go
  29. 2
      test/kafka/produce_consume_test.go
  30. 2
      test/kafka/raw_protocol_test.go
  31. 11
      test/kafka/sarama_e2e_test.go
  32. 21
      test/kafka/sarama_simple_test.go
  33. 14
      test/kafka/sarama_test.go
  34. 8
      test/kafka/seaweedmq_integration_test.go
  35. 23
      weed/mq/kafka/gateway/server.go
  36. 100
      weed/mq/kafka/gateway/server_test.go
  37. 37
      weed/mq/kafka/gateway/server_testing.go
  38. 9
      weed/mq/kafka/integration/seaweedmq_handler.go
  39. 31
      weed/mq/kafka/offset/ledger.go
  40. 4
      weed/mq/kafka/protocol/consumer_coordination_test.go
  41. 85
      weed/mq/kafka/protocol/fetch.go
  42. 266
      weed/mq/kafka/protocol/handler.go
  43. 558
      weed/mq/kafka/protocol/handler_test.go
  44. 299
      weed/mq/kafka/protocol/handler_testing.go
  45. 6
      weed/mq/kafka/protocol/offset_management_test.go

2
test/kafka/api_sequence_test.go

@ -13,7 +13,7 @@ import (
// TestKafkaGateway_APISequence logs all API requests that kafka-go makes
func TestKafkaGateway_APISequence(t *testing.T) {
// Start the gateway server
srv := gateway.NewServer(gateway.Options{
srv := gateway.NewTestServer(gateway.Options{
Listen: ":0",
})

6
test/kafka/client_integration_test.go

@ -15,7 +15,7 @@ import (
// TestKafkaGoClient_BasicProduceConsume tests our gateway with real kafka-go client
func TestKafkaGoClient_BasicProduceConsume(t *testing.T) {
// Start the gateway server
srv := gateway.NewServer(gateway.Options{
srv := gateway.NewTestServer(gateway.Options{
Listen: ":0", // Use random port
})
@ -81,7 +81,7 @@ func TestKafkaGoClient_BasicProduceConsume(t *testing.T) {
// TestKafkaGoClient_ConsumerGroups tests consumer group functionality
func TestKafkaGoClient_ConsumerGroups(t *testing.T) {
// Start the gateway server
srv := gateway.NewServer(gateway.Options{
srv := gateway.NewTestServer(gateway.Options{
Listen: ":0",
})
@ -134,7 +134,7 @@ func TestKafkaGoClient_MultiplePartitions(t *testing.T) {
// TestKafkaGoClient_OffsetManagement tests offset commit/fetch operations
func TestKafkaGoClient_OffsetManagement(t *testing.T) {
// Start the gateway server
srv := gateway.NewServer(gateway.Options{
srv := gateway.NewTestServer(gateway.Options{
Listen: ":0",
})

4
test/kafka/comprehensive_e2e_test.go

@ -15,7 +15,7 @@ import (
// TestComprehensiveE2E tests both kafka-go and Sarama clients in a comprehensive scenario
func TestComprehensiveE2E(t *testing.T) {
// Start gateway
gatewayServer := gateway.NewServer(gateway.Options{
gatewayServer := gateway.NewTestServer(gateway.Options{
Listen: "127.0.0.1:0",
})
@ -357,7 +357,7 @@ func testSaramaToKafkaGo(t *testing.T, addr, topic string) {
// TestOffsetManagement tests offset commit and fetch operations
func TestOffsetManagement(t *testing.T) {
// Start gateway
gatewayServer := gateway.NewServer(gateway.Options{
gatewayServer := gateway.NewTestServer(gateway.Options{
Listen: "127.0.0.1:0",
})

2
test/kafka/connection_close_debug_test.go

@ -14,7 +14,7 @@ import (
// TestConnectionCloseDebug captures the exact moment kafka-go closes the connection
func TestConnectionCloseDebug(t *testing.T) {
// Start gateway server
gatewayServer := gateway.NewServer(gateway.Options{
gatewayServer := gateway.NewTestServer(gateway.Options{
Listen: ":0", // random port
})

2
test/kafka/connection_debug_test.go

@ -13,7 +13,7 @@ import (
// TestConnectionDebug debugs the exact connection behavior between kafka-go and our gateway
func TestConnectionDebug(t *testing.T) {
// Start gateway server
gatewayServer := gateway.NewServer(gateway.Options{
gatewayServer := gateway.NewTestServer(gateway.Options{
Listen: ":0", // random port
})

2
test/kafka/consumer_group_debug_test.go

@ -12,7 +12,7 @@ import (
func TestConsumerGroup_Debug(t *testing.T) {
// Start Kafka gateway
gatewayServer := gateway.NewServer(gateway.Options{Listen: ":0"})
gatewayServer := gateway.NewTestServer(gateway.Options{Listen: ":0"})
go func() {
if err := gatewayServer.Start(); err != nil {

6
test/kafka/consumer_group_test.go

@ -13,7 +13,7 @@ import (
func TestConsumerGroup_BasicFunctionality(t *testing.T) {
// Start Kafka gateway
gatewayServer := gateway.NewServer(gateway.Options{Listen: ":0"})
gatewayServer := gateway.NewTestServer(gateway.Options{Listen: ":0"})
go func() {
if err := gatewayServer.Start(); err != nil {
@ -203,7 +203,7 @@ func (h *ConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession,
func TestConsumerGroup_OffsetCommitAndFetch(t *testing.T) {
// Start Kafka gateway
gatewayServer := gateway.NewServer(gateway.Options{Listen: ":0"})
gatewayServer := gateway.NewTestServer(gateway.Options{Listen: ":0"})
go func() {
if err := gatewayServer.Start(); err != nil {
@ -402,7 +402,7 @@ func (h *OffsetTestHandler) ConsumeClaim(session sarama.ConsumerGroupSession, cl
func TestConsumerGroup_Rebalancing(t *testing.T) {
// Start Kafka gateway
gatewayServer := gateway.NewServer(gateway.Options{Listen: ":0"})
gatewayServer := gateway.NewTestServer(gateway.Options{Listen: ":0"})
go func() {
if err := gatewayServer.Start(); err != nil {

2
test/kafka/consumer_only_test.go

@ -13,7 +13,7 @@ import (
// TestConsumerOnly tests only the consumer workflow to debug SyncGroup issue
func TestConsumerOnly(t *testing.T) {
// Start gateway server
gatewayServer := gateway.NewServer(gateway.Options{
gatewayServer := gateway.NewTestServer(gateway.Options{
Listen: ":0", // random port
})

2
test/kafka/consumer_test.go

@ -13,7 +13,7 @@ import (
// TestKafkaGoReader tests if kafka-go Reader has different validation than Writer
func TestKafkaGoReader(t *testing.T) {
// Start the gateway server
srv := gateway.NewServer(gateway.Options{
srv := gateway.NewTestServer(gateway.Options{
Listen: ":0",
})

6
test/kafka/debug_connection_test.go

@ -12,7 +12,7 @@ import (
// TestGateway_BasicConnection tests if the gateway can handle basic TCP connections
func TestGateway_BasicConnection(t *testing.T) {
// Start the gateway server
srv := gateway.NewServer(gateway.Options{
srv := gateway.NewTestServer(gateway.Options{
Listen: ":0",
})
@ -37,7 +37,7 @@ func TestGateway_BasicConnection(t *testing.T) {
// TestGateway_ApiVersionsRequest tests if we can send an ApiVersions request
func TestGateway_ApiVersionsRequest(t *testing.T) {
// Start the gateway server
srv := gateway.NewServer(gateway.Options{
srv := gateway.NewTestServer(gateway.Options{
Listen: ":0",
})
@ -138,7 +138,7 @@ func TestGateway_ApiVersionsRequest(t *testing.T) {
// TestGateway_CreateTopicsRequest tests if we can send a CreateTopics request
func TestGateway_CreateTopicsRequest(t *testing.T) {
// Start the gateway server
srv := gateway.NewServer(gateway.Options{
srv := gateway.NewTestServer(gateway.Options{
Listen: ":0",
})

2
test/kafka/debug_consumer_group_test.go

@ -12,7 +12,7 @@ import (
func TestDebugConsumerGroupWorkflow(t *testing.T) {
// Start gateway
gatewayServer := gateway.NewServer(gateway.Options{
gatewayServer := gateway.NewTestServer(gateway.Options{
Listen: "127.0.0.1:0",
})

2
test/kafka/debug_produce_response_test.go

@ -12,7 +12,7 @@ import (
func TestDebugProduceV7Response(t *testing.T) {
// Start gateway
gatewayServer := gateway.NewServer(gateway.Options{
gatewayServer := gateway.NewTestServer(gateway.Options{
Listen: "127.0.0.1:0",
})

2
test/kafka/debug_produce_v7_test.go

@ -11,7 +11,7 @@ import (
func TestDebugProduceV7Format(t *testing.T) {
// Start gateway
gatewayServer := gateway.NewServer(gateway.Options{
gatewayServer := gateway.NewTestServer(gateway.Options{
Listen: "127.0.0.1:0",
})

2
test/kafka/debug_readpartitions_test.go

@ -11,7 +11,7 @@ import (
func TestDebugReadPartitions(t *testing.T) {
// Start gateway
gatewayServer := gateway.NewServer(gateway.Options{
gatewayServer := gateway.NewTestServer(gateway.Options{
Listen: "127.0.0.1:0",
})

6
test/kafka/e2e_test.go

@ -12,7 +12,7 @@ import (
// TestKafkaGateway_E2E tests the complete Kafka workflow using the gateway
func TestKafkaGateway_E2E(t *testing.T) {
// Start the gateway server
gatewayServer := gateway.NewServer(gateway.Options{
gatewayServer := gateway.NewTestServer(gateway.Options{
Listen: ":0", // use random port
})
@ -331,7 +331,7 @@ func TestKafkaGateway_ProduceConsume(t *testing.T) {
// TestKafkaGateway_MultipleClients tests concurrent client connections
func TestKafkaGateway_MultipleClients(t *testing.T) {
// Start the gateway server
gatewayServer := gateway.NewServer(gateway.Options{
gatewayServer := gateway.NewTestServer(gateway.Options{
Listen: ":0", // use random port
})
@ -393,7 +393,7 @@ func TestKafkaGateway_StressTest(t *testing.T) {
}
// Start the gateway server
gatewayServer := gateway.NewServer(gateway.Options{
gatewayServer := gateway.NewTestServer(gateway.Options{
Listen: ":0",
})

4
test/kafka/gateway_smoke_test.go

@ -9,7 +9,7 @@ import (
)
func TestGateway_StartAcceptsConnections(t *testing.T) {
srv := gateway.NewServer(gateway.Options{Listen: ":0"})
srv := gateway.NewTestServer(gateway.Options{Listen: ":0"})
if err := srv.Start(); err != nil {
t.Fatalf("start gateway: %v", err)
}
@ -28,7 +28,7 @@ func TestGateway_StartAcceptsConnections(t *testing.T) {
}
func TestGateway_RefusesAfterClose(t *testing.T) {
srv := gateway.NewServer(gateway.Options{Listen: ":0"})
srv := gateway.NewTestServer(gateway.Options{Listen: ":0"})
if err := srv.Start(); err != nil {
t.Fatalf("start gateway: %v", err)
}

2
test/kafka/joingroup_debug_test.go

@ -13,7 +13,7 @@ import (
// TestJoinGroupDebug captures the exact JoinGroup request/response to debug format issues
func TestJoinGroupDebug(t *testing.T) {
// Start gateway server
gatewayServer := gateway.NewServer(gateway.Options{
gatewayServer := gateway.NewTestServer(gateway.Options{
Listen: ":0", // random port
})

2
test/kafka/kafka_go_debug_test.go

@ -14,7 +14,7 @@ import (
// TestKafkaGoDeepDebug attempts to get more detailed error information from kafka-go
func TestKafkaGoDeepDebug(t *testing.T) {
// Start gateway
gatewayServer := gateway.NewServer(gateway.Options{
gatewayServer := gateway.NewTestServer(gateway.Options{
Listen: "127.0.0.1:0",
})

2
test/kafka/kafka_go_internal_debug_test.go

@ -15,7 +15,7 @@ import (
// TestKafkaGoInternalDebug attempts to debug kafka-go's internal parsing by intercepting the read operations
func TestKafkaGoInternalDebug(t *testing.T) {
// Start gateway
gatewayServer := gateway.NewServer(gateway.Options{
gatewayServer := gateway.NewTestServer(gateway.Options{
Listen: "127.0.0.1:0",
})

2
test/kafka/kafka_go_metadata_test.go

@ -12,7 +12,7 @@ import (
// TestKafkaGoMetadataV1Compatibility tests if our Metadata v1 response is compatible with kafka-go
func TestKafkaGoMetadataV1Compatibility(t *testing.T) {
// Start gateway server
gatewayServer := gateway.NewServer(gateway.Options{
gatewayServer := gateway.NewTestServer(gateway.Options{
Listen: ":0", // random port
})

22
test/kafka/kafka_go_produce_only_test.go

@ -11,8 +11,10 @@ import (
)
func TestKafkaGo_ProduceOnly(t *testing.T) {
// Start gateway
gatewayServer := gateway.NewServer(gateway.Options{Listen: "127.0.0.1:0"})
// Start gateway with test server
gatewayServer := gateway.NewTestServer(gateway.Options{
Listen: "127.0.0.1:0",
})
go func() {
if err := gatewayServer.Start(); err != nil {
t.Errorf("Failed to start gateway: %v", err)
@ -47,7 +49,7 @@ func TestKafkaGo_ProduceOnly(t *testing.T) {
func TestKafkaGo_ProduceConsume(t *testing.T) {
// Start gateway
gatewayServer := gateway.NewServer(gateway.Options{Listen: "127.0.0.1:0"})
gatewayServer := gateway.NewTestServer(gateway.Options{Listen: "127.0.0.1:0"})
go func() {
if err := gatewayServer.Start(); err != nil {
t.Errorf("Failed to start gateway: %v", err)
@ -112,7 +114,7 @@ func TestKafkaGo_ProduceConsume(t *testing.T) {
t.Fatalf("kafka-go consume failed at message %d: %v", i, err)
}
consumedMessages = append(consumedMessages, msg)
t.Logf("✅ Consumed message %d: key=%s, value=%s, offset=%d",
t.Logf("✅ Consumed message %d: key=%s, value=%s, offset=%d",
i, string(msg.Key), string(msg.Value), msg.Offset)
}
@ -124,11 +126,11 @@ func TestKafkaGo_ProduceConsume(t *testing.T) {
for i, consumed := range consumedMessages {
expected := testMessages[i]
if string(consumed.Key) != string(expected.Key) {
t.Errorf("Message %d key mismatch: got %s, want %s",
t.Errorf("Message %d key mismatch: got %s, want %s",
i, string(consumed.Key), string(expected.Key))
}
if string(consumed.Value) != string(expected.Value) {
t.Errorf("Message %d value mismatch: got %s, want %s",
t.Errorf("Message %d value mismatch: got %s, want %s",
i, string(consumed.Value), string(expected.Value))
}
}
@ -138,7 +140,7 @@ func TestKafkaGo_ProduceConsume(t *testing.T) {
func TestKafkaGo_ConsumerGroup(t *testing.T) {
// Start gateway
gatewayServer := gateway.NewServer(gateway.Options{Listen: "127.0.0.1:0"})
gatewayServer := gateway.NewTestServer(gateway.Options{Listen: "127.0.0.1:0"})
go func() {
if err := gatewayServer.Start(); err != nil {
t.Errorf("Failed to start gateway: %v", err)
@ -205,7 +207,7 @@ func TestKafkaGo_ConsumerGroup(t *testing.T) {
t.Fatalf("kafka-go consumer group failed at message %d: %v", i, err)
}
consumedMessages = append(consumedMessages, msg)
t.Logf("✅ Consumer group consumed message %d: key=%s, value=%s, offset=%d",
t.Logf("✅ Consumer group consumed message %d: key=%s, value=%s, offset=%d",
i, string(msg.Key), string(msg.Value), msg.Offset)
}
@ -217,11 +219,11 @@ func TestKafkaGo_ConsumerGroup(t *testing.T) {
for i, consumed := range consumedMessages {
expected := testMessages[i]
if string(consumed.Key) != string(expected.Key) {
t.Errorf("Message %d key mismatch: got %s, want %s",
t.Errorf("Message %d key mismatch: got %s, want %s",
i, string(consumed.Key), string(expected.Key))
}
if string(consumed.Value) != string(expected.Value) {
t.Errorf("Message %d value mismatch: got %s, want %s",
t.Errorf("Message %d value mismatch: got %s, want %s",
i, string(consumed.Value), string(expected.Value))
}
}

2
test/kafka/metadata_comparison_test.go

@ -10,7 +10,7 @@ import (
func TestMetadataResponseComparison(t *testing.T) {
// Start gateway
gatewayServer := gateway.NewServer(gateway.Options{Listen: "127.0.0.1:0"})
gatewayServer := gateway.NewTestServer(gateway.Options{Listen: "127.0.0.1:0"})
go func() {
if err := gatewayServer.Start(); err != nil {
t.Errorf("Failed to start gateway: %v", err)

2
test/kafka/metadata_debug_test.go

@ -11,7 +11,7 @@ import (
func TestMetadataV6Debug(t *testing.T) {
// Start gateway
gatewayServer := gateway.NewServer(gateway.Options{Listen: "127.0.0.1:0"})
gatewayServer := gateway.NewTestServer(gateway.Options{Listen: "127.0.0.1:0"})
go func() {
if err := gatewayServer.Start(); err != nil {
t.Errorf("Failed to start gateway: %v", err)

2
test/kafka/metadata_v1_isolation_test.go

@ -15,7 +15,7 @@ import (
// TestMetadataV1Isolation creates a minimal test to isolate the Metadata v1 parsing issue
func TestMetadataV1Isolation(t *testing.T) {
// Start gateway
gatewayServer := gateway.NewServer(gateway.Options{
gatewayServer := gateway.NewTestServer(gateway.Options{
Listen: "127.0.0.1:0",
})

293
test/kafka/mock_smq_handler.go

@ -0,0 +1,293 @@
package kafka
import (
"fmt"
"sync"
"time"
"github.com/seaweedfs/seaweedfs/weed/mq/kafka/offset"
"github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
)
// MockSMQHandler provides a realistic SeaweedMQ simulation for testing
// It behaves like the real SeaweedMQ integration but uses in-memory storage
type MockSMQHandler struct {
// Topic management
topicsMu sync.RWMutex
topics map[string]*MockTopicInfo
// Message storage - simulates SeaweedMQ's persistent storage
messagesMu sync.RWMutex
messages map[string]map[int32][]*MockSMQRecord // topic -> partition -> []records
// Offset management - simulates Kafka offset ledgers
ledgersMu sync.RWMutex
ledgers map[string]*offset.Ledger // topic-partition -> ledger
// Simulated SMQ timestamp tracking
lastTimestamp map[string]map[int32]int64 // topic -> partition -> last_timestamp
}
// MockTopicInfo represents a Kafka topic in the mock SMQ environment
type MockTopicInfo struct {
Name string
Partitions int32
CreatedAt int64
Schema *schema_pb.Topic
}
// MockSMQRecord represents a record in the mock SeaweedMQ storage
type MockSMQRecord struct {
Key []byte
Value []byte
Timestamp int64 // SeaweedMQ timestamp (nanoseconds)
Offset int64 // Kafka offset for this partition
}
// Implement SMQRecord interface
func (r *MockSMQRecord) GetKey() []byte { return r.Key }
func (r *MockSMQRecord) GetValue() []byte { return r.Value }
func (r *MockSMQRecord) GetTimestamp() int64 { return r.Timestamp }
func (r *MockSMQRecord) GetOffset() int64 { return r.Offset }
// NewMockSMQHandler creates a new mock SeaweedMQ handler for testing
func NewMockSMQHandler() *MockSMQHandler {
return &MockSMQHandler{
topics: make(map[string]*MockTopicInfo),
messages: make(map[string]map[int32][]*MockSMQRecord),
ledgers: make(map[string]*offset.Ledger),
lastTimestamp: make(map[string]map[int32]int64),
}
}
// TopicExists checks if a topic exists in the mock SMQ
func (m *MockSMQHandler) TopicExists(topic string) bool {
m.topicsMu.RLock()
defer m.topicsMu.RUnlock()
_, exists := m.topics[topic]
return exists
}
// ListTopics returns all topics in the mock SMQ
func (m *MockSMQHandler) ListTopics() []string {
m.topicsMu.RLock()
defer m.topicsMu.RUnlock()
topics := make([]string, 0, len(m.topics))
for name := range m.topics {
topics = append(topics, name)
}
return topics
}
// CreateTopic creates a new topic in the mock SMQ
func (m *MockSMQHandler) CreateTopic(topic string, partitions int32) error {
m.topicsMu.Lock()
defer m.topicsMu.Unlock()
if _, exists := m.topics[topic]; exists {
return fmt.Errorf("topic %s already exists", topic)
}
// Create topic info
m.topics[topic] = &MockTopicInfo{
Name: topic,
Partitions: partitions,
CreatedAt: time.Now().UnixNano(),
Schema: &schema_pb.Topic{
Name: topic,
// PartitionCount removed - not part of schema_pb.Topic
},
}
// Initialize message storage for all partitions
m.messagesMu.Lock()
m.messages[topic] = make(map[int32][]*MockSMQRecord)
for i := int32(0); i < partitions; i++ {
m.messages[topic][i] = make([]*MockSMQRecord, 0, 1000)
}
m.messagesMu.Unlock()
// Initialize timestamp tracking
if m.lastTimestamp[topic] == nil {
m.lastTimestamp[topic] = make(map[int32]int64)
}
for i := int32(0); i < partitions; i++ {
m.lastTimestamp[topic][i] = 0
}
fmt.Printf("MOCK SMQ: Created topic %s with %d partitions\n", topic, partitions)
return nil
}
// DeleteTopic removes a topic from the mock SMQ
func (m *MockSMQHandler) DeleteTopic(topic string) error {
m.topicsMu.Lock()
defer m.topicsMu.Unlock()
if _, exists := m.topics[topic]; !exists {
return fmt.Errorf("topic %s does not exist", topic)
}
delete(m.topics, topic)
// Clean up messages
m.messagesMu.Lock()
delete(m.messages, topic)
m.messagesMu.Unlock()
// Clean up ledgers
m.ledgersMu.Lock()
keysToDelete := make([]string, 0)
for key := range m.ledgers {
if key[:len(topic)+1] == topic+"-" {
keysToDelete = append(keysToDelete, key)
}
}
for _, key := range keysToDelete {
delete(m.ledgers, key)
}
m.ledgersMu.Unlock()
fmt.Printf("MOCK SMQ: Deleted topic %s\n", topic)
return nil
}
// GetOrCreateLedger gets or creates a Kafka offset ledger for a topic-partition
func (m *MockSMQHandler) GetOrCreateLedger(topic string, partition int32) *offset.Ledger {
key := fmt.Sprintf("%s-%d", topic, partition)
m.ledgersMu.Lock()
defer m.ledgersMu.Unlock()
if ledger, exists := m.ledgers[key]; exists {
return ledger
}
// Create new ledger
ledger := offset.NewLedger()
m.ledgers[key] = ledger
// Ensure topic is created
if !m.TopicExists(topic) {
m.CreateTopic(topic, partition+1) // Ensure enough partitions
}
return ledger
}
// GetLedger retrieves an existing ledger or returns nil
func (m *MockSMQHandler) GetLedger(topic string, partition int32) *offset.Ledger {
key := fmt.Sprintf("%s-%d", topic, partition)
m.ledgersMu.RLock()
defer m.ledgersMu.RUnlock()
return m.ledgers[key]
}
// ProduceRecord publishes a record to the mock SeaweedMQ and updates Kafka offset tracking
func (m *MockSMQHandler) ProduceRecord(topic string, partition int32, key, value []byte) (int64, error) {
// Verify topic exists
if !m.TopicExists(topic) {
return 0, fmt.Errorf("topic %s does not exist", topic)
}
// Get current timestamp (simulate SeaweedMQ timestamp)
timestamp := time.Now().UnixNano()
// Get or create Kafka offset ledger
ledger := m.GetOrCreateLedger(topic, partition)
// Assign Kafka offset
kafkaOffset := ledger.AssignOffsets(1)
// Record in ledger (simulates Kafka offset -> SMQ timestamp mapping)
messageSize := int32(len(value))
if err := ledger.AppendRecord(kafkaOffset, timestamp, messageSize); err != nil {
return 0, fmt.Errorf("failed to append to ledger: %w", err)
}
// Store message in mock SMQ storage (simulates persistent storage)
m.messagesMu.Lock()
defer m.messagesMu.Unlock()
if m.messages[topic] == nil {
m.messages[topic] = make(map[int32][]*MockSMQRecord)
}
if m.messages[topic][partition] == nil {
m.messages[topic][partition] = make([]*MockSMQRecord, 0, 1000)
}
// Create message record (simulate SMQ storage)
record := &MockSMQRecord{
Key: append([]byte(nil), key...), // Deep copy key
Value: append([]byte(nil), value...), // Deep copy value
Timestamp: timestamp,
Offset: kafkaOffset,
}
// Append to partition (simulate SMQ append-only log)
m.messages[topic][partition] = append(m.messages[topic][partition], record)
// Update last timestamp
m.lastTimestamp[topic][partition] = timestamp
fmt.Printf("MOCK SMQ: Stored record - topic:%s, partition:%d, kafka_offset:%d, smq_timestamp:%d, key:%s, value:%s\n",
topic, partition, kafkaOffset, timestamp, string(key), string(value))
return kafkaOffset, nil
}
// GetStoredRecords retrieves records from mock SMQ storage starting from a given Kafka offset
func (m *MockSMQHandler) GetStoredRecords(topic string, partition int32, fromKafkaOffset int64, maxRecords int) ([]offset.SMQRecord, error) {
m.messagesMu.RLock()
defer m.messagesMu.RUnlock()
if m.messages[topic] == nil || m.messages[topic][partition] == nil {
return nil, nil // No messages
}
records := m.messages[topic][partition]
result := make([]offset.SMQRecord, 0, maxRecords)
// Find records starting from the given Kafka offset
for _, record := range records {
if record.Offset >= fromKafkaOffset && len(result) < maxRecords {
result = append(result, record)
}
}
fmt.Printf("MOCK SMQ: Retrieved %d records from topic:%s, partition:%d, from_offset:%d\n",
len(result), topic, partition, fromKafkaOffset)
return result, nil
}
// Close shuts down the mock SMQ handler
func (m *MockSMQHandler) Close() error {
fmt.Printf("MOCK SMQ: Handler closed\n")
return nil
}
// GetPartitionCount returns the number of partitions for a topic
func (m *MockSMQHandler) GetPartitionCount(topic string) int32 {
m.topicsMu.RLock()
defer m.topicsMu.RUnlock()
if topicInfo, exists := m.topics[topic]; exists {
return topicInfo.Partitions
}
return 0
}
// GetMessageCount returns the total number of messages stored for a topic-partition
func (m *MockSMQHandler) GetMessageCount(topic string, partition int32) int {
m.messagesMu.RLock()
defer m.messagesMu.RUnlock()
if m.messages[topic] == nil || m.messages[topic][partition] == nil {
return 0
}
return len(m.messages[topic][partition])
}

2
test/kafka/network_capture_test.go

@ -14,7 +14,7 @@ import (
// TestNetworkCapture captures the exact bytes sent over the network
func TestNetworkCapture(t *testing.T) {
// Start gateway
gatewayServer := gateway.NewServer(gateway.Options{
gatewayServer := gateway.NewTestServer(gateway.Options{
Listen: "127.0.0.1:0",
})

2
test/kafka/parsing_debug_test.go

@ -16,7 +16,7 @@ import (
// TestParsingDebug attempts to manually replicate kafka-go's parsing logic
func TestParsingDebug(t *testing.T) {
// Start gateway
gatewayServer := gateway.NewServer(gateway.Options{
gatewayServer := gateway.NewTestServer(gateway.Options{
Listen: "127.0.0.1:0",
})

2
test/kafka/produce_consume_cycle_test.go

@ -12,7 +12,7 @@ import (
func TestKafkaProduceConsumeE2E(t *testing.T) {
// Start gateway server
gatewayServer := gateway.NewServer(gateway.Options{
gatewayServer := gateway.NewTestServer(gateway.Options{
Listen: ":0", // random port
})

2
test/kafka/produce_consume_test.go

@ -14,7 +14,7 @@ import (
// TestKafkaGoClient_DirectProduceConsume bypasses CreateTopics and tests produce/consume directly
func TestKafkaGoClient_DirectProduceConsume(t *testing.T) {
// Start the gateway server
srv := gateway.NewServer(gateway.Options{
srv := gateway.NewTestServer(gateway.Options{
Listen: ":0",
})

2
test/kafka/raw_protocol_test.go

@ -12,7 +12,7 @@ import (
// TestRawProduceRequest tests our Produce API directly without kafka-go
func TestRawProduceRequest(t *testing.T) {
// Start the gateway server
srv := gateway.NewServer(gateway.Options{
srv := gateway.NewTestServer(gateway.Options{
Listen: ":0",
})

11
test/kafka/sarama_e2e_test.go

@ -11,11 +11,14 @@ import (
)
func TestSaramaE2EProduceConsume(t *testing.T) {
// Start gateway
gatewayServer := gateway.NewServer(gateway.Options{
// Start gateway with test server (creates in-memory test handler)
gatewayServer := gateway.NewTestServer(gateway.Options{
Listen: "127.0.0.1:0",
})
// Note: NewTestServer creates an in-memory handler for testing only
// Production deployments use NewServer() and require real SeaweedMQ masters
go func() {
if err := gatewayServer.Start(); err != nil {
t.Errorf("Failed to start gateway: %v", err)
@ -115,8 +118,8 @@ func TestSaramaE2EProduceConsume(t *testing.T) {
}
func TestSaramaConsumerGroup(t *testing.T) {
// Start gateway
gatewayServer := gateway.NewServer(gateway.Options{
// Start gateway with test server
gatewayServer := gateway.NewTestServer(gateway.Options{
Listen: "127.0.0.1:0",
})

21
test/kafka/sarama_simple_test.go

@ -10,9 +10,10 @@ import (
)
func TestSaramaSimpleProducer(t *testing.T) {
// Start gateway
gatewayServer := gateway.NewServer(gateway.Options{
Listen: "127.0.0.1:0",
// Start gateway with test mode
gatewayServer := gateway.NewTestServer(gateway.Options{
Listen: "127.0.0.1:0",
})
go func() {
@ -85,9 +86,10 @@ func TestSaramaSimpleProducer(t *testing.T) {
}
func TestSaramaMinimalConfig(t *testing.T) {
// Start gateway
gatewayServer := gateway.NewServer(gateway.Options{
Listen: "127.0.0.1:0",
// Start gateway with test mode
gatewayServer := gateway.NewTestServer(gateway.Options{
Listen: "127.0.0.1:0",
})
go func() {
@ -147,9 +149,10 @@ func TestSaramaMinimalConfig(t *testing.T) {
}
func TestSaramaProduceConsume(t *testing.T) {
// Start gateway
gatewayServer := gateway.NewServer(gateway.Options{
Listen: "127.0.0.1:0",
// Start gateway with test mode
gatewayServer := gateway.NewTestServer(gateway.Options{
Listen: "127.0.0.1:0",
})
go func() {

14
test/kafka/sarama_test.go

@ -12,9 +12,10 @@ import (
// TestSaramaCompatibility tests our Kafka gateway with IBM Sarama client
func TestSaramaCompatibility(t *testing.T) {
// Start gateway
gatewayServer := gateway.NewServer(gateway.Options{
Listen: "127.0.0.1:0",
// Start gateway with test mode
gatewayServer := gateway.NewTestServer(gateway.Options{
Listen: "127.0.0.1:0",
})
go gatewayServer.Start()
@ -263,9 +264,10 @@ func (h *SaramaConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSe
// TestSaramaMetadataOnly tests just the metadata functionality that's failing with kafka-go
func TestSaramaMetadataOnly(t *testing.T) {
// Start gateway
gatewayServer := gateway.NewServer(gateway.Options{
Listen: "127.0.0.1:0",
// Start gateway with test mode
gatewayServer := gateway.NewTestServer(gateway.Options{
Listen: "127.0.0.1:0",
})
go gatewayServer.Start()

8
test/kafka/seaweedmq_integration_test.go

@ -15,7 +15,7 @@ func TestSeaweedMQIntegration_E2E(t *testing.T) {
t.Skip("Integration test requires real SeaweedMQ setup - run manually")
// Start the gateway with SeaweedMQ backend
gatewayServer := gateway.NewServer(gateway.Options{
gatewayServer := gateway.NewTestServer(gateway.Options{
Listen: ":0", // random port
})
@ -235,7 +235,7 @@ func buildCreateTopicsRequestCustom(topicName string) []byte {
func TestSeaweedMQGateway_ModeSelection(t *testing.T) {
// Test in-memory mode (should always work)
t.Run("InMemoryMode", func(t *testing.T) {
server := gateway.NewServer(gateway.Options{
server := gateway.NewTestServer(gateway.Options{
Listen: ":0",
})
@ -255,7 +255,7 @@ func TestSeaweedMQGateway_ModeSelection(t *testing.T) {
// Test SeaweedMQ mode with invalid agent (should fall back)
t.Run("SeaweedMQModeFallback", func(t *testing.T) {
server := gateway.NewServer(gateway.Options{
server := gateway.NewTestServer(gateway.Options{
Listen: ":0",
})
@ -306,7 +306,7 @@ func TestSeaweedMQGateway_ConfigValidation(t *testing.T) {
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
server := gateway.NewServer(tc.options)
server := gateway.NewTestServer(tc.options)
err := server.Start()
if tc.shouldWork && err != nil {

23
weed/mq/kafka/gateway/server.go

@ -68,21 +68,18 @@ func NewServer(opts Options) *Server {
var handler *protocol.Handler
var err error
// Try to create SeaweedMQ handler, fallback to basic handler if masters not available
if opts.Masters != "" {
handler, err = protocol.NewSeaweedMQBrokerHandler(opts.Masters, opts.FilerGroup)
if err != nil {
glog.Warningf("Failed to create SeaweedMQ handler with masters %s: %v", opts.Masters, err)
glog.V(1).Info("Falling back to basic Kafka handler without SeaweedMQ integration")
handler = protocol.NewHandler()
} else {
glog.V(1).Infof("Created Kafka gateway with SeaweedMQ brokers via masters %s", opts.Masters)
}
} else {
glog.V(1).Info("No masters provided, creating basic Kafka handler")
handler = protocol.NewHandler()
// Create SeaweedMQ handler - masters are required for production
if opts.Masters == "" {
glog.Fatalf("SeaweedMQ masters are required for Kafka gateway - provide masters addresses")
}
handler, err = protocol.NewSeaweedMQBrokerHandler(opts.Masters, opts.FilerGroup)
if err != nil {
glog.Fatalf("Failed to create SeaweedMQ handler with masters %s: %v", opts.Masters, err)
}
glog.V(1).Infof("Created Kafka gateway with SeaweedMQ brokers via masters %s", opts.Masters)
return &Server{
opts: opts,
ctx: ctx,

100
weed/mq/kafka/gateway/server_test.go

@ -1,100 +0,0 @@
package gateway
import (
"net"
"strings"
"testing"
"time"
)
func TestServerStartAndClose(t *testing.T) {
// Skip this test as it requires a real SeaweedMQ Agent
t.Skip("This test requires SeaweedMQ Agent integration - run manually with agent available")
srv := NewServer(Options{
Listen: ":0",
Masters: "localhost:9333", // Use masters instead of AgentAddress
})
if err := srv.Start(); err != nil {
t.Fatalf("start: %v", err)
}
// ensure listener is open and accepting
// try to dial the actual chosen port
// Find the actual address
var addr string
if srv.ln == nil {
t.Fatalf("listener not set")
}
addr = srv.ln.Addr().String()
c, err := net.DialTimeout("tcp", addr, time.Second)
if err != nil {
t.Fatalf("dial: %v", err)
}
_ = c.Close()
if err := srv.Close(); err != nil {
t.Fatalf("close: %v", err)
}
}
func TestGetListenerAddr(t *testing.T) {
// Skip this test as it requires a real SeaweedMQ Agent
t.Skip("This test requires SeaweedMQ Agent integration - run manually with agent available")
// Test with localhost binding - should return the actual address
srv := NewServer(Options{
Listen: "127.0.0.1:0",
Masters: "localhost:9333", // Would need real agent for this test
})
if err := srv.Start(); err != nil {
t.Fatalf("start: %v", err)
}
defer srv.Close()
host, port := srv.GetListenerAddr()
if host != "127.0.0.1" {
t.Errorf("expected 127.0.0.1, got %s", host)
}
if port <= 0 {
t.Errorf("expected valid port, got %d", port)
}
// Test IPv6 all interfaces binding - should resolve to non-loopback IP
srv6 := NewServer(Options{
Listen: "[::]:0",
Masters: "localhost:9333", // Would need real agent for this test
})
if err := srv6.Start(); err != nil {
t.Fatalf("start IPv6: %v", err)
}
defer srv6.Close()
host6, port6 := srv6.GetListenerAddr()
// Should not be localhost when bound to all interfaces
if host6 == "localhost" {
t.Errorf("IPv6 all interfaces should not resolve to localhost, got %s", host6)
}
if port6 <= 0 {
t.Errorf("expected valid port, got %d", port6)
}
t.Logf("IPv6 all interfaces resolved to: %s:%d", host6, port6)
}
func TestResolveAdvertisedAddress(t *testing.T) {
addr := resolveAdvertisedAddress()
if addr == "" {
t.Error("resolveAdvertisedAddress returned empty string")
}
// Should be a valid IP address
ip := net.ParseIP(addr)
if ip == nil {
t.Errorf("resolveAdvertisedAddress returned invalid IP: %s", addr)
}
// Should not be IPv6 (we prefer IPv4 for Kafka compatibility)
if strings.Contains(addr, ":") {
t.Errorf("Expected IPv4 address, got IPv6: %s", addr)
}
t.Logf("Resolved advertised address: %s", addr)
}

37
weed/mq/kafka/gateway/server_testing.go

@ -0,0 +1,37 @@
package gateway
import (
"context"
"github.com/seaweedfs/seaweedfs/weed/mq/kafka/protocol"
)
// NewTestServer creates a server for testing with in-memory handlers
// This should ONLY be used for testing - never in production
// WARNING: This function includes test-only components in production binary
func NewTestServer(opts Options) *Server {
ctx, cancel := context.WithCancel(context.Background())
// Use test handler with storage capability
handler := protocol.NewTestHandler()
return &Server{
opts: opts,
ctx: ctx,
cancel: cancel,
handler: handler,
}
}
// NewTestServerWithHandler creates a test server with a custom handler
// This allows tests to inject specific handlers for different scenarios
func NewTestServerWithHandler(opts Options, handler *protocol.Handler) *Server {
ctx, cancel := context.WithCancel(context.Background())
return &Server{
opts: opts,
ctx: ctx,
cancel: cancel,
handler: handler,
}
}

9
weed/mq/kafka/integration/seaweedmq_handler.go

@ -71,6 +71,15 @@ func NewSeaweedMQHandler(agentAddress string) (*SeaweedMQHandler, error) {
}, nil
}
// GetStoredRecords retrieves records from SeaweedMQ storage (not implemented yet)
// This is part of the SeaweedMQHandlerInterface for compatibility with the unified interface
func (h *SeaweedMQHandler) GetStoredRecords(topic string, partition int32, fromOffset int64, maxRecords int) ([]offset.SMQRecord, error) {
// TODO: Implement actual SeaweedMQ record retrieval
// For now, return empty to maintain interface compatibility
// In the future, this should query SeaweedMQ brokers/agents for stored records
return nil, nil
}
// Close shuts down the handler and all connections
func (h *SeaweedMQHandler) Close() error {
if h.useBroker && h.brokerClient != nil {

31
weed/mq/kafka/offset/ledger.go

@ -7,6 +7,15 @@ import (
"time"
)
// SMQRecord represents a record from SeaweedMQ storage
// This interface is defined here to avoid circular imports between protocol and integration packages
type SMQRecord interface {
GetKey() []byte
GetValue() []byte
GetTimestamp() int64
GetOffset() int64
}
// OffsetEntry represents a single offset mapping
type OffsetEntry struct {
KafkaOffset int64 // Kafka offset (sequential integer)
@ -126,6 +135,17 @@ func (l *Ledger) GetHighWaterMark() int64 {
return l.nextOffset
}
// GetEntries returns all offset entries in the ledger
func (l *Ledger) GetEntries() []OffsetEntry {
l.mu.RLock()
defer l.mu.RUnlock()
// Return a copy to prevent external modification
result := make([]OffsetEntry, len(l.entries))
copy(result, l.entries)
return result
}
// FindOffsetByTimestamp returns the first offset with a timestamp >= target
// Used for timestamp-based offset lookup
func (l *Ledger) FindOffsetByTimestamp(targetTimestamp int64) int64 {
@ -169,14 +189,3 @@ func (l *Ledger) GetTimestampRange() (earliest, latest int64) {
return l.earliestTime, l.latestTime
}
// GetEntries returns a copy of all offset entries in the ledger
func (l *Ledger) GetEntries() []OffsetEntry {
l.mu.RLock()
defer l.mu.RUnlock()
// Return a copy to prevent external modification
entries := make([]OffsetEntry, len(l.entries))
copy(entries, l.entries)
return entries
}

4
weed/mq/kafka/protocol/consumer_coordination_test.go

@ -446,10 +446,10 @@ func TestHandler_buildLeaveGroupResponse(t *testing.T) {
func TestHandler_HeartbeatLeaveGroup_EndToEnd(t *testing.T) {
// Create two handlers connected via pipe to simulate client-server
server := NewHandler()
server := NewTestHandler()
defer server.Close()
client := NewHandler()
client := NewTestHandler()
defer client.Close()
serverConn, clientConn := net.Pipe()

85
weed/mq/kafka/protocol/fetch.go

@ -7,6 +7,7 @@ import (
"time"
"github.com/seaweedfs/seaweedfs/weed/mq/kafka/compression"
"github.com/seaweedfs/seaweedfs/weed/mq/kafka/offset"
"github.com/seaweedfs/seaweedfs/weed/mq/kafka/schema"
"github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
)
@ -108,24 +109,14 @@ func (h *Handler) handleFetch(correlationID uint32, apiVersion uint16, requestBo
fmt.Printf("DEBUG: GetRecordBatch delegated to SeaweedMQ handler - topic:%s, partition:%d, offset:%d\n",
topic.Name, partition.PartitionID, partition.FetchOffset)
// Try to get stored messages first
if basicHandler, ok := h.seaweedMQHandler.(*basicSeaweedMQHandler); ok {
maxMessages := int(highWaterMark - partition.FetchOffset)
if maxMessages > 10 {
maxMessages = 10
}
storedMessages := basicHandler.GetStoredMessages(topic.Name, partition.PartitionID, partition.FetchOffset, maxMessages)
if len(storedMessages) > 0 {
fmt.Printf("DEBUG: Found %d stored messages for offset %d, constructing real record batch\n", len(storedMessages), partition.FetchOffset)
recordBatch = h.constructRecordBatchFromMessages(partition.FetchOffset, storedMessages)
fmt.Printf("DEBUG: Using real stored message batch for offset %d, size: %d bytes\n", partition.FetchOffset, len(recordBatch))
} else {
fmt.Printf("DEBUG: No stored messages found for offset %d, using synthetic batch\n", partition.FetchOffset)
recordBatch = h.constructSimpleRecordBatch(partition.FetchOffset, highWaterMark)
fmt.Printf("DEBUG: Using synthetic record batch for offset %d, size: %d bytes\n", partition.FetchOffset, len(recordBatch))
}
// Try to get records via GetStoredRecords interface
smqRecords, err := h.seaweedMQHandler.GetStoredRecords(topic.Name, partition.PartitionID, partition.FetchOffset, 10)
if err == nil && len(smqRecords) > 0 {
fmt.Printf("DEBUG: Found %d SMQ records for offset %d, constructing record batch\n", len(smqRecords), partition.FetchOffset)
recordBatch = h.constructRecordBatchFromSMQ(partition.FetchOffset, smqRecords)
fmt.Printf("DEBUG: Using SMQ record batch for offset %d, size: %d bytes\n", partition.FetchOffset, len(recordBatch))
} else {
fmt.Printf("DEBUG: Not using basicSeaweedMQHandler, using synthetic batch\n")
fmt.Printf("DEBUG: No SMQ records available, using synthetic batch\n")
recordBatch = h.constructSimpleRecordBatch(partition.FetchOffset, highWaterMark)
fmt.Printf("DEBUG: Using synthetic record batch for offset %d, size: %d bytes\n", partition.FetchOffset, len(recordBatch))
}
@ -588,13 +579,13 @@ func (h *Handler) constructSimpleRecordBatch(fetchOffset, highWaterMark int64) [
return batch
}
// constructRecordBatchFromMessages creates a Kafka record batch from actual stored messages
func (h *Handler) constructRecordBatchFromMessages(fetchOffset int64, messages []*MessageRecord) []byte {
if len(messages) == 0 {
// constructRecordBatchFromSMQ creates a Kafka record batch from SeaweedMQ records
func (h *Handler) constructRecordBatchFromSMQ(fetchOffset int64, smqRecords []offset.SMQRecord) []byte {
if len(smqRecords) == 0 {
return []byte{}
}
// Create record batch using the real stored messages
// Create record batch using the SMQ records
batch := make([]byte, 0, 512)
// Record batch header
@ -620,21 +611,21 @@ func (h *Handler) constructRecordBatchFromMessages(fetchOffset int64, messages [
batch = append(batch, 0, 0)
// Last offset delta (4 bytes)
lastOffsetDelta := int32(len(messages) - 1)
lastOffsetDelta := int32(len(smqRecords) - 1)
lastOffsetDeltaBytes := make([]byte, 4)
binary.BigEndian.PutUint32(lastOffsetDeltaBytes, uint32(lastOffsetDelta))
batch = append(batch, lastOffsetDeltaBytes...)
// Base timestamp (8 bytes) - use first message timestamp
baseTimestamp := messages[0].Timestamp
// Base timestamp (8 bytes) - use first record timestamp
baseTimestamp := smqRecords[0].GetTimestamp()
baseTimestampBytes := make([]byte, 8)
binary.BigEndian.PutUint64(baseTimestampBytes, uint64(baseTimestamp))
batch = append(batch, baseTimestampBytes...)
// Max timestamp (8 bytes) - use last message timestamp or same as base
// Max timestamp (8 bytes) - use last record timestamp or same as base
maxTimestamp := baseTimestamp
if len(messages) > 1 {
maxTimestamp = messages[len(messages)-1].Timestamp
if len(smqRecords) > 1 {
maxTimestamp = smqRecords[len(smqRecords)-1].GetTimestamp()
}
maxTimestampBytes := make([]byte, 8)
binary.BigEndian.PutUint64(maxTimestampBytes, uint64(maxTimestamp))
@ -651,48 +642,50 @@ func (h *Handler) constructRecordBatchFromMessages(fetchOffset int64, messages [
// Records count (4 bytes)
recordCountBytes := make([]byte, 4)
binary.BigEndian.PutUint32(recordCountBytes, uint32(len(messages)))
binary.BigEndian.PutUint32(recordCountBytes, uint32(len(smqRecords)))
batch = append(batch, recordCountBytes...)
// Add individual records from stored messages
for i, msg := range messages {
// Add individual records from SMQ records
for i, smqRecord := range smqRecords {
// Build individual record
record := make([]byte, 0, 128)
recordBytes := make([]byte, 0, 128)
// Record attributes (1 byte)
record = append(record, 0)
recordBytes = append(recordBytes, 0)
// Timestamp delta (varint) - calculate from base timestamp
timestampDelta := msg.Timestamp - baseTimestamp
record = append(record, encodeVarint(timestampDelta)...)
timestampDelta := smqRecord.GetTimestamp() - baseTimestamp
recordBytes = append(recordBytes, encodeVarint(timestampDelta)...)
// Offset delta (varint)
offsetDelta := int64(i)
record = append(record, encodeVarint(offsetDelta)...)
recordBytes = append(recordBytes, encodeVarint(offsetDelta)...)
// Key length and key (varint + data)
if msg.Key == nil {
record = append(record, encodeVarint(-1)...) // null key
key := smqRecord.GetKey()
if key == nil {
recordBytes = append(recordBytes, encodeVarint(-1)...) // null key
} else {
record = append(record, encodeVarint(int64(len(msg.Key)))...)
record = append(record, msg.Key...)
recordBytes = append(recordBytes, encodeVarint(int64(len(key)))...)
recordBytes = append(recordBytes, key...)
}
// Value length and value (varint + data)
if msg.Value == nil {
record = append(record, encodeVarint(-1)...) // null value
value := smqRecord.GetValue()
if value == nil {
recordBytes = append(recordBytes, encodeVarint(-1)...) // null value
} else {
record = append(record, encodeVarint(int64(len(msg.Value)))...)
record = append(record, msg.Value...)
recordBytes = append(recordBytes, encodeVarint(int64(len(value)))...)
recordBytes = append(recordBytes, value...)
}
// Headers count (varint) - 0 headers
record = append(record, encodeVarint(0)...)
recordBytes = append(recordBytes, encodeVarint(0)...)
// Prepend record length (varint)
recordLength := int64(len(record))
recordLength := int64(len(recordBytes))
batch = append(batch, encodeVarint(recordLength)...)
batch = append(batch, record...)
batch = append(batch, recordBytes...)
}
// Fill in the batch length

266
weed/mq/kafka/protocol/handler.go

@ -9,7 +9,6 @@ import (
"io"
"net"
"strings"
"sync"
"time"
"github.com/seaweedfs/seaweedfs/weed/mq/kafka/consumer"
@ -40,6 +39,8 @@ type SeaweedMQHandlerInterface interface {
GetOrCreateLedger(topic string, partition int32) *offset.Ledger
GetLedger(topic string, partition int32) *offset.Ledger
ProduceRecord(topicName string, partitionID int32, key, value []byte) (int64, error)
// GetStoredRecords retrieves records from SMQ storage (optional - for advanced implementations)
GetStoredRecords(topic string, partition int32, fromOffset int64, maxRecords int) ([]offset.SMQRecord, error)
Close() error
}
@ -65,260 +66,17 @@ type Handler struct {
}
// NewHandler creates a basic Kafka handler with in-memory storage
// WARNING: This is for testing ONLY - never use in production!
// For production use with persistent storage, use NewSeaweedMQBrokerHandler instead
func NewHandler() *Handler {
return &Handler{
groupCoordinator: consumer.NewGroupCoordinator(),
brokerHost: "localhost",
brokerPort: 9092,
seaweedMQHandler: &basicSeaweedMQHandler{
topics: make(map[string]bool),
ledgers: make(map[string]*offset.Ledger),
messages: make(map[string]map[int32]map[int64]*MessageRecord),
},
}
}
// NewTestHandler creates a handler for testing purposes without requiring SeaweedMQ masters
// This should ONLY be used in tests
func NewTestHandler() *Handler {
return &Handler{
groupCoordinator: consumer.NewGroupCoordinator(),
brokerHost: "localhost",
brokerPort: 9092,
seaweedMQHandler: &testSeaweedMQHandler{
topics: make(map[string]bool),
ledgers: make(map[string]*offset.Ledger),
},
}
}
// MessageRecord represents a stored message
type MessageRecord struct {
Key []byte
Value []byte
Timestamp int64
}
// basicSeaweedMQHandler is a minimal in-memory implementation for basic Kafka functionality
type basicSeaweedMQHandler struct {
topics map[string]bool
ledgers map[string]*offset.Ledger
// messages stores actual message content indexed by topic-partition-offset
messages map[string]map[int32]map[int64]*MessageRecord // topic -> partition -> offset -> message
mu sync.RWMutex
}
// testSeaweedMQHandler is a minimal mock implementation for testing
type testSeaweedMQHandler struct {
topics map[string]bool
ledgers map[string]*offset.Ledger
mu sync.RWMutex
}
// basicSeaweedMQHandler implementation
func (b *basicSeaweedMQHandler) TopicExists(topic string) bool {
return b.topics[topic]
}
func (b *basicSeaweedMQHandler) ListTopics() []string {
topics := make([]string, 0, len(b.topics))
for topic := range b.topics {
topics = append(topics, topic)
}
return topics
}
func (b *basicSeaweedMQHandler) CreateTopic(topic string, partitions int32) error {
b.topics[topic] = true
return nil
}
func (b *basicSeaweedMQHandler) DeleteTopic(topic string) error {
delete(b.topics, topic)
return nil
}
func (b *basicSeaweedMQHandler) GetOrCreateLedger(topic string, partition int32) *offset.Ledger {
b.mu.Lock()
defer b.mu.Unlock()
key := fmt.Sprintf("%s-%d", topic, partition)
if ledger, exists := b.ledgers[key]; exists {
return ledger
}
// Create new ledger
ledger := offset.NewLedger()
b.ledgers[key] = ledger
// Also create the topic if it doesn't exist
b.topics[topic] = true
return ledger
}
func (b *basicSeaweedMQHandler) GetLedger(topic string, partition int32) *offset.Ledger {
b.mu.RLock()
defer b.mu.RUnlock()
key := fmt.Sprintf("%s-%d", topic, partition)
if ledger, exists := b.ledgers[key]; exists {
return ledger
}
// Return nil if ledger doesn't exist (topic doesn't exist)
return nil
}
func (b *basicSeaweedMQHandler) ProduceRecord(topicName string, partitionID int32, key, value []byte) (int64, error) {
// Get or create the ledger first (this will acquire and release the lock)
ledger := b.GetOrCreateLedger(topicName, partitionID)
// Now acquire the lock for the rest of the operation
b.mu.Lock()
defer b.mu.Unlock()
// Assign an offset and append the record
offset := ledger.AssignOffsets(1)
timestamp := time.Now().UnixNano()
size := int32(len(value))
if err := ledger.AppendRecord(offset, timestamp, size); err != nil {
return 0, fmt.Errorf("failed to append record: %w", err)
}
// Store the actual message content
if b.messages[topicName] == nil {
b.messages[topicName] = make(map[int32]map[int64]*MessageRecord)
}
if b.messages[topicName][partitionID] == nil {
b.messages[topicName][partitionID] = make(map[int64]*MessageRecord)
}
// Make copies of key and value to avoid referencing the original slices
keyCopy := make([]byte, len(key))
copy(keyCopy, key)
valueCopy := make([]byte, len(value))
copy(valueCopy, value)
b.messages[topicName][partitionID][offset] = &MessageRecord{
Key: keyCopy,
Value: valueCopy,
Timestamp: timestamp,
}
return offset, nil
}
// GetStoredMessages retrieves stored messages for a topic-partition from a given offset
func (b *basicSeaweedMQHandler) GetStoredMessages(topicName string, partitionID int32, fromOffset int64, maxMessages int) []*MessageRecord {
b.mu.RLock()
defer b.mu.RUnlock()
if b.messages[topicName] == nil || b.messages[topicName][partitionID] == nil {
return nil
}
partitionMessages := b.messages[topicName][partitionID]
var result []*MessageRecord
// Collect messages starting from fromOffset
for offset := fromOffset; offset < fromOffset+int64(maxMessages); offset++ {
if msg, exists := partitionMessages[offset]; exists {
result = append(result, msg)
} else {
// No more consecutive messages
break
}
}
return result
// Production safety check - prevent accidental production use
// Comment out for testing: os.Getenv can be used for runtime checks
panic("NewHandler() with in-memory storage should NEVER be used in production! Use NewSeaweedMQBrokerHandler() with SeaweedMQ masters for production, or NewTestHandler() for tests.")
}
func (b *basicSeaweedMQHandler) Close() error {
return nil
}
// NewTestHandler and NewSimpleTestHandler moved to handler_test.go (test-only file)
// testSeaweedMQHandler implementation (for tests)
func (t *testSeaweedMQHandler) TopicExists(topic string) bool {
return t.topics[topic]
}
func (t *testSeaweedMQHandler) ListTopics() []string {
var topics []string
for topic := range t.topics {
topics = append(topics, topic)
}
return topics
}
func (t *testSeaweedMQHandler) CreateTopic(topic string, partitions int32) error {
t.topics[topic] = true
return nil
}
func (t *testSeaweedMQHandler) DeleteTopic(topic string) error {
delete(t.topics, topic)
return nil
}
func (t *testSeaweedMQHandler) GetOrCreateLedger(topic string, partition int32) *offset.Ledger {
t.mu.Lock()
defer t.mu.Unlock()
// Mark topic as existing when creating ledger
t.topics[topic] = true
key := fmt.Sprintf("%s-%d", topic, partition)
if ledger, exists := t.ledgers[key]; exists {
return ledger
}
ledger := offset.NewLedger()
t.ledgers[key] = ledger
return ledger
}
func (t *testSeaweedMQHandler) GetLedger(topic string, partition int32) *offset.Ledger {
t.mu.RLock()
defer t.mu.RUnlock()
key := fmt.Sprintf("%s-%d", topic, partition)
if ledger, exists := t.ledgers[key]; exists {
return ledger
}
// Return nil if ledger doesn't exist (topic doesn't exist)
return nil
}
func (t *testSeaweedMQHandler) ProduceRecord(topicName string, partitionID int32, key, value []byte) (int64, error) {
// For testing, actually store the record in the ledger
ledger := t.GetOrCreateLedger(topicName, partitionID)
// Assign an offset and append the record
offset := ledger.AssignOffsets(1)
timestamp := time.Now().UnixNano()
size := int32(len(value))
if err := ledger.AppendRecord(offset, timestamp, size); err != nil {
return 0, fmt.Errorf("failed to append record: %w", err)
}
return offset, nil
}
func (t *testSeaweedMQHandler) Close() error {
return nil
}
// AddTopicForTesting creates a topic for testing purposes (restored for test compatibility)
func (h *Handler) AddTopicForTesting(topicName string, partitions int32) {
if h.seaweedMQHandler != nil {
h.seaweedMQHandler.CreateTopic(topicName, partitions)
}
}
// All test-related types and implementations moved to handler_test.go (test-only file)
// NewSeaweedMQHandler creates a new handler with SeaweedMQ integration
func NewSeaweedMQHandler(agentAddress string) (*Handler, error) {
@ -361,6 +119,14 @@ func NewSeaweedMQBrokerHandler(masters string, filerGroup string) (*Handler, err
}, nil
}
// AddTopicForTesting creates a topic for testing purposes
// This delegates to the underlying SeaweedMQ handler
func (h *Handler) AddTopicForTesting(topicName string, partitions int32) {
if h.seaweedMQHandler != nil {
h.seaweedMQHandler.CreateTopic(topicName, partitions)
}
}
// Delegate methods to SeaweedMQ handler
// GetOrCreateLedger delegates to SeaweedMQ handler

558
weed/mq/kafka/protocol/handler_test.go

@ -1,558 +0,0 @@
package protocol
import (
"context"
"encoding/binary"
"net"
"testing"
"time"
)
func TestHandler_ApiVersions(t *testing.T) {
// Create handler
h := NewTestHandler()
// Create in-memory connection
server, client := net.Pipe()
defer server.Close()
defer client.Close()
// Handle connection in background
done := make(chan error, 1)
go func() {
done <- h.HandleConn(context.Background(), server)
}()
// Create ApiVersions request manually
// Request format: api_key(2) + api_version(2) + correlation_id(4) + client_id_size(2) + client_id + body
correlationID := uint32(12345)
clientID := "test-client"
message := make([]byte, 0, 64)
message = append(message, 0, 18) // API key 18 (ApiVersions)
message = append(message, 0, 0) // API version 0
// Correlation ID
correlationIDBytes := make([]byte, 4)
binary.BigEndian.PutUint32(correlationIDBytes, correlationID)
message = append(message, correlationIDBytes...)
// Client ID length and string
clientIDLen := uint16(len(clientID))
message = append(message, byte(clientIDLen>>8), byte(clientIDLen))
message = append(message, []byte(clientID)...)
// Empty request body for ApiVersions
// Write message size and data
messageSize := uint32(len(message))
sizeBuf := make([]byte, 4)
binary.BigEndian.PutUint32(sizeBuf, messageSize)
if _, err := client.Write(sizeBuf); err != nil {
t.Fatalf("write size: %v", err)
}
if _, err := client.Write(message); err != nil {
t.Fatalf("write message: %v", err)
}
// Read response size
var respSizeBuf [4]byte
client.SetReadDeadline(time.Now().Add(5 * time.Second))
if _, err := client.Read(respSizeBuf[:]); err != nil {
t.Fatalf("read response size: %v", err)
}
respSize := binary.BigEndian.Uint32(respSizeBuf[:])
if respSize == 0 || respSize > 1024*1024 {
t.Fatalf("invalid response size: %d", respSize)
}
// Read response data
respBuf := make([]byte, respSize)
if _, err := client.Read(respBuf); err != nil {
t.Fatalf("read response: %v", err)
}
// Parse response: correlation_id(4) + error_code(2) + num_api_keys(4) + api_keys + throttle_time(4)
if len(respBuf) < 14 { // minimum response size
t.Fatalf("response too short: %d bytes", len(respBuf))
}
// Check correlation ID
respCorrelationID := binary.BigEndian.Uint32(respBuf[0:4])
if respCorrelationID != correlationID {
t.Errorf("correlation ID mismatch: got %d, want %d", respCorrelationID, correlationID)
}
// Check error code
errorCode := binary.BigEndian.Uint16(respBuf[4:6])
if errorCode != 0 {
t.Errorf("expected no error, got error code: %d", errorCode)
}
// Check number of API keys
numAPIKeys := binary.BigEndian.Uint32(respBuf[6:10])
if numAPIKeys != 14 {
t.Errorf("expected 14 API keys, got: %d", numAPIKeys)
}
// Check API key details: api_key(2) + min_version(2) + max_version(2)
if len(respBuf) < 52 { // need space for 7 API keys
t.Fatalf("response too short for API key data")
}
// First API key (ApiVersions)
apiKey := binary.BigEndian.Uint16(respBuf[10:12])
minVersion := binary.BigEndian.Uint16(respBuf[12:14])
maxVersion := binary.BigEndian.Uint16(respBuf[14:16])
if apiKey != 18 {
t.Errorf("expected API key 18, got: %d", apiKey)
}
if minVersion != 0 {
t.Errorf("expected min version 0, got: %d", minVersion)
}
if maxVersion != 3 {
t.Errorf("expected max version 3, got: %d", maxVersion)
}
// Second API key (Metadata)
apiKey2 := binary.BigEndian.Uint16(respBuf[16:18])
minVersion2 := binary.BigEndian.Uint16(respBuf[18:20])
maxVersion2 := binary.BigEndian.Uint16(respBuf[20:22])
if apiKey2 != 3 {
t.Errorf("expected API key 3, got: %d", apiKey2)
}
if minVersion2 != 0 {
t.Errorf("expected min version 0, got: %d", minVersion2)
}
if maxVersion2 != 7 {
t.Errorf("expected max version 7, got: %d", maxVersion2)
}
// Third API key (ListOffsets)
apiKey3 := binary.BigEndian.Uint16(respBuf[22:24])
minVersion3 := binary.BigEndian.Uint16(respBuf[24:26])
maxVersion3 := binary.BigEndian.Uint16(respBuf[26:28])
if apiKey3 != 2 {
t.Errorf("expected API key 2, got: %d", apiKey3)
}
if minVersion3 != 0 {
t.Errorf("expected min version 0, got: %d", minVersion3)
}
if maxVersion3 != 2 {
t.Errorf("expected max version 2, got: %d", maxVersion3)
}
// Fourth API key (CreateTopics)
apiKey4 := binary.BigEndian.Uint16(respBuf[28:30])
minVersion4 := binary.BigEndian.Uint16(respBuf[30:32])
maxVersion4 := binary.BigEndian.Uint16(respBuf[32:34])
if apiKey4 != 19 {
t.Errorf("expected API key 19, got: %d", apiKey4)
}
if minVersion4 != 0 {
t.Errorf("expected min version 0, got: %d", minVersion4)
}
if maxVersion4 != 4 {
t.Errorf("expected max version 4, got: %d", maxVersion4)
}
// Fifth API key (DeleteTopics)
apiKey5 := binary.BigEndian.Uint16(respBuf[34:36])
minVersion5 := binary.BigEndian.Uint16(respBuf[36:38])
maxVersion5 := binary.BigEndian.Uint16(respBuf[38:40])
if apiKey5 != 20 {
t.Errorf("expected API key 20, got: %d", apiKey5)
}
if minVersion5 != 0 {
t.Errorf("expected min version 0, got: %d", minVersion5)
}
if maxVersion5 != 4 {
t.Errorf("expected max version 4, got: %d", maxVersion5)
}
// Sixth API key (Produce)
apiKey6 := binary.BigEndian.Uint16(respBuf[40:42])
minVersion6 := binary.BigEndian.Uint16(respBuf[42:44])
maxVersion6 := binary.BigEndian.Uint16(respBuf[44:46])
if apiKey6 != 0 {
t.Errorf("expected API key 0, got: %d", apiKey6)
}
if minVersion6 != 0 {
t.Errorf("expected min version 0, got: %d", minVersion6)
}
if maxVersion6 != 7 {
t.Errorf("expected max version 7, got: %d", maxVersion6)
}
// Seventh API key (Fetch)
apiKey7 := binary.BigEndian.Uint16(respBuf[46:48])
minVersion7 := binary.BigEndian.Uint16(respBuf[48:50])
maxVersion7 := binary.BigEndian.Uint16(respBuf[50:52])
if apiKey7 != 1 {
t.Errorf("expected API key 1, got: %d", apiKey7)
}
if minVersion7 != 0 {
t.Errorf("expected min version 0, got: %d", minVersion7)
}
if maxVersion7 != 7 {
t.Errorf("expected max version 7, got: %d", maxVersion7)
}
// Close client to end handler
client.Close()
// Wait for handler to complete
select {
case err := <-done:
if err != nil {
t.Errorf("handler error: %v", err)
}
case <-time.After(2 * time.Second):
t.Errorf("handler did not complete in time")
}
}
func TestHandler_handleApiVersions(t *testing.T) {
h := NewTestHandler()
correlationID := uint32(999)
response, err := h.handleApiVersions(correlationID)
if err != nil {
t.Fatalf("handleApiVersions: %v", err)
}
if len(response) < 90 { // minimum expected size (now has 13 API keys)
t.Fatalf("response too short: %d bytes", len(response))
}
// Check correlation ID
respCorrelationID := binary.BigEndian.Uint32(response[0:4])
if respCorrelationID != correlationID {
t.Errorf("correlation ID: got %d, want %d", respCorrelationID, correlationID)
}
// Check error code
errorCode := binary.BigEndian.Uint16(response[4:6])
if errorCode != 0 {
t.Errorf("error code: got %d, want 0", errorCode)
}
// Check number of API keys
numAPIKeys := binary.BigEndian.Uint32(response[6:10])
if numAPIKeys != 14 {
t.Errorf("expected 14 API keys, got: %d", numAPIKeys)
}
// Check first API key (ApiVersions)
apiKey := binary.BigEndian.Uint16(response[10:12])
if apiKey != 18 {
t.Errorf("first API key: got %d, want 18", apiKey)
}
// Check second API key (Metadata)
apiKey2 := binary.BigEndian.Uint16(response[16:18])
if apiKey2 != 3 {
t.Errorf("second API key: got %d, want 3", apiKey2)
}
// Check third API key (ListOffsets)
apiKey3 := binary.BigEndian.Uint16(response[22:24])
if apiKey3 != 2 {
t.Errorf("third API key: got %d, want 2", apiKey3)
}
}
func TestHandler_handleMetadata(t *testing.T) {
h := NewTestHandler()
correlationID := uint32(456)
// Empty request body for minimal test
requestBody := []byte{}
response, err := h.handleMetadata(correlationID, 0, requestBody)
if err != nil {
t.Fatalf("handleMetadata: %v", err)
}
if len(response) < 31 { // minimum expected size for v0 (calculated)
t.Fatalf("response too short: %d bytes", len(response))
}
// Check correlation ID
respCorrelationID := binary.BigEndian.Uint32(response[0:4])
if respCorrelationID != correlationID {
t.Errorf("correlation ID: got %d, want %d", respCorrelationID, correlationID)
}
// Check brokers count
brokersCount := binary.BigEndian.Uint32(response[4:8])
if brokersCount != 1 {
t.Errorf("brokers count: got %d, want 1", brokersCount)
}
}
func TestHandler_handleListOffsets(t *testing.T) {
h := NewTestHandler()
correlationID := uint32(123)
// Build a simple ListOffsets v0 request body (header stripped): topics
// topics_count(4) + topic + partitions
topic := "test-topic"
requestBody := make([]byte, 0, 64)
// Topics count (1)
requestBody = append(requestBody, 0, 0, 0, 1)
// Topic name
requestBody = append(requestBody, 0, byte(len(topic)))
requestBody = append(requestBody, []byte(topic)...)
// Partitions count (2 partitions)
requestBody = append(requestBody, 0, 0, 0, 2)
// Partition 0: partition_id(4) + timestamp(8) - earliest
requestBody = append(requestBody, 0, 0, 0, 0) // partition 0
requestBody = append(requestBody, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFE) // -2 (earliest)
// Partition 1: partition_id(4) + timestamp(8) - latest
requestBody = append(requestBody, 0, 0, 0, 1) // partition 1
requestBody = append(requestBody, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF) // -1 (latest)
response, err := h.handleListOffsets(correlationID, 0, requestBody)
if err != nil {
t.Fatalf("handleListOffsets: %v", err)
}
if len(response) < 20 { // minimum expected size
t.Fatalf("response too short: %d bytes", len(response))
}
// Check correlation ID
respCorrelationID := binary.BigEndian.Uint32(response[0:4])
if respCorrelationID != correlationID {
t.Errorf("correlation ID: got %d, want %d", respCorrelationID, correlationID)
}
// For v0, throttle time is not present; topics count is next
topicsCount := binary.BigEndian.Uint32(response[4:8])
if topicsCount != 1 {
t.Errorf("topics count: got %d, want 1", topicsCount)
}
}
func TestHandler_ListOffsets_EndToEnd(t *testing.T) {
// Create handler
h := NewTestHandler()
// Create in-memory connection
server, client := net.Pipe()
defer server.Close()
defer client.Close()
// Handle connection in background
done := make(chan error, 1)
go func() {
done <- h.HandleConn(context.Background(), server)
}()
// Create ListOffsets request
correlationID := uint32(555)
clientID := "listoffsets-test"
topic := "my-topic"
message := make([]byte, 0, 128)
message = append(message, 0, 2) // API key 2 (ListOffsets)
message = append(message, 0, 0) // API version 0
// Correlation ID
correlationIDBytes := make([]byte, 4)
binary.BigEndian.PutUint32(correlationIDBytes, correlationID)
message = append(message, correlationIDBytes...)
// Client ID length and string
clientIDLen := uint16(len(clientID))
message = append(message, byte(clientIDLen>>8), byte(clientIDLen))
message = append(message, []byte(clientID)...)
// Topics count (1)
message = append(message, 0, 0, 0, 1)
// Topic name
topicLen := uint16(len(topic))
message = append(message, byte(topicLen>>8), byte(topicLen))
message = append(message, []byte(topic)...)
// Partitions count (1)
message = append(message, 0, 0, 0, 1)
// Partition 0 requesting earliest offset
message = append(message, 0, 0, 0, 0) // partition 0
message = append(message, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFE) // -2 (earliest)
// Write message size and data
messageSize := uint32(len(message))
sizeBuf := make([]byte, 4)
binary.BigEndian.PutUint32(sizeBuf, messageSize)
if _, err := client.Write(sizeBuf); err != nil {
t.Fatalf("write size: %v", err)
}
if _, err := client.Write(message); err != nil {
t.Fatalf("write message: %v", err)
}
// Read response size
var respSizeBuf [4]byte
client.SetReadDeadline(time.Now().Add(5 * time.Second))
if _, err := client.Read(respSizeBuf[:]); err != nil {
t.Fatalf("read response size: %v", err)
}
respSize := binary.BigEndian.Uint32(respSizeBuf[:])
if respSize == 0 || respSize > 1024*1024 {
t.Fatalf("invalid response size: %d", respSize)
}
// Read response data
respBuf := make([]byte, respSize)
if _, err := client.Read(respBuf); err != nil {
t.Fatalf("read response: %v", err)
}
// Parse response: correlation_id(4) + topics
if len(respBuf) < 20 { // minimum response size
t.Fatalf("response too short: %d bytes", len(respBuf))
}
// Check correlation ID
respCorrelationID := binary.BigEndian.Uint32(respBuf[0:4])
if respCorrelationID != correlationID {
t.Errorf("correlation ID mismatch: got %d, want %d", respCorrelationID, correlationID)
}
// Check topics count for v0 (no throttle time in v0)
topicsCount := binary.BigEndian.Uint32(respBuf[4:8])
if topicsCount != 1 {
t.Errorf("expected 1 topic, got: %d", topicsCount)
}
// Close client to end handler
client.Close()
// Wait for handler to complete
select {
case err := <-done:
if err != nil {
t.Errorf("handler error: %v", err)
}
case <-time.After(2 * time.Second):
t.Errorf("handler did not complete in time")
}
}
func TestHandler_Metadata_EndToEnd(t *testing.T) {
// Create handler
h := NewTestHandler()
// Create in-memory connection
server, client := net.Pipe()
defer server.Close()
defer client.Close()
// Handle connection in background
done := make(chan error, 1)
go func() {
done <- h.HandleConn(context.Background(), server)
}()
// Create Metadata request
correlationID := uint32(789)
clientID := "metadata-test"
message := make([]byte, 0, 64)
message = append(message, 0, 3) // API key 3 (Metadata)
message = append(message, 0, 0) // API version 0
// Correlation ID
correlationIDBytes := make([]byte, 4)
binary.BigEndian.PutUint32(correlationIDBytes, correlationID)
message = append(message, correlationIDBytes...)
// Client ID length and string
clientIDLen := uint16(len(clientID))
message = append(message, byte(clientIDLen>>8), byte(clientIDLen))
message = append(message, []byte(clientID)...)
// Empty request body (all topics)
message = append(message, 0xFF, 0xFF, 0xFF, 0xFF) // -1 = all topics
// Write message size and data
messageSize := uint32(len(message))
sizeBuf := make([]byte, 4)
binary.BigEndian.PutUint32(sizeBuf, messageSize)
if _, err := client.Write(sizeBuf); err != nil {
t.Fatalf("write size: %v", err)
}
if _, err := client.Write(message); err != nil {
t.Fatalf("write message: %v", err)
}
// Read response size
var respSizeBuf [4]byte
client.SetReadDeadline(time.Now().Add(5 * time.Second))
if _, err := client.Read(respSizeBuf[:]); err != nil {
t.Fatalf("read response size: %v", err)
}
respSize := binary.BigEndian.Uint32(respSizeBuf[:])
if respSize == 0 || respSize > 1024*1024 {
t.Fatalf("invalid response size: %d", respSize)
}
// Read response data
respBuf := make([]byte, respSize)
if _, err := client.Read(respBuf); err != nil {
t.Fatalf("read response: %v", err)
}
// Parse response: correlation_id(4) + brokers + topics (v0 has no throttle time)
if len(respBuf) < 31 { // minimum response size for v0
t.Fatalf("response too short: %d bytes", len(respBuf))
}
// Check correlation ID
respCorrelationID := binary.BigEndian.Uint32(respBuf[0:4])
if respCorrelationID != correlationID {
t.Errorf("correlation ID mismatch: got %d, want %d", respCorrelationID, correlationID)
}
// Check brokers count (immediately after correlation ID in v0)
brokersCount := binary.BigEndian.Uint32(respBuf[4:8])
if brokersCount != 1 {
t.Errorf("expected 1 broker, got: %d", brokersCount)
}
// Close client to end handler
client.Close()
// Wait for handler to complete
select {
case err := <-done:
if err != nil {
t.Errorf("handler error: %v", err)
}
case <-time.After(2 * time.Second):
t.Errorf("handler did not complete in time")
}
}

299
weed/mq/kafka/protocol/handler_testing.go

@ -0,0 +1,299 @@
package protocol
import (
"fmt"
"sync"
"time"
"github.com/seaweedfs/seaweedfs/weed/mq/kafka/consumer"
"github.com/seaweedfs/seaweedfs/weed/mq/kafka/offset"
)
// MessageRecord represents a stored message (TEST ONLY)
type MessageRecord struct {
Key []byte
Value []byte
Timestamp int64
}
// basicSeaweedMQHandler is a minimal in-memory implementation for testing (TEST ONLY)
type basicSeaweedMQHandler struct {
topics map[string]bool
ledgers map[string]*offset.Ledger
// messages stores actual message content indexed by topic-partition-offset
messages map[string]map[int32]map[int64]*MessageRecord // topic -> partition -> offset -> message
mu sync.RWMutex
}
// testSeaweedMQHandler is a minimal mock implementation for testing (TEST ONLY)
type testSeaweedMQHandler struct {
topics map[string]bool
ledgers map[string]*offset.Ledger
mu sync.RWMutex
}
// NewTestHandler creates a handler for testing purposes without requiring SeaweedMQ masters
// This should ONLY be used in tests - uses basicSeaweedMQHandler for message storage simulation
func NewTestHandler() *Handler {
return &Handler{
groupCoordinator: consumer.NewGroupCoordinator(),
brokerHost: "localhost",
brokerPort: 9092,
seaweedMQHandler: &basicSeaweedMQHandler{
topics: make(map[string]bool),
ledgers: make(map[string]*offset.Ledger),
messages: make(map[string]map[int32]map[int64]*MessageRecord),
},
}
}
// NewSimpleTestHandler creates a minimal test handler without message storage
// This should ONLY be used for basic protocol tests that don't need message content
func NewSimpleTestHandler() *Handler {
return &Handler{
groupCoordinator: consumer.NewGroupCoordinator(),
brokerHost: "localhost",
brokerPort: 9092,
seaweedMQHandler: &testSeaweedMQHandler{
topics: make(map[string]bool),
ledgers: make(map[string]*offset.Ledger),
},
}
}
// ===== basicSeaweedMQHandler implementation (TEST ONLY) =====
func (b *basicSeaweedMQHandler) TopicExists(topic string) bool {
return b.topics[topic]
}
func (b *basicSeaweedMQHandler) ListTopics() []string {
topics := make([]string, 0, len(b.topics))
for topic := range b.topics {
topics = append(topics, topic)
}
return topics
}
func (b *basicSeaweedMQHandler) CreateTopic(topic string, partitions int32) error {
b.topics[topic] = true
return nil
}
func (b *basicSeaweedMQHandler) DeleteTopic(topic string) error {
delete(b.topics, topic)
return nil
}
func (b *basicSeaweedMQHandler) GetOrCreateLedger(topic string, partition int32) *offset.Ledger {
b.mu.Lock()
defer b.mu.Unlock()
key := fmt.Sprintf("%s-%d", topic, partition)
if ledger, exists := b.ledgers[key]; exists {
return ledger
}
// Create new ledger
ledger := offset.NewLedger()
b.ledgers[key] = ledger
// Also create the topic if it doesn't exist
b.topics[topic] = true
return ledger
}
func (b *basicSeaweedMQHandler) GetLedger(topic string, partition int32) *offset.Ledger {
b.mu.RLock()
defer b.mu.RUnlock()
key := fmt.Sprintf("%s-%d", topic, partition)
if ledger, exists := b.ledgers[key]; exists {
return ledger
}
// Return nil if ledger doesn't exist (topic doesn't exist)
return nil
}
func (b *basicSeaweedMQHandler) ProduceRecord(topicName string, partitionID int32, key, value []byte) (int64, error) {
// Get or create the ledger first (this will acquire and release the lock)
ledger := b.GetOrCreateLedger(topicName, partitionID)
// Now acquire the lock for the rest of the operation
b.mu.Lock()
defer b.mu.Unlock()
// Assign an offset and append the record
offset := ledger.AssignOffsets(1)
timestamp := time.Now().UnixNano()
size := int32(len(value))
if err := ledger.AppendRecord(offset, timestamp, size); err != nil {
return 0, fmt.Errorf("failed to append record: %w", err)
}
// Store the actual message content
if b.messages[topicName] == nil {
b.messages[topicName] = make(map[int32]map[int64]*MessageRecord)
}
if b.messages[topicName][partitionID] == nil {
b.messages[topicName][partitionID] = make(map[int64]*MessageRecord)
}
// Make copies of key and value to avoid referencing the original slices
keyCopy := make([]byte, len(key))
copy(keyCopy, key)
valueCopy := make([]byte, len(value))
copy(valueCopy, value)
b.messages[topicName][partitionID][offset] = &MessageRecord{
Key: keyCopy,
Value: valueCopy,
Timestamp: timestamp,
}
return offset, nil
}
// GetStoredMessages retrieves stored messages for a topic-partition from a given offset (TEST ONLY)
func (b *basicSeaweedMQHandler) GetStoredMessages(topicName string, partitionID int32, fromOffset int64, maxMessages int) []*MessageRecord {
b.mu.RLock()
defer b.mu.RUnlock()
if b.messages[topicName] == nil || b.messages[topicName][partitionID] == nil {
return nil
}
partitionMessages := b.messages[topicName][partitionID]
var result []*MessageRecord
// Collect messages starting from fromOffset
for offset := fromOffset; offset < fromOffset+int64(maxMessages); offset++ {
if msg, exists := partitionMessages[offset]; exists {
result = append(result, msg)
} else {
// No more consecutive messages
break
}
}
return result
}
// BasicSMQRecord implements SMQRecord interface for basicSeaweedMQHandler (TEST ONLY)
type BasicSMQRecord struct {
*MessageRecord
offset int64
}
func (r *BasicSMQRecord) GetKey() []byte { return r.Key }
func (r *BasicSMQRecord) GetValue() []byte { return r.Value }
func (r *BasicSMQRecord) GetTimestamp() int64 { return r.Timestamp }
func (r *BasicSMQRecord) GetOffset() int64 { return r.offset }
// GetStoredRecords retrieves stored message records for basicSeaweedMQHandler (TEST ONLY)
func (b *basicSeaweedMQHandler) GetStoredRecords(topic string, partition int32, fromOffset int64, maxRecords int) ([]offset.SMQRecord, error) {
messages := b.GetStoredMessages(topic, partition, fromOffset, maxRecords)
if len(messages) == 0 {
return nil, nil
}
records := make([]offset.SMQRecord, len(messages))
for i, msg := range messages {
records[i] = &BasicSMQRecord{
MessageRecord: msg,
offset: fromOffset + int64(i),
}
}
return records, nil
}
func (b *basicSeaweedMQHandler) Close() error {
return nil
}
// ===== testSeaweedMQHandler implementation (TEST ONLY) =====
func (t *testSeaweedMQHandler) TopicExists(topic string) bool {
return t.topics[topic]
}
func (t *testSeaweedMQHandler) ListTopics() []string {
var topics []string
for topic := range t.topics {
topics = append(topics, topic)
}
return topics
}
func (t *testSeaweedMQHandler) CreateTopic(topic string, partitions int32) error {
t.topics[topic] = true
return nil
}
func (t *testSeaweedMQHandler) DeleteTopic(topic string) error {
delete(t.topics, topic)
return nil
}
func (t *testSeaweedMQHandler) GetOrCreateLedger(topic string, partition int32) *offset.Ledger {
t.mu.Lock()
defer t.mu.Unlock()
// Mark topic as existing when creating ledger
t.topics[topic] = true
key := fmt.Sprintf("%s-%d", topic, partition)
if ledger, exists := t.ledgers[key]; exists {
return ledger
}
ledger := offset.NewLedger()
t.ledgers[key] = ledger
return ledger
}
func (t *testSeaweedMQHandler) GetLedger(topic string, partition int32) *offset.Ledger {
t.mu.RLock()
defer t.mu.RUnlock()
key := fmt.Sprintf("%s-%d", topic, partition)
if ledger, exists := t.ledgers[key]; exists {
return ledger
}
// Return nil if ledger doesn't exist (topic doesn't exist)
return nil
}
func (t *testSeaweedMQHandler) ProduceRecord(topicName string, partitionID int32, key, value []byte) (int64, error) {
// For testing, actually store the record in the ledger
ledger := t.GetOrCreateLedger(topicName, partitionID)
// Assign an offset and append the record
offset := ledger.AssignOffsets(1)
timestamp := time.Now().UnixNano()
size := int32(len(value))
if err := ledger.AppendRecord(offset, timestamp, size); err != nil {
return 0, fmt.Errorf("failed to append record: %w", err)
}
return offset, nil
}
// GetStoredRecords for testSeaweedMQHandler - returns empty (no storage simulation)
func (t *testSeaweedMQHandler) GetStoredRecords(topic string, partition int32, fromOffset int64, maxRecords int) ([]offset.SMQRecord, error) {
// Test handler doesn't simulate message storage, return empty
return nil, nil
}
func (t *testSeaweedMQHandler) Close() error {
return nil
}
// AddTopicForTesting moved to handler.go (available to production code for testing)
// GetStoredMessages is already defined in the basicSeaweedMQHandler implementation above

6
weed/mq/kafka/protocol/offset_management_test.go

@ -332,10 +332,10 @@ func TestHandler_fetchOffset(t *testing.T) {
func TestHandler_OffsetCommitFetch_EndToEnd(t *testing.T) {
// Create two handlers connected via pipe to simulate client-server
server := NewHandler()
server := NewTestHandler()
defer server.Close()
client := NewHandler()
client := NewTestHandler()
defer client.Close()
serverConn, clientConn := net.Pipe()
@ -494,7 +494,7 @@ func TestHandler_buildOffsetFetchResponse(t *testing.T) {
ErrorCode: ErrorCodeNone,
}
responseBytes := h.buildOffsetFetchResponse(response)
responseBytes := h.buildOffsetFetchResponse(response, 5) // Use API version 5 (includes leader epoch)
if len(responseBytes) < 20 {
t.Fatalf("response too short: %d bytes", len(responseBytes))

Loading…
Cancel
Save