You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
364 lines
10 KiB
364 lines
10 KiB
package integration
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/mq/client/pub_client"
|
|
"github.com/seaweedfs/seaweedfs/weed/mq/kafka/offset"
|
|
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
|
|
"google.golang.org/grpc"
|
|
"google.golang.org/grpc/credentials/insecure"
|
|
)
|
|
|
|
// SMQPublisher handles publishing Kafka messages to SeaweedMQ with offset tracking
|
|
type SMQPublisher struct {
|
|
brokers []string
|
|
grpcDialOption grpc.DialOption
|
|
ctx context.Context
|
|
|
|
// Topic publishers - one per Kafka topic
|
|
publishersLock sync.RWMutex
|
|
publishers map[string]*TopicPublisherWrapper
|
|
|
|
// Offset persistence
|
|
offsetStorage *offset.SMQOffsetStorage
|
|
|
|
// Ledgers for offset tracking
|
|
ledgersLock sync.RWMutex
|
|
ledgers map[string]*offset.PersistentLedger // key: topic-partition
|
|
}
|
|
|
|
// TopicPublisherWrapper wraps a SMQ publisher with Kafka-specific metadata
|
|
type TopicPublisherWrapper struct {
|
|
publisher *pub_client.TopicPublisher
|
|
kafkaTopic string
|
|
smqTopic topic.Topic
|
|
recordType *schema_pb.RecordType
|
|
createdAt time.Time
|
|
}
|
|
|
|
// NewSMQPublisher creates a new SMQ publisher for Kafka messages
|
|
func NewSMQPublisher(brokers []string) (*SMQPublisher, error) {
|
|
// Create offset storage
|
|
// Use first broker as filer address for offset storage
|
|
filerAddress := brokers[0]
|
|
offsetStorage, err := offset.NewSMQOffsetStorage(filerAddress)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to create offset storage: %w", err)
|
|
}
|
|
|
|
return &SMQPublisher{
|
|
brokers: brokers,
|
|
grpcDialOption: grpc.WithTransportCredentials(insecure.NewCredentials()),
|
|
ctx: context.Background(),
|
|
publishers: make(map[string]*TopicPublisherWrapper),
|
|
offsetStorage: offsetStorage,
|
|
ledgers: make(map[string]*offset.PersistentLedger),
|
|
}, nil
|
|
}
|
|
|
|
// PublishMessage publishes a Kafka message to SMQ with offset tracking
|
|
func (p *SMQPublisher) PublishMessage(
|
|
kafkaTopic string,
|
|
kafkaPartition int32,
|
|
key []byte,
|
|
value *schema_pb.RecordValue,
|
|
recordType *schema_pb.RecordType,
|
|
) (int64, error) {
|
|
|
|
// Get or create publisher for this topic
|
|
publisher, err := p.getOrCreatePublisher(kafkaTopic, recordType)
|
|
if err != nil {
|
|
return -1, fmt.Errorf("failed to get publisher: %w", err)
|
|
}
|
|
|
|
// Get or create ledger for offset tracking
|
|
ledger, err := p.getOrCreateLedger(kafkaTopic, kafkaPartition)
|
|
if err != nil {
|
|
return -1, fmt.Errorf("failed to get ledger: %w", err)
|
|
}
|
|
|
|
// Assign Kafka offset
|
|
kafkaOffset := ledger.AssignOffsets(1)
|
|
|
|
// Add Kafka metadata to the record
|
|
enrichedValue := p.enrichRecordWithKafkaMetadata(value, kafkaOffset, kafkaPartition)
|
|
|
|
// Publish to SMQ
|
|
if err := publisher.publisher.PublishRecord(key, enrichedValue); err != nil {
|
|
return -1, fmt.Errorf("failed to publish to SMQ: %w", err)
|
|
}
|
|
|
|
// Record the offset mapping
|
|
smqTimestamp := time.Now().UnixNano()
|
|
if err := ledger.AppendRecord(kafkaOffset, smqTimestamp, int32(len(key)+estimateRecordSize(enrichedValue))); err != nil {
|
|
return -1, fmt.Errorf("failed to record offset mapping: %w", err)
|
|
}
|
|
|
|
return kafkaOffset, nil
|
|
}
|
|
|
|
// getOrCreatePublisher gets or creates a SMQ publisher for the given Kafka topic
|
|
func (p *SMQPublisher) getOrCreatePublisher(kafkaTopic string, recordType *schema_pb.RecordType) (*TopicPublisherWrapper, error) {
|
|
p.publishersLock.RLock()
|
|
if publisher, exists := p.publishers[kafkaTopic]; exists {
|
|
p.publishersLock.RUnlock()
|
|
return publisher, nil
|
|
}
|
|
p.publishersLock.RUnlock()
|
|
|
|
p.publishersLock.Lock()
|
|
defer p.publishersLock.Unlock()
|
|
|
|
// Double-check after acquiring write lock
|
|
if publisher, exists := p.publishers[kafkaTopic]; exists {
|
|
return publisher, nil
|
|
}
|
|
|
|
// Create SMQ topic name (namespace: kafka, name: original topic)
|
|
smqTopic := topic.NewTopic("kafka", kafkaTopic)
|
|
|
|
// Enhance record type with Kafka metadata fields
|
|
enhancedRecordType := p.enhanceRecordTypeWithKafkaMetadata(recordType)
|
|
|
|
// Create SMQ publisher
|
|
publisher, err := pub_client.NewTopicPublisher(&pub_client.PublisherConfiguration{
|
|
Topic: smqTopic,
|
|
PartitionCount: 16, // Use multiple partitions for better distribution
|
|
Brokers: p.brokers,
|
|
PublisherName: fmt.Sprintf("kafka-gateway-%s", kafkaTopic),
|
|
RecordType: enhancedRecordType,
|
|
})
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to create SMQ publisher: %w", err)
|
|
}
|
|
|
|
wrapper := &TopicPublisherWrapper{
|
|
publisher: publisher,
|
|
kafkaTopic: kafkaTopic,
|
|
smqTopic: smqTopic,
|
|
recordType: enhancedRecordType,
|
|
createdAt: time.Now(),
|
|
}
|
|
|
|
p.publishers[kafkaTopic] = wrapper
|
|
return wrapper, nil
|
|
}
|
|
|
|
// getOrCreateLedger gets or creates a persistent ledger for offset tracking
|
|
func (p *SMQPublisher) getOrCreateLedger(kafkaTopic string, partition int32) (*offset.PersistentLedger, error) {
|
|
key := fmt.Sprintf("%s-%d", kafkaTopic, partition)
|
|
|
|
p.ledgersLock.RLock()
|
|
if ledger, exists := p.ledgers[key]; exists {
|
|
p.ledgersLock.RUnlock()
|
|
return ledger, nil
|
|
}
|
|
p.ledgersLock.RUnlock()
|
|
|
|
p.ledgersLock.Lock()
|
|
defer p.ledgersLock.Unlock()
|
|
|
|
// Double-check after acquiring write lock
|
|
if ledger, exists := p.ledgers[key]; exists {
|
|
return ledger, nil
|
|
}
|
|
|
|
// Create persistent ledger
|
|
ledger := offset.NewPersistentLedger(key, p.offsetStorage)
|
|
|
|
p.ledgers[key] = ledger
|
|
return ledger, nil
|
|
}
|
|
|
|
// enhanceRecordTypeWithKafkaMetadata adds Kafka-specific fields to the record type
|
|
func (p *SMQPublisher) enhanceRecordTypeWithKafkaMetadata(originalType *schema_pb.RecordType) *schema_pb.RecordType {
|
|
if originalType == nil {
|
|
originalType = &schema_pb.RecordType{}
|
|
}
|
|
|
|
// Create enhanced record type with Kafka metadata
|
|
enhanced := &schema_pb.RecordType{
|
|
Fields: make([]*schema_pb.Field, 0, len(originalType.Fields)+3),
|
|
}
|
|
|
|
// Copy original fields
|
|
for _, field := range originalType.Fields {
|
|
enhanced.Fields = append(enhanced.Fields, field)
|
|
}
|
|
|
|
// Add Kafka metadata fields
|
|
nextIndex := int32(len(originalType.Fields))
|
|
|
|
enhanced.Fields = append(enhanced.Fields, &schema_pb.Field{
|
|
Name: "_kafka_offset",
|
|
FieldIndex: nextIndex,
|
|
Type: &schema_pb.Type{
|
|
Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_INT64},
|
|
},
|
|
IsRequired: true,
|
|
IsRepeated: false,
|
|
})
|
|
nextIndex++
|
|
|
|
enhanced.Fields = append(enhanced.Fields, &schema_pb.Field{
|
|
Name: "_kafka_partition",
|
|
FieldIndex: nextIndex,
|
|
Type: &schema_pb.Type{
|
|
Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_INT32},
|
|
},
|
|
IsRequired: true,
|
|
IsRepeated: false,
|
|
})
|
|
nextIndex++
|
|
|
|
enhanced.Fields = append(enhanced.Fields, &schema_pb.Field{
|
|
Name: "_kafka_timestamp",
|
|
FieldIndex: nextIndex,
|
|
Type: &schema_pb.Type{
|
|
Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_INT64},
|
|
},
|
|
IsRequired: true,
|
|
IsRepeated: false,
|
|
})
|
|
|
|
return enhanced
|
|
}
|
|
|
|
// enrichRecordWithKafkaMetadata adds Kafka metadata to the record value
|
|
func (p *SMQPublisher) enrichRecordWithKafkaMetadata(
|
|
originalValue *schema_pb.RecordValue,
|
|
kafkaOffset int64,
|
|
kafkaPartition int32,
|
|
) *schema_pb.RecordValue {
|
|
if originalValue == nil {
|
|
originalValue = &schema_pb.RecordValue{Fields: make(map[string]*schema_pb.Value)}
|
|
}
|
|
|
|
// Create enhanced record value
|
|
enhanced := &schema_pb.RecordValue{
|
|
Fields: make(map[string]*schema_pb.Value),
|
|
}
|
|
|
|
// Copy original fields
|
|
for key, value := range originalValue.Fields {
|
|
enhanced.Fields[key] = value
|
|
}
|
|
|
|
// Add Kafka metadata
|
|
enhanced.Fields["_kafka_offset"] = &schema_pb.Value{
|
|
Kind: &schema_pb.Value_Int64Value{Int64Value: kafkaOffset},
|
|
}
|
|
|
|
enhanced.Fields["_kafka_partition"] = &schema_pb.Value{
|
|
Kind: &schema_pb.Value_Int32Value{Int32Value: kafkaPartition},
|
|
}
|
|
|
|
enhanced.Fields["_kafka_timestamp"] = &schema_pb.Value{
|
|
Kind: &schema_pb.Value_Int64Value{Int64Value: time.Now().UnixNano()},
|
|
}
|
|
|
|
return enhanced
|
|
}
|
|
|
|
// GetLedger returns the ledger for a topic-partition
|
|
func (p *SMQPublisher) GetLedger(kafkaTopic string, partition int32) *offset.PersistentLedger {
|
|
key := fmt.Sprintf("%s-%d", kafkaTopic, partition)
|
|
|
|
p.ledgersLock.RLock()
|
|
defer p.ledgersLock.RUnlock()
|
|
|
|
return p.ledgers[key]
|
|
}
|
|
|
|
// Close shuts down all publishers and storage
|
|
func (p *SMQPublisher) Close() error {
|
|
var lastErr error
|
|
|
|
// Close all publishers
|
|
p.publishersLock.Lock()
|
|
for _, wrapper := range p.publishers {
|
|
if err := wrapper.publisher.Shutdown(); err != nil {
|
|
lastErr = err
|
|
}
|
|
}
|
|
p.publishers = make(map[string]*TopicPublisherWrapper)
|
|
p.publishersLock.Unlock()
|
|
|
|
// Close offset storage
|
|
if err := p.offsetStorage.Close(); err != nil {
|
|
lastErr = err
|
|
}
|
|
|
|
return lastErr
|
|
}
|
|
|
|
// estimateRecordSize estimates the size of a RecordValue in bytes
|
|
func estimateRecordSize(record *schema_pb.RecordValue) int {
|
|
if record == nil {
|
|
return 0
|
|
}
|
|
|
|
size := 0
|
|
for key, value := range record.Fields {
|
|
size += len(key) + 8 // Key + overhead
|
|
|
|
switch v := value.Kind.(type) {
|
|
case *schema_pb.Value_StringValue:
|
|
size += len(v.StringValue)
|
|
case *schema_pb.Value_BytesValue:
|
|
size += len(v.BytesValue)
|
|
case *schema_pb.Value_Int32Value, *schema_pb.Value_FloatValue:
|
|
size += 4
|
|
case *schema_pb.Value_Int64Value, *schema_pb.Value_DoubleValue:
|
|
size += 8
|
|
case *schema_pb.Value_BoolValue:
|
|
size += 1
|
|
default:
|
|
size += 16 // Estimate for complex types
|
|
}
|
|
}
|
|
|
|
return size
|
|
}
|
|
|
|
// GetTopicStats returns statistics for a Kafka topic
|
|
func (p *SMQPublisher) GetTopicStats(kafkaTopic string) map[string]interface{} {
|
|
stats := make(map[string]interface{})
|
|
|
|
p.publishersLock.RLock()
|
|
wrapper, exists := p.publishers[kafkaTopic]
|
|
p.publishersLock.RUnlock()
|
|
|
|
if !exists {
|
|
stats["exists"] = false
|
|
return stats
|
|
}
|
|
|
|
stats["exists"] = true
|
|
stats["smq_topic"] = wrapper.smqTopic.String()
|
|
stats["created_at"] = wrapper.createdAt
|
|
stats["record_type_fields"] = len(wrapper.recordType.Fields)
|
|
|
|
// Collect partition stats
|
|
partitionStats := make(map[string]interface{})
|
|
p.ledgersLock.RLock()
|
|
for key, ledger := range p.ledgers {
|
|
if len(key) > len(kafkaTopic) && key[:len(kafkaTopic)] == kafkaTopic {
|
|
partitionStats[key] = map[string]interface{}{
|
|
"high_water_mark": ledger.GetHighWaterMark(),
|
|
"earliest_offset": ledger.GetEarliestOffset(),
|
|
"latest_offset": ledger.GetLatestOffset(),
|
|
"entry_count": len(ledger.GetEntries()),
|
|
}
|
|
}
|
|
}
|
|
p.ledgersLock.RUnlock()
|
|
|
|
stats["partitions"] = partitionStats
|
|
return stats
|
|
}
|