You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
510 lines
14 KiB
510 lines
14 KiB
package integration
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"sync"
|
|
"time"
|
|
|
|
"google.golang.org/grpc"
|
|
"google.golang.org/grpc/credentials/insecure"
|
|
"google.golang.org/protobuf/proto"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/mq/kafka"
|
|
"github.com/seaweedfs/seaweedfs/weed/mq/kafka/offset"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/mq_agent_pb"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
|
|
)
|
|
|
|
// AgentClient wraps the SeaweedMQ Agent gRPC client for Kafka gateway integration
|
|
type AgentClient struct {
|
|
agentAddress string
|
|
conn *grpc.ClientConn
|
|
client mq_agent_pb.SeaweedMessagingAgentClient
|
|
|
|
// Publisher sessions: topic-partition -> session info
|
|
publishersLock sync.RWMutex
|
|
publishers map[string]*PublisherSession
|
|
|
|
// Subscriber sessions for offset tracking
|
|
subscribersLock sync.RWMutex
|
|
subscribers map[string]*SubscriberSession
|
|
|
|
ctx context.Context
|
|
cancel context.CancelFunc
|
|
}
|
|
|
|
// PublisherSession tracks a publishing session to SeaweedMQ
|
|
type PublisherSession struct {
|
|
SessionID int64
|
|
Topic string
|
|
Partition int32
|
|
Stream mq_agent_pb.SeaweedMessagingAgent_PublishRecordClient
|
|
RecordType *schema_pb.RecordType
|
|
LastSequence int64
|
|
}
|
|
|
|
// SubscriberSession tracks a subscription for offset management
|
|
type SubscriberSession struct {
|
|
Topic string
|
|
Partition int32
|
|
Stream mq_agent_pb.SeaweedMessagingAgent_SubscribeRecordClient
|
|
OffsetLedger *offset.Ledger // Still use for Kafka offset translation
|
|
}
|
|
|
|
// SeaweedRecord represents a record received from SeaweedMQ
|
|
type SeaweedRecord struct {
|
|
Key []byte
|
|
Value []byte
|
|
Timestamp int64
|
|
Sequence int64
|
|
}
|
|
|
|
// NewAgentClient creates a new SeaweedMQ Agent client
|
|
func NewAgentClient(agentAddress string) (*AgentClient, error) {
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
|
|
conn, err := grpc.DialContext(ctx, agentAddress,
|
|
grpc.WithTransportCredentials(insecure.NewCredentials()),
|
|
// Don't block - fail fast for invalid addresses
|
|
)
|
|
if err != nil {
|
|
cancel()
|
|
return nil, fmt.Errorf("failed to connect to agent %s: %v", agentAddress, err)
|
|
}
|
|
|
|
client := mq_agent_pb.NewSeaweedMessagingAgentClient(conn)
|
|
|
|
return &AgentClient{
|
|
agentAddress: agentAddress,
|
|
conn: conn,
|
|
client: client,
|
|
publishers: make(map[string]*PublisherSession),
|
|
subscribers: make(map[string]*SubscriberSession),
|
|
ctx: ctx,
|
|
cancel: cancel,
|
|
}, nil
|
|
}
|
|
|
|
// Close shuts down the agent client and all sessions
|
|
func (ac *AgentClient) Close() error {
|
|
ac.cancel()
|
|
|
|
// Close all publisher sessions
|
|
ac.publishersLock.Lock()
|
|
for key, session := range ac.publishers {
|
|
ac.closePublishSessionLocked(session.SessionID)
|
|
delete(ac.publishers, key)
|
|
}
|
|
ac.publishersLock.Unlock()
|
|
|
|
// Close all subscriber sessions
|
|
ac.subscribersLock.Lock()
|
|
for key, session := range ac.subscribers {
|
|
if session.Stream != nil {
|
|
session.Stream.CloseSend()
|
|
}
|
|
delete(ac.subscribers, key)
|
|
}
|
|
ac.subscribersLock.Unlock()
|
|
|
|
return ac.conn.Close()
|
|
}
|
|
|
|
// GetOrCreatePublisher gets or creates a publisher session for a topic-partition
|
|
func (ac *AgentClient) GetOrCreatePublisher(topic string, partition int32) (*PublisherSession, error) {
|
|
key := fmt.Sprintf("%s-%d", topic, partition)
|
|
|
|
// Try to get existing publisher
|
|
ac.publishersLock.RLock()
|
|
if session, exists := ac.publishers[key]; exists {
|
|
ac.publishersLock.RUnlock()
|
|
return session, nil
|
|
}
|
|
ac.publishersLock.RUnlock()
|
|
|
|
// Create new publisher session
|
|
ac.publishersLock.Lock()
|
|
defer ac.publishersLock.Unlock()
|
|
|
|
// Double-check after acquiring write lock
|
|
if session, exists := ac.publishers[key]; exists {
|
|
return session, nil
|
|
}
|
|
|
|
// Create the session
|
|
session, err := ac.createPublishSession(topic, partition)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
ac.publishers[key] = session
|
|
return session, nil
|
|
}
|
|
|
|
// createPublishSession creates a new publishing session with SeaweedMQ Agent
|
|
func (ac *AgentClient) createPublishSession(topic string, partition int32) (*PublisherSession, error) {
|
|
// Create a basic record type for Kafka messages
|
|
recordType := &schema_pb.RecordType{
|
|
Fields: []*schema_pb.Field{
|
|
{
|
|
Name: "key",
|
|
FieldIndex: 0,
|
|
Type: &schema_pb.Type{
|
|
Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_BYTES},
|
|
},
|
|
IsRequired: false,
|
|
IsRepeated: false,
|
|
},
|
|
{
|
|
Name: "value",
|
|
FieldIndex: 1,
|
|
Type: &schema_pb.Type{
|
|
Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_BYTES},
|
|
},
|
|
IsRequired: true,
|
|
IsRepeated: false,
|
|
},
|
|
{
|
|
Name: "timestamp",
|
|
FieldIndex: 2,
|
|
Type: &schema_pb.Type{
|
|
Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_TIMESTAMP},
|
|
},
|
|
IsRequired: false,
|
|
IsRepeated: false,
|
|
},
|
|
},
|
|
}
|
|
|
|
// Start publish session
|
|
startReq := &mq_agent_pb.StartPublishSessionRequest{
|
|
Topic: &schema_pb.Topic{
|
|
Namespace: "kafka", // Use "kafka" namespace for Kafka messages
|
|
Name: topic,
|
|
},
|
|
PartitionCount: 1, // For Phase 2, use single partition
|
|
RecordType: recordType,
|
|
PublisherName: "kafka-gateway",
|
|
}
|
|
|
|
startResp, err := ac.client.StartPublishSession(ac.ctx, startReq)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to start publish session: %v", err)
|
|
}
|
|
|
|
if startResp.Error != "" {
|
|
return nil, fmt.Errorf("publish session error: %s", startResp.Error)
|
|
}
|
|
|
|
// Create streaming connection
|
|
stream, err := ac.client.PublishRecord(ac.ctx)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to create publish stream: %v", err)
|
|
}
|
|
|
|
session := &PublisherSession{
|
|
SessionID: startResp.SessionId,
|
|
Topic: topic,
|
|
Partition: partition,
|
|
Stream: stream,
|
|
RecordType: recordType,
|
|
}
|
|
|
|
return session, nil
|
|
}
|
|
|
|
// PublishRecord publishes a single record to SeaweedMQ
|
|
func (ac *AgentClient) PublishRecord(topic string, partition int32, key []byte, value []byte, timestamp int64) (int64, error) {
|
|
session, err := ac.GetOrCreatePublisher(topic, partition)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
|
|
// Convert to SeaweedMQ record format
|
|
record := &schema_pb.RecordValue{
|
|
Fields: map[string]*schema_pb.Value{
|
|
"key": {
|
|
Kind: &schema_pb.Value_BytesValue{BytesValue: key},
|
|
},
|
|
"value": {
|
|
Kind: &schema_pb.Value_BytesValue{BytesValue: value},
|
|
},
|
|
"timestamp": {
|
|
Kind: &schema_pb.Value_TimestampValue{
|
|
TimestampValue: &schema_pb.TimestampValue{
|
|
TimestampMicros: timestamp / 1000, // Convert nanoseconds to microseconds
|
|
IsUtc: true,
|
|
},
|
|
},
|
|
},
|
|
},
|
|
}
|
|
|
|
// Send publish request
|
|
req := &mq_agent_pb.PublishRecordRequest{
|
|
SessionId: session.SessionID,
|
|
Key: key,
|
|
Value: record,
|
|
}
|
|
|
|
if err := session.Stream.Send(req); err != nil {
|
|
return 0, fmt.Errorf("failed to send record: %v", err)
|
|
}
|
|
|
|
// Read acknowledgment (this is a streaming API, so we should read the response)
|
|
resp, err := session.Stream.Recv()
|
|
if err != nil {
|
|
return 0, fmt.Errorf("failed to receive ack: %v", err)
|
|
}
|
|
|
|
if resp.Error != "" {
|
|
return 0, fmt.Errorf("publish error: %s", resp.Error)
|
|
}
|
|
|
|
session.LastSequence = resp.AckSequence
|
|
return resp.AckSequence, nil
|
|
}
|
|
|
|
// PublishRecordValue publishes a RecordValue message to SeaweedMQ via agent
|
|
func (ac *AgentClient) PublishRecordValue(topic string, partition int32, key []byte, recordValueBytes []byte, timestamp int64) (int64, error) {
|
|
session, err := ac.GetOrCreatePublisher(topic, partition)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
|
|
// Unmarshal the RecordValue to send it properly
|
|
record := &schema_pb.RecordValue{}
|
|
if err := proto.Unmarshal(recordValueBytes, record); err != nil {
|
|
return 0, fmt.Errorf("failed to unmarshal RecordValue: %v", err)
|
|
}
|
|
|
|
// Send the RecordValue directly
|
|
req := &mq_agent_pb.PublishRecordRequest{
|
|
SessionId: session.SessionID,
|
|
Key: key,
|
|
Value: record,
|
|
}
|
|
|
|
if err := session.Stream.Send(req); err != nil {
|
|
return 0, fmt.Errorf("failed to send RecordValue: %v", err)
|
|
}
|
|
|
|
// Read acknowledgment
|
|
resp, err := session.Stream.Recv()
|
|
if err != nil {
|
|
return 0, fmt.Errorf("failed to receive RecordValue ack: %v", err)
|
|
}
|
|
|
|
if resp.Error != "" {
|
|
return 0, fmt.Errorf("RecordValue publish error: %s", resp.Error)
|
|
}
|
|
|
|
session.LastSequence = resp.AckSequence
|
|
return resp.AckSequence, nil
|
|
}
|
|
|
|
// GetOrCreateSubscriber gets or creates a subscriber for offset tracking
|
|
func (ac *AgentClient) GetOrCreateSubscriber(topic string, partition int32, startOffset int64) (*SubscriberSession, error) {
|
|
key := fmt.Sprintf("%s-%d", topic, partition)
|
|
|
|
ac.subscribersLock.RLock()
|
|
if session, exists := ac.subscribers[key]; exists {
|
|
ac.subscribersLock.RUnlock()
|
|
return session, nil
|
|
}
|
|
ac.subscribersLock.RUnlock()
|
|
|
|
// Create new subscriber session
|
|
ac.subscribersLock.Lock()
|
|
defer ac.subscribersLock.Unlock()
|
|
|
|
if session, exists := ac.subscribers[key]; exists {
|
|
return session, nil
|
|
}
|
|
|
|
session, err := ac.createSubscribeSession(topic, partition, startOffset)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
ac.subscribers[key] = session
|
|
return session, nil
|
|
}
|
|
|
|
// createSubscribeSession creates a subscriber session for reading messages
|
|
func (ac *AgentClient) createSubscribeSession(topic string, partition int32, startOffset int64) (*SubscriberSession, error) {
|
|
stream, err := ac.client.SubscribeRecord(ac.ctx)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to create subscribe stream: %v", err)
|
|
}
|
|
|
|
// Send initial subscribe request
|
|
initReq := &mq_agent_pb.SubscribeRecordRequest{
|
|
Init: &mq_agent_pb.SubscribeRecordRequest_InitSubscribeRecordRequest{
|
|
ConsumerGroup: "kafka-gateway",
|
|
ConsumerGroupInstanceId: fmt.Sprintf("kafka-gateway-%s-%d", topic, partition),
|
|
Topic: &schema_pb.Topic{
|
|
Namespace: "kafka",
|
|
Name: topic,
|
|
},
|
|
PartitionOffsets: []*schema_pb.PartitionOffset{
|
|
{
|
|
// Map Kafka partition to specific SMQ ring range using centralized utility
|
|
Partition: kafka.CreateSMQPartition(partition, time.Now().UnixNano()),
|
|
StartTsNs: startOffset, // Use offset as timestamp for now
|
|
},
|
|
},
|
|
OffsetType: schema_pb.OffsetType_EXACT_TS_NS,
|
|
MaxSubscribedPartitions: 1,
|
|
SlidingWindowSize: 10,
|
|
},
|
|
}
|
|
|
|
if err := stream.Send(initReq); err != nil {
|
|
return nil, fmt.Errorf("failed to send subscribe init: %v", err)
|
|
}
|
|
|
|
session := &SubscriberSession{
|
|
Topic: topic,
|
|
Partition: partition,
|
|
Stream: stream,
|
|
OffsetLedger: offset.NewLedger(), // Keep Kafka offset tracking
|
|
}
|
|
|
|
return session, nil
|
|
}
|
|
|
|
// ClosePublisher closes a specific publisher session
|
|
func (ac *AgentClient) ClosePublisher(topic string, partition int32) error {
|
|
key := fmt.Sprintf("%s-%d", topic, partition)
|
|
|
|
ac.publishersLock.Lock()
|
|
defer ac.publishersLock.Unlock()
|
|
|
|
session, exists := ac.publishers[key]
|
|
if !exists {
|
|
return nil // Already closed or never existed
|
|
}
|
|
|
|
err := ac.closePublishSessionLocked(session.SessionID)
|
|
delete(ac.publishers, key)
|
|
return err
|
|
}
|
|
|
|
// closePublishSessionLocked closes a publish session (must be called with lock held)
|
|
func (ac *AgentClient) closePublishSessionLocked(sessionID int64) error {
|
|
closeReq := &mq_agent_pb.ClosePublishSessionRequest{
|
|
SessionId: sessionID,
|
|
}
|
|
|
|
_, err := ac.client.ClosePublishSession(ac.ctx, closeReq)
|
|
return err
|
|
}
|
|
|
|
// ReadRecords reads available records from the subscriber session
|
|
func (ac *AgentClient) ReadRecords(session *SubscriberSession, maxRecords int) ([]*SeaweedRecord, error) {
|
|
if session == nil {
|
|
return nil, fmt.Errorf("subscriber session cannot be nil")
|
|
}
|
|
|
|
var records []*SeaweedRecord
|
|
|
|
for len(records) < maxRecords {
|
|
// Try to receive a message with timeout to avoid blocking indefinitely
|
|
ctx, cancel := context.WithTimeout(ac.ctx, 100*time.Millisecond)
|
|
|
|
select {
|
|
case <-ctx.Done():
|
|
cancel()
|
|
return records, nil // Return what we have so far
|
|
default:
|
|
// Try to receive a record
|
|
resp, err := session.Stream.Recv()
|
|
cancel()
|
|
|
|
if err != nil {
|
|
// If we have some records, return them; otherwise return error
|
|
if len(records) > 0 {
|
|
return records, nil
|
|
}
|
|
return nil, fmt.Errorf("failed to receive record: %v", err)
|
|
}
|
|
|
|
if resp.Value != nil || resp.Key != nil {
|
|
// Convert SeaweedMQ record to our format
|
|
record := &SeaweedRecord{
|
|
Sequence: resp.Offset, // Use offset as sequence
|
|
Timestamp: resp.TsNs, // Timestamp in nanoseconds
|
|
Key: resp.Key, // Raw key
|
|
}
|
|
|
|
// Extract value from the structured record
|
|
if resp.Value != nil && resp.Value.Fields != nil {
|
|
if valueValue, exists := resp.Value.Fields["value"]; exists && valueValue.GetBytesValue() != nil {
|
|
record.Value = valueValue.GetBytesValue()
|
|
}
|
|
// Also check for key in structured fields if raw key is empty
|
|
if len(record.Key) == 0 {
|
|
if keyValue, exists := resp.Value.Fields["key"]; exists && keyValue.GetBytesValue() != nil {
|
|
record.Key = keyValue.GetBytesValue()
|
|
}
|
|
}
|
|
// Override timestamp if available in structured fields
|
|
if timestampValue, exists := resp.Value.Fields["timestamp"]; exists && timestampValue.GetTimestampValue() != nil {
|
|
record.Timestamp = timestampValue.GetTimestampValue().TimestampMicros * 1000 // Convert to nanoseconds
|
|
}
|
|
}
|
|
|
|
records = append(records, record)
|
|
}
|
|
}
|
|
}
|
|
|
|
return records, nil
|
|
}
|
|
|
|
// HealthCheck verifies the agent connection is working
|
|
func (ac *AgentClient) HealthCheck() error {
|
|
// Create a timeout context for health check
|
|
ctx, cancel := context.WithTimeout(ac.ctx, 2*time.Second)
|
|
defer cancel()
|
|
|
|
// Try to start and immediately close a dummy session
|
|
req := &mq_agent_pb.StartPublishSessionRequest{
|
|
Topic: &schema_pb.Topic{
|
|
Namespace: "kafka",
|
|
Name: "_health_check",
|
|
},
|
|
PartitionCount: 1,
|
|
RecordType: &schema_pb.RecordType{
|
|
Fields: []*schema_pb.Field{
|
|
{
|
|
Name: "test",
|
|
FieldIndex: 0,
|
|
Type: &schema_pb.Type{
|
|
Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_STRING},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
PublisherName: "health-check",
|
|
}
|
|
|
|
resp, err := ac.client.StartPublishSession(ctx, req)
|
|
if err != nil {
|
|
return fmt.Errorf("health check failed: %v", err)
|
|
}
|
|
|
|
if resp.Error != "" {
|
|
return fmt.Errorf("health check error: %s", resp.Error)
|
|
}
|
|
|
|
// Close the health check session
|
|
closeReq := &mq_agent_pb.ClosePublishSessionRequest{
|
|
SessionId: resp.SessionId,
|
|
}
|
|
_, _ = ac.client.ClosePublishSession(ctx, closeReq)
|
|
|
|
return nil
|
|
}
|