From 56aa5278affc2f17ebfa8050b5936894147d7720 Mon Sep 17 00:00:00 2001 From: chrislu Date: Fri, 12 Sep 2025 23:13:41 -0700 Subject: [PATCH] single mode --- weed/mq/kafka/gateway/server.go | 22 ++--- .../mq/kafka/integration/seaweedmq_handler.go | 4 + weed/mq/kafka/protocol/handler.go | 97 ++++++------------- weed/mq/kafka/protocol/produce.go | 79 +++------------ 4 files changed, 53 insertions(+), 149 deletions(-) diff --git a/weed/mq/kafka/gateway/server.go b/weed/mq/kafka/gateway/server.go index 9326dd1ce..28e254310 100644 --- a/weed/mq/kafka/gateway/server.go +++ b/weed/mq/kafka/gateway/server.go @@ -49,7 +49,7 @@ func resolveAdvertisedAddress() string { type Options struct { Listen string - Masters string // SeaweedFS master servers (required for production mode, empty for testing) + Masters string // SeaweedFS master servers (required) FilerGroup string // filer group name (optional) } @@ -65,22 +65,14 @@ type Server struct { func NewServer(opts Options) *Server { ctx, cancel := context.WithCancel(context.Background()) - var handler *protocol.Handler - var err error - - if opts.Masters == "" { - // Use in-memory handler when no masters are configured (typical for tests) - handler = protocol.NewHandler() - glog.V(1).Infof("Created Kafka gateway with in-memory handler") - } else { - // Create broker-based SeaweedMQ handler when masters are provided - handler, err = protocol.NewSeaweedMQBrokerHandler(opts.Masters, opts.FilerGroup) - if err != nil { - glog.Fatalf("Failed to create SeaweedMQ broker handler: %v", err) - } - glog.V(1).Infof("Created Kafka gateway with SeaweedMQ brokers via masters %s", opts.Masters) + // Create SeaweedMQ handler - masters required + handler, err := protocol.NewSeaweedMQBrokerHandler(opts.Masters, opts.FilerGroup) + if err != nil { + glog.Fatalf("Failed to create Kafka gateway handler: %v", err) } + glog.V(1).Infof("Created Kafka gateway with SeaweedMQ brokers via masters %s", opts.Masters) + return &Server{ opts: opts, ctx: ctx, diff --git a/weed/mq/kafka/integration/seaweedmq_handler.go b/weed/mq/kafka/integration/seaweedmq_handler.go index 75938168c..b9b00145d 100644 --- a/weed/mq/kafka/integration/seaweedmq_handler.go +++ b/weed/mq/kafka/integration/seaweedmq_handler.go @@ -585,6 +585,10 @@ func (h *SeaweedMQHandler) convertSingleSeaweedRecord(seaweedRecord *SeaweedReco // NewSeaweedMQBrokerHandler creates a new handler with SeaweedMQ broker integration func NewSeaweedMQBrokerHandler(masters string, filerGroup string) (*SeaweedMQHandler, error) { + if masters == "" { + return nil, fmt.Errorf("masters required - SeaweedMQ infrastructure must be configured") + } + // Parse master addresses masterAddresses := strings.Split(masters, ",") if len(masterAddresses) == 0 { diff --git a/weed/mq/kafka/protocol/handler.go b/weed/mq/kafka/protocol/handler.go index 2e2325efe..0f2c4fd75 100644 --- a/weed/mq/kafka/protocol/handler.go +++ b/weed/mq/kafka/protocol/handler.go @@ -9,7 +9,6 @@ import ( "net" "strconv" "strings" - "sync" "time" "github.com/seaweedfs/seaweedfs/weed/mq/kafka/consumer" @@ -31,22 +30,10 @@ type TopicPartitionKey struct { Partition int32 } -// Handler processes Kafka protocol requests from clients +// Handler processes Kafka protocol requests from clients using SeaweedMQ type Handler struct { - // Legacy in-memory mode (for backward compatibility and tests) - topicsMu sync.RWMutex - topics map[string]*TopicInfo // topic name -> topic info - - ledgersMu sync.RWMutex - ledgers map[TopicPartitionKey]*offset.Ledger // topic-partition -> offset ledger - - // Record batch storage for in-memory mode (for testing) - recordBatchMu sync.RWMutex - recordBatches map[string][]byte // "topic:partition:offset" -> record batch data - - // SeaweedMQ integration (optional, for production use) + // SeaweedMQ integration seaweedMQHandler *integration.SeaweedMQHandler - useSeaweedMQ bool // SMQ offset storage for consumer group offsets smqOffsetStorage *offset.SMQOffsetStorage @@ -64,17 +51,9 @@ type Handler struct { brokerPort int } -// NewHandler creates a new handler in legacy in-memory mode +// NewHandler is deprecated - use NewSeaweedMQBrokerHandler with proper SeaweedMQ infrastructure func NewHandler() *Handler { - return &Handler{ - topics: make(map[string]*TopicInfo), - ledgers: make(map[TopicPartitionKey]*offset.Ledger), - recordBatches: make(map[string][]byte), - useSeaweedMQ: false, - groupCoordinator: consumer.NewGroupCoordinator(), - brokerHost: "localhost", // default fallback - brokerPort: 9092, // default fallback - } + panic("NewHandler() deprecated - SeaweedMQ infrastructure must be configured using NewSeaweedMQBrokerHandler()") } // NewSeaweedMQHandler creates a new handler with SeaweedMQ integration @@ -85,16 +64,16 @@ func NewSeaweedMQHandler(agentAddress string) (*Handler, error) { } return &Handler{ - topics: make(map[string]*TopicInfo), // Keep for compatibility - ledgers: make(map[TopicPartitionKey]*offset.Ledger), // Keep for compatibility seaweedMQHandler: smqHandler, - useSeaweedMQ: true, groupCoordinator: consumer.NewGroupCoordinator(), + brokerHost: "localhost", + brokerPort: 9092, }, nil } // NewSeaweedMQBrokerHandler creates a new handler with SeaweedMQ broker integration func NewSeaweedMQBrokerHandler(masters string, filerGroup string) (*Handler, error) { + // Set up SeaweedMQ integration smqHandler, err := integration.NewSeaweedMQBrokerHandler(masters, filerGroup) if err != nil { return nil, err @@ -110,15 +89,31 @@ func NewSeaweedMQBrokerHandler(masters string, filerGroup string) (*Handler, err } return &Handler{ - topics: make(map[string]*TopicInfo), // Keep for compatibility - ledgers: make(map[TopicPartitionKey]*offset.Ledger), // Keep for compatibility seaweedMQHandler: smqHandler, smqOffsetStorage: smqOffsetStorage, - useSeaweedMQ: true, groupCoordinator: consumer.NewGroupCoordinator(), + brokerHost: "localhost", // default fallback + brokerPort: 9092, // default fallback }, nil } +// Delegate methods to SeaweedMQ handler + +// AddTopicForTesting creates a topic for testing purposes +func (h *Handler) AddTopicForTesting(topicName string, partitions int32) { + h.seaweedMQHandler.CreateTopic(topicName, partitions) +} + +// GetOrCreateLedger delegates to SeaweedMQ handler +func (h *Handler) GetOrCreateLedger(topic string, partition int32) *offset.Ledger { + return h.seaweedMQHandler.GetOrCreateLedger(topic, partition) +} + +// GetLedger delegates to SeaweedMQ handler +func (h *Handler) GetLedger(topic string, partition int32) *offset.Ledger { + return h.seaweedMQHandler.GetLedger(topic, partition) +} + // Close shuts down the handler and all connections func (h *Handler) Close() error { // Close group coordinator @@ -134,50 +129,12 @@ func (h *Handler) Close() error { } // Close SeaweedMQ handler if present - if h.useSeaweedMQ && h.seaweedMQHandler != nil { + if h.seaweedMQHandler != nil { return h.seaweedMQHandler.Close() } return nil } -// GetOrCreateLedger returns the offset ledger for a topic-partition, creating it if needed -func (h *Handler) GetOrCreateLedger(topic string, partition int32) *offset.Ledger { - key := TopicPartitionKey{Topic: topic, Partition: partition} - - // First try to get existing ledger with read lock - h.ledgersMu.RLock() - ledger, exists := h.ledgers[key] - h.ledgersMu.RUnlock() - - if exists { - return ledger - } - - // Create new ledger with write lock - h.ledgersMu.Lock() - defer h.ledgersMu.Unlock() - - // Double-check after acquiring write lock - if ledger, exists := h.ledgers[key]; exists { - return ledger - } - - // Create and store new ledger - ledger = offset.NewLedger() - h.ledgers[key] = ledger - return ledger -} - -// GetLedger returns the offset ledger for a topic-partition, or nil if not found -func (h *Handler) GetLedger(topic string, partition int32) *offset.Ledger { - key := TopicPartitionKey{Topic: topic, Partition: partition} - - h.ledgersMu.RLock() - defer h.ledgersMu.RUnlock() - - return h.ledgers[key] -} - // StoreRecordBatch stores a record batch for later retrieval during Fetch operations func (h *Handler) StoreRecordBatch(topicName string, partition int32, baseOffset int64, recordBatch []byte) { key := fmt.Sprintf("%s:%d:%d", topicName, partition, baseOffset) diff --git a/weed/mq/kafka/protocol/produce.go b/weed/mq/kafka/protocol/produce.go index 7b687d32f..fefb79fbc 100644 --- a/weed/mq/kafka/protocol/produce.go +++ b/weed/mq/kafka/protocol/produce.go @@ -179,30 +179,12 @@ func (h *Handler) handleProduceV0V1(correlationID uint32, apiVersion uint16, req if parseErr != nil { errorCode = 42 // INVALID_RECORD } else if recordCount > 0 { - if h.useSeaweedMQ { - // Use SeaweedMQ integration for production - offset, err := h.produceToSeaweedMQ(topicName, int32(partitionID), recordSetData) - if err != nil { - errorCode = 1 // UNKNOWN_SERVER_ERROR - } else { - baseOffset = offset - } + // Use SeaweedMQ integration + offset, err := h.produceToSeaweedMQ(topicName, int32(partitionID), recordSetData) + if err != nil { + errorCode = 1 // UNKNOWN_SERVER_ERROR } else { - // Use legacy in-memory mode for tests - ledger := h.GetOrCreateLedger(topicName, int32(partitionID)) - fmt.Printf("DEBUG: Before AssignOffsets - HWM: %d, recordCount: %d\n", ledger.GetHighWaterMark(), recordCount) - baseOffset = ledger.AssignOffsets(int64(recordCount)) - fmt.Printf("DEBUG: After AssignOffsets - HWM: %d, baseOffset: %d\n", ledger.GetHighWaterMark(), baseOffset) - - // Append each record to the ledger - avgSize := totalSize / recordCount - for k := int64(0); k < int64(recordCount); k++ { - err := ledger.AppendRecord(baseOffset+k, currentTime+k*1000, avgSize) - if err != nil { - fmt.Printf("DEBUG: AppendRecord error: %v\n", err) - } - } - fmt.Printf("DEBUG: After AppendRecord - HWM: %d, entries: %d\n", ledger.GetHighWaterMark(), len(ledger.GetEntries())) + baseOffset = offset } } } @@ -472,30 +454,12 @@ func (h *Handler) handleProduceV2Plus(correlationID uint32, apiVersion uint16, r if parseErr != nil { errorCode = 42 // INVALID_RECORD } else if recordCount > 0 { - if h.useSeaweedMQ { - // Use SeaweedMQ integration for production - offsetVal, err := h.produceToSeaweedMQ(topicName, int32(partitionID), recordSetData) - if err != nil { - errorCode = 1 // UNKNOWN_SERVER_ERROR - } else { - baseOffset = offsetVal - } + // Use SeaweedMQ integration + offsetVal, err := h.produceToSeaweedMQ(topicName, int32(partitionID), recordSetData) + if err != nil { + errorCode = 1 // UNKNOWN_SERVER_ERROR } else { - // Use legacy in-memory mode for tests - ledger := h.GetOrCreateLedger(topicName, int32(partitionID)) - fmt.Printf("DEBUG: Produce v%d Before AssignOffsets - HWM: %d, recordCount: %d\n", apiVersion, ledger.GetHighWaterMark(), recordCount) - baseOffset = ledger.AssignOffsets(int64(recordCount)) - fmt.Printf("DEBUG: Produce v%d After AssignOffsets - HWM: %d, baseOffset: %d\n", apiVersion, ledger.GetHighWaterMark(), baseOffset) - - // Store the actual record batch data for Fetch operations - h.StoreRecordBatch(topicName, int32(partitionID), baseOffset, recordSetData) - - // Append each record to the ledger - avgSize := totalSize / recordCount - for k := int64(0); k < int64(recordCount); k++ { - _ = ledger.AppendRecord(baseOffset+k, currentTime+k*1000, avgSize) - } - fmt.Printf("DEBUG: Produce v%d After AppendRecord - HWM: %d, entries: %d\n", apiVersion, ledger.GetHighWaterMark(), len(ledger.GetEntries())) + baseOffset = offsetVal } } } @@ -571,17 +535,8 @@ func (h *Handler) processSchematizedMessage(topicName string, partitionID int32, fmt.Printf("DEBUG: Successfully decoded message with schema ID %d, format %s, subject %s\n", decodedMsg.SchemaID, decodedMsg.SchemaFormat, decodedMsg.Subject) - // If SeaweedMQ integration is enabled, store the decoded message - if h.useSeaweedMQ && h.seaweedMQHandler != nil { - return h.storeDecodedMessage(topicName, partitionID, decodedMsg) - } - - // For in-memory mode, we could store metadata about the schema - // For now, just log the successful decoding - fmt.Printf("DEBUG: Schema decoding successful - would store RecordValue with %d fields\n", - len(decodedMsg.RecordValue.Fields)) - - return nil + // Store the decoded message using SeaweedMQ + return h.storeDecodedMessage(topicName, partitionID, decodedMsg) } // storeDecodedMessage stores a decoded message using mq.broker integration @@ -602,8 +557,8 @@ func (h *Handler) storeDecodedMessage(topicName string, partitionID int32, decod return nil } - // Fallback to SeaweedMQ integration if available - if h.useSeaweedMQ && h.seaweedMQHandler != nil { + // Use SeaweedMQ integration + if h.seaweedMQHandler != nil { // Extract key and value from the original envelope (simplified) key := []byte(fmt.Sprintf("kafka-key-%d", time.Now().UnixNano())) value := decodedMsg.Envelope.Payload @@ -618,11 +573,7 @@ func (h *Handler) storeDecodedMessage(topicName string, partitionID int32, decod return nil } - // For in-memory mode, just log the successful decoding - fmt.Printf("DEBUG: Schema decoding successful (in-memory mode) - topic: %s, partition: %d, schema: %d, fields: %d\n", - topicName, partitionID, decodedMsg.SchemaID, len(decodedMsg.RecordValue.Fields)) - - return nil + return fmt.Errorf("no SeaweedMQ handler available") } // extractMessagesFromRecordSet extracts individual messages from a record set with compression support