From bd7b07e90cd5a8ac7524cae60eafb672febdb8a9 Mon Sep 17 00:00:00 2001 From: chrislu Date: Fri, 12 Sep 2025 18:35:07 -0700 Subject: [PATCH] use broker client --- .../mq/kafka/integration/seaweedmq_handler.go | 376 +++++++++++++++++- 1 file changed, 356 insertions(+), 20 deletions(-) diff --git a/weed/mq/kafka/integration/seaweedmq_handler.go b/weed/mq/kafka/integration/seaweedmq_handler.go index bff598dd8..19e68bed1 100644 --- a/weed/mq/kafka/integration/seaweedmq_handler.go +++ b/weed/mq/kafka/integration/seaweedmq_handler.go @@ -14,12 +14,15 @@ import ( "github.com/seaweedfs/seaweedfs/weed/cluster" "github.com/seaweedfs/seaweedfs/weed/mq/kafka/offset" "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" + "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" ) // SeaweedMQHandler integrates Kafka protocol handlers with real SeaweedMQ storage type SeaweedMQHandler struct { - agentClient *AgentClient + agentClient *AgentClient // For agent-based connections + brokerClient *BrokerClient // For broker-based connections + useBroker bool // Flag to determine which client to use // Topic registry - still keep track of Kafka topics topicsMu sync.RWMutex @@ -61,6 +64,7 @@ func NewSeaweedMQHandler(agentAddress string) (*SeaweedMQHandler, error) { return &SeaweedMQHandler{ agentClient: agentClient, + useBroker: false, topics: make(map[string]*KafkaTopicInfo), ledgers: make(map[TopicPartitionKey]*offset.Ledger), }, nil @@ -68,7 +72,12 @@ func NewSeaweedMQHandler(agentAddress string) (*SeaweedMQHandler, error) { // Close shuts down the handler and all connections func (h *SeaweedMQHandler) Close() error { - return h.agentClient.Close() + if h.useBroker && h.brokerClient != nil { + return h.brokerClient.Close() + } else if h.agentClient != nil { + return h.agentClient.Close() + } + return nil } // CreateTopic creates a new topic in both Kafka registry and SeaweedMQ @@ -121,7 +130,11 @@ func (h *SeaweedMQHandler) DeleteTopic(name string) error { // Close all publisher sessions for this topic for partitionID := int32(0); partitionID < topicInfo.Partitions; partitionID++ { - h.agentClient.ClosePublisher(name, partitionID) + if h.useBroker && h.brokerClient != nil { + h.brokerClient.ClosePublisher(name, partitionID) + } else if h.agentClient != nil { + h.agentClient.ClosePublisher(name, partitionID) + } } // Remove from registry @@ -179,9 +192,17 @@ func (h *SeaweedMQHandler) ProduceRecord(topic string, partition int32, key []by timestamp := time.Now().UnixNano() // Publish to SeaweedMQ - _, err := h.agentClient.PublishRecord(topic, partition, key, value, timestamp) - if err != nil { - return 0, fmt.Errorf("failed to publish to SeaweedMQ: %v", err) + var publishErr error + if h.useBroker && h.brokerClient != nil { + _, publishErr = h.brokerClient.PublishRecord(topic, partition, key, value, timestamp) + } else if h.agentClient != nil { + _, publishErr = h.agentClient.PublishRecord(topic, partition, key, value, timestamp) + } else { + publishErr = fmt.Errorf("no client available") + } + + if publishErr != nil { + return 0, fmt.Errorf("failed to publish to SeaweedMQ: %v", publishErr) } // Update Kafka offset ledger @@ -256,10 +277,8 @@ func (h *SeaweedMQHandler) FetchRecords(topic string, partition int32, fetchOffs } // Get or create subscriber session for this topic/partition - subscriber, err := h.agentClient.GetOrCreateSubscriber(topic, partition, fetchOffset) - if err != nil { - return nil, fmt.Errorf("failed to get subscriber: %v", err) - } + var seaweedRecords []*SeaweedRecord + var err error // Calculate how many records to fetch recordsToFetch := int(highWaterMark - fetchOffset) @@ -267,8 +286,23 @@ func (h *SeaweedMQHandler) FetchRecords(topic string, partition int32, fetchOffs recordsToFetch = 100 // Limit batch size } - // Read real records from SeaweedMQ - seaweedRecords, err := h.agentClient.ReadRecords(subscriber, recordsToFetch) + // Read records using appropriate client + if h.useBroker && h.brokerClient != nil { + brokerSubscriber, subErr := h.brokerClient.GetOrCreateSubscriber(topic, partition, fetchOffset) + if subErr != nil { + return nil, fmt.Errorf("failed to get broker subscriber: %v", subErr) + } + seaweedRecords, err = h.brokerClient.ReadRecords(brokerSubscriber, recordsToFetch) + } else if h.agentClient != nil { + agentSubscriber, subErr := h.agentClient.GetOrCreateSubscriber(topic, partition, fetchOffset) + if subErr != nil { + return nil, fmt.Errorf("failed to get agent subscriber: %v", subErr) + } + seaweedRecords, err = h.agentClient.ReadRecords(agentSubscriber, recordsToFetch) + } else { + return nil, fmt.Errorf("no client available") + } + if err != nil { // If no records available, return empty batch instead of error return []byte{}, nil @@ -582,9 +616,10 @@ func NewSeaweedMQBrokerHandler(masters string, filerGroup string) (*SeaweedMQHan } return &SeaweedMQHandler{ - agentClient: brokerClient, - topics: make(map[string]*KafkaTopicInfo), - ledgers: make(map[TopicPartitionKey]*offset.Ledger), + brokerClient: brokerClient, + useBroker: true, + topics: make(map[string]*KafkaTopicInfo), + ledgers: make(map[TopicPartitionKey]*offset.Ledger), }, nil } @@ -626,10 +661,311 @@ func discoverBrokers(masterAddresses []string, filerGroup string) ([]string, err return brokers, nil } +// BrokerClient wraps the SeaweedMQ Broker gRPC client for Kafka gateway integration +type BrokerClient struct { + brokerAddress string + conn *grpc.ClientConn + client mq_pb.SeaweedMessagingClient + + // Publisher streams: topic-partition -> stream info + publishersLock sync.RWMutex + publishers map[string]*BrokerPublisherSession + + // Subscriber streams for offset tracking + subscribersLock sync.RWMutex + subscribers map[string]*BrokerSubscriberSession + + ctx context.Context + cancel context.CancelFunc +} + +// BrokerPublisherSession tracks a publishing stream to SeaweedMQ broker +type BrokerPublisherSession struct { + Topic string + Partition int32 + Stream mq_pb.SeaweedMessaging_PublishMessageClient +} + +// BrokerSubscriberSession tracks a subscription stream for offset management +type BrokerSubscriberSession struct { + Topic string + Partition int32 + Stream mq_pb.SeaweedMessaging_SubscribeMessageClient +} + // NewBrokerClient creates a client that connects to a SeaweedMQ broker -// This reuses the AgentClient structure but connects to a broker instead -func NewBrokerClient(brokerAddress string) (*AgentClient, error) { - // For now, reuse the AgentClient implementation - // In the future, this could be enhanced to use broker-specific protocols - return NewAgentClient(brokerAddress) +func NewBrokerClient(brokerAddress string) (*BrokerClient, error) { + ctx, cancel := context.WithCancel(context.Background()) + + conn, err := grpc.DialContext(ctx, brokerAddress, + grpc.WithTransportCredentials(insecure.NewCredentials()), + ) + if err != nil { + cancel() + return nil, fmt.Errorf("failed to connect to broker %s: %v", brokerAddress, err) + } + + client := mq_pb.NewSeaweedMessagingClient(conn) + + return &BrokerClient{ + brokerAddress: brokerAddress, + conn: conn, + client: client, + publishers: make(map[string]*BrokerPublisherSession), + subscribers: make(map[string]*BrokerSubscriberSession), + ctx: ctx, + cancel: cancel, + }, nil +} + +// Close shuts down the broker client and all streams +func (bc *BrokerClient) Close() error { + bc.cancel() + + // Close all publisher streams + bc.publishersLock.Lock() + for key := range bc.publishers { + delete(bc.publishers, key) + } + bc.publishersLock.Unlock() + + // Close all subscriber streams + bc.subscribersLock.Lock() + for key := range bc.subscribers { + delete(bc.subscribers, key) + } + bc.subscribersLock.Unlock() + + return bc.conn.Close() +} + +// PublishRecord publishes a single record to SeaweedMQ broker +func (bc *BrokerClient) PublishRecord(topic string, partition int32, key []byte, value []byte, timestamp int64) (int64, error) { + session, err := bc.getOrCreatePublisher(topic, partition) + if err != nil { + return 0, err + } + + // Send data message using broker API format + dataMsg := &mq_pb.DataMessage{ + Key: key, + Value: value, + TsNs: timestamp, + } + + if err := session.Stream.Send(&mq_pb.PublishMessageRequest{ + Message: &mq_pb.PublishMessageRequest_Data{ + Data: dataMsg, + }, + }); err != nil { + return 0, fmt.Errorf("failed to send data: %v", err) + } + + // Read acknowledgment + resp, err := session.Stream.Recv() + if err != nil { + return 0, fmt.Errorf("failed to receive ack: %v", err) + } + + if resp.Error != "" { + return 0, fmt.Errorf("publish error: %s", resp.Error) + } + + return resp.AckSequence, nil +} + +// getOrCreatePublisher gets or creates a publisher stream for a topic-partition +func (bc *BrokerClient) getOrCreatePublisher(topic string, partition int32) (*BrokerPublisherSession, error) { + key := fmt.Sprintf("%s-%d", topic, partition) + + // Try to get existing publisher + bc.publishersLock.RLock() + if session, exists := bc.publishers[key]; exists { + bc.publishersLock.RUnlock() + return session, nil + } + bc.publishersLock.RUnlock() + + // Create new publisher stream + bc.publishersLock.Lock() + defer bc.publishersLock.Unlock() + + // Double-check after acquiring write lock + if session, exists := bc.publishers[key]; exists { + return session, nil + } + + // Create the stream + stream, err := bc.client.PublishMessage(bc.ctx) + if err != nil { + return nil, fmt.Errorf("failed to create publish stream: %v", err) + } + + // Send init message + if err := stream.Send(&mq_pb.PublishMessageRequest{ + Message: &mq_pb.PublishMessageRequest_Init{ + Init: &mq_pb.PublishMessageRequest_InitMessage{ + Topic: &schema_pb.Topic{ + Namespace: "kafka", + Name: topic, + }, + Partition: &schema_pb.Partition{ + RingSize: 1024, // Standard ring size + RangeStart: partition * 32, + RangeStop: (partition+1)*32 - 1, + }, + AckInterval: 100, + PublisherName: "kafka-gateway", + }, + }, + }); err != nil { + return nil, fmt.Errorf("failed to send init message: %v", err) + } + + session := &BrokerPublisherSession{ + Topic: topic, + Partition: partition, + Stream: stream, + } + + bc.publishers[key] = session + return session, nil +} + +// GetOrCreateSubscriber gets or creates a subscriber for offset tracking +func (bc *BrokerClient) GetOrCreateSubscriber(topic string, partition int32, startOffset int64) (*BrokerSubscriberSession, error) { + key := fmt.Sprintf("%s-%d", topic, partition) + + bc.subscribersLock.RLock() + if session, exists := bc.subscribers[key]; exists { + bc.subscribersLock.RUnlock() + return session, nil + } + bc.subscribersLock.RUnlock() + + // Create new subscriber stream + bc.subscribersLock.Lock() + defer bc.subscribersLock.Unlock() + + if session, exists := bc.subscribers[key]; exists { + return session, nil + } + + stream, err := bc.client.SubscribeMessage(bc.ctx) + if err != nil { + return nil, fmt.Errorf("failed to create subscribe stream: %v", err) + } + + // Send init message + if err := stream.Send(&mq_pb.SubscribeMessageRequest{ + Message: &mq_pb.SubscribeMessageRequest_Init{ + Init: &mq_pb.SubscribeMessageRequest_InitMessage{ + ConsumerGroup: "kafka-gateway", + ConsumerId: fmt.Sprintf("kafka-gateway-%s-%d", topic, partition), + ClientId: "kafka-gateway", + Topic: &schema_pb.Topic{ + Namespace: "kafka", + Name: topic, + }, + PartitionOffset: &schema_pb.PartitionOffset{ + Partition: &schema_pb.Partition{ + RingSize: 1024, + RangeStart: partition * 32, + RangeStop: (partition+1)*32 - 1, + }, + StartTsNs: startOffset, + }, + OffsetType: schema_pb.OffsetType_EXACT_TS_NS, + SlidingWindowSize: 10, + }, + }, + }); err != nil { + return nil, fmt.Errorf("failed to send subscribe init: %v", err) + } + + session := &BrokerSubscriberSession{ + Topic: topic, + Partition: partition, + Stream: stream, + } + + bc.subscribers[key] = session + return session, nil +} + +// ReadRecords reads available records from the subscriber stream +func (bc *BrokerClient) ReadRecords(session *BrokerSubscriberSession, maxRecords int) ([]*SeaweedRecord, error) { + if session == nil { + return nil, fmt.Errorf("subscriber session cannot be nil") + } + + var records []*SeaweedRecord + + for len(records) < maxRecords { + // Try to receive a message with timeout + ctx, cancel := context.WithTimeout(bc.ctx, 100*time.Millisecond) + + select { + case <-ctx.Done(): + cancel() + return records, nil // Return what we have so far + default: + resp, err := session.Stream.Recv() + cancel() + + if err != nil { + // If we have some records, return them; otherwise return error + if len(records) > 0 { + return records, nil + } + return nil, fmt.Errorf("failed to receive record: %v", err) + } + + if dataMsg := resp.GetData(); dataMsg != nil { + record := &SeaweedRecord{ + Key: dataMsg.Key, + Value: dataMsg.Value, + Timestamp: dataMsg.TsNs, + Sequence: 0, // Will be set by offset ledger + } + records = append(records, record) + } + } + } + + return records, nil +} + +// HealthCheck verifies the broker connection is working +func (bc *BrokerClient) HealthCheck() error { + // Create a timeout context for health check + ctx, cancel := context.WithTimeout(bc.ctx, 2*time.Second) + defer cancel() + + // Try to list topics as a health check + _, err := bc.client.ListTopics(ctx, &mq_pb.ListTopicsRequest{}) + if err != nil { + return fmt.Errorf("broker health check failed: %v", err) + } + + return nil +} + +// ClosePublisher closes a specific publisher session +func (bc *BrokerClient) ClosePublisher(topic string, partition int32) error { + key := fmt.Sprintf("%s-%d", topic, partition) + + bc.publishersLock.Lock() + defer bc.publishersLock.Unlock() + + session, exists := bc.publishers[key] + if !exists { + return nil // Already closed or never existed + } + + if session.Stream != nil { + session.Stream.CloseSend() + } + delete(bc.publishers, key) + return nil }