You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

544 lines
16 KiB

package offset
import (
"testing"
"github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
)
func TestSMQOffsetIntegration_PublishRecord(t *testing.T) {
storage := NewInMemoryOffsetStorage()
integration := NewSMQOffsetIntegration(storage)
partition := createTestPartition()
// Publish a single record
response, err := integration.PublishRecord(
"test-namespace", "test-topic",
partition,
[]byte("test-key"),
&schema_pb.RecordValue{},
)
if err != nil {
t.Fatalf("Failed to publish record: %v", err)
}
if response.Error != "" {
t.Errorf("Expected no error, got: %s", response.Error)
}
if response.BaseOffset != 0 {
t.Errorf("Expected base offset 0, got %d", response.BaseOffset)
}
if response.LastOffset != 0 {
t.Errorf("Expected last offset 0, got %d", response.LastOffset)
}
}
func TestSMQOffsetIntegration_PublishRecordBatch(t *testing.T) {
storage := NewInMemoryOffsetStorage()
integration := NewSMQOffsetIntegration(storage)
partition := createTestPartition()
// Create batch of records
records := []PublishRecordRequest{
{Key: []byte("key1"), Value: &schema_pb.RecordValue{}},
{Key: []byte("key2"), Value: &schema_pb.RecordValue{}},
{Key: []byte("key3"), Value: &schema_pb.RecordValue{}},
}
// Publish batch
response, err := integration.PublishRecordBatch("test-namespace", "test-topic", partition, records)
if err != nil {
t.Fatalf("Failed to publish record batch: %v", err)
}
if response.Error != "" {
t.Errorf("Expected no error, got: %s", response.Error)
}
if response.BaseOffset != 0 {
t.Errorf("Expected base offset 0, got %d", response.BaseOffset)
}
if response.LastOffset != 2 {
t.Errorf("Expected last offset 2, got %d", response.LastOffset)
}
// Verify high water mark
hwm, err := integration.GetHighWaterMark("test-namespace", "test-topic", partition)
if err != nil {
t.Fatalf("Failed to get high water mark: %v", err)
}
if hwm != 3 {
t.Errorf("Expected high water mark 3, got %d", hwm)
}
}
func TestSMQOffsetIntegration_EmptyBatch(t *testing.T) {
storage := NewInMemoryOffsetStorage()
integration := NewSMQOffsetIntegration(storage)
partition := createTestPartition()
// Publish empty batch
response, err := integration.PublishRecordBatch("test-namespace", "test-topic", partition, []PublishRecordRequest{})
if err != nil {
t.Fatalf("Failed to publish empty batch: %v", err)
}
if response.Error == "" {
t.Error("Expected error for empty batch")
}
}
func TestSMQOffsetIntegration_CreateSubscription(t *testing.T) {
storage := NewInMemoryOffsetStorage()
integration := NewSMQOffsetIntegration(storage)
partition := createTestPartition()
// Publish some records first
records := []PublishRecordRequest{
{Key: []byte("key1"), Value: &schema_pb.RecordValue{}},
{Key: []byte("key2"), Value: &schema_pb.RecordValue{}},
}
integration.PublishRecordBatch("test-namespace", "test-topic", partition, records)
// Create subscription
sub, err := integration.CreateSubscription(
"test-sub",
"test-namespace", "test-topic",
partition,
schema_pb.OffsetType_RESET_TO_EARLIEST,
0,
)
if err != nil {
t.Fatalf("Failed to create subscription: %v", err)
}
if sub.ID != "test-sub" {
t.Errorf("Expected subscription ID 'test-sub', got %s", sub.ID)
}
if sub.StartOffset != 0 {
t.Errorf("Expected start offset 0, got %d", sub.StartOffset)
}
}
func TestSMQOffsetIntegration_SubscribeRecords(t *testing.T) {
storage := NewInMemoryOffsetStorage()
integration := NewSMQOffsetIntegration(storage)
partition := createTestPartition()
// Publish some records
records := []PublishRecordRequest{
{Key: []byte("key1"), Value: &schema_pb.RecordValue{}},
{Key: []byte("key2"), Value: &schema_pb.RecordValue{}},
{Key: []byte("key3"), Value: &schema_pb.RecordValue{}},
}
integration.PublishRecordBatch("test-namespace", "test-topic", partition, records)
// Create subscription
sub, err := integration.CreateSubscription(
"test-sub",
"test-namespace", "test-topic",
partition,
schema_pb.OffsetType_RESET_TO_EARLIEST,
0,
)
if err != nil {
t.Fatalf("Failed to create subscription: %v", err)
}
// Subscribe to records
responses, err := integration.SubscribeRecords(sub, 2)
if err != nil {
t.Fatalf("Failed to subscribe to records: %v", err)
}
if len(responses) != 2 {
t.Errorf("Expected 2 responses, got %d", len(responses))
}
// Check offset progression
if responses[0].Offset != 0 {
t.Errorf("Expected first record offset 0, got %d", responses[0].Offset)
}
if responses[1].Offset != 1 {
t.Errorf("Expected second record offset 1, got %d", responses[1].Offset)
}
// Check subscription advancement
if sub.CurrentOffset != 2 {
t.Errorf("Expected subscription current offset 2, got %d", sub.CurrentOffset)
}
}
func TestSMQOffsetIntegration_SubscribeEmptyPartition(t *testing.T) {
storage := NewInMemoryOffsetStorage()
integration := NewSMQOffsetIntegration(storage)
partition := createTestPartition()
// Create subscription on empty partition
sub, err := integration.CreateSubscription(
"empty-sub",
"test-namespace", "test-topic",
partition,
schema_pb.OffsetType_RESET_TO_EARLIEST,
0,
)
if err != nil {
t.Fatalf("Failed to create subscription: %v", err)
}
// Subscribe to records (should return empty)
responses, err := integration.SubscribeRecords(sub, 10)
if err != nil {
t.Fatalf("Failed to subscribe to empty partition: %v", err)
}
if len(responses) != 0 {
t.Errorf("Expected 0 responses from empty partition, got %d", len(responses))
}
}
func TestSMQOffsetIntegration_SeekSubscription(t *testing.T) {
storage := NewInMemoryOffsetStorage()
integration := NewSMQOffsetIntegration(storage)
partition := createTestPartition()
// Publish records
records := []PublishRecordRequest{
{Key: []byte("key1"), Value: &schema_pb.RecordValue{}},
{Key: []byte("key2"), Value: &schema_pb.RecordValue{}},
{Key: []byte("key3"), Value: &schema_pb.RecordValue{}},
{Key: []byte("key4"), Value: &schema_pb.RecordValue{}},
{Key: []byte("key5"), Value: &schema_pb.RecordValue{}},
}
integration.PublishRecordBatch("test-namespace", "test-topic", partition, records)
// Create subscription
sub, err := integration.CreateSubscription(
"seek-sub",
"test-namespace", "test-topic",
partition,
schema_pb.OffsetType_RESET_TO_EARLIEST,
0,
)
if err != nil {
t.Fatalf("Failed to create subscription: %v", err)
}
// Seek to offset 3
err = integration.SeekSubscription("seek-sub", 3)
if err != nil {
t.Fatalf("Failed to seek subscription: %v", err)
}
if sub.CurrentOffset != 3 {
t.Errorf("Expected current offset 3 after seek, got %d", sub.CurrentOffset)
}
// Subscribe from new position
responses, err := integration.SubscribeRecords(sub, 2)
if err != nil {
t.Fatalf("Failed to subscribe after seek: %v", err)
}
if len(responses) != 2 {
t.Errorf("Expected 2 responses after seek, got %d", len(responses))
}
if responses[0].Offset != 3 {
t.Errorf("Expected first record offset 3 after seek, got %d", responses[0].Offset)
}
}
func TestSMQOffsetIntegration_GetSubscriptionLag(t *testing.T) {
storage := NewInMemoryOffsetStorage()
integration := NewSMQOffsetIntegration(storage)
partition := createTestPartition()
// Publish records
records := []PublishRecordRequest{
{Key: []byte("key1"), Value: &schema_pb.RecordValue{}},
{Key: []byte("key2"), Value: &schema_pb.RecordValue{}},
{Key: []byte("key3"), Value: &schema_pb.RecordValue{}},
}
integration.PublishRecordBatch("test-namespace", "test-topic", partition, records)
// Create subscription at offset 1
sub, err := integration.CreateSubscription(
"lag-sub",
"test-namespace", "test-topic",
partition,
schema_pb.OffsetType_EXACT_OFFSET,
1,
)
if err != nil {
t.Fatalf("Failed to create subscription: %v", err)
}
// Get lag
lag, err := integration.GetSubscriptionLag("lag-sub")
if err != nil {
t.Fatalf("Failed to get subscription lag: %v", err)
}
expectedLag := int64(3 - 1) // hwm - current
if lag != expectedLag {
t.Errorf("Expected lag %d, got %d", expectedLag, lag)
}
// Advance subscription and check lag again
integration.SubscribeRecords(sub, 1)
lag, err = integration.GetSubscriptionLag("lag-sub")
if err != nil {
t.Fatalf("Failed to get lag after advance: %v", err)
}
expectedLag = int64(3 - 2) // hwm - current
if lag != expectedLag {
t.Errorf("Expected lag %d after advance, got %d", expectedLag, lag)
}
}
func TestSMQOffsetIntegration_CloseSubscription(t *testing.T) {
storage := NewInMemoryOffsetStorage()
integration := NewSMQOffsetIntegration(storage)
partition := createTestPartition()
// Create subscription
_, err := integration.CreateSubscription(
"close-sub",
"test-namespace", "test-topic",
partition,
schema_pb.OffsetType_RESET_TO_EARLIEST,
0,
)
if err != nil {
t.Fatalf("Failed to create subscription: %v", err)
}
// Close subscription
err = integration.CloseSubscription("close-sub")
if err != nil {
t.Fatalf("Failed to close subscription: %v", err)
}
// Try to get lag (should fail)
_, err = integration.GetSubscriptionLag("close-sub")
if err == nil {
t.Error("Expected error when getting lag for closed subscription")
}
}
func TestSMQOffsetIntegration_ValidateOffsetRange(t *testing.T) {
storage := NewInMemoryOffsetStorage()
integration := NewSMQOffsetIntegration(storage)
partition := createTestPartition()
// Publish some records
records := []PublishRecordRequest{
{Key: []byte("key1"), Value: &schema_pb.RecordValue{}},
{Key: []byte("key2"), Value: &schema_pb.RecordValue{}},
{Key: []byte("key3"), Value: &schema_pb.RecordValue{}},
}
integration.PublishRecordBatch("test-namespace", "test-topic", partition, records)
// Test valid range
err := integration.ValidateOffsetRange("test-namespace", "test-topic", partition, 0, 2)
if err != nil {
t.Errorf("Valid range should not return error: %v", err)
}
// Test invalid range (beyond hwm)
err = integration.ValidateOffsetRange("test-namespace", "test-topic", partition, 0, 5)
if err == nil {
t.Error("Expected error for range beyond high water mark")
}
}
func TestSMQOffsetIntegration_GetAvailableOffsetRange(t *testing.T) {
storage := NewInMemoryOffsetStorage()
integration := NewSMQOffsetIntegration(storage)
partition := createTestPartition()
// Test empty partition
offsetRange, err := integration.GetAvailableOffsetRange("test-namespace", "test-topic", partition)
if err != nil {
t.Fatalf("Failed to get available range for empty partition: %v", err)
}
if offsetRange.Count != 0 {
t.Errorf("Expected empty range for empty partition, got count %d", offsetRange.Count)
}
// Publish records
records := []PublishRecordRequest{
{Key: []byte("key1"), Value: &schema_pb.RecordValue{}},
{Key: []byte("key2"), Value: &schema_pb.RecordValue{}},
}
integration.PublishRecordBatch("test-namespace", "test-topic", partition, records)
// Test with data
offsetRange, err = integration.GetAvailableOffsetRange("test-namespace", "test-topic", partition)
if err != nil {
t.Fatalf("Failed to get available range: %v", err)
}
if offsetRange.StartOffset != 0 {
t.Errorf("Expected start offset 0, got %d", offsetRange.StartOffset)
}
if offsetRange.EndOffset != 1 {
t.Errorf("Expected end offset 1, got %d", offsetRange.EndOffset)
}
if offsetRange.Count != 2 {
t.Errorf("Expected count 2, got %d", offsetRange.Count)
}
}
func TestSMQOffsetIntegration_GetOffsetMetrics(t *testing.T) {
storage := NewInMemoryOffsetStorage()
integration := NewSMQOffsetIntegration(storage)
partition := createTestPartition()
// Initial metrics
metrics := integration.GetOffsetMetrics()
if metrics.TotalOffsets != 0 {
t.Errorf("Expected 0 total offsets initially, got %d", metrics.TotalOffsets)
}
if metrics.ActiveSubscriptions != 0 {
t.Errorf("Expected 0 active subscriptions initially, got %d", metrics.ActiveSubscriptions)
}
// Publish records
records := []PublishRecordRequest{
{Key: []byte("key1"), Value: &schema_pb.RecordValue{}},
{Key: []byte("key2"), Value: &schema_pb.RecordValue{}},
}
integration.PublishRecordBatch("test-namespace", "test-topic", partition, records)
// Create subscriptions
integration.CreateSubscription("sub1", "test-namespace", "test-topic", partition, schema_pb.OffsetType_RESET_TO_EARLIEST, 0)
integration.CreateSubscription("sub2", "test-namespace", "test-topic", partition, schema_pb.OffsetType_RESET_TO_EARLIEST, 0)
// Check updated metrics
metrics = integration.GetOffsetMetrics()
if metrics.TotalOffsets != 2 {
t.Errorf("Expected 2 total offsets, got %d", metrics.TotalOffsets)
}
if metrics.ActiveSubscriptions != 2 {
t.Errorf("Expected 2 active subscriptions, got %d", metrics.ActiveSubscriptions)
}
if metrics.PartitionCount != 1 {
t.Errorf("Expected 1 partition, got %d", metrics.PartitionCount)
}
}
func TestSMQOffsetIntegration_GetOffsetInfo(t *testing.T) {
storage := NewInMemoryOffsetStorage()
integration := NewSMQOffsetIntegration(storage)
partition := createTestPartition()
// Test non-existent offset
info, err := integration.GetOffsetInfo("test-namespace", "test-topic", partition, 0)
if err != nil {
t.Fatalf("Failed to get offset info: %v", err)
}
if info.Exists {
t.Error("Offset should not exist in empty partition")
}
// Publish record
integration.PublishRecord("test-namespace", "test-topic", partition, []byte("key1"), &schema_pb.RecordValue{})
// Test existing offset
info, err = integration.GetOffsetInfo("test-namespace", "test-topic", partition, 0)
if err != nil {
t.Fatalf("Failed to get offset info for existing offset: %v", err)
}
if !info.Exists {
t.Error("Offset should exist after publishing")
}
if info.Offset != 0 {
t.Errorf("Expected offset 0, got %d", info.Offset)
}
}
func TestSMQOffsetIntegration_GetPartitionOffsetInfo(t *testing.T) {
storage := NewInMemoryOffsetStorage()
integration := NewSMQOffsetIntegration(storage)
partition := createTestPartition()
// Test empty partition
info, err := integration.GetPartitionOffsetInfo("test-namespace", "test-topic", partition)
if err != nil {
t.Fatalf("Failed to get partition offset info: %v", err)
}
if info.EarliestOffset != 0 {
t.Errorf("Expected earliest offset 0, got %d", info.EarliestOffset)
}
if info.LatestOffset != -1 {
t.Errorf("Expected latest offset -1 for empty partition, got %d", info.LatestOffset)
}
if info.HighWaterMark != 0 {
t.Errorf("Expected high water mark 0, got %d", info.HighWaterMark)
}
if info.RecordCount != 0 {
t.Errorf("Expected record count 0, got %d", info.RecordCount)
}
// Publish records
records := []PublishRecordRequest{
{Key: []byte("key1"), Value: &schema_pb.RecordValue{}},
{Key: []byte("key2"), Value: &schema_pb.RecordValue{}},
{Key: []byte("key3"), Value: &schema_pb.RecordValue{}},
}
integration.PublishRecordBatch("test-namespace", "test-topic", partition, records)
// Create subscription
integration.CreateSubscription("test-sub", "test-namespace", "test-topic", partition, schema_pb.OffsetType_RESET_TO_EARLIEST, 0)
// Test with data
info, err = integration.GetPartitionOffsetInfo("test-namespace", "test-topic", partition)
if err != nil {
t.Fatalf("Failed to get partition offset info with data: %v", err)
}
if info.EarliestOffset != 0 {
t.Errorf("Expected earliest offset 0, got %d", info.EarliestOffset)
}
if info.LatestOffset != 2 {
t.Errorf("Expected latest offset 2, got %d", info.LatestOffset)
}
if info.HighWaterMark != 3 {
t.Errorf("Expected high water mark 3, got %d", info.HighWaterMark)
}
if info.RecordCount != 3 {
t.Errorf("Expected record count 3, got %d", info.RecordCount)
}
if info.ActiveSubscriptions != 1 {
t.Errorf("Expected 1 active subscription, got %d", info.ActiveSubscriptions)
}
}