diff --git a/weed/mq/offset/integration.go b/weed/mq/offset/integration.go new file mode 100644 index 000000000..6ca6b2a8b --- /dev/null +++ b/weed/mq/offset/integration.go @@ -0,0 +1,350 @@ +package offset + +import ( + "fmt" + "sync" + + "github.com/seaweedfs/seaweedfs/weed/pb/mq_agent_pb" + "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" +) + +// SMQOffsetIntegration provides integration between offset management and SMQ broker +type SMQOffsetIntegration struct { + mu sync.RWMutex + offsetAssigner *OffsetAssigner + offsetSubscriber *OffsetSubscriber + offsetSeeker *OffsetSeeker + + // Mapping between SMQ records and offsets + recordOffsetMap map[string]int64 // record key -> offset + offsetRecordMap map[string]map[int64]int64 // partition key -> offset -> record timestamp +} + +// NewSMQOffsetIntegration creates a new SMQ offset integration +func NewSMQOffsetIntegration(storage OffsetStorage) *SMQOffsetIntegration { + registry := NewPartitionOffsetRegistry(storage) + assigner := &OffsetAssigner{registry: registry} + + return &SMQOffsetIntegration{ + offsetAssigner: assigner, + offsetSubscriber: NewOffsetSubscriber(registry), + offsetSeeker: NewOffsetSeeker(registry), + recordOffsetMap: make(map[string]int64), + offsetRecordMap: make(map[string]map[int64]int64), + } +} + +// PublishRecord publishes a record and assigns it an offset +func (integration *SMQOffsetIntegration) PublishRecord( + partition *schema_pb.Partition, + key []byte, + value *schema_pb.RecordValue, +) (*mq_agent_pb.PublishRecordResponse, error) { + + // Assign offset for this record + result := integration.offsetAssigner.AssignSingleOffset(partition) + if result.Error != nil { + return &mq_agent_pb.PublishRecordResponse{ + Error: fmt.Sprintf("Failed to assign offset: %v", result.Error), + }, nil + } + + assignment := result.Assignment + + // Store the mapping for later retrieval + integration.mu.Lock() + recordKey := string(key) + integration.recordOffsetMap[recordKey] = assignment.Offset + + partitionKey := partitionKey(partition) + if integration.offsetRecordMap[partitionKey] == nil { + integration.offsetRecordMap[partitionKey] = make(map[int64]int64) + } + integration.offsetRecordMap[partitionKey][assignment.Offset] = assignment.Timestamp + integration.mu.Unlock() + + // Return response with offset information + return &mq_agent_pb.PublishRecordResponse{ + AckSequence: assignment.Offset, // Use offset as ack sequence for now + BaseOffset: assignment.Offset, + LastOffset: assignment.Offset, + Error: "", + }, nil +} + +// PublishRecordBatch publishes a batch of records and assigns them offsets +func (integration *SMQOffsetIntegration) PublishRecordBatch( + partition *schema_pb.Partition, + records []PublishRecordRequest, +) (*mq_agent_pb.PublishRecordResponse, error) { + + if len(records) == 0 { + return &mq_agent_pb.PublishRecordResponse{ + Error: "Empty record batch", + }, nil + } + + // Assign batch of offsets + result := integration.offsetAssigner.AssignBatchOffsets(partition, int64(len(records))) + if result.Error != nil { + return &mq_agent_pb.PublishRecordResponse{ + Error: fmt.Sprintf("Failed to assign batch offsets: %v", result.Error), + }, nil + } + + batch := result.Batch + + // Store mappings for all records in the batch + integration.mu.Lock() + for i, record := range records { + recordKey := string(record.Key) + offset := batch.BaseOffset + int64(i) + integration.recordOffsetMap[recordKey] = offset + } + + partitionKey := partitionKey(partition) + if integration.offsetRecordMap[partitionKey] == nil { + integration.offsetRecordMap[partitionKey] = make(map[int64]int64) + } + for i := int64(0); i < batch.Count; i++ { + offset := batch.BaseOffset + i + integration.offsetRecordMap[partitionKey][offset] = batch.Timestamp + } + integration.mu.Unlock() + + return &mq_agent_pb.PublishRecordResponse{ + AckSequence: batch.LastOffset, // Use last offset as ack sequence + BaseOffset: batch.BaseOffset, + LastOffset: batch.LastOffset, + Error: "", + }, nil +} + +// CreateSubscription creates an offset-based subscription +func (integration *SMQOffsetIntegration) CreateSubscription( + subscriptionID string, + partition *schema_pb.Partition, + offsetType schema_pb.OffsetType, + startOffset int64, +) (*OffsetSubscription, error) { + + return integration.offsetSubscriber.CreateSubscription( + subscriptionID, + partition, + offsetType, + startOffset, + ) +} + +// SubscribeRecords subscribes to records starting from a specific offset +func (integration *SMQOffsetIntegration) SubscribeRecords( + subscription *OffsetSubscription, + maxRecords int64, +) ([]*mq_agent_pb.SubscribeRecordResponse, error) { + + if !subscription.IsActive { + return nil, fmt.Errorf("subscription is not active") + } + + // Get the range of offsets to read + offsetRange, err := subscription.GetOffsetRange(maxRecords) + if err != nil { + return nil, fmt.Errorf("failed to get offset range: %w", err) + } + + if offsetRange.Count == 0 { + // No records available + return []*mq_agent_pb.SubscribeRecordResponse{}, nil + } + + // TODO: This is where we would integrate with SMQ's actual storage layer + // For now, return mock responses with offset information + responses := make([]*mq_agent_pb.SubscribeRecordResponse, offsetRange.Count) + + for i := int64(0); i < offsetRange.Count; i++ { + offset := offsetRange.StartOffset + i + + responses[i] = &mq_agent_pb.SubscribeRecordResponse{ + Key: []byte(fmt.Sprintf("key-%d", offset)), + Value: &schema_pb.RecordValue{}, // Mock value + TsNs: offset * 1000000, // Mock timestamp based on offset + Offset: offset, + IsEndOfStream: false, + IsEndOfTopic: false, + Error: "", + } + } + + // Advance the subscription + subscription.AdvanceOffsetBy(offsetRange.Count) + + return responses, nil +} + +// GetHighWaterMark returns the high water mark for a partition +func (integration *SMQOffsetIntegration) GetHighWaterMark(partition *schema_pb.Partition) (int64, error) { + return integration.offsetAssigner.GetHighWaterMark(partition) +} + +// SeekSubscription seeks a subscription to a specific offset +func (integration *SMQOffsetIntegration) SeekSubscription( + subscriptionID string, + offset int64, +) error { + + subscription, err := integration.offsetSubscriber.GetSubscription(subscriptionID) + if err != nil { + return fmt.Errorf("subscription not found: %w", err) + } + + return subscription.SeekToOffset(offset) +} + +// GetSubscriptionLag returns the lag for a subscription +func (integration *SMQOffsetIntegration) GetSubscriptionLag(subscriptionID string) (int64, error) { + subscription, err := integration.offsetSubscriber.GetSubscription(subscriptionID) + if err != nil { + return 0, fmt.Errorf("subscription not found: %w", err) + } + + return subscription.GetLag() +} + +// CloseSubscription closes a subscription +func (integration *SMQOffsetIntegration) CloseSubscription(subscriptionID string) error { + return integration.offsetSubscriber.CloseSubscription(subscriptionID) +} + +// ValidateOffsetRange validates an offset range for a partition +func (integration *SMQOffsetIntegration) ValidateOffsetRange( + partition *schema_pb.Partition, + startOffset, endOffset int64, +) error { + + return integration.offsetSeeker.ValidateOffsetRange(partition, startOffset, endOffset) +} + +// GetAvailableOffsetRange returns the available offset range for a partition +func (integration *SMQOffsetIntegration) GetAvailableOffsetRange(partition *schema_pb.Partition) (*OffsetRange, error) { + return integration.offsetSeeker.GetAvailableOffsetRange(partition) +} + +// PublishRecordRequest represents a record to be published +type PublishRecordRequest struct { + Key []byte + Value *schema_pb.RecordValue +} + +// OffsetMetrics provides metrics about offset usage +type OffsetMetrics struct { + PartitionCount int64 + TotalOffsets int64 + ActiveSubscriptions int64 + AverageLatency float64 +} + +// GetOffsetMetrics returns metrics about offset usage +func (integration *SMQOffsetIntegration) GetOffsetMetrics() *OffsetMetrics { + integration.mu.RLock() + defer integration.mu.RUnlock() + + // Count active subscriptions + activeSubscriptions := int64(0) + for _, subscription := range integration.offsetSubscriber.subscriptions { + if subscription.IsActive { + activeSubscriptions++ + } + } + + return &OffsetMetrics{ + PartitionCount: int64(len(integration.offsetAssigner.registry.managers)), + TotalOffsets: int64(len(integration.recordOffsetMap)), + ActiveSubscriptions: activeSubscriptions, + AverageLatency: 0.0, // TODO: Implement latency tracking + } +} + +// OffsetInfo provides detailed information about an offset +type OffsetInfo struct { + Offset int64 + Timestamp int64 + Partition *schema_pb.Partition + Exists bool +} + +// GetOffsetInfo returns detailed information about a specific offset +func (integration *SMQOffsetIntegration) GetOffsetInfo( + partition *schema_pb.Partition, + offset int64, +) (*OffsetInfo, error) { + + hwm, err := integration.GetHighWaterMark(partition) + if err != nil { + return nil, fmt.Errorf("failed to get high water mark: %w", err) + } + + exists := offset >= 0 && offset < hwm + + // TODO: Get actual timestamp from storage + timestamp := int64(0) + if exists { + integration.mu.RLock() + partitionKey := partitionKey(partition) + if offsetMap, found := integration.offsetRecordMap[partitionKey]; found { + if ts, found := offsetMap[offset]; found { + timestamp = ts + } + } + integration.mu.RUnlock() + } + + return &OffsetInfo{ + Offset: offset, + Timestamp: timestamp, + Partition: partition, + Exists: exists, + }, nil +} + +// PartitionOffsetInfo provides offset information for a partition +type PartitionOffsetInfo struct { + Partition *schema_pb.Partition + EarliestOffset int64 + LatestOffset int64 + HighWaterMark int64 + RecordCount int64 + ActiveSubscriptions int64 +} + +// GetPartitionOffsetInfo returns comprehensive offset information for a partition +func (integration *SMQOffsetIntegration) GetPartitionOffsetInfo(partition *schema_pb.Partition) (*PartitionOffsetInfo, error) { + hwm, err := integration.GetHighWaterMark(partition) + if err != nil { + return nil, fmt.Errorf("failed to get high water mark: %w", err) + } + + earliestOffset := int64(0) + latestOffset := hwm - 1 + if hwm == 0 { + latestOffset = -1 // No records + } + + // Count active subscriptions for this partition + activeSubscriptions := int64(0) + integration.mu.RLock() + for _, subscription := range integration.offsetSubscriber.subscriptions { + if subscription.IsActive && partitionKey(subscription.Partition) == partitionKey(partition) { + activeSubscriptions++ + } + } + integration.mu.RUnlock() + + return &PartitionOffsetInfo{ + Partition: partition, + EarliestOffset: earliestOffset, + LatestOffset: latestOffset, + HighWaterMark: hwm, + RecordCount: hwm, + ActiveSubscriptions: activeSubscriptions, + }, nil +} diff --git a/weed/mq/offset/integration_test.go b/weed/mq/offset/integration_test.go new file mode 100644 index 000000000..96537e2fc --- /dev/null +++ b/weed/mq/offset/integration_test.go @@ -0,0 +1,537 @@ +package offset + +import ( + "testing" + + "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" +) + +func TestSMQOffsetIntegration_PublishRecord(t *testing.T) { + storage := NewInMemoryOffsetStorage() + integration := NewSMQOffsetIntegration(storage) + partition := createTestPartition() + + // Publish a single record + response, err := integration.PublishRecord( + partition, + []byte("test-key"), + &schema_pb.RecordValue{}, + ) + + if err != nil { + t.Fatalf("Failed to publish record: %v", err) + } + + if response.Error != "" { + t.Errorf("Expected no error, got: %s", response.Error) + } + + if response.BaseOffset != 0 { + t.Errorf("Expected base offset 0, got %d", response.BaseOffset) + } + + if response.LastOffset != 0 { + t.Errorf("Expected last offset 0, got %d", response.LastOffset) + } +} + +func TestSMQOffsetIntegration_PublishRecordBatch(t *testing.T) { + storage := NewInMemoryOffsetStorage() + integration := NewSMQOffsetIntegration(storage) + partition := createTestPartition() + + // Create batch of records + records := []PublishRecordRequest{ + {Key: []byte("key1"), Value: &schema_pb.RecordValue{}}, + {Key: []byte("key2"), Value: &schema_pb.RecordValue{}}, + {Key: []byte("key3"), Value: &schema_pb.RecordValue{}}, + } + + // Publish batch + response, err := integration.PublishRecordBatch(partition, records) + if err != nil { + t.Fatalf("Failed to publish record batch: %v", err) + } + + if response.Error != "" { + t.Errorf("Expected no error, got: %s", response.Error) + } + + if response.BaseOffset != 0 { + t.Errorf("Expected base offset 0, got %d", response.BaseOffset) + } + + if response.LastOffset != 2 { + t.Errorf("Expected last offset 2, got %d", response.LastOffset) + } + + // Verify high water mark + hwm, err := integration.GetHighWaterMark(partition) + if err != nil { + t.Fatalf("Failed to get high water mark: %v", err) + } + + if hwm != 3 { + t.Errorf("Expected high water mark 3, got %d", hwm) + } +} + +func TestSMQOffsetIntegration_EmptyBatch(t *testing.T) { + storage := NewInMemoryOffsetStorage() + integration := NewSMQOffsetIntegration(storage) + partition := createTestPartition() + + // Publish empty batch + response, err := integration.PublishRecordBatch(partition, []PublishRecordRequest{}) + if err != nil { + t.Fatalf("Failed to publish empty batch: %v", err) + } + + if response.Error == "" { + t.Error("Expected error for empty batch") + } +} + +func TestSMQOffsetIntegration_CreateSubscription(t *testing.T) { + storage := NewInMemoryOffsetStorage() + integration := NewSMQOffsetIntegration(storage) + partition := createTestPartition() + + // Publish some records first + records := []PublishRecordRequest{ + {Key: []byte("key1"), Value: &schema_pb.RecordValue{}}, + {Key: []byte("key2"), Value: &schema_pb.RecordValue{}}, + } + integration.PublishRecordBatch(partition, records) + + // Create subscription + sub, err := integration.CreateSubscription( + "test-sub", + partition, + schema_pb.OffsetType_RESET_TO_EARLIEST, + 0, + ) + + if err != nil { + t.Fatalf("Failed to create subscription: %v", err) + } + + if sub.ID != "test-sub" { + t.Errorf("Expected subscription ID 'test-sub', got %s", sub.ID) + } + + if sub.StartOffset != 0 { + t.Errorf("Expected start offset 0, got %d", sub.StartOffset) + } +} + +func TestSMQOffsetIntegration_SubscribeRecords(t *testing.T) { + storage := NewInMemoryOffsetStorage() + integration := NewSMQOffsetIntegration(storage) + partition := createTestPartition() + + // Publish some records + records := []PublishRecordRequest{ + {Key: []byte("key1"), Value: &schema_pb.RecordValue{}}, + {Key: []byte("key2"), Value: &schema_pb.RecordValue{}}, + {Key: []byte("key3"), Value: &schema_pb.RecordValue{}}, + } + integration.PublishRecordBatch(partition, records) + + // Create subscription + sub, err := integration.CreateSubscription( + "test-sub", + partition, + schema_pb.OffsetType_RESET_TO_EARLIEST, + 0, + ) + if err != nil { + t.Fatalf("Failed to create subscription: %v", err) + } + + // Subscribe to records + responses, err := integration.SubscribeRecords(sub, 2) + if err != nil { + t.Fatalf("Failed to subscribe to records: %v", err) + } + + if len(responses) != 2 { + t.Errorf("Expected 2 responses, got %d", len(responses)) + } + + // Check offset progression + if responses[0].Offset != 0 { + t.Errorf("Expected first record offset 0, got %d", responses[0].Offset) + } + + if responses[1].Offset != 1 { + t.Errorf("Expected second record offset 1, got %d", responses[1].Offset) + } + + // Check subscription advancement + if sub.CurrentOffset != 2 { + t.Errorf("Expected subscription current offset 2, got %d", sub.CurrentOffset) + } +} + +func TestSMQOffsetIntegration_SubscribeEmptyPartition(t *testing.T) { + storage := NewInMemoryOffsetStorage() + integration := NewSMQOffsetIntegration(storage) + partition := createTestPartition() + + // Create subscription on empty partition + sub, err := integration.CreateSubscription( + "empty-sub", + partition, + schema_pb.OffsetType_RESET_TO_EARLIEST, + 0, + ) + if err != nil { + t.Fatalf("Failed to create subscription: %v", err) + } + + // Subscribe to records (should return empty) + responses, err := integration.SubscribeRecords(sub, 10) + if err != nil { + t.Fatalf("Failed to subscribe to empty partition: %v", err) + } + + if len(responses) != 0 { + t.Errorf("Expected 0 responses from empty partition, got %d", len(responses)) + } +} + +func TestSMQOffsetIntegration_SeekSubscription(t *testing.T) { + storage := NewInMemoryOffsetStorage() + integration := NewSMQOffsetIntegration(storage) + partition := createTestPartition() + + // Publish records + records := []PublishRecordRequest{ + {Key: []byte("key1"), Value: &schema_pb.RecordValue{}}, + {Key: []byte("key2"), Value: &schema_pb.RecordValue{}}, + {Key: []byte("key3"), Value: &schema_pb.RecordValue{}}, + {Key: []byte("key4"), Value: &schema_pb.RecordValue{}}, + {Key: []byte("key5"), Value: &schema_pb.RecordValue{}}, + } + integration.PublishRecordBatch(partition, records) + + // Create subscription + sub, err := integration.CreateSubscription( + "seek-sub", + partition, + schema_pb.OffsetType_RESET_TO_EARLIEST, + 0, + ) + if err != nil { + t.Fatalf("Failed to create subscription: %v", err) + } + + // Seek to offset 3 + err = integration.SeekSubscription("seek-sub", 3) + if err != nil { + t.Fatalf("Failed to seek subscription: %v", err) + } + + if sub.CurrentOffset != 3 { + t.Errorf("Expected current offset 3 after seek, got %d", sub.CurrentOffset) + } + + // Subscribe from new position + responses, err := integration.SubscribeRecords(sub, 2) + if err != nil { + t.Fatalf("Failed to subscribe after seek: %v", err) + } + + if len(responses) != 2 { + t.Errorf("Expected 2 responses after seek, got %d", len(responses)) + } + + if responses[0].Offset != 3 { + t.Errorf("Expected first record offset 3 after seek, got %d", responses[0].Offset) + } +} + +func TestSMQOffsetIntegration_GetSubscriptionLag(t *testing.T) { + storage := NewInMemoryOffsetStorage() + integration := NewSMQOffsetIntegration(storage) + partition := createTestPartition() + + // Publish records + records := []PublishRecordRequest{ + {Key: []byte("key1"), Value: &schema_pb.RecordValue{}}, + {Key: []byte("key2"), Value: &schema_pb.RecordValue{}}, + {Key: []byte("key3"), Value: &schema_pb.RecordValue{}}, + } + integration.PublishRecordBatch(partition, records) + + // Create subscription at offset 1 + sub, err := integration.CreateSubscription( + "lag-sub", + partition, + schema_pb.OffsetType_EXACT_OFFSET, + 1, + ) + if err != nil { + t.Fatalf("Failed to create subscription: %v", err) + } + + // Get lag + lag, err := integration.GetSubscriptionLag("lag-sub") + if err != nil { + t.Fatalf("Failed to get subscription lag: %v", err) + } + + expectedLag := int64(3 - 1) // hwm - current + if lag != expectedLag { + t.Errorf("Expected lag %d, got %d", expectedLag, lag) + } + + // Advance subscription and check lag again + integration.SubscribeRecords(sub, 1) + + lag, err = integration.GetSubscriptionLag("lag-sub") + if err != nil { + t.Fatalf("Failed to get lag after advance: %v", err) + } + + expectedLag = int64(3 - 2) // hwm - current + if lag != expectedLag { + t.Errorf("Expected lag %d after advance, got %d", expectedLag, lag) + } +} + +func TestSMQOffsetIntegration_CloseSubscription(t *testing.T) { + storage := NewInMemoryOffsetStorage() + integration := NewSMQOffsetIntegration(storage) + partition := createTestPartition() + + // Create subscription + _, err := integration.CreateSubscription( + "close-sub", + partition, + schema_pb.OffsetType_RESET_TO_EARLIEST, + 0, + ) + if err != nil { + t.Fatalf("Failed to create subscription: %v", err) + } + + // Close subscription + err = integration.CloseSubscription("close-sub") + if err != nil { + t.Fatalf("Failed to close subscription: %v", err) + } + + // Try to get lag (should fail) + _, err = integration.GetSubscriptionLag("close-sub") + if err == nil { + t.Error("Expected error when getting lag for closed subscription") + } +} + +func TestSMQOffsetIntegration_ValidateOffsetRange(t *testing.T) { + storage := NewInMemoryOffsetStorage() + integration := NewSMQOffsetIntegration(storage) + partition := createTestPartition() + + // Publish some records + records := []PublishRecordRequest{ + {Key: []byte("key1"), Value: &schema_pb.RecordValue{}}, + {Key: []byte("key2"), Value: &schema_pb.RecordValue{}}, + {Key: []byte("key3"), Value: &schema_pb.RecordValue{}}, + } + integration.PublishRecordBatch(partition, records) + + // Test valid range + err := integration.ValidateOffsetRange(partition, 0, 2) + if err != nil { + t.Errorf("Valid range should not return error: %v", err) + } + + // Test invalid range (beyond hwm) + err = integration.ValidateOffsetRange(partition, 0, 5) + if err == nil { + t.Error("Expected error for range beyond high water mark") + } +} + +func TestSMQOffsetIntegration_GetAvailableOffsetRange(t *testing.T) { + storage := NewInMemoryOffsetStorage() + integration := NewSMQOffsetIntegration(storage) + partition := createTestPartition() + + // Test empty partition + offsetRange, err := integration.GetAvailableOffsetRange(partition) + if err != nil { + t.Fatalf("Failed to get available range for empty partition: %v", err) + } + + if offsetRange.Count != 0 { + t.Errorf("Expected empty range for empty partition, got count %d", offsetRange.Count) + } + + // Publish records + records := []PublishRecordRequest{ + {Key: []byte("key1"), Value: &schema_pb.RecordValue{}}, + {Key: []byte("key2"), Value: &schema_pb.RecordValue{}}, + } + integration.PublishRecordBatch(partition, records) + + // Test with data + offsetRange, err = integration.GetAvailableOffsetRange(partition) + if err != nil { + t.Fatalf("Failed to get available range: %v", err) + } + + if offsetRange.StartOffset != 0 { + t.Errorf("Expected start offset 0, got %d", offsetRange.StartOffset) + } + + if offsetRange.EndOffset != 1 { + t.Errorf("Expected end offset 1, got %d", offsetRange.EndOffset) + } + + if offsetRange.Count != 2 { + t.Errorf("Expected count 2, got %d", offsetRange.Count) + } +} + +func TestSMQOffsetIntegration_GetOffsetMetrics(t *testing.T) { + storage := NewInMemoryOffsetStorage() + integration := NewSMQOffsetIntegration(storage) + partition := createTestPartition() + + // Initial metrics + metrics := integration.GetOffsetMetrics() + if metrics.TotalOffsets != 0 { + t.Errorf("Expected 0 total offsets initially, got %d", metrics.TotalOffsets) + } + + if metrics.ActiveSubscriptions != 0 { + t.Errorf("Expected 0 active subscriptions initially, got %d", metrics.ActiveSubscriptions) + } + + // Publish records + records := []PublishRecordRequest{ + {Key: []byte("key1"), Value: &schema_pb.RecordValue{}}, + {Key: []byte("key2"), Value: &schema_pb.RecordValue{}}, + } + integration.PublishRecordBatch(partition, records) + + // Create subscriptions + integration.CreateSubscription("sub1", partition, schema_pb.OffsetType_RESET_TO_EARLIEST, 0) + integration.CreateSubscription("sub2", partition, schema_pb.OffsetType_RESET_TO_EARLIEST, 0) + + // Check updated metrics + metrics = integration.GetOffsetMetrics() + if metrics.TotalOffsets != 2 { + t.Errorf("Expected 2 total offsets, got %d", metrics.TotalOffsets) + } + + if metrics.ActiveSubscriptions != 2 { + t.Errorf("Expected 2 active subscriptions, got %d", metrics.ActiveSubscriptions) + } + + if metrics.PartitionCount != 1 { + t.Errorf("Expected 1 partition, got %d", metrics.PartitionCount) + } +} + +func TestSMQOffsetIntegration_GetOffsetInfo(t *testing.T) { + storage := NewInMemoryOffsetStorage() + integration := NewSMQOffsetIntegration(storage) + partition := createTestPartition() + + // Test non-existent offset + info, err := integration.GetOffsetInfo(partition, 0) + if err != nil { + t.Fatalf("Failed to get offset info: %v", err) + } + + if info.Exists { + t.Error("Offset should not exist in empty partition") + } + + // Publish record + integration.PublishRecord(partition, []byte("key1"), &schema_pb.RecordValue{}) + + // Test existing offset + info, err = integration.GetOffsetInfo(partition, 0) + if err != nil { + t.Fatalf("Failed to get offset info for existing offset: %v", err) + } + + if !info.Exists { + t.Error("Offset should exist after publishing") + } + + if info.Offset != 0 { + t.Errorf("Expected offset 0, got %d", info.Offset) + } +} + +func TestSMQOffsetIntegration_GetPartitionOffsetInfo(t *testing.T) { + storage := NewInMemoryOffsetStorage() + integration := NewSMQOffsetIntegration(storage) + partition := createTestPartition() + + // Test empty partition + info, err := integration.GetPartitionOffsetInfo(partition) + if err != nil { + t.Fatalf("Failed to get partition offset info: %v", err) + } + + if info.EarliestOffset != 0 { + t.Errorf("Expected earliest offset 0, got %d", info.EarliestOffset) + } + + if info.LatestOffset != -1 { + t.Errorf("Expected latest offset -1 for empty partition, got %d", info.LatestOffset) + } + + if info.HighWaterMark != 0 { + t.Errorf("Expected high water mark 0, got %d", info.HighWaterMark) + } + + if info.RecordCount != 0 { + t.Errorf("Expected record count 0, got %d", info.RecordCount) + } + + // Publish records + records := []PublishRecordRequest{ + {Key: []byte("key1"), Value: &schema_pb.RecordValue{}}, + {Key: []byte("key2"), Value: &schema_pb.RecordValue{}}, + {Key: []byte("key3"), Value: &schema_pb.RecordValue{}}, + } + integration.PublishRecordBatch(partition, records) + + // Create subscription + integration.CreateSubscription("test-sub", partition, schema_pb.OffsetType_RESET_TO_EARLIEST, 0) + + // Test with data + info, err = integration.GetPartitionOffsetInfo(partition) + if err != nil { + t.Fatalf("Failed to get partition offset info with data: %v", err) + } + + if info.EarliestOffset != 0 { + t.Errorf("Expected earliest offset 0, got %d", info.EarliestOffset) + } + + if info.LatestOffset != 2 { + t.Errorf("Expected latest offset 2, got %d", info.LatestOffset) + } + + if info.HighWaterMark != 3 { + t.Errorf("Expected high water mark 3, got %d", info.HighWaterMark) + } + + if info.RecordCount != 3 { + t.Errorf("Expected record count 3, got %d", info.RecordCount) + } + + if info.ActiveSubscriptions != 1 { + t.Errorf("Expected 1 active subscription, got %d", info.ActiveSubscriptions) + } +} diff --git a/weed/mq/offset/subscriber.go b/weed/mq/offset/subscriber.go new file mode 100644 index 000000000..5d9cf789b --- /dev/null +++ b/weed/mq/offset/subscriber.go @@ -0,0 +1,349 @@ +package offset + +import ( + "fmt" + "sync" + + "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" +) + +// OffsetSubscriber handles offset-based subscription logic +type OffsetSubscriber struct { + mu sync.RWMutex + offsetRegistry *PartitionOffsetRegistry + subscriptions map[string]*OffsetSubscription +} + +// OffsetSubscription represents an active offset-based subscription +type OffsetSubscription struct { + ID string + Partition *schema_pb.Partition + StartOffset int64 + CurrentOffset int64 + OffsetType schema_pb.OffsetType + IsActive bool + offsetRegistry *PartitionOffsetRegistry +} + +// NewOffsetSubscriber creates a new offset-based subscriber +func NewOffsetSubscriber(offsetRegistry *PartitionOffsetRegistry) *OffsetSubscriber { + return &OffsetSubscriber{ + offsetRegistry: offsetRegistry, + subscriptions: make(map[string]*OffsetSubscription), + } +} + +// CreateSubscription creates a new offset-based subscription +func (s *OffsetSubscriber) CreateSubscription( + subscriptionID string, + partition *schema_pb.Partition, + offsetType schema_pb.OffsetType, + startOffset int64, +) (*OffsetSubscription, error) { + + s.mu.Lock() + defer s.mu.Unlock() + + // Check if subscription already exists + if _, exists := s.subscriptions[subscriptionID]; exists { + return nil, fmt.Errorf("subscription %s already exists", subscriptionID) + } + + // Resolve the actual start offset based on type + actualStartOffset, err := s.resolveStartOffset(partition, offsetType, startOffset) + if err != nil { + return nil, fmt.Errorf("failed to resolve start offset: %w", err) + } + + subscription := &OffsetSubscription{ + ID: subscriptionID, + Partition: partition, + StartOffset: actualStartOffset, + CurrentOffset: actualStartOffset, + OffsetType: offsetType, + IsActive: true, + offsetRegistry: s.offsetRegistry, + } + + s.subscriptions[subscriptionID] = subscription + return subscription, nil +} + +// GetSubscription retrieves an existing subscription +func (s *OffsetSubscriber) GetSubscription(subscriptionID string) (*OffsetSubscription, error) { + s.mu.RLock() + defer s.mu.RUnlock() + + subscription, exists := s.subscriptions[subscriptionID] + if !exists { + return nil, fmt.Errorf("subscription %s not found", subscriptionID) + } + + return subscription, nil +} + +// CloseSubscription closes and removes a subscription +func (s *OffsetSubscriber) CloseSubscription(subscriptionID string) error { + s.mu.Lock() + defer s.mu.Unlock() + + subscription, exists := s.subscriptions[subscriptionID] + if !exists { + return fmt.Errorf("subscription %s not found", subscriptionID) + } + + subscription.IsActive = false + delete(s.subscriptions, subscriptionID) + return nil +} + +// resolveStartOffset resolves the actual start offset based on OffsetType +func (s *OffsetSubscriber) resolveStartOffset( + partition *schema_pb.Partition, + offsetType schema_pb.OffsetType, + requestedOffset int64, +) (int64, error) { + + switch offsetType { + case schema_pb.OffsetType_EXACT_OFFSET: + // Validate that the requested offset exists + return s.validateAndGetOffset(partition, requestedOffset) + + case schema_pb.OffsetType_RESET_TO_OFFSET: + // Use the requested offset, even if it doesn't exist yet + return requestedOffset, nil + + case schema_pb.OffsetType_RESET_TO_EARLIEST: + // Start from offset 0 + return 0, nil + + case schema_pb.OffsetType_RESET_TO_LATEST: + // Start from the current high water mark + hwm, err := s.offsetRegistry.GetHighWaterMark(partition) + if err != nil { + return 0, err + } + return hwm, nil + + case schema_pb.OffsetType_RESUME_OR_EARLIEST: + // Try to resume from a saved position, fallback to earliest + // For now, just use earliest (consumer group position tracking will be added later) + return 0, nil + + case schema_pb.OffsetType_RESUME_OR_LATEST: + // Try to resume from a saved position, fallback to latest + // For now, just use latest + hwm, err := s.offsetRegistry.GetHighWaterMark(partition) + if err != nil { + return 0, err + } + return hwm, nil + + default: + return 0, fmt.Errorf("unsupported offset type: %v", offsetType) + } +} + +// validateAndGetOffset validates that an offset exists and returns it +func (s *OffsetSubscriber) validateAndGetOffset(partition *schema_pb.Partition, offset int64) (int64, error) { + if offset < 0 { + return 0, fmt.Errorf("offset cannot be negative: %d", offset) + } + + // Get the current high water mark + hwm, err := s.offsetRegistry.GetHighWaterMark(partition) + if err != nil { + return 0, fmt.Errorf("failed to get high water mark: %w", err) + } + + // Check if offset is within valid range + if offset >= hwm { + return 0, fmt.Errorf("offset %d is beyond high water mark %d", offset, hwm) + } + + return offset, nil +} + +// SeekToOffset seeks a subscription to a specific offset +func (sub *OffsetSubscription) SeekToOffset(offset int64) error { + if !sub.IsActive { + return fmt.Errorf("subscription is not active") + } + + // Validate the offset + if offset < 0 { + return fmt.Errorf("offset cannot be negative: %d", offset) + } + + hwm, err := sub.offsetRegistry.GetHighWaterMark(sub.Partition) + if err != nil { + return fmt.Errorf("failed to get high water mark: %w", err) + } + + if offset > hwm { + return fmt.Errorf("offset %d is beyond high water mark %d", offset, hwm) + } + + sub.CurrentOffset = offset + return nil +} + +// GetNextOffset returns the next offset to read +func (sub *OffsetSubscription) GetNextOffset() int64 { + return sub.CurrentOffset +} + +// AdvanceOffset advances the subscription to the next offset +func (sub *OffsetSubscription) AdvanceOffset() { + sub.CurrentOffset++ +} + +// GetLag returns the lag between current position and high water mark +func (sub *OffsetSubscription) GetLag() (int64, error) { + if !sub.IsActive { + return 0, fmt.Errorf("subscription is not active") + } + + hwm, err := sub.offsetRegistry.GetHighWaterMark(sub.Partition) + if err != nil { + return 0, fmt.Errorf("failed to get high water mark: %w", err) + } + + lag := hwm - sub.CurrentOffset + if lag < 0 { + lag = 0 + } + + return lag, nil +} + +// IsAtEnd checks if the subscription has reached the end of available data +func (sub *OffsetSubscription) IsAtEnd() (bool, error) { + if !sub.IsActive { + return true, fmt.Errorf("subscription is not active") + } + + hwm, err := sub.offsetRegistry.GetHighWaterMark(sub.Partition) + if err != nil { + return false, fmt.Errorf("failed to get high water mark: %w", err) + } + + return sub.CurrentOffset >= hwm, nil +} + +// OffsetRange represents a range of offsets +type OffsetRange struct { + StartOffset int64 + EndOffset int64 + Count int64 +} + +// GetOffsetRange returns a range of offsets for batch reading +func (sub *OffsetSubscription) GetOffsetRange(maxCount int64) (*OffsetRange, error) { + if !sub.IsActive { + return nil, fmt.Errorf("subscription is not active") + } + + hwm, err := sub.offsetRegistry.GetHighWaterMark(sub.Partition) + if err != nil { + return nil, fmt.Errorf("failed to get high water mark: %w", err) + } + + startOffset := sub.CurrentOffset + endOffset := startOffset + maxCount - 1 + + // Don't go beyond high water mark + if endOffset >= hwm { + endOffset = hwm - 1 + } + + // If start is already at or beyond HWM, return empty range + if startOffset >= hwm { + return &OffsetRange{ + StartOffset: startOffset, + EndOffset: startOffset - 1, // Empty range + Count: 0, + }, nil + } + + count := endOffset - startOffset + 1 + return &OffsetRange{ + StartOffset: startOffset, + EndOffset: endOffset, + Count: count, + }, nil +} + +// AdvanceOffsetBy advances the subscription by a specific number of offsets +func (sub *OffsetSubscription) AdvanceOffsetBy(count int64) { + sub.CurrentOffset += count +} + +// OffsetSeeker provides utilities for offset-based seeking +type OffsetSeeker struct { + offsetRegistry *PartitionOffsetRegistry +} + +// NewOffsetSeeker creates a new offset seeker +func NewOffsetSeeker(offsetRegistry *PartitionOffsetRegistry) *OffsetSeeker { + return &OffsetSeeker{ + offsetRegistry: offsetRegistry, + } +} + +// SeekToTimestamp finds the offset closest to a given timestamp +// This bridges offset-based and timestamp-based seeking +func (seeker *OffsetSeeker) SeekToTimestamp(partition *schema_pb.Partition, timestamp int64) (int64, error) { + // TODO: This requires integration with the storage layer to map timestamps to offsets + // For now, return an error indicating this feature needs implementation + return 0, fmt.Errorf("timestamp-to-offset mapping not implemented yet") +} + +// ValidateOffsetRange validates that an offset range is valid +func (seeker *OffsetSeeker) ValidateOffsetRange(partition *schema_pb.Partition, startOffset, endOffset int64) error { + if startOffset < 0 { + return fmt.Errorf("start offset cannot be negative: %d", startOffset) + } + + if endOffset < startOffset { + return fmt.Errorf("end offset %d cannot be less than start offset %d", endOffset, startOffset) + } + + hwm, err := seeker.offsetRegistry.GetHighWaterMark(partition) + if err != nil { + return fmt.Errorf("failed to get high water mark: %w", err) + } + + if startOffset >= hwm { + return fmt.Errorf("start offset %d is beyond high water mark %d", startOffset, hwm) + } + + if endOffset >= hwm { + return fmt.Errorf("end offset %d is beyond high water mark %d", endOffset, hwm) + } + + return nil +} + +// GetAvailableOffsetRange returns the range of available offsets for a partition +func (seeker *OffsetSeeker) GetAvailableOffsetRange(partition *schema_pb.Partition) (*OffsetRange, error) { + hwm, err := seeker.offsetRegistry.GetHighWaterMark(partition) + if err != nil { + return nil, fmt.Errorf("failed to get high water mark: %w", err) + } + + if hwm == 0 { + // No data available + return &OffsetRange{ + StartOffset: 0, + EndOffset: -1, + Count: 0, + }, nil + } + + return &OffsetRange{ + StartOffset: 0, + EndOffset: hwm - 1, + Count: hwm, + }, nil +} diff --git a/weed/mq/offset/subscriber_test.go b/weed/mq/offset/subscriber_test.go new file mode 100644 index 000000000..8ab7958d3 --- /dev/null +++ b/weed/mq/offset/subscriber_test.go @@ -0,0 +1,457 @@ +package offset + +import ( + "testing" + + "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" +) + +func TestOffsetSubscriber_CreateSubscription(t *testing.T) { + storage := NewInMemoryOffsetStorage() + registry := NewPartitionOffsetRegistry(storage) + subscriber := NewOffsetSubscriber(registry) + partition := createTestPartition() + + // Assign some offsets first + registry.AssignOffsets(partition, 10) + + // Test EXACT_OFFSET subscription + sub, err := subscriber.CreateSubscription("test-sub-1", partition, schema_pb.OffsetType_EXACT_OFFSET, 5) + if err != nil { + t.Fatalf("Failed to create EXACT_OFFSET subscription: %v", err) + } + + if sub.StartOffset != 5 { + t.Errorf("Expected start offset 5, got %d", sub.StartOffset) + } + if sub.CurrentOffset != 5 { + t.Errorf("Expected current offset 5, got %d", sub.CurrentOffset) + } + + // Test RESET_TO_LATEST subscription + sub2, err := subscriber.CreateSubscription("test-sub-2", partition, schema_pb.OffsetType_RESET_TO_LATEST, 0) + if err != nil { + t.Fatalf("Failed to create RESET_TO_LATEST subscription: %v", err) + } + + if sub2.StartOffset != 10 { // Should be at high water mark + t.Errorf("Expected start offset 10, got %d", sub2.StartOffset) + } +} + +func TestOffsetSubscriber_InvalidSubscription(t *testing.T) { + storage := NewInMemoryOffsetStorage() + registry := NewPartitionOffsetRegistry(storage) + subscriber := NewOffsetSubscriber(registry) + partition := createTestPartition() + + // Assign some offsets + registry.AssignOffsets(partition, 5) + + // Test invalid offset (beyond high water mark) + _, err := subscriber.CreateSubscription("invalid-sub", partition, schema_pb.OffsetType_EXACT_OFFSET, 10) + if err == nil { + t.Error("Expected error for offset beyond high water mark") + } + + // Test negative offset + _, err = subscriber.CreateSubscription("invalid-sub-2", partition, schema_pb.OffsetType_EXACT_OFFSET, -1) + if err == nil { + t.Error("Expected error for negative offset") + } +} + +func TestOffsetSubscriber_DuplicateSubscription(t *testing.T) { + storage := NewInMemoryOffsetStorage() + registry := NewPartitionOffsetRegistry(storage) + subscriber := NewOffsetSubscriber(registry) + partition := createTestPartition() + + // Create first subscription + _, err := subscriber.CreateSubscription("duplicate-sub", partition, schema_pb.OffsetType_RESET_TO_EARLIEST, 0) + if err != nil { + t.Fatalf("Failed to create first subscription: %v", err) + } + + // Try to create duplicate + _, err = subscriber.CreateSubscription("duplicate-sub", partition, schema_pb.OffsetType_RESET_TO_EARLIEST, 0) + if err == nil { + t.Error("Expected error for duplicate subscription ID") + } +} + +func TestOffsetSubscription_SeekToOffset(t *testing.T) { + storage := NewInMemoryOffsetStorage() + registry := NewPartitionOffsetRegistry(storage) + subscriber := NewOffsetSubscriber(registry) + partition := createTestPartition() + + // Assign offsets + registry.AssignOffsets(partition, 20) + + // Create subscription + sub, err := subscriber.CreateSubscription("seek-test", partition, schema_pb.OffsetType_RESET_TO_EARLIEST, 0) + if err != nil { + t.Fatalf("Failed to create subscription: %v", err) + } + + // Test valid seek + err = sub.SeekToOffset(10) + if err != nil { + t.Fatalf("Failed to seek to offset 10: %v", err) + } + + if sub.CurrentOffset != 10 { + t.Errorf("Expected current offset 10, got %d", sub.CurrentOffset) + } + + // Test invalid seek (beyond high water mark) + err = sub.SeekToOffset(25) + if err == nil { + t.Error("Expected error for seek beyond high water mark") + } + + // Test negative seek + err = sub.SeekToOffset(-1) + if err == nil { + t.Error("Expected error for negative seek offset") + } +} + +func TestOffsetSubscription_AdvanceOffset(t *testing.T) { + storage := NewInMemoryOffsetStorage() + registry := NewPartitionOffsetRegistry(storage) + subscriber := NewOffsetSubscriber(registry) + partition := createTestPartition() + + // Create subscription + sub, err := subscriber.CreateSubscription("advance-test", partition, schema_pb.OffsetType_RESET_TO_EARLIEST, 0) + if err != nil { + t.Fatalf("Failed to create subscription: %v", err) + } + + // Test single advance + initialOffset := sub.GetNextOffset() + sub.AdvanceOffset() + + if sub.GetNextOffset() != initialOffset+1 { + t.Errorf("Expected offset %d, got %d", initialOffset+1, sub.GetNextOffset()) + } + + // Test batch advance + sub.AdvanceOffsetBy(5) + + if sub.GetNextOffset() != initialOffset+6 { + t.Errorf("Expected offset %d, got %d", initialOffset+6, sub.GetNextOffset()) + } +} + +func TestOffsetSubscription_GetLag(t *testing.T) { + storage := NewInMemoryOffsetStorage() + registry := NewPartitionOffsetRegistry(storage) + subscriber := NewOffsetSubscriber(registry) + partition := createTestPartition() + + // Assign offsets + registry.AssignOffsets(partition, 15) + + // Create subscription at offset 5 + sub, err := subscriber.CreateSubscription("lag-test", partition, schema_pb.OffsetType_EXACT_OFFSET, 5) + if err != nil { + t.Fatalf("Failed to create subscription: %v", err) + } + + // Check initial lag + lag, err := sub.GetLag() + if err != nil { + t.Fatalf("Failed to get lag: %v", err) + } + + expectedLag := int64(15 - 5) // hwm - current + if lag != expectedLag { + t.Errorf("Expected lag %d, got %d", expectedLag, lag) + } + + // Advance and check lag again + sub.AdvanceOffsetBy(3) + + lag, err = sub.GetLag() + if err != nil { + t.Fatalf("Failed to get lag after advance: %v", err) + } + + expectedLag = int64(15 - 8) // hwm - current + if lag != expectedLag { + t.Errorf("Expected lag %d after advance, got %d", expectedLag, lag) + } +} + +func TestOffsetSubscription_IsAtEnd(t *testing.T) { + storage := NewInMemoryOffsetStorage() + registry := NewPartitionOffsetRegistry(storage) + subscriber := NewOffsetSubscriber(registry) + partition := createTestPartition() + + // Assign offsets + registry.AssignOffsets(partition, 10) + + // Create subscription at end + sub, err := subscriber.CreateSubscription("end-test", partition, schema_pb.OffsetType_RESET_TO_LATEST, 0) + if err != nil { + t.Fatalf("Failed to create subscription: %v", err) + } + + // Should be at end + atEnd, err := sub.IsAtEnd() + if err != nil { + t.Fatalf("Failed to check if at end: %v", err) + } + + if !atEnd { + t.Error("Expected subscription to be at end") + } + + // Seek to middle and check again + sub.SeekToOffset(5) + + atEnd, err = sub.IsAtEnd() + if err != nil { + t.Fatalf("Failed to check if at end after seek: %v", err) + } + + if atEnd { + t.Error("Expected subscription not to be at end after seek") + } +} + +func TestOffsetSubscription_GetOffsetRange(t *testing.T) { + storage := NewInMemoryOffsetStorage() + registry := NewPartitionOffsetRegistry(storage) + subscriber := NewOffsetSubscriber(registry) + partition := createTestPartition() + + // Assign offsets + registry.AssignOffsets(partition, 20) + + // Create subscription + sub, err := subscriber.CreateSubscription("range-test", partition, schema_pb.OffsetType_EXACT_OFFSET, 5) + if err != nil { + t.Fatalf("Failed to create subscription: %v", err) + } + + // Test normal range + offsetRange, err := sub.GetOffsetRange(10) + if err != nil { + t.Fatalf("Failed to get offset range: %v", err) + } + + if offsetRange.StartOffset != 5 { + t.Errorf("Expected start offset 5, got %d", offsetRange.StartOffset) + } + if offsetRange.EndOffset != 14 { + t.Errorf("Expected end offset 14, got %d", offsetRange.EndOffset) + } + if offsetRange.Count != 10 { + t.Errorf("Expected count 10, got %d", offsetRange.Count) + } + + // Test range that exceeds high water mark + sub.SeekToOffset(15) + offsetRange, err = sub.GetOffsetRange(10) + if err != nil { + t.Fatalf("Failed to get offset range near end: %v", err) + } + + if offsetRange.StartOffset != 15 { + t.Errorf("Expected start offset 15, got %d", offsetRange.StartOffset) + } + if offsetRange.EndOffset != 19 { // Should be capped at hwm-1 + t.Errorf("Expected end offset 19, got %d", offsetRange.EndOffset) + } + if offsetRange.Count != 5 { + t.Errorf("Expected count 5, got %d", offsetRange.Count) + } +} + +func TestOffsetSubscription_EmptyRange(t *testing.T) { + storage := NewInMemoryOffsetStorage() + registry := NewPartitionOffsetRegistry(storage) + subscriber := NewOffsetSubscriber(registry) + partition := createTestPartition() + + // Assign offsets + registry.AssignOffsets(partition, 10) + + // Create subscription at end + sub, err := subscriber.CreateSubscription("empty-range-test", partition, schema_pb.OffsetType_RESET_TO_LATEST, 0) + if err != nil { + t.Fatalf("Failed to create subscription: %v", err) + } + + // Request range when at end + offsetRange, err := sub.GetOffsetRange(5) + if err != nil { + t.Fatalf("Failed to get offset range at end: %v", err) + } + + if offsetRange.Count != 0 { + t.Errorf("Expected empty range (count 0), got count %d", offsetRange.Count) + } + + if offsetRange.StartOffset != 10 { + t.Errorf("Expected start offset 10, got %d", offsetRange.StartOffset) + } + + if offsetRange.EndOffset != 9 { // Empty range: end < start + t.Errorf("Expected end offset 9 (empty range), got %d", offsetRange.EndOffset) + } +} + +func TestOffsetSeeker_ValidateOffsetRange(t *testing.T) { + storage := NewInMemoryOffsetStorage() + registry := NewPartitionOffsetRegistry(storage) + seeker := NewOffsetSeeker(registry) + partition := createTestPartition() + + // Assign offsets + registry.AssignOffsets(partition, 15) + + // Test valid range + err := seeker.ValidateOffsetRange(partition, 5, 10) + if err != nil { + t.Errorf("Valid range should not return error: %v", err) + } + + // Test invalid ranges + testCases := []struct { + name string + startOffset int64 + endOffset int64 + expectError bool + }{ + {"negative start", -1, 5, true}, + {"end before start", 10, 5, true}, + {"start beyond hwm", 20, 25, true}, + {"valid range", 0, 14, false}, + {"single offset", 5, 5, false}, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + err := seeker.ValidateOffsetRange(partition, tc.startOffset, tc.endOffset) + if tc.expectError && err == nil { + t.Error("Expected error but got none") + } + if !tc.expectError && err != nil { + t.Errorf("Expected no error but got: %v", err) + } + }) + } +} + +func TestOffsetSeeker_GetAvailableOffsetRange(t *testing.T) { + storage := NewInMemoryOffsetStorage() + registry := NewPartitionOffsetRegistry(storage) + seeker := NewOffsetSeeker(registry) + partition := createTestPartition() + + // Test empty partition + offsetRange, err := seeker.GetAvailableOffsetRange(partition) + if err != nil { + t.Fatalf("Failed to get available range for empty partition: %v", err) + } + + if offsetRange.Count != 0 { + t.Errorf("Expected empty range for empty partition, got count %d", offsetRange.Count) + } + + // Assign offsets and test again + registry.AssignOffsets(partition, 25) + + offsetRange, err = seeker.GetAvailableOffsetRange(partition) + if err != nil { + t.Fatalf("Failed to get available range: %v", err) + } + + if offsetRange.StartOffset != 0 { + t.Errorf("Expected start offset 0, got %d", offsetRange.StartOffset) + } + if offsetRange.EndOffset != 24 { + t.Errorf("Expected end offset 24, got %d", offsetRange.EndOffset) + } + if offsetRange.Count != 25 { + t.Errorf("Expected count 25, got %d", offsetRange.Count) + } +} + +func TestOffsetSubscriber_CloseSubscription(t *testing.T) { + storage := NewInMemoryOffsetStorage() + registry := NewPartitionOffsetRegistry(storage) + subscriber := NewOffsetSubscriber(registry) + partition := createTestPartition() + + // Create subscription + sub, err := subscriber.CreateSubscription("close-test", partition, schema_pb.OffsetType_RESET_TO_EARLIEST, 0) + if err != nil { + t.Fatalf("Failed to create subscription: %v", err) + } + + // Verify subscription exists + _, err = subscriber.GetSubscription("close-test") + if err != nil { + t.Fatalf("Subscription should exist: %v", err) + } + + // Close subscription + err = subscriber.CloseSubscription("close-test") + if err != nil { + t.Fatalf("Failed to close subscription: %v", err) + } + + // Verify subscription is gone + _, err = subscriber.GetSubscription("close-test") + if err == nil { + t.Error("Subscription should not exist after close") + } + + // Verify subscription is marked inactive + if sub.IsActive { + t.Error("Subscription should be marked inactive after close") + } +} + +func TestOffsetSubscription_InactiveOperations(t *testing.T) { + storage := NewInMemoryOffsetStorage() + registry := NewPartitionOffsetRegistry(storage) + subscriber := NewOffsetSubscriber(registry) + partition := createTestPartition() + + // Create and close subscription + sub, err := subscriber.CreateSubscription("inactive-test", partition, schema_pb.OffsetType_RESET_TO_EARLIEST, 0) + if err != nil { + t.Fatalf("Failed to create subscription: %v", err) + } + + subscriber.CloseSubscription("inactive-test") + + // Test operations on inactive subscription + err = sub.SeekToOffset(5) + if err == nil { + t.Error("Expected error for seek on inactive subscription") + } + + _, err = sub.GetLag() + if err == nil { + t.Error("Expected error for GetLag on inactive subscription") + } + + _, err = sub.IsAtEnd() + if err == nil { + t.Error("Expected error for IsAtEnd on inactive subscription") + } + + _, err = sub.GetOffsetRange(10) + if err == nil { + t.Error("Expected error for GetOffsetRange on inactive subscription") + } +}