You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

293 lines
8.3 KiB

package kafka
import (
"fmt"
"sync"
"time"
"github.com/seaweedfs/seaweedfs/weed/mq/kafka/offset"
"github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
)
// MockSMQHandler provides a realistic SeaweedMQ simulation for testing
// It behaves like the real SeaweedMQ integration but uses in-memory storage
type MockSMQHandler struct {
// Topic management
topicsMu sync.RWMutex
topics map[string]*MockTopicInfo
// Message storage - simulates SeaweedMQ's persistent storage
messagesMu sync.RWMutex
messages map[string]map[int32][]*MockSMQRecord // topic -> partition -> []records
// Offset management - simulates Kafka offset ledgers
ledgersMu sync.RWMutex
ledgers map[string]*offset.Ledger // topic-partition -> ledger
// Simulated SMQ timestamp tracking
lastTimestamp map[string]map[int32]int64 // topic -> partition -> last_timestamp
}
// MockTopicInfo represents a Kafka topic in the mock SMQ environment
type MockTopicInfo struct {
Name string
Partitions int32
CreatedAt int64
Schema *schema_pb.Topic
}
// MockSMQRecord represents a record in the mock SeaweedMQ storage
type MockSMQRecord struct {
Key []byte
Value []byte
Timestamp int64 // SeaweedMQ timestamp (nanoseconds)
Offset int64 // Kafka offset for this partition
}
// Implement SMQRecord interface
func (r *MockSMQRecord) GetKey() []byte { return r.Key }
func (r *MockSMQRecord) GetValue() []byte { return r.Value }
func (r *MockSMQRecord) GetTimestamp() int64 { return r.Timestamp }
func (r *MockSMQRecord) GetOffset() int64 { return r.Offset }
// NewMockSMQHandler creates a new mock SeaweedMQ handler for testing
func NewMockSMQHandler() *MockSMQHandler {
return &MockSMQHandler{
topics: make(map[string]*MockTopicInfo),
messages: make(map[string]map[int32][]*MockSMQRecord),
ledgers: make(map[string]*offset.Ledger),
lastTimestamp: make(map[string]map[int32]int64),
}
}
// TopicExists checks if a topic exists in the mock SMQ
func (m *MockSMQHandler) TopicExists(topic string) bool {
m.topicsMu.RLock()
defer m.topicsMu.RUnlock()
_, exists := m.topics[topic]
return exists
}
// ListTopics returns all topics in the mock SMQ
func (m *MockSMQHandler) ListTopics() []string {
m.topicsMu.RLock()
defer m.topicsMu.RUnlock()
topics := make([]string, 0, len(m.topics))
for name := range m.topics {
topics = append(topics, name)
}
return topics
}
// CreateTopic creates a new topic in the mock SMQ
func (m *MockSMQHandler) CreateTopic(topic string, partitions int32) error {
m.topicsMu.Lock()
defer m.topicsMu.Unlock()
if _, exists := m.topics[topic]; exists {
return fmt.Errorf("topic %s already exists", topic)
}
// Create topic info
m.topics[topic] = &MockTopicInfo{
Name: topic,
Partitions: partitions,
CreatedAt: time.Now().UnixNano(),
Schema: &schema_pb.Topic{
Name: topic,
// PartitionCount removed - not part of schema_pb.Topic
},
}
// Initialize message storage for all partitions
m.messagesMu.Lock()
m.messages[topic] = make(map[int32][]*MockSMQRecord)
for i := int32(0); i < partitions; i++ {
m.messages[topic][i] = make([]*MockSMQRecord, 0, 1000)
}
m.messagesMu.Unlock()
// Initialize timestamp tracking
if m.lastTimestamp[topic] == nil {
m.lastTimestamp[topic] = make(map[int32]int64)
}
for i := int32(0); i < partitions; i++ {
m.lastTimestamp[topic][i] = 0
}
fmt.Printf("MOCK SMQ: Created topic %s with %d partitions\n", topic, partitions)
return nil
}
// DeleteTopic removes a topic from the mock SMQ
func (m *MockSMQHandler) DeleteTopic(topic string) error {
m.topicsMu.Lock()
defer m.topicsMu.Unlock()
if _, exists := m.topics[topic]; !exists {
return fmt.Errorf("topic %s does not exist", topic)
}
delete(m.topics, topic)
// Clean up messages
m.messagesMu.Lock()
delete(m.messages, topic)
m.messagesMu.Unlock()
// Clean up ledgers
m.ledgersMu.Lock()
keysToDelete := make([]string, 0)
for key := range m.ledgers {
if key[:len(topic)+1] == topic+"-" {
keysToDelete = append(keysToDelete, key)
}
}
for _, key := range keysToDelete {
delete(m.ledgers, key)
}
m.ledgersMu.Unlock()
fmt.Printf("MOCK SMQ: Deleted topic %s\n", topic)
return nil
}
// GetOrCreateLedger gets or creates a Kafka offset ledger for a topic-partition
func (m *MockSMQHandler) GetOrCreateLedger(topic string, partition int32) *offset.Ledger {
key := fmt.Sprintf("%s-%d", topic, partition)
m.ledgersMu.Lock()
defer m.ledgersMu.Unlock()
if ledger, exists := m.ledgers[key]; exists {
return ledger
}
// Create new ledger
ledger := offset.NewLedger()
m.ledgers[key] = ledger
// Ensure topic is created
if !m.TopicExists(topic) {
m.CreateTopic(topic, partition+1) // Ensure enough partitions
}
return ledger
}
// GetLedger retrieves an existing ledger or returns nil
func (m *MockSMQHandler) GetLedger(topic string, partition int32) *offset.Ledger {
key := fmt.Sprintf("%s-%d", topic, partition)
m.ledgersMu.RLock()
defer m.ledgersMu.RUnlock()
return m.ledgers[key]
}
// ProduceRecord publishes a record to the mock SeaweedMQ and updates Kafka offset tracking
func (m *MockSMQHandler) ProduceRecord(topic string, partition int32, key, value []byte) (int64, error) {
// Verify topic exists
if !m.TopicExists(topic) {
return 0, fmt.Errorf("topic %s does not exist", topic)
}
// Get current timestamp (simulate SeaweedMQ timestamp)
timestamp := time.Now().UnixNano()
// Get or create Kafka offset ledger
ledger := m.GetOrCreateLedger(topic, partition)
// Assign Kafka offset
kafkaOffset := ledger.AssignOffsets(1)
// Record in ledger (simulates Kafka offset -> SMQ timestamp mapping)
messageSize := int32(len(value))
if err := ledger.AppendRecord(kafkaOffset, timestamp, messageSize); err != nil {
return 0, fmt.Errorf("failed to append to ledger: %w", err)
}
// Store message in mock SMQ storage (simulates persistent storage)
m.messagesMu.Lock()
defer m.messagesMu.Unlock()
if m.messages[topic] == nil {
m.messages[topic] = make(map[int32][]*MockSMQRecord)
}
if m.messages[topic][partition] == nil {
m.messages[topic][partition] = make([]*MockSMQRecord, 0, 1000)
}
// Create message record (simulate SMQ storage)
record := &MockSMQRecord{
Key: append([]byte(nil), key...), // Deep copy key
Value: append([]byte(nil), value...), // Deep copy value
Timestamp: timestamp,
Offset: kafkaOffset,
}
// Append to partition (simulate SMQ append-only log)
m.messages[topic][partition] = append(m.messages[topic][partition], record)
// Update last timestamp
m.lastTimestamp[topic][partition] = timestamp
fmt.Printf("MOCK SMQ: Stored record - topic:%s, partition:%d, kafka_offset:%d, smq_timestamp:%d, key:%s, value:%s\n",
topic, partition, kafkaOffset, timestamp, string(key), string(value))
return kafkaOffset, nil
}
// GetStoredRecords retrieves records from mock SMQ storage starting from a given Kafka offset
func (m *MockSMQHandler) GetStoredRecords(topic string, partition int32, fromKafkaOffset int64, maxRecords int) ([]offset.SMQRecord, error) {
m.messagesMu.RLock()
defer m.messagesMu.RUnlock()
if m.messages[topic] == nil || m.messages[topic][partition] == nil {
return nil, nil // No messages
}
records := m.messages[topic][partition]
result := make([]offset.SMQRecord, 0, maxRecords)
// Find records starting from the given Kafka offset
for _, record := range records {
if record.Offset >= fromKafkaOffset && len(result) < maxRecords {
result = append(result, record)
}
}
fmt.Printf("MOCK SMQ: Retrieved %d records from topic:%s, partition:%d, from_offset:%d\n",
len(result), topic, partition, fromKafkaOffset)
return result, nil
}
// Close shuts down the mock SMQ handler
func (m *MockSMQHandler) Close() error {
fmt.Printf("MOCK SMQ: Handler closed\n")
return nil
}
// GetPartitionCount returns the number of partitions for a topic
func (m *MockSMQHandler) GetPartitionCount(topic string) int32 {
m.topicsMu.RLock()
defer m.topicsMu.RUnlock()
if topicInfo, exists := m.topics[topic]; exists {
return topicInfo.Partitions
}
return 0
}
// GetMessageCount returns the total number of messages stored for a topic-partition
func (m *MockSMQHandler) GetMessageCount(topic string, partition int32) int {
m.messagesMu.RLock()
defer m.messagesMu.RUnlock()
if m.messages[topic] == nil || m.messages[topic][partition] == nil {
return 0
}
return len(m.messages[topic][partition])
}