Browse Source

use broker client

pull/7231/head
chrislu 2 months ago
parent
commit
bd7b07e90c
  1. 368
      weed/mq/kafka/integration/seaweedmq_handler.go

368
weed/mq/kafka/integration/seaweedmq_handler.go

@ -14,12 +14,15 @@ import (
"github.com/seaweedfs/seaweedfs/weed/cluster" "github.com/seaweedfs/seaweedfs/weed/cluster"
"github.com/seaweedfs/seaweedfs/weed/mq/kafka/offset" "github.com/seaweedfs/seaweedfs/weed/mq/kafka/offset"
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb" "github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
) )
// SeaweedMQHandler integrates Kafka protocol handlers with real SeaweedMQ storage // SeaweedMQHandler integrates Kafka protocol handlers with real SeaweedMQ storage
type SeaweedMQHandler struct { type SeaweedMQHandler struct {
agentClient *AgentClient
agentClient *AgentClient // For agent-based connections
brokerClient *BrokerClient // For broker-based connections
useBroker bool // Flag to determine which client to use
// Topic registry - still keep track of Kafka topics // Topic registry - still keep track of Kafka topics
topicsMu sync.RWMutex topicsMu sync.RWMutex
@ -61,6 +64,7 @@ func NewSeaweedMQHandler(agentAddress string) (*SeaweedMQHandler, error) {
return &SeaweedMQHandler{ return &SeaweedMQHandler{
agentClient: agentClient, agentClient: agentClient,
useBroker: false,
topics: make(map[string]*KafkaTopicInfo), topics: make(map[string]*KafkaTopicInfo),
ledgers: make(map[TopicPartitionKey]*offset.Ledger), ledgers: make(map[TopicPartitionKey]*offset.Ledger),
}, nil }, nil
@ -68,8 +72,13 @@ func NewSeaweedMQHandler(agentAddress string) (*SeaweedMQHandler, error) {
// Close shuts down the handler and all connections // Close shuts down the handler and all connections
func (h *SeaweedMQHandler) Close() error { func (h *SeaweedMQHandler) Close() error {
if h.useBroker && h.brokerClient != nil {
return h.brokerClient.Close()
} else if h.agentClient != nil {
return h.agentClient.Close() return h.agentClient.Close()
} }
return nil
}
// CreateTopic creates a new topic in both Kafka registry and SeaweedMQ // CreateTopic creates a new topic in both Kafka registry and SeaweedMQ
func (h *SeaweedMQHandler) CreateTopic(name string, partitions int32) error { func (h *SeaweedMQHandler) CreateTopic(name string, partitions int32) error {
@ -121,8 +130,12 @@ func (h *SeaweedMQHandler) DeleteTopic(name string) error {
// Close all publisher sessions for this topic // Close all publisher sessions for this topic
for partitionID := int32(0); partitionID < topicInfo.Partitions; partitionID++ { for partitionID := int32(0); partitionID < topicInfo.Partitions; partitionID++ {
if h.useBroker && h.brokerClient != nil {
h.brokerClient.ClosePublisher(name, partitionID)
} else if h.agentClient != nil {
h.agentClient.ClosePublisher(name, partitionID) h.agentClient.ClosePublisher(name, partitionID)
} }
}
// Remove from registry // Remove from registry
delete(h.topics, name) delete(h.topics, name)
@ -179,9 +192,17 @@ func (h *SeaweedMQHandler) ProduceRecord(topic string, partition int32, key []by
timestamp := time.Now().UnixNano() timestamp := time.Now().UnixNano()
// Publish to SeaweedMQ // Publish to SeaweedMQ
_, err := h.agentClient.PublishRecord(topic, partition, key, value, timestamp)
if err != nil {
return 0, fmt.Errorf("failed to publish to SeaweedMQ: %v", err)
var publishErr error
if h.useBroker && h.brokerClient != nil {
_, publishErr = h.brokerClient.PublishRecord(topic, partition, key, value, timestamp)
} else if h.agentClient != nil {
_, publishErr = h.agentClient.PublishRecord(topic, partition, key, value, timestamp)
} else {
publishErr = fmt.Errorf("no client available")
}
if publishErr != nil {
return 0, fmt.Errorf("failed to publish to SeaweedMQ: %v", publishErr)
} }
// Update Kafka offset ledger // Update Kafka offset ledger
@ -256,10 +277,8 @@ func (h *SeaweedMQHandler) FetchRecords(topic string, partition int32, fetchOffs
} }
// Get or create subscriber session for this topic/partition // Get or create subscriber session for this topic/partition
subscriber, err := h.agentClient.GetOrCreateSubscriber(topic, partition, fetchOffset)
if err != nil {
return nil, fmt.Errorf("failed to get subscriber: %v", err)
}
var seaweedRecords []*SeaweedRecord
var err error
// Calculate how many records to fetch // Calculate how many records to fetch
recordsToFetch := int(highWaterMark - fetchOffset) recordsToFetch := int(highWaterMark - fetchOffset)
@ -267,8 +286,23 @@ func (h *SeaweedMQHandler) FetchRecords(topic string, partition int32, fetchOffs
recordsToFetch = 100 // Limit batch size recordsToFetch = 100 // Limit batch size
} }
// Read real records from SeaweedMQ
seaweedRecords, err := h.agentClient.ReadRecords(subscriber, recordsToFetch)
// Read records using appropriate client
if h.useBroker && h.brokerClient != nil {
brokerSubscriber, subErr := h.brokerClient.GetOrCreateSubscriber(topic, partition, fetchOffset)
if subErr != nil {
return nil, fmt.Errorf("failed to get broker subscriber: %v", subErr)
}
seaweedRecords, err = h.brokerClient.ReadRecords(brokerSubscriber, recordsToFetch)
} else if h.agentClient != nil {
agentSubscriber, subErr := h.agentClient.GetOrCreateSubscriber(topic, partition, fetchOffset)
if subErr != nil {
return nil, fmt.Errorf("failed to get agent subscriber: %v", subErr)
}
seaweedRecords, err = h.agentClient.ReadRecords(agentSubscriber, recordsToFetch)
} else {
return nil, fmt.Errorf("no client available")
}
if err != nil { if err != nil {
// If no records available, return empty batch instead of error // If no records available, return empty batch instead of error
return []byte{}, nil return []byte{}, nil
@ -582,7 +616,8 @@ func NewSeaweedMQBrokerHandler(masters string, filerGroup string) (*SeaweedMQHan
} }
return &SeaweedMQHandler{ return &SeaweedMQHandler{
agentClient: brokerClient,
brokerClient: brokerClient,
useBroker: true,
topics: make(map[string]*KafkaTopicInfo), topics: make(map[string]*KafkaTopicInfo),
ledgers: make(map[TopicPartitionKey]*offset.Ledger), ledgers: make(map[TopicPartitionKey]*offset.Ledger),
}, nil }, nil
@ -626,10 +661,311 @@ func discoverBrokers(masterAddresses []string, filerGroup string) ([]string, err
return brokers, nil return brokers, nil
} }
// BrokerClient wraps the SeaweedMQ Broker gRPC client for Kafka gateway integration
type BrokerClient struct {
brokerAddress string
conn *grpc.ClientConn
client mq_pb.SeaweedMessagingClient
// Publisher streams: topic-partition -> stream info
publishersLock sync.RWMutex
publishers map[string]*BrokerPublisherSession
// Subscriber streams for offset tracking
subscribersLock sync.RWMutex
subscribers map[string]*BrokerSubscriberSession
ctx context.Context
cancel context.CancelFunc
}
// BrokerPublisherSession tracks a publishing stream to SeaweedMQ broker
type BrokerPublisherSession struct {
Topic string
Partition int32
Stream mq_pb.SeaweedMessaging_PublishMessageClient
}
// BrokerSubscriberSession tracks a subscription stream for offset management
type BrokerSubscriberSession struct {
Topic string
Partition int32
Stream mq_pb.SeaweedMessaging_SubscribeMessageClient
}
// NewBrokerClient creates a client that connects to a SeaweedMQ broker // NewBrokerClient creates a client that connects to a SeaweedMQ broker
// This reuses the AgentClient structure but connects to a broker instead
func NewBrokerClient(brokerAddress string) (*AgentClient, error) {
// For now, reuse the AgentClient implementation
// In the future, this could be enhanced to use broker-specific protocols
return NewAgentClient(brokerAddress)
func NewBrokerClient(brokerAddress string) (*BrokerClient, error) {
ctx, cancel := context.WithCancel(context.Background())
conn, err := grpc.DialContext(ctx, brokerAddress,
grpc.WithTransportCredentials(insecure.NewCredentials()),
)
if err != nil {
cancel()
return nil, fmt.Errorf("failed to connect to broker %s: %v", brokerAddress, err)
}
client := mq_pb.NewSeaweedMessagingClient(conn)
return &BrokerClient{
brokerAddress: brokerAddress,
conn: conn,
client: client,
publishers: make(map[string]*BrokerPublisherSession),
subscribers: make(map[string]*BrokerSubscriberSession),
ctx: ctx,
cancel: cancel,
}, nil
}
// Close shuts down the broker client and all streams
func (bc *BrokerClient) Close() error {
bc.cancel()
// Close all publisher streams
bc.publishersLock.Lock()
for key := range bc.publishers {
delete(bc.publishers, key)
}
bc.publishersLock.Unlock()
// Close all subscriber streams
bc.subscribersLock.Lock()
for key := range bc.subscribers {
delete(bc.subscribers, key)
}
bc.subscribersLock.Unlock()
return bc.conn.Close()
}
// PublishRecord publishes a single record to SeaweedMQ broker
func (bc *BrokerClient) PublishRecord(topic string, partition int32, key []byte, value []byte, timestamp int64) (int64, error) {
session, err := bc.getOrCreatePublisher(topic, partition)
if err != nil {
return 0, err
}
// Send data message using broker API format
dataMsg := &mq_pb.DataMessage{
Key: key,
Value: value,
TsNs: timestamp,
}
if err := session.Stream.Send(&mq_pb.PublishMessageRequest{
Message: &mq_pb.PublishMessageRequest_Data{
Data: dataMsg,
},
}); err != nil {
return 0, fmt.Errorf("failed to send data: %v", err)
}
// Read acknowledgment
resp, err := session.Stream.Recv()
if err != nil {
return 0, fmt.Errorf("failed to receive ack: %v", err)
}
if resp.Error != "" {
return 0, fmt.Errorf("publish error: %s", resp.Error)
}
return resp.AckSequence, nil
}
// getOrCreatePublisher gets or creates a publisher stream for a topic-partition
func (bc *BrokerClient) getOrCreatePublisher(topic string, partition int32) (*BrokerPublisherSession, error) {
key := fmt.Sprintf("%s-%d", topic, partition)
// Try to get existing publisher
bc.publishersLock.RLock()
if session, exists := bc.publishers[key]; exists {
bc.publishersLock.RUnlock()
return session, nil
}
bc.publishersLock.RUnlock()
// Create new publisher stream
bc.publishersLock.Lock()
defer bc.publishersLock.Unlock()
// Double-check after acquiring write lock
if session, exists := bc.publishers[key]; exists {
return session, nil
}
// Create the stream
stream, err := bc.client.PublishMessage(bc.ctx)
if err != nil {
return nil, fmt.Errorf("failed to create publish stream: %v", err)
}
// Send init message
if err := stream.Send(&mq_pb.PublishMessageRequest{
Message: &mq_pb.PublishMessageRequest_Init{
Init: &mq_pb.PublishMessageRequest_InitMessage{
Topic: &schema_pb.Topic{
Namespace: "kafka",
Name: topic,
},
Partition: &schema_pb.Partition{
RingSize: 1024, // Standard ring size
RangeStart: partition * 32,
RangeStop: (partition+1)*32 - 1,
},
AckInterval: 100,
PublisherName: "kafka-gateway",
},
},
}); err != nil {
return nil, fmt.Errorf("failed to send init message: %v", err)
}
session := &BrokerPublisherSession{
Topic: topic,
Partition: partition,
Stream: stream,
}
bc.publishers[key] = session
return session, nil
}
// GetOrCreateSubscriber gets or creates a subscriber for offset tracking
func (bc *BrokerClient) GetOrCreateSubscriber(topic string, partition int32, startOffset int64) (*BrokerSubscriberSession, error) {
key := fmt.Sprintf("%s-%d", topic, partition)
bc.subscribersLock.RLock()
if session, exists := bc.subscribers[key]; exists {
bc.subscribersLock.RUnlock()
return session, nil
}
bc.subscribersLock.RUnlock()
// Create new subscriber stream
bc.subscribersLock.Lock()
defer bc.subscribersLock.Unlock()
if session, exists := bc.subscribers[key]; exists {
return session, nil
}
stream, err := bc.client.SubscribeMessage(bc.ctx)
if err != nil {
return nil, fmt.Errorf("failed to create subscribe stream: %v", err)
}
// Send init message
if err := stream.Send(&mq_pb.SubscribeMessageRequest{
Message: &mq_pb.SubscribeMessageRequest_Init{
Init: &mq_pb.SubscribeMessageRequest_InitMessage{
ConsumerGroup: "kafka-gateway",
ConsumerId: fmt.Sprintf("kafka-gateway-%s-%d", topic, partition),
ClientId: "kafka-gateway",
Topic: &schema_pb.Topic{
Namespace: "kafka",
Name: topic,
},
PartitionOffset: &schema_pb.PartitionOffset{
Partition: &schema_pb.Partition{
RingSize: 1024,
RangeStart: partition * 32,
RangeStop: (partition+1)*32 - 1,
},
StartTsNs: startOffset,
},
OffsetType: schema_pb.OffsetType_EXACT_TS_NS,
SlidingWindowSize: 10,
},
},
}); err != nil {
return nil, fmt.Errorf("failed to send subscribe init: %v", err)
}
session := &BrokerSubscriberSession{
Topic: topic,
Partition: partition,
Stream: stream,
}
bc.subscribers[key] = session
return session, nil
}
// ReadRecords reads available records from the subscriber stream
func (bc *BrokerClient) ReadRecords(session *BrokerSubscriberSession, maxRecords int) ([]*SeaweedRecord, error) {
if session == nil {
return nil, fmt.Errorf("subscriber session cannot be nil")
}
var records []*SeaweedRecord
for len(records) < maxRecords {
// Try to receive a message with timeout
ctx, cancel := context.WithTimeout(bc.ctx, 100*time.Millisecond)
select {
case <-ctx.Done():
cancel()
return records, nil // Return what we have so far
default:
resp, err := session.Stream.Recv()
cancel()
if err != nil {
// If we have some records, return them; otherwise return error
if len(records) > 0 {
return records, nil
}
return nil, fmt.Errorf("failed to receive record: %v", err)
}
if dataMsg := resp.GetData(); dataMsg != nil {
record := &SeaweedRecord{
Key: dataMsg.Key,
Value: dataMsg.Value,
Timestamp: dataMsg.TsNs,
Sequence: 0, // Will be set by offset ledger
}
records = append(records, record)
}
}
}
return records, nil
}
// HealthCheck verifies the broker connection is working
func (bc *BrokerClient) HealthCheck() error {
// Create a timeout context for health check
ctx, cancel := context.WithTimeout(bc.ctx, 2*time.Second)
defer cancel()
// Try to list topics as a health check
_, err := bc.client.ListTopics(ctx, &mq_pb.ListTopicsRequest{})
if err != nil {
return fmt.Errorf("broker health check failed: %v", err)
}
return nil
}
// ClosePublisher closes a specific publisher session
func (bc *BrokerClient) ClosePublisher(topic string, partition int32) error {
key := fmt.Sprintf("%s-%d", topic, partition)
bc.publishersLock.Lock()
defer bc.publishersLock.Unlock()
session, exists := bc.publishers[key]
if !exists {
return nil // Already closed or never existed
}
if session.Stream != nil {
session.Stream.CloseSend()
}
delete(bc.publishers, key)
return nil
} }
Loading…
Cancel
Save