You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					
					
						
							544 lines
						
					
					
						
							16 KiB
						
					
					
				
			
		
		
		
			
			
			
		
		
	
	
							544 lines
						
					
					
						
							16 KiB
						
					
					
				| package offset | |
| 
 | |
| import ( | |
| 	"testing" | |
| 
 | |
| 	"github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" | |
| ) | |
| 
 | |
| func TestSMQOffsetIntegration_PublishRecord(t *testing.T) { | |
| 	storage := NewInMemoryOffsetStorage() | |
| 	integration := NewSMQOffsetIntegration(storage) | |
| 	partition := createTestPartition() | |
| 
 | |
| 	// Publish a single record | |
| 	response, err := integration.PublishRecord( | |
| 		"test-namespace", "test-topic", | |
| 		partition, | |
| 		[]byte("test-key"), | |
| 		&schema_pb.RecordValue{}, | |
| 	) | |
| 
 | |
| 	if err != nil { | |
| 		t.Fatalf("Failed to publish record: %v", err) | |
| 	} | |
| 
 | |
| 	if response.Error != "" { | |
| 		t.Errorf("Expected no error, got: %s", response.Error) | |
| 	} | |
| 
 | |
| 	if response.BaseOffset != 0 { | |
| 		t.Errorf("Expected base offset 0, got %d", response.BaseOffset) | |
| 	} | |
| 
 | |
| 	if response.LastOffset != 0 { | |
| 		t.Errorf("Expected last offset 0, got %d", response.LastOffset) | |
| 	} | |
| } | |
| 
 | |
| func TestSMQOffsetIntegration_PublishRecordBatch(t *testing.T) { | |
| 	storage := NewInMemoryOffsetStorage() | |
| 	integration := NewSMQOffsetIntegration(storage) | |
| 	partition := createTestPartition() | |
| 
 | |
| 	// Create batch of records | |
| 	records := []PublishRecordRequest{ | |
| 		{Key: []byte("key1"), Value: &schema_pb.RecordValue{}}, | |
| 		{Key: []byte("key2"), Value: &schema_pb.RecordValue{}}, | |
| 		{Key: []byte("key3"), Value: &schema_pb.RecordValue{}}, | |
| 	} | |
| 
 | |
| 	// Publish batch | |
| 	response, err := integration.PublishRecordBatch("test-namespace", "test-topic", partition, records) | |
| 	if err != nil { | |
| 		t.Fatalf("Failed to publish record batch: %v", err) | |
| 	} | |
| 
 | |
| 	if response.Error != "" { | |
| 		t.Errorf("Expected no error, got: %s", response.Error) | |
| 	} | |
| 
 | |
| 	if response.BaseOffset != 0 { | |
| 		t.Errorf("Expected base offset 0, got %d", response.BaseOffset) | |
| 	} | |
| 
 | |
| 	if response.LastOffset != 2 { | |
| 		t.Errorf("Expected last offset 2, got %d", response.LastOffset) | |
| 	} | |
| 
 | |
| 	// Verify high water mark | |
| 	hwm, err := integration.GetHighWaterMark("test-namespace", "test-topic", partition) | |
| 	if err != nil { | |
| 		t.Fatalf("Failed to get high water mark: %v", err) | |
| 	} | |
| 
 | |
| 	if hwm != 3 { | |
| 		t.Errorf("Expected high water mark 3, got %d", hwm) | |
| 	} | |
| } | |
| 
 | |
| func TestSMQOffsetIntegration_EmptyBatch(t *testing.T) { | |
| 	storage := NewInMemoryOffsetStorage() | |
| 	integration := NewSMQOffsetIntegration(storage) | |
| 	partition := createTestPartition() | |
| 
 | |
| 	// Publish empty batch | |
| 	response, err := integration.PublishRecordBatch("test-namespace", "test-topic", partition, []PublishRecordRequest{}) | |
| 	if err != nil { | |
| 		t.Fatalf("Failed to publish empty batch: %v", err) | |
| 	} | |
| 
 | |
| 	if response.Error == "" { | |
| 		t.Error("Expected error for empty batch") | |
| 	} | |
| } | |
| 
 | |
| func TestSMQOffsetIntegration_CreateSubscription(t *testing.T) { | |
| 	storage := NewInMemoryOffsetStorage() | |
| 	integration := NewSMQOffsetIntegration(storage) | |
| 	partition := createTestPartition() | |
| 
 | |
| 	// Publish some records first | |
| 	records := []PublishRecordRequest{ | |
| 		{Key: []byte("key1"), Value: &schema_pb.RecordValue{}}, | |
| 		{Key: []byte("key2"), Value: &schema_pb.RecordValue{}}, | |
| 	} | |
| 	integration.PublishRecordBatch("test-namespace", "test-topic", partition, records) | |
| 
 | |
| 	// Create subscription | |
| 	sub, err := integration.CreateSubscription( | |
| 		"test-sub", | |
| 		"test-namespace", "test-topic", | |
| 		partition, | |
| 		schema_pb.OffsetType_RESET_TO_EARLIEST, | |
| 		0, | |
| 	) | |
| 
 | |
| 	if err != nil { | |
| 		t.Fatalf("Failed to create subscription: %v", err) | |
| 	} | |
| 
 | |
| 	if sub.ID != "test-sub" { | |
| 		t.Errorf("Expected subscription ID 'test-sub', got %s", sub.ID) | |
| 	} | |
| 
 | |
| 	if sub.StartOffset != 0 { | |
| 		t.Errorf("Expected start offset 0, got %d", sub.StartOffset) | |
| 	} | |
| } | |
| 
 | |
| func TestSMQOffsetIntegration_SubscribeRecords(t *testing.T) { | |
| 	storage := NewInMemoryOffsetStorage() | |
| 	integration := NewSMQOffsetIntegration(storage) | |
| 	partition := createTestPartition() | |
| 
 | |
| 	// Publish some records | |
| 	records := []PublishRecordRequest{ | |
| 		{Key: []byte("key1"), Value: &schema_pb.RecordValue{}}, | |
| 		{Key: []byte("key2"), Value: &schema_pb.RecordValue{}}, | |
| 		{Key: []byte("key3"), Value: &schema_pb.RecordValue{}}, | |
| 	} | |
| 	integration.PublishRecordBatch("test-namespace", "test-topic", partition, records) | |
| 
 | |
| 	// Create subscription | |
| 	sub, err := integration.CreateSubscription( | |
| 		"test-sub", | |
| 		"test-namespace", "test-topic", | |
| 		partition, | |
| 		schema_pb.OffsetType_RESET_TO_EARLIEST, | |
| 		0, | |
| 	) | |
| 	if err != nil { | |
| 		t.Fatalf("Failed to create subscription: %v", err) | |
| 	} | |
| 
 | |
| 	// Subscribe to records | |
| 	responses, err := integration.SubscribeRecords(sub, 2) | |
| 	if err != nil { | |
| 		t.Fatalf("Failed to subscribe to records: %v", err) | |
| 	} | |
| 
 | |
| 	if len(responses) != 2 { | |
| 		t.Errorf("Expected 2 responses, got %d", len(responses)) | |
| 	} | |
| 
 | |
| 	// Check offset progression | |
| 	if responses[0].Offset != 0 { | |
| 		t.Errorf("Expected first record offset 0, got %d", responses[0].Offset) | |
| 	} | |
| 
 | |
| 	if responses[1].Offset != 1 { | |
| 		t.Errorf("Expected second record offset 1, got %d", responses[1].Offset) | |
| 	} | |
| 
 | |
| 	// Check subscription advancement | |
| 	if sub.CurrentOffset != 2 { | |
| 		t.Errorf("Expected subscription current offset 2, got %d", sub.CurrentOffset) | |
| 	} | |
| } | |
| 
 | |
| func TestSMQOffsetIntegration_SubscribeEmptyPartition(t *testing.T) { | |
| 	storage := NewInMemoryOffsetStorage() | |
| 	integration := NewSMQOffsetIntegration(storage) | |
| 	partition := createTestPartition() | |
| 
 | |
| 	// Create subscription on empty partition | |
| 	sub, err := integration.CreateSubscription( | |
| 		"empty-sub", | |
| 		"test-namespace", "test-topic", | |
| 		partition, | |
| 		schema_pb.OffsetType_RESET_TO_EARLIEST, | |
| 		0, | |
| 	) | |
| 	if err != nil { | |
| 		t.Fatalf("Failed to create subscription: %v", err) | |
| 	} | |
| 
 | |
| 	// Subscribe to records (should return empty) | |
| 	responses, err := integration.SubscribeRecords(sub, 10) | |
| 	if err != nil { | |
| 		t.Fatalf("Failed to subscribe to empty partition: %v", err) | |
| 	} | |
| 
 | |
| 	if len(responses) != 0 { | |
| 		t.Errorf("Expected 0 responses from empty partition, got %d", len(responses)) | |
| 	} | |
| } | |
| 
 | |
| func TestSMQOffsetIntegration_SeekSubscription(t *testing.T) { | |
| 	storage := NewInMemoryOffsetStorage() | |
| 	integration := NewSMQOffsetIntegration(storage) | |
| 	partition := createTestPartition() | |
| 
 | |
| 	// Publish records | |
| 	records := []PublishRecordRequest{ | |
| 		{Key: []byte("key1"), Value: &schema_pb.RecordValue{}}, | |
| 		{Key: []byte("key2"), Value: &schema_pb.RecordValue{}}, | |
| 		{Key: []byte("key3"), Value: &schema_pb.RecordValue{}}, | |
| 		{Key: []byte("key4"), Value: &schema_pb.RecordValue{}}, | |
| 		{Key: []byte("key5"), Value: &schema_pb.RecordValue{}}, | |
| 	} | |
| 	integration.PublishRecordBatch("test-namespace", "test-topic", partition, records) | |
| 
 | |
| 	// Create subscription | |
| 	sub, err := integration.CreateSubscription( | |
| 		"seek-sub", | |
| 		"test-namespace", "test-topic", | |
| 		partition, | |
| 		schema_pb.OffsetType_RESET_TO_EARLIEST, | |
| 		0, | |
| 	) | |
| 	if err != nil { | |
| 		t.Fatalf("Failed to create subscription: %v", err) | |
| 	} | |
| 
 | |
| 	// Seek to offset 3 | |
| 	err = integration.SeekSubscription("seek-sub", 3) | |
| 	if err != nil { | |
| 		t.Fatalf("Failed to seek subscription: %v", err) | |
| 	} | |
| 
 | |
| 	if sub.CurrentOffset != 3 { | |
| 		t.Errorf("Expected current offset 3 after seek, got %d", sub.CurrentOffset) | |
| 	} | |
| 
 | |
| 	// Subscribe from new position | |
| 	responses, err := integration.SubscribeRecords(sub, 2) | |
| 	if err != nil { | |
| 		t.Fatalf("Failed to subscribe after seek: %v", err) | |
| 	} | |
| 
 | |
| 	if len(responses) != 2 { | |
| 		t.Errorf("Expected 2 responses after seek, got %d", len(responses)) | |
| 	} | |
| 
 | |
| 	if responses[0].Offset != 3 { | |
| 		t.Errorf("Expected first record offset 3 after seek, got %d", responses[0].Offset) | |
| 	} | |
| } | |
| 
 | |
| func TestSMQOffsetIntegration_GetSubscriptionLag(t *testing.T) { | |
| 	storage := NewInMemoryOffsetStorage() | |
| 	integration := NewSMQOffsetIntegration(storage) | |
| 	partition := createTestPartition() | |
| 
 | |
| 	// Publish records | |
| 	records := []PublishRecordRequest{ | |
| 		{Key: []byte("key1"), Value: &schema_pb.RecordValue{}}, | |
| 		{Key: []byte("key2"), Value: &schema_pb.RecordValue{}}, | |
| 		{Key: []byte("key3"), Value: &schema_pb.RecordValue{}}, | |
| 	} | |
| 	integration.PublishRecordBatch("test-namespace", "test-topic", partition, records) | |
| 
 | |
| 	// Create subscription at offset 1 | |
| 	sub, err := integration.CreateSubscription( | |
| 		"lag-sub", | |
| 		"test-namespace", "test-topic", | |
| 		partition, | |
| 		schema_pb.OffsetType_EXACT_OFFSET, | |
| 		1, | |
| 	) | |
| 	if err != nil { | |
| 		t.Fatalf("Failed to create subscription: %v", err) | |
| 	} | |
| 
 | |
| 	// Get lag | |
| 	lag, err := integration.GetSubscriptionLag("lag-sub") | |
| 	if err != nil { | |
| 		t.Fatalf("Failed to get subscription lag: %v", err) | |
| 	} | |
| 
 | |
| 	expectedLag := int64(3 - 1) // hwm - current | |
| 	if lag != expectedLag { | |
| 		t.Errorf("Expected lag %d, got %d", expectedLag, lag) | |
| 	} | |
| 
 | |
| 	// Advance subscription and check lag again | |
| 	integration.SubscribeRecords(sub, 1) | |
| 
 | |
| 	lag, err = integration.GetSubscriptionLag("lag-sub") | |
| 	if err != nil { | |
| 		t.Fatalf("Failed to get lag after advance: %v", err) | |
| 	} | |
| 
 | |
| 	expectedLag = int64(3 - 2) // hwm - current | |
| 	if lag != expectedLag { | |
| 		t.Errorf("Expected lag %d after advance, got %d", expectedLag, lag) | |
| 	} | |
| } | |
| 
 | |
| func TestSMQOffsetIntegration_CloseSubscription(t *testing.T) { | |
| 	storage := NewInMemoryOffsetStorage() | |
| 	integration := NewSMQOffsetIntegration(storage) | |
| 	partition := createTestPartition() | |
| 
 | |
| 	// Create subscription | |
| 	_, err := integration.CreateSubscription( | |
| 		"close-sub", | |
| 		"test-namespace", "test-topic", | |
| 		partition, | |
| 		schema_pb.OffsetType_RESET_TO_EARLIEST, | |
| 		0, | |
| 	) | |
| 	if err != nil { | |
| 		t.Fatalf("Failed to create subscription: %v", err) | |
| 	} | |
| 
 | |
| 	// Close subscription | |
| 	err = integration.CloseSubscription("close-sub") | |
| 	if err != nil { | |
| 		t.Fatalf("Failed to close subscription: %v", err) | |
| 	} | |
| 
 | |
| 	// Try to get lag (should fail) | |
| 	_, err = integration.GetSubscriptionLag("close-sub") | |
| 	if err == nil { | |
| 		t.Error("Expected error when getting lag for closed subscription") | |
| 	} | |
| } | |
| 
 | |
| func TestSMQOffsetIntegration_ValidateOffsetRange(t *testing.T) { | |
| 	storage := NewInMemoryOffsetStorage() | |
| 	integration := NewSMQOffsetIntegration(storage) | |
| 	partition := createTestPartition() | |
| 
 | |
| 	// Publish some records | |
| 	records := []PublishRecordRequest{ | |
| 		{Key: []byte("key1"), Value: &schema_pb.RecordValue{}}, | |
| 		{Key: []byte("key2"), Value: &schema_pb.RecordValue{}}, | |
| 		{Key: []byte("key3"), Value: &schema_pb.RecordValue{}}, | |
| 	} | |
| 	integration.PublishRecordBatch("test-namespace", "test-topic", partition, records) | |
| 
 | |
| 	// Test valid range | |
| 	err := integration.ValidateOffsetRange("test-namespace", "test-topic", partition, 0, 2) | |
| 	if err != nil { | |
| 		t.Errorf("Valid range should not return error: %v", err) | |
| 	} | |
| 
 | |
| 	// Test invalid range (beyond hwm) | |
| 	err = integration.ValidateOffsetRange("test-namespace", "test-topic", partition, 0, 5) | |
| 	if err == nil { | |
| 		t.Error("Expected error for range beyond high water mark") | |
| 	} | |
| } | |
| 
 | |
| func TestSMQOffsetIntegration_GetAvailableOffsetRange(t *testing.T) { | |
| 	storage := NewInMemoryOffsetStorage() | |
| 	integration := NewSMQOffsetIntegration(storage) | |
| 	partition := createTestPartition() | |
| 
 | |
| 	// Test empty partition | |
| 	offsetRange, err := integration.GetAvailableOffsetRange("test-namespace", "test-topic", partition) | |
| 	if err != nil { | |
| 		t.Fatalf("Failed to get available range for empty partition: %v", err) | |
| 	} | |
| 
 | |
| 	if offsetRange.Count != 0 { | |
| 		t.Errorf("Expected empty range for empty partition, got count %d", offsetRange.Count) | |
| 	} | |
| 
 | |
| 	// Publish records | |
| 	records := []PublishRecordRequest{ | |
| 		{Key: []byte("key1"), Value: &schema_pb.RecordValue{}}, | |
| 		{Key: []byte("key2"), Value: &schema_pb.RecordValue{}}, | |
| 	} | |
| 	integration.PublishRecordBatch("test-namespace", "test-topic", partition, records) | |
| 
 | |
| 	// Test with data | |
| 	offsetRange, err = integration.GetAvailableOffsetRange("test-namespace", "test-topic", partition) | |
| 	if err != nil { | |
| 		t.Fatalf("Failed to get available range: %v", err) | |
| 	} | |
| 
 | |
| 	if offsetRange.StartOffset != 0 { | |
| 		t.Errorf("Expected start offset 0, got %d", offsetRange.StartOffset) | |
| 	} | |
| 
 | |
| 	if offsetRange.EndOffset != 1 { | |
| 		t.Errorf("Expected end offset 1, got %d", offsetRange.EndOffset) | |
| 	} | |
| 
 | |
| 	if offsetRange.Count != 2 { | |
| 		t.Errorf("Expected count 2, got %d", offsetRange.Count) | |
| 	} | |
| } | |
| 
 | |
| func TestSMQOffsetIntegration_GetOffsetMetrics(t *testing.T) { | |
| 	storage := NewInMemoryOffsetStorage() | |
| 	integration := NewSMQOffsetIntegration(storage) | |
| 	partition := createTestPartition() | |
| 
 | |
| 	// Initial metrics | |
| 	metrics := integration.GetOffsetMetrics() | |
| 	if metrics.TotalOffsets != 0 { | |
| 		t.Errorf("Expected 0 total offsets initially, got %d", metrics.TotalOffsets) | |
| 	} | |
| 
 | |
| 	if metrics.ActiveSubscriptions != 0 { | |
| 		t.Errorf("Expected 0 active subscriptions initially, got %d", metrics.ActiveSubscriptions) | |
| 	} | |
| 
 | |
| 	// Publish records | |
| 	records := []PublishRecordRequest{ | |
| 		{Key: []byte("key1"), Value: &schema_pb.RecordValue{}}, | |
| 		{Key: []byte("key2"), Value: &schema_pb.RecordValue{}}, | |
| 	} | |
| 	integration.PublishRecordBatch("test-namespace", "test-topic", partition, records) | |
| 
 | |
| 	// Create subscriptions | |
| 	integration.CreateSubscription("sub1", "test-namespace", "test-topic", partition, schema_pb.OffsetType_RESET_TO_EARLIEST, 0) | |
| 	integration.CreateSubscription("sub2", "test-namespace", "test-topic", partition, schema_pb.OffsetType_RESET_TO_EARLIEST, 0) | |
| 
 | |
| 	// Check updated metrics | |
| 	metrics = integration.GetOffsetMetrics() | |
| 	if metrics.TotalOffsets != 2 { | |
| 		t.Errorf("Expected 2 total offsets, got %d", metrics.TotalOffsets) | |
| 	} | |
| 
 | |
| 	if metrics.ActiveSubscriptions != 2 { | |
| 		t.Errorf("Expected 2 active subscriptions, got %d", metrics.ActiveSubscriptions) | |
| 	} | |
| 
 | |
| 	if metrics.PartitionCount != 1 { | |
| 		t.Errorf("Expected 1 partition, got %d", metrics.PartitionCount) | |
| 	} | |
| } | |
| 
 | |
| func TestSMQOffsetIntegration_GetOffsetInfo(t *testing.T) { | |
| 	storage := NewInMemoryOffsetStorage() | |
| 	integration := NewSMQOffsetIntegration(storage) | |
| 	partition := createTestPartition() | |
| 
 | |
| 	// Test non-existent offset | |
| 	info, err := integration.GetOffsetInfo("test-namespace", "test-topic", partition, 0) | |
| 	if err != nil { | |
| 		t.Fatalf("Failed to get offset info: %v", err) | |
| 	} | |
| 
 | |
| 	if info.Exists { | |
| 		t.Error("Offset should not exist in empty partition") | |
| 	} | |
| 
 | |
| 	// Publish record | |
| 	integration.PublishRecord("test-namespace", "test-topic", partition, []byte("key1"), &schema_pb.RecordValue{}) | |
| 
 | |
| 	// Test existing offset | |
| 	info, err = integration.GetOffsetInfo("test-namespace", "test-topic", partition, 0) | |
| 	if err != nil { | |
| 		t.Fatalf("Failed to get offset info for existing offset: %v", err) | |
| 	} | |
| 
 | |
| 	if !info.Exists { | |
| 		t.Error("Offset should exist after publishing") | |
| 	} | |
| 
 | |
| 	if info.Offset != 0 { | |
| 		t.Errorf("Expected offset 0, got %d", info.Offset) | |
| 	} | |
| } | |
| 
 | |
| func TestSMQOffsetIntegration_GetPartitionOffsetInfo(t *testing.T) { | |
| 	storage := NewInMemoryOffsetStorage() | |
| 	integration := NewSMQOffsetIntegration(storage) | |
| 	partition := createTestPartition() | |
| 
 | |
| 	// Test empty partition | |
| 	info, err := integration.GetPartitionOffsetInfo("test-namespace", "test-topic", partition) | |
| 	if err != nil { | |
| 		t.Fatalf("Failed to get partition offset info: %v", err) | |
| 	} | |
| 
 | |
| 	if info.EarliestOffset != 0 { | |
| 		t.Errorf("Expected earliest offset 0, got %d", info.EarliestOffset) | |
| 	} | |
| 
 | |
| 	if info.LatestOffset != -1 { | |
| 		t.Errorf("Expected latest offset -1 for empty partition, got %d", info.LatestOffset) | |
| 	} | |
| 
 | |
| 	if info.HighWaterMark != 0 { | |
| 		t.Errorf("Expected high water mark 0, got %d", info.HighWaterMark) | |
| 	} | |
| 
 | |
| 	if info.RecordCount != 0 { | |
| 		t.Errorf("Expected record count 0, got %d", info.RecordCount) | |
| 	} | |
| 
 | |
| 	// Publish records | |
| 	records := []PublishRecordRequest{ | |
| 		{Key: []byte("key1"), Value: &schema_pb.RecordValue{}}, | |
| 		{Key: []byte("key2"), Value: &schema_pb.RecordValue{}}, | |
| 		{Key: []byte("key3"), Value: &schema_pb.RecordValue{}}, | |
| 	} | |
| 	integration.PublishRecordBatch("test-namespace", "test-topic", partition, records) | |
| 
 | |
| 	// Create subscription | |
| 	integration.CreateSubscription("test-sub", "test-namespace", "test-topic", partition, schema_pb.OffsetType_RESET_TO_EARLIEST, 0) | |
| 
 | |
| 	// Test with data | |
| 	info, err = integration.GetPartitionOffsetInfo("test-namespace", "test-topic", partition) | |
| 	if err != nil { | |
| 		t.Fatalf("Failed to get partition offset info with data: %v", err) | |
| 	} | |
| 
 | |
| 	if info.EarliestOffset != 0 { | |
| 		t.Errorf("Expected earliest offset 0, got %d", info.EarliestOffset) | |
| 	} | |
| 
 | |
| 	if info.LatestOffset != 2 { | |
| 		t.Errorf("Expected latest offset 2, got %d", info.LatestOffset) | |
| 	} | |
| 
 | |
| 	if info.HighWaterMark != 3 { | |
| 		t.Errorf("Expected high water mark 3, got %d", info.HighWaterMark) | |
| 	} | |
| 
 | |
| 	if info.RecordCount != 3 { | |
| 		t.Errorf("Expected record count 3, got %d", info.RecordCount) | |
| 	} | |
| 
 | |
| 	if info.ActiveSubscriptions != 1 { | |
| 		t.Errorf("Expected 1 active subscription, got %d", info.ActiveSubscriptions) | |
| 	} | |
| }
 |