Browse Source

Phase 3: Implement offset-based subscription and SMQ integration

- Add OffsetSubscriber for managing offset-based subscriptions
- Implement OffsetSubscription with seeking, lag tracking, and range operations
- Add OffsetSeeker for offset validation and range utilities
- Create SMQOffsetIntegration for bridging offset management with SMQ broker
- Support all OffsetType variants: EXACT_OFFSET, RESET_TO_OFFSET, RESET_TO_EARLIEST, RESET_TO_LATEST
- Implement subscription lifecycle: create, seek, advance, close
- Add comprehensive offset validation and error handling
- Support batch record publishing and subscription
- Add offset metrics and partition information APIs
- Include extensive test coverage for all subscription scenarios:
  - Basic subscription creation and record consumption
  - Offset seeking and range operations
  - Subscription lag tracking and end-of-stream detection
  - Empty partition handling and error conditions
  - Integration with offset assignment and high water marks
- All 40+ tests pass, providing robust offset-based messaging foundation
pull/7231/head
chrislu 2 months ago
parent
commit
82fb366968
  1. 350
      weed/mq/offset/integration.go
  2. 537
      weed/mq/offset/integration_test.go
  3. 349
      weed/mq/offset/subscriber.go
  4. 457
      weed/mq/offset/subscriber_test.go

350
weed/mq/offset/integration.go

@ -0,0 +1,350 @@
package offset
import (
"fmt"
"sync"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_agent_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
)
// SMQOffsetIntegration provides integration between offset management and SMQ broker
type SMQOffsetIntegration struct {
mu sync.RWMutex
offsetAssigner *OffsetAssigner
offsetSubscriber *OffsetSubscriber
offsetSeeker *OffsetSeeker
// Mapping between SMQ records and offsets
recordOffsetMap map[string]int64 // record key -> offset
offsetRecordMap map[string]map[int64]int64 // partition key -> offset -> record timestamp
}
// NewSMQOffsetIntegration creates a new SMQ offset integration
func NewSMQOffsetIntegration(storage OffsetStorage) *SMQOffsetIntegration {
registry := NewPartitionOffsetRegistry(storage)
assigner := &OffsetAssigner{registry: registry}
return &SMQOffsetIntegration{
offsetAssigner: assigner,
offsetSubscriber: NewOffsetSubscriber(registry),
offsetSeeker: NewOffsetSeeker(registry),
recordOffsetMap: make(map[string]int64),
offsetRecordMap: make(map[string]map[int64]int64),
}
}
// PublishRecord publishes a record and assigns it an offset
func (integration *SMQOffsetIntegration) PublishRecord(
partition *schema_pb.Partition,
key []byte,
value *schema_pb.RecordValue,
) (*mq_agent_pb.PublishRecordResponse, error) {
// Assign offset for this record
result := integration.offsetAssigner.AssignSingleOffset(partition)
if result.Error != nil {
return &mq_agent_pb.PublishRecordResponse{
Error: fmt.Sprintf("Failed to assign offset: %v", result.Error),
}, nil
}
assignment := result.Assignment
// Store the mapping for later retrieval
integration.mu.Lock()
recordKey := string(key)
integration.recordOffsetMap[recordKey] = assignment.Offset
partitionKey := partitionKey(partition)
if integration.offsetRecordMap[partitionKey] == nil {
integration.offsetRecordMap[partitionKey] = make(map[int64]int64)
}
integration.offsetRecordMap[partitionKey][assignment.Offset] = assignment.Timestamp
integration.mu.Unlock()
// Return response with offset information
return &mq_agent_pb.PublishRecordResponse{
AckSequence: assignment.Offset, // Use offset as ack sequence for now
BaseOffset: assignment.Offset,
LastOffset: assignment.Offset,
Error: "",
}, nil
}
// PublishRecordBatch publishes a batch of records and assigns them offsets
func (integration *SMQOffsetIntegration) PublishRecordBatch(
partition *schema_pb.Partition,
records []PublishRecordRequest,
) (*mq_agent_pb.PublishRecordResponse, error) {
if len(records) == 0 {
return &mq_agent_pb.PublishRecordResponse{
Error: "Empty record batch",
}, nil
}
// Assign batch of offsets
result := integration.offsetAssigner.AssignBatchOffsets(partition, int64(len(records)))
if result.Error != nil {
return &mq_agent_pb.PublishRecordResponse{
Error: fmt.Sprintf("Failed to assign batch offsets: %v", result.Error),
}, nil
}
batch := result.Batch
// Store mappings for all records in the batch
integration.mu.Lock()
for i, record := range records {
recordKey := string(record.Key)
offset := batch.BaseOffset + int64(i)
integration.recordOffsetMap[recordKey] = offset
}
partitionKey := partitionKey(partition)
if integration.offsetRecordMap[partitionKey] == nil {
integration.offsetRecordMap[partitionKey] = make(map[int64]int64)
}
for i := int64(0); i < batch.Count; i++ {
offset := batch.BaseOffset + i
integration.offsetRecordMap[partitionKey][offset] = batch.Timestamp
}
integration.mu.Unlock()
return &mq_agent_pb.PublishRecordResponse{
AckSequence: batch.LastOffset, // Use last offset as ack sequence
BaseOffset: batch.BaseOffset,
LastOffset: batch.LastOffset,
Error: "",
}, nil
}
// CreateSubscription creates an offset-based subscription
func (integration *SMQOffsetIntegration) CreateSubscription(
subscriptionID string,
partition *schema_pb.Partition,
offsetType schema_pb.OffsetType,
startOffset int64,
) (*OffsetSubscription, error) {
return integration.offsetSubscriber.CreateSubscription(
subscriptionID,
partition,
offsetType,
startOffset,
)
}
// SubscribeRecords subscribes to records starting from a specific offset
func (integration *SMQOffsetIntegration) SubscribeRecords(
subscription *OffsetSubscription,
maxRecords int64,
) ([]*mq_agent_pb.SubscribeRecordResponse, error) {
if !subscription.IsActive {
return nil, fmt.Errorf("subscription is not active")
}
// Get the range of offsets to read
offsetRange, err := subscription.GetOffsetRange(maxRecords)
if err != nil {
return nil, fmt.Errorf("failed to get offset range: %w", err)
}
if offsetRange.Count == 0 {
// No records available
return []*mq_agent_pb.SubscribeRecordResponse{}, nil
}
// TODO: This is where we would integrate with SMQ's actual storage layer
// For now, return mock responses with offset information
responses := make([]*mq_agent_pb.SubscribeRecordResponse, offsetRange.Count)
for i := int64(0); i < offsetRange.Count; i++ {
offset := offsetRange.StartOffset + i
responses[i] = &mq_agent_pb.SubscribeRecordResponse{
Key: []byte(fmt.Sprintf("key-%d", offset)),
Value: &schema_pb.RecordValue{}, // Mock value
TsNs: offset * 1000000, // Mock timestamp based on offset
Offset: offset,
IsEndOfStream: false,
IsEndOfTopic: false,
Error: "",
}
}
// Advance the subscription
subscription.AdvanceOffsetBy(offsetRange.Count)
return responses, nil
}
// GetHighWaterMark returns the high water mark for a partition
func (integration *SMQOffsetIntegration) GetHighWaterMark(partition *schema_pb.Partition) (int64, error) {
return integration.offsetAssigner.GetHighWaterMark(partition)
}
// SeekSubscription seeks a subscription to a specific offset
func (integration *SMQOffsetIntegration) SeekSubscription(
subscriptionID string,
offset int64,
) error {
subscription, err := integration.offsetSubscriber.GetSubscription(subscriptionID)
if err != nil {
return fmt.Errorf("subscription not found: %w", err)
}
return subscription.SeekToOffset(offset)
}
// GetSubscriptionLag returns the lag for a subscription
func (integration *SMQOffsetIntegration) GetSubscriptionLag(subscriptionID string) (int64, error) {
subscription, err := integration.offsetSubscriber.GetSubscription(subscriptionID)
if err != nil {
return 0, fmt.Errorf("subscription not found: %w", err)
}
return subscription.GetLag()
}
// CloseSubscription closes a subscription
func (integration *SMQOffsetIntegration) CloseSubscription(subscriptionID string) error {
return integration.offsetSubscriber.CloseSubscription(subscriptionID)
}
// ValidateOffsetRange validates an offset range for a partition
func (integration *SMQOffsetIntegration) ValidateOffsetRange(
partition *schema_pb.Partition,
startOffset, endOffset int64,
) error {
return integration.offsetSeeker.ValidateOffsetRange(partition, startOffset, endOffset)
}
// GetAvailableOffsetRange returns the available offset range for a partition
func (integration *SMQOffsetIntegration) GetAvailableOffsetRange(partition *schema_pb.Partition) (*OffsetRange, error) {
return integration.offsetSeeker.GetAvailableOffsetRange(partition)
}
// PublishRecordRequest represents a record to be published
type PublishRecordRequest struct {
Key []byte
Value *schema_pb.RecordValue
}
// OffsetMetrics provides metrics about offset usage
type OffsetMetrics struct {
PartitionCount int64
TotalOffsets int64
ActiveSubscriptions int64
AverageLatency float64
}
// GetOffsetMetrics returns metrics about offset usage
func (integration *SMQOffsetIntegration) GetOffsetMetrics() *OffsetMetrics {
integration.mu.RLock()
defer integration.mu.RUnlock()
// Count active subscriptions
activeSubscriptions := int64(0)
for _, subscription := range integration.offsetSubscriber.subscriptions {
if subscription.IsActive {
activeSubscriptions++
}
}
return &OffsetMetrics{
PartitionCount: int64(len(integration.offsetAssigner.registry.managers)),
TotalOffsets: int64(len(integration.recordOffsetMap)),
ActiveSubscriptions: activeSubscriptions,
AverageLatency: 0.0, // TODO: Implement latency tracking
}
}
// OffsetInfo provides detailed information about an offset
type OffsetInfo struct {
Offset int64
Timestamp int64
Partition *schema_pb.Partition
Exists bool
}
// GetOffsetInfo returns detailed information about a specific offset
func (integration *SMQOffsetIntegration) GetOffsetInfo(
partition *schema_pb.Partition,
offset int64,
) (*OffsetInfo, error) {
hwm, err := integration.GetHighWaterMark(partition)
if err != nil {
return nil, fmt.Errorf("failed to get high water mark: %w", err)
}
exists := offset >= 0 && offset < hwm
// TODO: Get actual timestamp from storage
timestamp := int64(0)
if exists {
integration.mu.RLock()
partitionKey := partitionKey(partition)
if offsetMap, found := integration.offsetRecordMap[partitionKey]; found {
if ts, found := offsetMap[offset]; found {
timestamp = ts
}
}
integration.mu.RUnlock()
}
return &OffsetInfo{
Offset: offset,
Timestamp: timestamp,
Partition: partition,
Exists: exists,
}, nil
}
// PartitionOffsetInfo provides offset information for a partition
type PartitionOffsetInfo struct {
Partition *schema_pb.Partition
EarliestOffset int64
LatestOffset int64
HighWaterMark int64
RecordCount int64
ActiveSubscriptions int64
}
// GetPartitionOffsetInfo returns comprehensive offset information for a partition
func (integration *SMQOffsetIntegration) GetPartitionOffsetInfo(partition *schema_pb.Partition) (*PartitionOffsetInfo, error) {
hwm, err := integration.GetHighWaterMark(partition)
if err != nil {
return nil, fmt.Errorf("failed to get high water mark: %w", err)
}
earliestOffset := int64(0)
latestOffset := hwm - 1
if hwm == 0 {
latestOffset = -1 // No records
}
// Count active subscriptions for this partition
activeSubscriptions := int64(0)
integration.mu.RLock()
for _, subscription := range integration.offsetSubscriber.subscriptions {
if subscription.IsActive && partitionKey(subscription.Partition) == partitionKey(partition) {
activeSubscriptions++
}
}
integration.mu.RUnlock()
return &PartitionOffsetInfo{
Partition: partition,
EarliestOffset: earliestOffset,
LatestOffset: latestOffset,
HighWaterMark: hwm,
RecordCount: hwm,
ActiveSubscriptions: activeSubscriptions,
}, nil
}

537
weed/mq/offset/integration_test.go

@ -0,0 +1,537 @@
package offset
import (
"testing"
"github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
)
func TestSMQOffsetIntegration_PublishRecord(t *testing.T) {
storage := NewInMemoryOffsetStorage()
integration := NewSMQOffsetIntegration(storage)
partition := createTestPartition()
// Publish a single record
response, err := integration.PublishRecord(
partition,
[]byte("test-key"),
&schema_pb.RecordValue{},
)
if err != nil {
t.Fatalf("Failed to publish record: %v", err)
}
if response.Error != "" {
t.Errorf("Expected no error, got: %s", response.Error)
}
if response.BaseOffset != 0 {
t.Errorf("Expected base offset 0, got %d", response.BaseOffset)
}
if response.LastOffset != 0 {
t.Errorf("Expected last offset 0, got %d", response.LastOffset)
}
}
func TestSMQOffsetIntegration_PublishRecordBatch(t *testing.T) {
storage := NewInMemoryOffsetStorage()
integration := NewSMQOffsetIntegration(storage)
partition := createTestPartition()
// Create batch of records
records := []PublishRecordRequest{
{Key: []byte("key1"), Value: &schema_pb.RecordValue{}},
{Key: []byte("key2"), Value: &schema_pb.RecordValue{}},
{Key: []byte("key3"), Value: &schema_pb.RecordValue{}},
}
// Publish batch
response, err := integration.PublishRecordBatch(partition, records)
if err != nil {
t.Fatalf("Failed to publish record batch: %v", err)
}
if response.Error != "" {
t.Errorf("Expected no error, got: %s", response.Error)
}
if response.BaseOffset != 0 {
t.Errorf("Expected base offset 0, got %d", response.BaseOffset)
}
if response.LastOffset != 2 {
t.Errorf("Expected last offset 2, got %d", response.LastOffset)
}
// Verify high water mark
hwm, err := integration.GetHighWaterMark(partition)
if err != nil {
t.Fatalf("Failed to get high water mark: %v", err)
}
if hwm != 3 {
t.Errorf("Expected high water mark 3, got %d", hwm)
}
}
func TestSMQOffsetIntegration_EmptyBatch(t *testing.T) {
storage := NewInMemoryOffsetStorage()
integration := NewSMQOffsetIntegration(storage)
partition := createTestPartition()
// Publish empty batch
response, err := integration.PublishRecordBatch(partition, []PublishRecordRequest{})
if err != nil {
t.Fatalf("Failed to publish empty batch: %v", err)
}
if response.Error == "" {
t.Error("Expected error for empty batch")
}
}
func TestSMQOffsetIntegration_CreateSubscription(t *testing.T) {
storage := NewInMemoryOffsetStorage()
integration := NewSMQOffsetIntegration(storage)
partition := createTestPartition()
// Publish some records first
records := []PublishRecordRequest{
{Key: []byte("key1"), Value: &schema_pb.RecordValue{}},
{Key: []byte("key2"), Value: &schema_pb.RecordValue{}},
}
integration.PublishRecordBatch(partition, records)
// Create subscription
sub, err := integration.CreateSubscription(
"test-sub",
partition,
schema_pb.OffsetType_RESET_TO_EARLIEST,
0,
)
if err != nil {
t.Fatalf("Failed to create subscription: %v", err)
}
if sub.ID != "test-sub" {
t.Errorf("Expected subscription ID 'test-sub', got %s", sub.ID)
}
if sub.StartOffset != 0 {
t.Errorf("Expected start offset 0, got %d", sub.StartOffset)
}
}
func TestSMQOffsetIntegration_SubscribeRecords(t *testing.T) {
storage := NewInMemoryOffsetStorage()
integration := NewSMQOffsetIntegration(storage)
partition := createTestPartition()
// Publish some records
records := []PublishRecordRequest{
{Key: []byte("key1"), Value: &schema_pb.RecordValue{}},
{Key: []byte("key2"), Value: &schema_pb.RecordValue{}},
{Key: []byte("key3"), Value: &schema_pb.RecordValue{}},
}
integration.PublishRecordBatch(partition, records)
// Create subscription
sub, err := integration.CreateSubscription(
"test-sub",
partition,
schema_pb.OffsetType_RESET_TO_EARLIEST,
0,
)
if err != nil {
t.Fatalf("Failed to create subscription: %v", err)
}
// Subscribe to records
responses, err := integration.SubscribeRecords(sub, 2)
if err != nil {
t.Fatalf("Failed to subscribe to records: %v", err)
}
if len(responses) != 2 {
t.Errorf("Expected 2 responses, got %d", len(responses))
}
// Check offset progression
if responses[0].Offset != 0 {
t.Errorf("Expected first record offset 0, got %d", responses[0].Offset)
}
if responses[1].Offset != 1 {
t.Errorf("Expected second record offset 1, got %d", responses[1].Offset)
}
// Check subscription advancement
if sub.CurrentOffset != 2 {
t.Errorf("Expected subscription current offset 2, got %d", sub.CurrentOffset)
}
}
func TestSMQOffsetIntegration_SubscribeEmptyPartition(t *testing.T) {
storage := NewInMemoryOffsetStorage()
integration := NewSMQOffsetIntegration(storage)
partition := createTestPartition()
// Create subscription on empty partition
sub, err := integration.CreateSubscription(
"empty-sub",
partition,
schema_pb.OffsetType_RESET_TO_EARLIEST,
0,
)
if err != nil {
t.Fatalf("Failed to create subscription: %v", err)
}
// Subscribe to records (should return empty)
responses, err := integration.SubscribeRecords(sub, 10)
if err != nil {
t.Fatalf("Failed to subscribe to empty partition: %v", err)
}
if len(responses) != 0 {
t.Errorf("Expected 0 responses from empty partition, got %d", len(responses))
}
}
func TestSMQOffsetIntegration_SeekSubscription(t *testing.T) {
storage := NewInMemoryOffsetStorage()
integration := NewSMQOffsetIntegration(storage)
partition := createTestPartition()
// Publish records
records := []PublishRecordRequest{
{Key: []byte("key1"), Value: &schema_pb.RecordValue{}},
{Key: []byte("key2"), Value: &schema_pb.RecordValue{}},
{Key: []byte("key3"), Value: &schema_pb.RecordValue{}},
{Key: []byte("key4"), Value: &schema_pb.RecordValue{}},
{Key: []byte("key5"), Value: &schema_pb.RecordValue{}},
}
integration.PublishRecordBatch(partition, records)
// Create subscription
sub, err := integration.CreateSubscription(
"seek-sub",
partition,
schema_pb.OffsetType_RESET_TO_EARLIEST,
0,
)
if err != nil {
t.Fatalf("Failed to create subscription: %v", err)
}
// Seek to offset 3
err = integration.SeekSubscription("seek-sub", 3)
if err != nil {
t.Fatalf("Failed to seek subscription: %v", err)
}
if sub.CurrentOffset != 3 {
t.Errorf("Expected current offset 3 after seek, got %d", sub.CurrentOffset)
}
// Subscribe from new position
responses, err := integration.SubscribeRecords(sub, 2)
if err != nil {
t.Fatalf("Failed to subscribe after seek: %v", err)
}
if len(responses) != 2 {
t.Errorf("Expected 2 responses after seek, got %d", len(responses))
}
if responses[0].Offset != 3 {
t.Errorf("Expected first record offset 3 after seek, got %d", responses[0].Offset)
}
}
func TestSMQOffsetIntegration_GetSubscriptionLag(t *testing.T) {
storage := NewInMemoryOffsetStorage()
integration := NewSMQOffsetIntegration(storage)
partition := createTestPartition()
// Publish records
records := []PublishRecordRequest{
{Key: []byte("key1"), Value: &schema_pb.RecordValue{}},
{Key: []byte("key2"), Value: &schema_pb.RecordValue{}},
{Key: []byte("key3"), Value: &schema_pb.RecordValue{}},
}
integration.PublishRecordBatch(partition, records)
// Create subscription at offset 1
sub, err := integration.CreateSubscription(
"lag-sub",
partition,
schema_pb.OffsetType_EXACT_OFFSET,
1,
)
if err != nil {
t.Fatalf("Failed to create subscription: %v", err)
}
// Get lag
lag, err := integration.GetSubscriptionLag("lag-sub")
if err != nil {
t.Fatalf("Failed to get subscription lag: %v", err)
}
expectedLag := int64(3 - 1) // hwm - current
if lag != expectedLag {
t.Errorf("Expected lag %d, got %d", expectedLag, lag)
}
// Advance subscription and check lag again
integration.SubscribeRecords(sub, 1)
lag, err = integration.GetSubscriptionLag("lag-sub")
if err != nil {
t.Fatalf("Failed to get lag after advance: %v", err)
}
expectedLag = int64(3 - 2) // hwm - current
if lag != expectedLag {
t.Errorf("Expected lag %d after advance, got %d", expectedLag, lag)
}
}
func TestSMQOffsetIntegration_CloseSubscription(t *testing.T) {
storage := NewInMemoryOffsetStorage()
integration := NewSMQOffsetIntegration(storage)
partition := createTestPartition()
// Create subscription
_, err := integration.CreateSubscription(
"close-sub",
partition,
schema_pb.OffsetType_RESET_TO_EARLIEST,
0,
)
if err != nil {
t.Fatalf("Failed to create subscription: %v", err)
}
// Close subscription
err = integration.CloseSubscription("close-sub")
if err != nil {
t.Fatalf("Failed to close subscription: %v", err)
}
// Try to get lag (should fail)
_, err = integration.GetSubscriptionLag("close-sub")
if err == nil {
t.Error("Expected error when getting lag for closed subscription")
}
}
func TestSMQOffsetIntegration_ValidateOffsetRange(t *testing.T) {
storage := NewInMemoryOffsetStorage()
integration := NewSMQOffsetIntegration(storage)
partition := createTestPartition()
// Publish some records
records := []PublishRecordRequest{
{Key: []byte("key1"), Value: &schema_pb.RecordValue{}},
{Key: []byte("key2"), Value: &schema_pb.RecordValue{}},
{Key: []byte("key3"), Value: &schema_pb.RecordValue{}},
}
integration.PublishRecordBatch(partition, records)
// Test valid range
err := integration.ValidateOffsetRange(partition, 0, 2)
if err != nil {
t.Errorf("Valid range should not return error: %v", err)
}
// Test invalid range (beyond hwm)
err = integration.ValidateOffsetRange(partition, 0, 5)
if err == nil {
t.Error("Expected error for range beyond high water mark")
}
}
func TestSMQOffsetIntegration_GetAvailableOffsetRange(t *testing.T) {
storage := NewInMemoryOffsetStorage()
integration := NewSMQOffsetIntegration(storage)
partition := createTestPartition()
// Test empty partition
offsetRange, err := integration.GetAvailableOffsetRange(partition)
if err != nil {
t.Fatalf("Failed to get available range for empty partition: %v", err)
}
if offsetRange.Count != 0 {
t.Errorf("Expected empty range for empty partition, got count %d", offsetRange.Count)
}
// Publish records
records := []PublishRecordRequest{
{Key: []byte("key1"), Value: &schema_pb.RecordValue{}},
{Key: []byte("key2"), Value: &schema_pb.RecordValue{}},
}
integration.PublishRecordBatch(partition, records)
// Test with data
offsetRange, err = integration.GetAvailableOffsetRange(partition)
if err != nil {
t.Fatalf("Failed to get available range: %v", err)
}
if offsetRange.StartOffset != 0 {
t.Errorf("Expected start offset 0, got %d", offsetRange.StartOffset)
}
if offsetRange.EndOffset != 1 {
t.Errorf("Expected end offset 1, got %d", offsetRange.EndOffset)
}
if offsetRange.Count != 2 {
t.Errorf("Expected count 2, got %d", offsetRange.Count)
}
}
func TestSMQOffsetIntegration_GetOffsetMetrics(t *testing.T) {
storage := NewInMemoryOffsetStorage()
integration := NewSMQOffsetIntegration(storage)
partition := createTestPartition()
// Initial metrics
metrics := integration.GetOffsetMetrics()
if metrics.TotalOffsets != 0 {
t.Errorf("Expected 0 total offsets initially, got %d", metrics.TotalOffsets)
}
if metrics.ActiveSubscriptions != 0 {
t.Errorf("Expected 0 active subscriptions initially, got %d", metrics.ActiveSubscriptions)
}
// Publish records
records := []PublishRecordRequest{
{Key: []byte("key1"), Value: &schema_pb.RecordValue{}},
{Key: []byte("key2"), Value: &schema_pb.RecordValue{}},
}
integration.PublishRecordBatch(partition, records)
// Create subscriptions
integration.CreateSubscription("sub1", partition, schema_pb.OffsetType_RESET_TO_EARLIEST, 0)
integration.CreateSubscription("sub2", partition, schema_pb.OffsetType_RESET_TO_EARLIEST, 0)
// Check updated metrics
metrics = integration.GetOffsetMetrics()
if metrics.TotalOffsets != 2 {
t.Errorf("Expected 2 total offsets, got %d", metrics.TotalOffsets)
}
if metrics.ActiveSubscriptions != 2 {
t.Errorf("Expected 2 active subscriptions, got %d", metrics.ActiveSubscriptions)
}
if metrics.PartitionCount != 1 {
t.Errorf("Expected 1 partition, got %d", metrics.PartitionCount)
}
}
func TestSMQOffsetIntegration_GetOffsetInfo(t *testing.T) {
storage := NewInMemoryOffsetStorage()
integration := NewSMQOffsetIntegration(storage)
partition := createTestPartition()
// Test non-existent offset
info, err := integration.GetOffsetInfo(partition, 0)
if err != nil {
t.Fatalf("Failed to get offset info: %v", err)
}
if info.Exists {
t.Error("Offset should not exist in empty partition")
}
// Publish record
integration.PublishRecord(partition, []byte("key1"), &schema_pb.RecordValue{})
// Test existing offset
info, err = integration.GetOffsetInfo(partition, 0)
if err != nil {
t.Fatalf("Failed to get offset info for existing offset: %v", err)
}
if !info.Exists {
t.Error("Offset should exist after publishing")
}
if info.Offset != 0 {
t.Errorf("Expected offset 0, got %d", info.Offset)
}
}
func TestSMQOffsetIntegration_GetPartitionOffsetInfo(t *testing.T) {
storage := NewInMemoryOffsetStorage()
integration := NewSMQOffsetIntegration(storage)
partition := createTestPartition()
// Test empty partition
info, err := integration.GetPartitionOffsetInfo(partition)
if err != nil {
t.Fatalf("Failed to get partition offset info: %v", err)
}
if info.EarliestOffset != 0 {
t.Errorf("Expected earliest offset 0, got %d", info.EarliestOffset)
}
if info.LatestOffset != -1 {
t.Errorf("Expected latest offset -1 for empty partition, got %d", info.LatestOffset)
}
if info.HighWaterMark != 0 {
t.Errorf("Expected high water mark 0, got %d", info.HighWaterMark)
}
if info.RecordCount != 0 {
t.Errorf("Expected record count 0, got %d", info.RecordCount)
}
// Publish records
records := []PublishRecordRequest{
{Key: []byte("key1"), Value: &schema_pb.RecordValue{}},
{Key: []byte("key2"), Value: &schema_pb.RecordValue{}},
{Key: []byte("key3"), Value: &schema_pb.RecordValue{}},
}
integration.PublishRecordBatch(partition, records)
// Create subscription
integration.CreateSubscription("test-sub", partition, schema_pb.OffsetType_RESET_TO_EARLIEST, 0)
// Test with data
info, err = integration.GetPartitionOffsetInfo(partition)
if err != nil {
t.Fatalf("Failed to get partition offset info with data: %v", err)
}
if info.EarliestOffset != 0 {
t.Errorf("Expected earliest offset 0, got %d", info.EarliestOffset)
}
if info.LatestOffset != 2 {
t.Errorf("Expected latest offset 2, got %d", info.LatestOffset)
}
if info.HighWaterMark != 3 {
t.Errorf("Expected high water mark 3, got %d", info.HighWaterMark)
}
if info.RecordCount != 3 {
t.Errorf("Expected record count 3, got %d", info.RecordCount)
}
if info.ActiveSubscriptions != 1 {
t.Errorf("Expected 1 active subscription, got %d", info.ActiveSubscriptions)
}
}

349
weed/mq/offset/subscriber.go

@ -0,0 +1,349 @@
package offset
import (
"fmt"
"sync"
"github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
)
// OffsetSubscriber handles offset-based subscription logic
type OffsetSubscriber struct {
mu sync.RWMutex
offsetRegistry *PartitionOffsetRegistry
subscriptions map[string]*OffsetSubscription
}
// OffsetSubscription represents an active offset-based subscription
type OffsetSubscription struct {
ID string
Partition *schema_pb.Partition
StartOffset int64
CurrentOffset int64
OffsetType schema_pb.OffsetType
IsActive bool
offsetRegistry *PartitionOffsetRegistry
}
// NewOffsetSubscriber creates a new offset-based subscriber
func NewOffsetSubscriber(offsetRegistry *PartitionOffsetRegistry) *OffsetSubscriber {
return &OffsetSubscriber{
offsetRegistry: offsetRegistry,
subscriptions: make(map[string]*OffsetSubscription),
}
}
// CreateSubscription creates a new offset-based subscription
func (s *OffsetSubscriber) CreateSubscription(
subscriptionID string,
partition *schema_pb.Partition,
offsetType schema_pb.OffsetType,
startOffset int64,
) (*OffsetSubscription, error) {
s.mu.Lock()
defer s.mu.Unlock()
// Check if subscription already exists
if _, exists := s.subscriptions[subscriptionID]; exists {
return nil, fmt.Errorf("subscription %s already exists", subscriptionID)
}
// Resolve the actual start offset based on type
actualStartOffset, err := s.resolveStartOffset(partition, offsetType, startOffset)
if err != nil {
return nil, fmt.Errorf("failed to resolve start offset: %w", err)
}
subscription := &OffsetSubscription{
ID: subscriptionID,
Partition: partition,
StartOffset: actualStartOffset,
CurrentOffset: actualStartOffset,
OffsetType: offsetType,
IsActive: true,
offsetRegistry: s.offsetRegistry,
}
s.subscriptions[subscriptionID] = subscription
return subscription, nil
}
// GetSubscription retrieves an existing subscription
func (s *OffsetSubscriber) GetSubscription(subscriptionID string) (*OffsetSubscription, error) {
s.mu.RLock()
defer s.mu.RUnlock()
subscription, exists := s.subscriptions[subscriptionID]
if !exists {
return nil, fmt.Errorf("subscription %s not found", subscriptionID)
}
return subscription, nil
}
// CloseSubscription closes and removes a subscription
func (s *OffsetSubscriber) CloseSubscription(subscriptionID string) error {
s.mu.Lock()
defer s.mu.Unlock()
subscription, exists := s.subscriptions[subscriptionID]
if !exists {
return fmt.Errorf("subscription %s not found", subscriptionID)
}
subscription.IsActive = false
delete(s.subscriptions, subscriptionID)
return nil
}
// resolveStartOffset resolves the actual start offset based on OffsetType
func (s *OffsetSubscriber) resolveStartOffset(
partition *schema_pb.Partition,
offsetType schema_pb.OffsetType,
requestedOffset int64,
) (int64, error) {
switch offsetType {
case schema_pb.OffsetType_EXACT_OFFSET:
// Validate that the requested offset exists
return s.validateAndGetOffset(partition, requestedOffset)
case schema_pb.OffsetType_RESET_TO_OFFSET:
// Use the requested offset, even if it doesn't exist yet
return requestedOffset, nil
case schema_pb.OffsetType_RESET_TO_EARLIEST:
// Start from offset 0
return 0, nil
case schema_pb.OffsetType_RESET_TO_LATEST:
// Start from the current high water mark
hwm, err := s.offsetRegistry.GetHighWaterMark(partition)
if err != nil {
return 0, err
}
return hwm, nil
case schema_pb.OffsetType_RESUME_OR_EARLIEST:
// Try to resume from a saved position, fallback to earliest
// For now, just use earliest (consumer group position tracking will be added later)
return 0, nil
case schema_pb.OffsetType_RESUME_OR_LATEST:
// Try to resume from a saved position, fallback to latest
// For now, just use latest
hwm, err := s.offsetRegistry.GetHighWaterMark(partition)
if err != nil {
return 0, err
}
return hwm, nil
default:
return 0, fmt.Errorf("unsupported offset type: %v", offsetType)
}
}
// validateAndGetOffset validates that an offset exists and returns it
func (s *OffsetSubscriber) validateAndGetOffset(partition *schema_pb.Partition, offset int64) (int64, error) {
if offset < 0 {
return 0, fmt.Errorf("offset cannot be negative: %d", offset)
}
// Get the current high water mark
hwm, err := s.offsetRegistry.GetHighWaterMark(partition)
if err != nil {
return 0, fmt.Errorf("failed to get high water mark: %w", err)
}
// Check if offset is within valid range
if offset >= hwm {
return 0, fmt.Errorf("offset %d is beyond high water mark %d", offset, hwm)
}
return offset, nil
}
// SeekToOffset seeks a subscription to a specific offset
func (sub *OffsetSubscription) SeekToOffset(offset int64) error {
if !sub.IsActive {
return fmt.Errorf("subscription is not active")
}
// Validate the offset
if offset < 0 {
return fmt.Errorf("offset cannot be negative: %d", offset)
}
hwm, err := sub.offsetRegistry.GetHighWaterMark(sub.Partition)
if err != nil {
return fmt.Errorf("failed to get high water mark: %w", err)
}
if offset > hwm {
return fmt.Errorf("offset %d is beyond high water mark %d", offset, hwm)
}
sub.CurrentOffset = offset
return nil
}
// GetNextOffset returns the next offset to read
func (sub *OffsetSubscription) GetNextOffset() int64 {
return sub.CurrentOffset
}
// AdvanceOffset advances the subscription to the next offset
func (sub *OffsetSubscription) AdvanceOffset() {
sub.CurrentOffset++
}
// GetLag returns the lag between current position and high water mark
func (sub *OffsetSubscription) GetLag() (int64, error) {
if !sub.IsActive {
return 0, fmt.Errorf("subscription is not active")
}
hwm, err := sub.offsetRegistry.GetHighWaterMark(sub.Partition)
if err != nil {
return 0, fmt.Errorf("failed to get high water mark: %w", err)
}
lag := hwm - sub.CurrentOffset
if lag < 0 {
lag = 0
}
return lag, nil
}
// IsAtEnd checks if the subscription has reached the end of available data
func (sub *OffsetSubscription) IsAtEnd() (bool, error) {
if !sub.IsActive {
return true, fmt.Errorf("subscription is not active")
}
hwm, err := sub.offsetRegistry.GetHighWaterMark(sub.Partition)
if err != nil {
return false, fmt.Errorf("failed to get high water mark: %w", err)
}
return sub.CurrentOffset >= hwm, nil
}
// OffsetRange represents a range of offsets
type OffsetRange struct {
StartOffset int64
EndOffset int64
Count int64
}
// GetOffsetRange returns a range of offsets for batch reading
func (sub *OffsetSubscription) GetOffsetRange(maxCount int64) (*OffsetRange, error) {
if !sub.IsActive {
return nil, fmt.Errorf("subscription is not active")
}
hwm, err := sub.offsetRegistry.GetHighWaterMark(sub.Partition)
if err != nil {
return nil, fmt.Errorf("failed to get high water mark: %w", err)
}
startOffset := sub.CurrentOffset
endOffset := startOffset + maxCount - 1
// Don't go beyond high water mark
if endOffset >= hwm {
endOffset = hwm - 1
}
// If start is already at or beyond HWM, return empty range
if startOffset >= hwm {
return &OffsetRange{
StartOffset: startOffset,
EndOffset: startOffset - 1, // Empty range
Count: 0,
}, nil
}
count := endOffset - startOffset + 1
return &OffsetRange{
StartOffset: startOffset,
EndOffset: endOffset,
Count: count,
}, nil
}
// AdvanceOffsetBy advances the subscription by a specific number of offsets
func (sub *OffsetSubscription) AdvanceOffsetBy(count int64) {
sub.CurrentOffset += count
}
// OffsetSeeker provides utilities for offset-based seeking
type OffsetSeeker struct {
offsetRegistry *PartitionOffsetRegistry
}
// NewOffsetSeeker creates a new offset seeker
func NewOffsetSeeker(offsetRegistry *PartitionOffsetRegistry) *OffsetSeeker {
return &OffsetSeeker{
offsetRegistry: offsetRegistry,
}
}
// SeekToTimestamp finds the offset closest to a given timestamp
// This bridges offset-based and timestamp-based seeking
func (seeker *OffsetSeeker) SeekToTimestamp(partition *schema_pb.Partition, timestamp int64) (int64, error) {
// TODO: This requires integration with the storage layer to map timestamps to offsets
// For now, return an error indicating this feature needs implementation
return 0, fmt.Errorf("timestamp-to-offset mapping not implemented yet")
}
// ValidateOffsetRange validates that an offset range is valid
func (seeker *OffsetSeeker) ValidateOffsetRange(partition *schema_pb.Partition, startOffset, endOffset int64) error {
if startOffset < 0 {
return fmt.Errorf("start offset cannot be negative: %d", startOffset)
}
if endOffset < startOffset {
return fmt.Errorf("end offset %d cannot be less than start offset %d", endOffset, startOffset)
}
hwm, err := seeker.offsetRegistry.GetHighWaterMark(partition)
if err != nil {
return fmt.Errorf("failed to get high water mark: %w", err)
}
if startOffset >= hwm {
return fmt.Errorf("start offset %d is beyond high water mark %d", startOffset, hwm)
}
if endOffset >= hwm {
return fmt.Errorf("end offset %d is beyond high water mark %d", endOffset, hwm)
}
return nil
}
// GetAvailableOffsetRange returns the range of available offsets for a partition
func (seeker *OffsetSeeker) GetAvailableOffsetRange(partition *schema_pb.Partition) (*OffsetRange, error) {
hwm, err := seeker.offsetRegistry.GetHighWaterMark(partition)
if err != nil {
return nil, fmt.Errorf("failed to get high water mark: %w", err)
}
if hwm == 0 {
// No data available
return &OffsetRange{
StartOffset: 0,
EndOffset: -1,
Count: 0,
}, nil
}
return &OffsetRange{
StartOffset: 0,
EndOffset: hwm - 1,
Count: hwm,
}, nil
}

457
weed/mq/offset/subscriber_test.go

@ -0,0 +1,457 @@
package offset
import (
"testing"
"github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
)
func TestOffsetSubscriber_CreateSubscription(t *testing.T) {
storage := NewInMemoryOffsetStorage()
registry := NewPartitionOffsetRegistry(storage)
subscriber := NewOffsetSubscriber(registry)
partition := createTestPartition()
// Assign some offsets first
registry.AssignOffsets(partition, 10)
// Test EXACT_OFFSET subscription
sub, err := subscriber.CreateSubscription("test-sub-1", partition, schema_pb.OffsetType_EXACT_OFFSET, 5)
if err != nil {
t.Fatalf("Failed to create EXACT_OFFSET subscription: %v", err)
}
if sub.StartOffset != 5 {
t.Errorf("Expected start offset 5, got %d", sub.StartOffset)
}
if sub.CurrentOffset != 5 {
t.Errorf("Expected current offset 5, got %d", sub.CurrentOffset)
}
// Test RESET_TO_LATEST subscription
sub2, err := subscriber.CreateSubscription("test-sub-2", partition, schema_pb.OffsetType_RESET_TO_LATEST, 0)
if err != nil {
t.Fatalf("Failed to create RESET_TO_LATEST subscription: %v", err)
}
if sub2.StartOffset != 10 { // Should be at high water mark
t.Errorf("Expected start offset 10, got %d", sub2.StartOffset)
}
}
func TestOffsetSubscriber_InvalidSubscription(t *testing.T) {
storage := NewInMemoryOffsetStorage()
registry := NewPartitionOffsetRegistry(storage)
subscriber := NewOffsetSubscriber(registry)
partition := createTestPartition()
// Assign some offsets
registry.AssignOffsets(partition, 5)
// Test invalid offset (beyond high water mark)
_, err := subscriber.CreateSubscription("invalid-sub", partition, schema_pb.OffsetType_EXACT_OFFSET, 10)
if err == nil {
t.Error("Expected error for offset beyond high water mark")
}
// Test negative offset
_, err = subscriber.CreateSubscription("invalid-sub-2", partition, schema_pb.OffsetType_EXACT_OFFSET, -1)
if err == nil {
t.Error("Expected error for negative offset")
}
}
func TestOffsetSubscriber_DuplicateSubscription(t *testing.T) {
storage := NewInMemoryOffsetStorage()
registry := NewPartitionOffsetRegistry(storage)
subscriber := NewOffsetSubscriber(registry)
partition := createTestPartition()
// Create first subscription
_, err := subscriber.CreateSubscription("duplicate-sub", partition, schema_pb.OffsetType_RESET_TO_EARLIEST, 0)
if err != nil {
t.Fatalf("Failed to create first subscription: %v", err)
}
// Try to create duplicate
_, err = subscriber.CreateSubscription("duplicate-sub", partition, schema_pb.OffsetType_RESET_TO_EARLIEST, 0)
if err == nil {
t.Error("Expected error for duplicate subscription ID")
}
}
func TestOffsetSubscription_SeekToOffset(t *testing.T) {
storage := NewInMemoryOffsetStorage()
registry := NewPartitionOffsetRegistry(storage)
subscriber := NewOffsetSubscriber(registry)
partition := createTestPartition()
// Assign offsets
registry.AssignOffsets(partition, 20)
// Create subscription
sub, err := subscriber.CreateSubscription("seek-test", partition, schema_pb.OffsetType_RESET_TO_EARLIEST, 0)
if err != nil {
t.Fatalf("Failed to create subscription: %v", err)
}
// Test valid seek
err = sub.SeekToOffset(10)
if err != nil {
t.Fatalf("Failed to seek to offset 10: %v", err)
}
if sub.CurrentOffset != 10 {
t.Errorf("Expected current offset 10, got %d", sub.CurrentOffset)
}
// Test invalid seek (beyond high water mark)
err = sub.SeekToOffset(25)
if err == nil {
t.Error("Expected error for seek beyond high water mark")
}
// Test negative seek
err = sub.SeekToOffset(-1)
if err == nil {
t.Error("Expected error for negative seek offset")
}
}
func TestOffsetSubscription_AdvanceOffset(t *testing.T) {
storage := NewInMemoryOffsetStorage()
registry := NewPartitionOffsetRegistry(storage)
subscriber := NewOffsetSubscriber(registry)
partition := createTestPartition()
// Create subscription
sub, err := subscriber.CreateSubscription("advance-test", partition, schema_pb.OffsetType_RESET_TO_EARLIEST, 0)
if err != nil {
t.Fatalf("Failed to create subscription: %v", err)
}
// Test single advance
initialOffset := sub.GetNextOffset()
sub.AdvanceOffset()
if sub.GetNextOffset() != initialOffset+1 {
t.Errorf("Expected offset %d, got %d", initialOffset+1, sub.GetNextOffset())
}
// Test batch advance
sub.AdvanceOffsetBy(5)
if sub.GetNextOffset() != initialOffset+6 {
t.Errorf("Expected offset %d, got %d", initialOffset+6, sub.GetNextOffset())
}
}
func TestOffsetSubscription_GetLag(t *testing.T) {
storage := NewInMemoryOffsetStorage()
registry := NewPartitionOffsetRegistry(storage)
subscriber := NewOffsetSubscriber(registry)
partition := createTestPartition()
// Assign offsets
registry.AssignOffsets(partition, 15)
// Create subscription at offset 5
sub, err := subscriber.CreateSubscription("lag-test", partition, schema_pb.OffsetType_EXACT_OFFSET, 5)
if err != nil {
t.Fatalf("Failed to create subscription: %v", err)
}
// Check initial lag
lag, err := sub.GetLag()
if err != nil {
t.Fatalf("Failed to get lag: %v", err)
}
expectedLag := int64(15 - 5) // hwm - current
if lag != expectedLag {
t.Errorf("Expected lag %d, got %d", expectedLag, lag)
}
// Advance and check lag again
sub.AdvanceOffsetBy(3)
lag, err = sub.GetLag()
if err != nil {
t.Fatalf("Failed to get lag after advance: %v", err)
}
expectedLag = int64(15 - 8) // hwm - current
if lag != expectedLag {
t.Errorf("Expected lag %d after advance, got %d", expectedLag, lag)
}
}
func TestOffsetSubscription_IsAtEnd(t *testing.T) {
storage := NewInMemoryOffsetStorage()
registry := NewPartitionOffsetRegistry(storage)
subscriber := NewOffsetSubscriber(registry)
partition := createTestPartition()
// Assign offsets
registry.AssignOffsets(partition, 10)
// Create subscription at end
sub, err := subscriber.CreateSubscription("end-test", partition, schema_pb.OffsetType_RESET_TO_LATEST, 0)
if err != nil {
t.Fatalf("Failed to create subscription: %v", err)
}
// Should be at end
atEnd, err := sub.IsAtEnd()
if err != nil {
t.Fatalf("Failed to check if at end: %v", err)
}
if !atEnd {
t.Error("Expected subscription to be at end")
}
// Seek to middle and check again
sub.SeekToOffset(5)
atEnd, err = sub.IsAtEnd()
if err != nil {
t.Fatalf("Failed to check if at end after seek: %v", err)
}
if atEnd {
t.Error("Expected subscription not to be at end after seek")
}
}
func TestOffsetSubscription_GetOffsetRange(t *testing.T) {
storage := NewInMemoryOffsetStorage()
registry := NewPartitionOffsetRegistry(storage)
subscriber := NewOffsetSubscriber(registry)
partition := createTestPartition()
// Assign offsets
registry.AssignOffsets(partition, 20)
// Create subscription
sub, err := subscriber.CreateSubscription("range-test", partition, schema_pb.OffsetType_EXACT_OFFSET, 5)
if err != nil {
t.Fatalf("Failed to create subscription: %v", err)
}
// Test normal range
offsetRange, err := sub.GetOffsetRange(10)
if err != nil {
t.Fatalf("Failed to get offset range: %v", err)
}
if offsetRange.StartOffset != 5 {
t.Errorf("Expected start offset 5, got %d", offsetRange.StartOffset)
}
if offsetRange.EndOffset != 14 {
t.Errorf("Expected end offset 14, got %d", offsetRange.EndOffset)
}
if offsetRange.Count != 10 {
t.Errorf("Expected count 10, got %d", offsetRange.Count)
}
// Test range that exceeds high water mark
sub.SeekToOffset(15)
offsetRange, err = sub.GetOffsetRange(10)
if err != nil {
t.Fatalf("Failed to get offset range near end: %v", err)
}
if offsetRange.StartOffset != 15 {
t.Errorf("Expected start offset 15, got %d", offsetRange.StartOffset)
}
if offsetRange.EndOffset != 19 { // Should be capped at hwm-1
t.Errorf("Expected end offset 19, got %d", offsetRange.EndOffset)
}
if offsetRange.Count != 5 {
t.Errorf("Expected count 5, got %d", offsetRange.Count)
}
}
func TestOffsetSubscription_EmptyRange(t *testing.T) {
storage := NewInMemoryOffsetStorage()
registry := NewPartitionOffsetRegistry(storage)
subscriber := NewOffsetSubscriber(registry)
partition := createTestPartition()
// Assign offsets
registry.AssignOffsets(partition, 10)
// Create subscription at end
sub, err := subscriber.CreateSubscription("empty-range-test", partition, schema_pb.OffsetType_RESET_TO_LATEST, 0)
if err != nil {
t.Fatalf("Failed to create subscription: %v", err)
}
// Request range when at end
offsetRange, err := sub.GetOffsetRange(5)
if err != nil {
t.Fatalf("Failed to get offset range at end: %v", err)
}
if offsetRange.Count != 0 {
t.Errorf("Expected empty range (count 0), got count %d", offsetRange.Count)
}
if offsetRange.StartOffset != 10 {
t.Errorf("Expected start offset 10, got %d", offsetRange.StartOffset)
}
if offsetRange.EndOffset != 9 { // Empty range: end < start
t.Errorf("Expected end offset 9 (empty range), got %d", offsetRange.EndOffset)
}
}
func TestOffsetSeeker_ValidateOffsetRange(t *testing.T) {
storage := NewInMemoryOffsetStorage()
registry := NewPartitionOffsetRegistry(storage)
seeker := NewOffsetSeeker(registry)
partition := createTestPartition()
// Assign offsets
registry.AssignOffsets(partition, 15)
// Test valid range
err := seeker.ValidateOffsetRange(partition, 5, 10)
if err != nil {
t.Errorf("Valid range should not return error: %v", err)
}
// Test invalid ranges
testCases := []struct {
name string
startOffset int64
endOffset int64
expectError bool
}{
{"negative start", -1, 5, true},
{"end before start", 10, 5, true},
{"start beyond hwm", 20, 25, true},
{"valid range", 0, 14, false},
{"single offset", 5, 5, false},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
err := seeker.ValidateOffsetRange(partition, tc.startOffset, tc.endOffset)
if tc.expectError && err == nil {
t.Error("Expected error but got none")
}
if !tc.expectError && err != nil {
t.Errorf("Expected no error but got: %v", err)
}
})
}
}
func TestOffsetSeeker_GetAvailableOffsetRange(t *testing.T) {
storage := NewInMemoryOffsetStorage()
registry := NewPartitionOffsetRegistry(storage)
seeker := NewOffsetSeeker(registry)
partition := createTestPartition()
// Test empty partition
offsetRange, err := seeker.GetAvailableOffsetRange(partition)
if err != nil {
t.Fatalf("Failed to get available range for empty partition: %v", err)
}
if offsetRange.Count != 0 {
t.Errorf("Expected empty range for empty partition, got count %d", offsetRange.Count)
}
// Assign offsets and test again
registry.AssignOffsets(partition, 25)
offsetRange, err = seeker.GetAvailableOffsetRange(partition)
if err != nil {
t.Fatalf("Failed to get available range: %v", err)
}
if offsetRange.StartOffset != 0 {
t.Errorf("Expected start offset 0, got %d", offsetRange.StartOffset)
}
if offsetRange.EndOffset != 24 {
t.Errorf("Expected end offset 24, got %d", offsetRange.EndOffset)
}
if offsetRange.Count != 25 {
t.Errorf("Expected count 25, got %d", offsetRange.Count)
}
}
func TestOffsetSubscriber_CloseSubscription(t *testing.T) {
storage := NewInMemoryOffsetStorage()
registry := NewPartitionOffsetRegistry(storage)
subscriber := NewOffsetSubscriber(registry)
partition := createTestPartition()
// Create subscription
sub, err := subscriber.CreateSubscription("close-test", partition, schema_pb.OffsetType_RESET_TO_EARLIEST, 0)
if err != nil {
t.Fatalf("Failed to create subscription: %v", err)
}
// Verify subscription exists
_, err = subscriber.GetSubscription("close-test")
if err != nil {
t.Fatalf("Subscription should exist: %v", err)
}
// Close subscription
err = subscriber.CloseSubscription("close-test")
if err != nil {
t.Fatalf("Failed to close subscription: %v", err)
}
// Verify subscription is gone
_, err = subscriber.GetSubscription("close-test")
if err == nil {
t.Error("Subscription should not exist after close")
}
// Verify subscription is marked inactive
if sub.IsActive {
t.Error("Subscription should be marked inactive after close")
}
}
func TestOffsetSubscription_InactiveOperations(t *testing.T) {
storage := NewInMemoryOffsetStorage()
registry := NewPartitionOffsetRegistry(storage)
subscriber := NewOffsetSubscriber(registry)
partition := createTestPartition()
// Create and close subscription
sub, err := subscriber.CreateSubscription("inactive-test", partition, schema_pb.OffsetType_RESET_TO_EARLIEST, 0)
if err != nil {
t.Fatalf("Failed to create subscription: %v", err)
}
subscriber.CloseSubscription("inactive-test")
// Test operations on inactive subscription
err = sub.SeekToOffset(5)
if err == nil {
t.Error("Expected error for seek on inactive subscription")
}
_, err = sub.GetLag()
if err == nil {
t.Error("Expected error for GetLag on inactive subscription")
}
_, err = sub.IsAtEnd()
if err == nil {
t.Error("Expected error for IsAtEnd on inactive subscription")
}
_, err = sub.GetOffsetRange(10)
if err == nil {
t.Error("Expected error for GetOffsetRange on inactive subscription")
}
}
Loading…
Cancel
Save