You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

380 lines
12 KiB

package offset
import (
"fmt"
"sync"
"time"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_agent_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
)
// SMQOffsetIntegration provides integration between offset management and SMQ broker
type SMQOffsetIntegration struct {
mu sync.RWMutex
offsetAssigner *OffsetAssigner
offsetSubscriber *OffsetSubscriber
offsetSeeker *OffsetSeeker
}
// NewSMQOffsetIntegration creates a new SMQ offset integration
func NewSMQOffsetIntegration(storage OffsetStorage) *SMQOffsetIntegration {
registry := NewPartitionOffsetRegistry(storage)
assigner := &OffsetAssigner{registry: registry}
return &SMQOffsetIntegration{
offsetAssigner: assigner,
offsetSubscriber: NewOffsetSubscriber(registry),
offsetSeeker: NewOffsetSeeker(registry),
}
}
// PublishRecord publishes a record and assigns it an offset
func (integration *SMQOffsetIntegration) PublishRecord(
namespace, topicName string,
partition *schema_pb.Partition,
key []byte,
value *schema_pb.RecordValue,
) (*mq_agent_pb.PublishRecordResponse, error) {
// Assign offset for this record
result := integration.offsetAssigner.AssignSingleOffset(namespace, topicName, partition)
if result.Error != nil {
return &mq_agent_pb.PublishRecordResponse{
Error: fmt.Sprintf("Failed to assign offset: %v", result.Error),
}, nil
}
assignment := result.Assignment
// Note: Removed in-memory mapping storage to prevent memory leaks
// Record-to-offset mappings are now handled by persistent storage layer
// Return response with offset information
return &mq_agent_pb.PublishRecordResponse{
AckSequence: assignment.Offset, // Use offset as ack sequence for now
BaseOffset: assignment.Offset,
LastOffset: assignment.Offset,
Error: "",
}, nil
}
// PublishRecordBatch publishes a batch of records and assigns them offsets
func (integration *SMQOffsetIntegration) PublishRecordBatch(
namespace, topicName string,
partition *schema_pb.Partition,
records []PublishRecordRequest,
) (*mq_agent_pb.PublishRecordResponse, error) {
if len(records) == 0 {
return &mq_agent_pb.PublishRecordResponse{
Error: "Empty record batch",
}, nil
}
// Assign batch of offsets
result := integration.offsetAssigner.AssignBatchOffsets(namespace, topicName, partition, int64(len(records)))
if result.Error != nil {
return &mq_agent_pb.PublishRecordResponse{
Error: fmt.Sprintf("Failed to assign batch offsets: %v", result.Error),
}, nil
}
batch := result.Batch
// Note: Removed in-memory mapping storage to prevent memory leaks
// Batch record-to-offset mappings are now handled by persistent storage layer
return &mq_agent_pb.PublishRecordResponse{
AckSequence: batch.LastOffset, // Use last offset as ack sequence
BaseOffset: batch.BaseOffset,
LastOffset: batch.LastOffset,
Error: "",
}, nil
}
// CreateSubscription creates an offset-based subscription
func (integration *SMQOffsetIntegration) CreateSubscription(
subscriptionID string,
namespace, topicName string,
partition *schema_pb.Partition,
offsetType schema_pb.OffsetType,
startOffset int64,
) (*OffsetSubscription, error) {
return integration.offsetSubscriber.CreateSubscription(
subscriptionID,
namespace, topicName,
partition,
offsetType,
startOffset,
)
}
// SubscribeRecords subscribes to records starting from a specific offset
func (integration *SMQOffsetIntegration) SubscribeRecords(
subscription *OffsetSubscription,
maxRecords int64,
) ([]*mq_agent_pb.SubscribeRecordResponse, error) {
if !subscription.IsActive {
return nil, fmt.Errorf("subscription is not active")
}
// Get the range of offsets to read
offsetRange, err := subscription.GetOffsetRange(maxRecords)
if err != nil {
return nil, fmt.Errorf("failed to get offset range: %w", err)
}
if offsetRange.Count == 0 {
// No records available
return []*mq_agent_pb.SubscribeRecordResponse{}, nil
}
// TODO: This is where we would integrate with SMQ's actual storage layer
// For now, return mock responses with offset information
responses := make([]*mq_agent_pb.SubscribeRecordResponse, offsetRange.Count)
for i := int64(0); i < offsetRange.Count; i++ {
offset := offsetRange.StartOffset + i
responses[i] = &mq_agent_pb.SubscribeRecordResponse{
Key: []byte(fmt.Sprintf("key-%d", offset)),
Value: &schema_pb.RecordValue{}, // Mock value
TsNs: offset * 1000000, // Mock timestamp based on offset
Offset: offset,
IsEndOfStream: false,
IsEndOfTopic: false,
Error: "",
}
}
// Advance the subscription
subscription.AdvanceOffsetBy(offsetRange.Count)
return responses, nil
}
// GetHighWaterMark returns the high water mark for a partition
func (integration *SMQOffsetIntegration) GetHighWaterMark(namespace, topicName string, partition *schema_pb.Partition) (int64, error) {
return integration.offsetAssigner.GetHighWaterMark(namespace, topicName, partition)
}
// SeekSubscription seeks a subscription to a specific offset
func (integration *SMQOffsetIntegration) SeekSubscription(
subscriptionID string,
offset int64,
) error {
subscription, err := integration.offsetSubscriber.GetSubscription(subscriptionID)
if err != nil {
return fmt.Errorf("subscription not found: %w", err)
}
return subscription.SeekToOffset(offset)
}
// GetSubscriptionLag returns the lag for a subscription
func (integration *SMQOffsetIntegration) GetSubscriptionLag(subscriptionID string) (int64, error) {
subscription, err := integration.offsetSubscriber.GetSubscription(subscriptionID)
if err != nil {
return 0, fmt.Errorf("subscription not found: %w", err)
}
return subscription.GetLag()
}
// CloseSubscription closes a subscription
func (integration *SMQOffsetIntegration) CloseSubscription(subscriptionID string) error {
return integration.offsetSubscriber.CloseSubscription(subscriptionID)
}
// ValidateOffsetRange validates an offset range for a partition
func (integration *SMQOffsetIntegration) ValidateOffsetRange(
namespace, topicName string,
partition *schema_pb.Partition,
startOffset, endOffset int64,
) error {
return integration.offsetSeeker.ValidateOffsetRange(namespace, topicName, partition, startOffset, endOffset)
}
// GetAvailableOffsetRange returns the available offset range for a partition
func (integration *SMQOffsetIntegration) GetAvailableOffsetRange(namespace, topicName string, partition *schema_pb.Partition) (*OffsetRange, error) {
return integration.offsetSeeker.GetAvailableOffsetRange(namespace, topicName, partition)
}
// PublishRecordRequest represents a record to be published
type PublishRecordRequest struct {
Key []byte
Value *schema_pb.RecordValue
}
// OffsetMetrics provides metrics about offset usage
type OffsetMetrics struct {
PartitionCount int64
TotalOffsets int64
ActiveSubscriptions int64
AverageLatency float64
}
// GetOffsetMetrics returns metrics about offset usage
func (integration *SMQOffsetIntegration) GetOffsetMetrics() *OffsetMetrics {
integration.mu.RLock()
defer integration.mu.RUnlock()
// Count active subscriptions
activeSubscriptions := int64(0)
for _, subscription := range integration.offsetSubscriber.subscriptions {
if subscription.IsActive {
activeSubscriptions++
}
}
// Calculate total offsets from all partition managers instead of in-memory map
var totalOffsets int64
for _, manager := range integration.offsetAssigner.registry.managers {
totalOffsets += manager.GetHighWaterMark()
}
return &OffsetMetrics{
PartitionCount: int64(len(integration.offsetAssigner.registry.managers)),
TotalOffsets: totalOffsets, // Now calculated from storage, not memory maps
ActiveSubscriptions: activeSubscriptions,
AverageLatency: 0.0, // TODO: Implement latency tracking
}
}
// OffsetInfo provides detailed information about an offset
type OffsetInfo struct {
Offset int64
Timestamp int64
Partition *schema_pb.Partition
Exists bool
}
// GetOffsetInfo returns detailed information about a specific offset
func (integration *SMQOffsetIntegration) GetOffsetInfo(
namespace, topicName string,
partition *schema_pb.Partition,
offset int64,
) (*OffsetInfo, error) {
hwm, err := integration.GetHighWaterMark(namespace, topicName, partition)
if err != nil {
return nil, fmt.Errorf("failed to get high water mark: %w", err)
}
exists := offset >= 0 && offset < hwm
// TODO: Get actual timestamp from storage
timestamp := int64(0)
// Note: Timestamp lookup from in-memory map removed to prevent memory leaks
// For now, use a placeholder timestamp. In production, this should come from
// persistent storage if timestamp tracking is needed.
if exists {
timestamp = time.Now().UnixNano() // Placeholder - should come from storage
}
return &OffsetInfo{
Offset: offset,
Timestamp: timestamp,
Partition: partition,
Exists: exists,
}, nil
}
// PartitionOffsetInfo provides offset information for a partition
type PartitionOffsetInfo struct {
Partition *schema_pb.Partition
EarliestOffset int64
LatestOffset int64
HighWaterMark int64
RecordCount int64
ActiveSubscriptions int64
}
// GetPartitionOffsetInfo returns comprehensive offset information for a partition
func (integration *SMQOffsetIntegration) GetPartitionOffsetInfo(namespace, topicName string, partition *schema_pb.Partition) (*PartitionOffsetInfo, error) {
hwm, err := integration.GetHighWaterMark(namespace, topicName, partition)
if err != nil {
return nil, fmt.Errorf("failed to get high water mark: %w", err)
}
earliestOffset := int64(0)
latestOffset := hwm - 1
if hwm == 0 {
latestOffset = -1 // No records
}
// Count active subscriptions for this partition
activeSubscriptions := int64(0)
integration.mu.RLock()
for _, subscription := range integration.offsetSubscriber.subscriptions {
if subscription.IsActive && partitionKey(subscription.Partition) == partitionKey(partition) {
activeSubscriptions++
}
}
integration.mu.RUnlock()
return &PartitionOffsetInfo{
Partition: partition,
EarliestOffset: earliestOffset,
LatestOffset: latestOffset,
HighWaterMark: hwm,
RecordCount: hwm,
ActiveSubscriptions: activeSubscriptions,
}, nil
}
// GetSubscription retrieves an existing subscription
func (integration *SMQOffsetIntegration) GetSubscription(subscriptionID string) (*OffsetSubscription, error) {
return integration.offsetSubscriber.GetSubscription(subscriptionID)
}
// ListActiveSubscriptions returns all active subscriptions
func (integration *SMQOffsetIntegration) ListActiveSubscriptions() ([]*OffsetSubscription, error) {
integration.mu.RLock()
defer integration.mu.RUnlock()
result := make([]*OffsetSubscription, 0)
for _, subscription := range integration.offsetSubscriber.subscriptions {
if subscription.IsActive {
result = append(result, subscription)
}
}
return result, nil
}
// AssignSingleOffset assigns a single offset for a partition
func (integration *SMQOffsetIntegration) AssignSingleOffset(namespace, topicName string, partition *schema_pb.Partition) *AssignmentResult {
return integration.offsetAssigner.AssignSingleOffset(namespace, topicName, partition)
}
// AssignBatchOffsets assigns a batch of offsets for a partition
func (integration *SMQOffsetIntegration) AssignBatchOffsets(namespace, topicName string, partition *schema_pb.Partition, count int64) *AssignmentResult {
return integration.offsetAssigner.AssignBatchOffsets(namespace, topicName, partition, count)
}
// Reset resets the integration layer state (for testing)
func (integration *SMQOffsetIntegration) Reset() {
integration.mu.Lock()
defer integration.mu.Unlock()
// Note: No in-memory maps to clear (removed to prevent memory leaks)
// Close all subscriptions
for _, subscription := range integration.offsetSubscriber.subscriptions {
subscription.IsActive = false
}
integration.offsetSubscriber.subscriptions = make(map[string]*OffsetSubscription)
// Reset the registries by creating new ones with the same storage
// This ensures that partition managers start fresh
registry := NewPartitionOffsetRegistry(integration.offsetAssigner.registry.storage)
integration.offsetAssigner.registry = registry
integration.offsetSubscriber.offsetRegistry = registry
integration.offsetSeeker.offsetRegistry = registry
}