Browse Source

single mode

pull/7231/head
chrislu 2 months ago
parent
commit
56aa5278af
  1. 22
      weed/mq/kafka/gateway/server.go
  2. 4
      weed/mq/kafka/integration/seaweedmq_handler.go
  3. 97
      weed/mq/kafka/protocol/handler.go
  4. 79
      weed/mq/kafka/protocol/produce.go

22
weed/mq/kafka/gateway/server.go

@ -49,7 +49,7 @@ func resolveAdvertisedAddress() string {
type Options struct {
Listen string
Masters string // SeaweedFS master servers (required for production mode, empty for testing)
Masters string // SeaweedFS master servers (required)
FilerGroup string // filer group name (optional)
}
@ -65,22 +65,14 @@ type Server struct {
func NewServer(opts Options) *Server {
ctx, cancel := context.WithCancel(context.Background())
var handler *protocol.Handler
var err error
if opts.Masters == "" {
// Use in-memory handler when no masters are configured (typical for tests)
handler = protocol.NewHandler()
glog.V(1).Infof("Created Kafka gateway with in-memory handler")
} else {
// Create broker-based SeaweedMQ handler when masters are provided
handler, err = protocol.NewSeaweedMQBrokerHandler(opts.Masters, opts.FilerGroup)
if err != nil {
glog.Fatalf("Failed to create SeaweedMQ broker handler: %v", err)
}
glog.V(1).Infof("Created Kafka gateway with SeaweedMQ brokers via masters %s", opts.Masters)
// Create SeaweedMQ handler - masters required
handler, err := protocol.NewSeaweedMQBrokerHandler(opts.Masters, opts.FilerGroup)
if err != nil {
glog.Fatalf("Failed to create Kafka gateway handler: %v", err)
}
glog.V(1).Infof("Created Kafka gateway with SeaweedMQ brokers via masters %s", opts.Masters)
return &Server{
opts: opts,
ctx: ctx,

4
weed/mq/kafka/integration/seaweedmq_handler.go

@ -585,6 +585,10 @@ func (h *SeaweedMQHandler) convertSingleSeaweedRecord(seaweedRecord *SeaweedReco
// NewSeaweedMQBrokerHandler creates a new handler with SeaweedMQ broker integration
func NewSeaweedMQBrokerHandler(masters string, filerGroup string) (*SeaweedMQHandler, error) {
if masters == "" {
return nil, fmt.Errorf("masters required - SeaweedMQ infrastructure must be configured")
}
// Parse master addresses
masterAddresses := strings.Split(masters, ",")
if len(masterAddresses) == 0 {

97
weed/mq/kafka/protocol/handler.go

@ -9,7 +9,6 @@ import (
"net"
"strconv"
"strings"
"sync"
"time"
"github.com/seaweedfs/seaweedfs/weed/mq/kafka/consumer"
@ -31,22 +30,10 @@ type TopicPartitionKey struct {
Partition int32
}
// Handler processes Kafka protocol requests from clients
// Handler processes Kafka protocol requests from clients using SeaweedMQ
type Handler struct {
// Legacy in-memory mode (for backward compatibility and tests)
topicsMu sync.RWMutex
topics map[string]*TopicInfo // topic name -> topic info
ledgersMu sync.RWMutex
ledgers map[TopicPartitionKey]*offset.Ledger // topic-partition -> offset ledger
// Record batch storage for in-memory mode (for testing)
recordBatchMu sync.RWMutex
recordBatches map[string][]byte // "topic:partition:offset" -> record batch data
// SeaweedMQ integration (optional, for production use)
// SeaweedMQ integration
seaweedMQHandler *integration.SeaweedMQHandler
useSeaweedMQ bool
// SMQ offset storage for consumer group offsets
smqOffsetStorage *offset.SMQOffsetStorage
@ -64,17 +51,9 @@ type Handler struct {
brokerPort int
}
// NewHandler creates a new handler in legacy in-memory mode
// NewHandler is deprecated - use NewSeaweedMQBrokerHandler with proper SeaweedMQ infrastructure
func NewHandler() *Handler {
return &Handler{
topics: make(map[string]*TopicInfo),
ledgers: make(map[TopicPartitionKey]*offset.Ledger),
recordBatches: make(map[string][]byte),
useSeaweedMQ: false,
groupCoordinator: consumer.NewGroupCoordinator(),
brokerHost: "localhost", // default fallback
brokerPort: 9092, // default fallback
}
panic("NewHandler() deprecated - SeaweedMQ infrastructure must be configured using NewSeaweedMQBrokerHandler()")
}
// NewSeaweedMQHandler creates a new handler with SeaweedMQ integration
@ -85,16 +64,16 @@ func NewSeaweedMQHandler(agentAddress string) (*Handler, error) {
}
return &Handler{
topics: make(map[string]*TopicInfo), // Keep for compatibility
ledgers: make(map[TopicPartitionKey]*offset.Ledger), // Keep for compatibility
seaweedMQHandler: smqHandler,
useSeaweedMQ: true,
groupCoordinator: consumer.NewGroupCoordinator(),
brokerHost: "localhost",
brokerPort: 9092,
}, nil
}
// NewSeaweedMQBrokerHandler creates a new handler with SeaweedMQ broker integration
func NewSeaweedMQBrokerHandler(masters string, filerGroup string) (*Handler, error) {
// Set up SeaweedMQ integration
smqHandler, err := integration.NewSeaweedMQBrokerHandler(masters, filerGroup)
if err != nil {
return nil, err
@ -110,15 +89,31 @@ func NewSeaweedMQBrokerHandler(masters string, filerGroup string) (*Handler, err
}
return &Handler{
topics: make(map[string]*TopicInfo), // Keep for compatibility
ledgers: make(map[TopicPartitionKey]*offset.Ledger), // Keep for compatibility
seaweedMQHandler: smqHandler,
smqOffsetStorage: smqOffsetStorage,
useSeaweedMQ: true,
groupCoordinator: consumer.NewGroupCoordinator(),
brokerHost: "localhost", // default fallback
brokerPort: 9092, // default fallback
}, nil
}
// Delegate methods to SeaweedMQ handler
// AddTopicForTesting creates a topic for testing purposes
func (h *Handler) AddTopicForTesting(topicName string, partitions int32) {
h.seaweedMQHandler.CreateTopic(topicName, partitions)
}
// GetOrCreateLedger delegates to SeaweedMQ handler
func (h *Handler) GetOrCreateLedger(topic string, partition int32) *offset.Ledger {
return h.seaweedMQHandler.GetOrCreateLedger(topic, partition)
}
// GetLedger delegates to SeaweedMQ handler
func (h *Handler) GetLedger(topic string, partition int32) *offset.Ledger {
return h.seaweedMQHandler.GetLedger(topic, partition)
}
// Close shuts down the handler and all connections
func (h *Handler) Close() error {
// Close group coordinator
@ -134,50 +129,12 @@ func (h *Handler) Close() error {
}
// Close SeaweedMQ handler if present
if h.useSeaweedMQ && h.seaweedMQHandler != nil {
if h.seaweedMQHandler != nil {
return h.seaweedMQHandler.Close()
}
return nil
}
// GetOrCreateLedger returns the offset ledger for a topic-partition, creating it if needed
func (h *Handler) GetOrCreateLedger(topic string, partition int32) *offset.Ledger {
key := TopicPartitionKey{Topic: topic, Partition: partition}
// First try to get existing ledger with read lock
h.ledgersMu.RLock()
ledger, exists := h.ledgers[key]
h.ledgersMu.RUnlock()
if exists {
return ledger
}
// Create new ledger with write lock
h.ledgersMu.Lock()
defer h.ledgersMu.Unlock()
// Double-check after acquiring write lock
if ledger, exists := h.ledgers[key]; exists {
return ledger
}
// Create and store new ledger
ledger = offset.NewLedger()
h.ledgers[key] = ledger
return ledger
}
// GetLedger returns the offset ledger for a topic-partition, or nil if not found
func (h *Handler) GetLedger(topic string, partition int32) *offset.Ledger {
key := TopicPartitionKey{Topic: topic, Partition: partition}
h.ledgersMu.RLock()
defer h.ledgersMu.RUnlock()
return h.ledgers[key]
}
// StoreRecordBatch stores a record batch for later retrieval during Fetch operations
func (h *Handler) StoreRecordBatch(topicName string, partition int32, baseOffset int64, recordBatch []byte) {
key := fmt.Sprintf("%s:%d:%d", topicName, partition, baseOffset)

79
weed/mq/kafka/protocol/produce.go

@ -179,30 +179,12 @@ func (h *Handler) handleProduceV0V1(correlationID uint32, apiVersion uint16, req
if parseErr != nil {
errorCode = 42 // INVALID_RECORD
} else if recordCount > 0 {
if h.useSeaweedMQ {
// Use SeaweedMQ integration for production
offset, err := h.produceToSeaweedMQ(topicName, int32(partitionID), recordSetData)
if err != nil {
errorCode = 1 // UNKNOWN_SERVER_ERROR
} else {
baseOffset = offset
}
// Use SeaweedMQ integration
offset, err := h.produceToSeaweedMQ(topicName, int32(partitionID), recordSetData)
if err != nil {
errorCode = 1 // UNKNOWN_SERVER_ERROR
} else {
// Use legacy in-memory mode for tests
ledger := h.GetOrCreateLedger(topicName, int32(partitionID))
fmt.Printf("DEBUG: Before AssignOffsets - HWM: %d, recordCount: %d\n", ledger.GetHighWaterMark(), recordCount)
baseOffset = ledger.AssignOffsets(int64(recordCount))
fmt.Printf("DEBUG: After AssignOffsets - HWM: %d, baseOffset: %d\n", ledger.GetHighWaterMark(), baseOffset)
// Append each record to the ledger
avgSize := totalSize / recordCount
for k := int64(0); k < int64(recordCount); k++ {
err := ledger.AppendRecord(baseOffset+k, currentTime+k*1000, avgSize)
if err != nil {
fmt.Printf("DEBUG: AppendRecord error: %v\n", err)
}
}
fmt.Printf("DEBUG: After AppendRecord - HWM: %d, entries: %d\n", ledger.GetHighWaterMark(), len(ledger.GetEntries()))
baseOffset = offset
}
}
}
@ -472,30 +454,12 @@ func (h *Handler) handleProduceV2Plus(correlationID uint32, apiVersion uint16, r
if parseErr != nil {
errorCode = 42 // INVALID_RECORD
} else if recordCount > 0 {
if h.useSeaweedMQ {
// Use SeaweedMQ integration for production
offsetVal, err := h.produceToSeaweedMQ(topicName, int32(partitionID), recordSetData)
if err != nil {
errorCode = 1 // UNKNOWN_SERVER_ERROR
} else {
baseOffset = offsetVal
}
// Use SeaweedMQ integration
offsetVal, err := h.produceToSeaweedMQ(topicName, int32(partitionID), recordSetData)
if err != nil {
errorCode = 1 // UNKNOWN_SERVER_ERROR
} else {
// Use legacy in-memory mode for tests
ledger := h.GetOrCreateLedger(topicName, int32(partitionID))
fmt.Printf("DEBUG: Produce v%d Before AssignOffsets - HWM: %d, recordCount: %d\n", apiVersion, ledger.GetHighWaterMark(), recordCount)
baseOffset = ledger.AssignOffsets(int64(recordCount))
fmt.Printf("DEBUG: Produce v%d After AssignOffsets - HWM: %d, baseOffset: %d\n", apiVersion, ledger.GetHighWaterMark(), baseOffset)
// Store the actual record batch data for Fetch operations
h.StoreRecordBatch(topicName, int32(partitionID), baseOffset, recordSetData)
// Append each record to the ledger
avgSize := totalSize / recordCount
for k := int64(0); k < int64(recordCount); k++ {
_ = ledger.AppendRecord(baseOffset+k, currentTime+k*1000, avgSize)
}
fmt.Printf("DEBUG: Produce v%d After AppendRecord - HWM: %d, entries: %d\n", apiVersion, ledger.GetHighWaterMark(), len(ledger.GetEntries()))
baseOffset = offsetVal
}
}
}
@ -571,17 +535,8 @@ func (h *Handler) processSchematizedMessage(topicName string, partitionID int32,
fmt.Printf("DEBUG: Successfully decoded message with schema ID %d, format %s, subject %s\n",
decodedMsg.SchemaID, decodedMsg.SchemaFormat, decodedMsg.Subject)
// If SeaweedMQ integration is enabled, store the decoded message
if h.useSeaweedMQ && h.seaweedMQHandler != nil {
return h.storeDecodedMessage(topicName, partitionID, decodedMsg)
}
// For in-memory mode, we could store metadata about the schema
// For now, just log the successful decoding
fmt.Printf("DEBUG: Schema decoding successful - would store RecordValue with %d fields\n",
len(decodedMsg.RecordValue.Fields))
return nil
// Store the decoded message using SeaweedMQ
return h.storeDecodedMessage(topicName, partitionID, decodedMsg)
}
// storeDecodedMessage stores a decoded message using mq.broker integration
@ -602,8 +557,8 @@ func (h *Handler) storeDecodedMessage(topicName string, partitionID int32, decod
return nil
}
// Fallback to SeaweedMQ integration if available
if h.useSeaweedMQ && h.seaweedMQHandler != nil {
// Use SeaweedMQ integration
if h.seaweedMQHandler != nil {
// Extract key and value from the original envelope (simplified)
key := []byte(fmt.Sprintf("kafka-key-%d", time.Now().UnixNano()))
value := decodedMsg.Envelope.Payload
@ -618,11 +573,7 @@ func (h *Handler) storeDecodedMessage(topicName string, partitionID int32, decod
return nil
}
// For in-memory mode, just log the successful decoding
fmt.Printf("DEBUG: Schema decoding successful (in-memory mode) - topic: %s, partition: %d, schema: %d, fields: %d\n",
topicName, partitionID, decodedMsg.SchemaID, len(decodedMsg.RecordValue.Fields))
return nil
return fmt.Errorf("no SeaweedMQ handler available")
}
// extractMessagesFromRecordSet extracts individual messages from a record set with compression support

Loading…
Cancel
Save