You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

472 lines
12 KiB

package offset
import (
"fmt"
"os"
"testing"
"time"
_ "github.com/mattn/go-sqlite3"
"github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
)
// TestEndToEndOffsetFlow tests the complete offset management flow
func TestEndToEndOffsetFlow(t *testing.T) {
// Create temporary database
tmpFile, err := os.CreateTemp("", "e2e_offset_test_*.db")
if err != nil {
t.Fatalf("Failed to create temp database: %v", err)
}
tmpFile.Close()
defer os.Remove(tmpFile.Name())
// Create database with migrations
db, err := CreateDatabase(tmpFile.Name())
if err != nil {
t.Fatalf("Failed to create database: %v", err)
}
defer db.Close()
// Create SQL storage
storage, err := NewSQLOffsetStorage(db)
if err != nil {
t.Fatalf("Failed to create SQL storage: %v", err)
}
defer storage.Close()
// Create SMQ offset integration
integration := NewSMQOffsetIntegration(storage)
// Test partition
partition := &schema_pb.Partition{
RingSize: 1024,
RangeStart: 0,
RangeStop: 31,
UnixTimeNs: time.Now().UnixNano(),
}
t.Run("PublishAndAssignOffsets", func(t *testing.T) {
// Simulate publishing messages with offset assignment
records := []PublishRecordRequest{
{Key: []byte("user1"), Value: &schema_pb.RecordValue{}},
{Key: []byte("user2"), Value: &schema_pb.RecordValue{}},
{Key: []byte("user3"), Value: &schema_pb.RecordValue{}},
}
response, err := integration.PublishRecordBatch("test-namespace", "test-topic", partition, records)
if err != nil {
t.Fatalf("Failed to publish record batch: %v", err)
}
if response.BaseOffset != 0 {
t.Errorf("Expected base offset 0, got %d", response.BaseOffset)
}
if response.LastOffset != 2 {
t.Errorf("Expected last offset 2, got %d", response.LastOffset)
}
// Verify high water mark
hwm, err := integration.GetHighWaterMark("test-namespace", "test-topic", partition)
if err != nil {
t.Fatalf("Failed to get high water mark: %v", err)
}
if hwm != 3 {
t.Errorf("Expected high water mark 3, got %d", hwm)
}
})
t.Run("CreateAndUseSubscription", func(t *testing.T) {
// Create subscription from earliest
sub, err := integration.CreateSubscription(
"e2e-test-sub",
"test-namespace", "test-topic",
partition,
schema_pb.OffsetType_RESET_TO_EARLIEST,
0,
)
if err != nil {
t.Fatalf("Failed to create subscription: %v", err)
}
// Subscribe to records
responses, err := integration.SubscribeRecords(sub, 2)
if err != nil {
t.Fatalf("Failed to subscribe to records: %v", err)
}
if len(responses) != 2 {
t.Errorf("Expected 2 responses, got %d", len(responses))
}
// Check subscription advancement
if sub.CurrentOffset != 2 {
t.Errorf("Expected current offset 2, got %d", sub.CurrentOffset)
}
// Get subscription lag
lag, err := sub.GetLag()
if err != nil {
t.Fatalf("Failed to get lag: %v", err)
}
if lag != 1 { // 3 (hwm) - 2 (current) = 1
t.Errorf("Expected lag 1, got %d", lag)
}
})
t.Run("OffsetSeekingAndRanges", func(t *testing.T) {
// Create subscription at specific offset
sub, err := integration.CreateSubscription(
"seek-test-sub",
"test-namespace", "test-topic",
partition,
schema_pb.OffsetType_EXACT_OFFSET,
1,
)
if err != nil {
t.Fatalf("Failed to create subscription at offset 1: %v", err)
}
// Verify starting position
if sub.CurrentOffset != 1 {
t.Errorf("Expected current offset 1, got %d", sub.CurrentOffset)
}
// Get offset range
offsetRange, err := sub.GetOffsetRange(2)
if err != nil {
t.Fatalf("Failed to get offset range: %v", err)
}
if offsetRange.StartOffset != 1 {
t.Errorf("Expected start offset 1, got %d", offsetRange.StartOffset)
}
if offsetRange.Count != 2 {
t.Errorf("Expected count 2, got %d", offsetRange.Count)
}
// Seek to different offset
err = sub.SeekToOffset(0)
if err != nil {
t.Fatalf("Failed to seek to offset 0: %v", err)
}
if sub.CurrentOffset != 0 {
t.Errorf("Expected current offset 0 after seek, got %d", sub.CurrentOffset)
}
})
t.Run("PartitionInformationAndMetrics", func(t *testing.T) {
// Get partition offset info
info, err := integration.GetPartitionOffsetInfo("test-namespace", "test-topic", partition)
if err != nil {
t.Fatalf("Failed to get partition offset info: %v", err)
}
if info.EarliestOffset != 0 {
t.Errorf("Expected earliest offset 0, got %d", info.EarliestOffset)
}
if info.LatestOffset != 2 {
t.Errorf("Expected latest offset 2, got %d", info.LatestOffset)
}
if info.HighWaterMark != 3 {
t.Errorf("Expected high water mark 3, got %d", info.HighWaterMark)
}
if info.ActiveSubscriptions != 2 { // Two subscriptions created above
t.Errorf("Expected 2 active subscriptions, got %d", info.ActiveSubscriptions)
}
// Get offset metrics
metrics := integration.GetOffsetMetrics()
if metrics.PartitionCount != 1 {
t.Errorf("Expected 1 partition, got %d", metrics.PartitionCount)
}
if metrics.ActiveSubscriptions != 2 {
t.Errorf("Expected 2 active subscriptions in metrics, got %d", metrics.ActiveSubscriptions)
}
})
}
// TestOffsetPersistenceAcrossRestarts tests that offsets persist across system restarts
func TestOffsetPersistenceAcrossRestarts(t *testing.T) {
// Create temporary database
tmpFile, err := os.CreateTemp("", "persistence_test_*.db")
if err != nil {
t.Fatalf("Failed to create temp database: %v", err)
}
tmpFile.Close()
defer os.Remove(tmpFile.Name())
partition := &schema_pb.Partition{
RingSize: 1024,
RangeStart: 0,
RangeStop: 31,
UnixTimeNs: time.Now().UnixNano(),
}
var lastOffset int64
// First session: Create database and assign offsets
{
db, err := CreateDatabase(tmpFile.Name())
if err != nil {
t.Fatalf("Failed to create database: %v", err)
}
storage, err := NewSQLOffsetStorage(db)
if err != nil {
t.Fatalf("Failed to create SQL storage: %v", err)
}
integration := NewSMQOffsetIntegration(storage)
// Publish some records
records := []PublishRecordRequest{
{Key: []byte("msg1"), Value: &schema_pb.RecordValue{}},
{Key: []byte("msg2"), Value: &schema_pb.RecordValue{}},
{Key: []byte("msg3"), Value: &schema_pb.RecordValue{}},
}
response, err := integration.PublishRecordBatch("test-namespace", "test-topic", partition, records)
if err != nil {
t.Fatalf("Failed to publish records: %v", err)
}
lastOffset = response.LastOffset
// Close connections
storage.Close()
db.Close()
}
// Second session: Reopen database and verify persistence
{
db, err := CreateDatabase(tmpFile.Name())
if err != nil {
t.Fatalf("Failed to reopen database: %v", err)
}
defer db.Close()
storage, err := NewSQLOffsetStorage(db)
if err != nil {
t.Fatalf("Failed to create SQL storage: %v", err)
}
defer storage.Close()
integration := NewSMQOffsetIntegration(storage)
// Verify high water mark persisted
hwm, err := integration.GetHighWaterMark("test-namespace", "test-topic", partition)
if err != nil {
t.Fatalf("Failed to get high water mark after restart: %v", err)
}
if hwm != lastOffset+1 {
t.Errorf("Expected high water mark %d after restart, got %d", lastOffset+1, hwm)
}
// Assign new offsets and verify continuity
newResponse, err := integration.PublishRecord("test-namespace", "test-topic", partition, []byte("msg4"), &schema_pb.RecordValue{})
if err != nil {
t.Fatalf("Failed to publish new record after restart: %v", err)
}
expectedNextOffset := lastOffset + 1
if newResponse.BaseOffset != expectedNextOffset {
t.Errorf("Expected next offset %d after restart, got %d", expectedNextOffset, newResponse.BaseOffset)
}
}
}
// TestConcurrentOffsetOperations tests concurrent offset operations
func TestConcurrentOffsetOperations(t *testing.T) {
// Create temporary database
tmpFile, err := os.CreateTemp("", "concurrent_test_*.db")
if err != nil {
t.Fatalf("Failed to create temp database: %v", err)
}
tmpFile.Close()
defer os.Remove(tmpFile.Name())
db, err := CreateDatabase(tmpFile.Name())
if err != nil {
t.Fatalf("Failed to create database: %v", err)
}
defer db.Close()
storage, err := NewSQLOffsetStorage(db)
if err != nil {
t.Fatalf("Failed to create SQL storage: %v", err)
}
defer storage.Close()
integration := NewSMQOffsetIntegration(storage)
partition := &schema_pb.Partition{
RingSize: 1024,
RangeStart: 0,
RangeStop: 31,
UnixTimeNs: time.Now().UnixNano(),
}
// Concurrent publishers
const numPublishers = 5
const recordsPerPublisher = 10
done := make(chan bool, numPublishers)
for i := 0; i < numPublishers; i++ {
go func(publisherID int) {
defer func() { done <- true }()
for j := 0; j < recordsPerPublisher; j++ {
key := fmt.Sprintf("publisher-%d-msg-%d", publisherID, j)
_, err := integration.PublishRecord("test-namespace", "test-topic", partition, []byte(key), &schema_pb.RecordValue{})
if err != nil {
t.Errorf("Publisher %d failed to publish message %d: %v", publisherID, j, err)
return
}
}
}(i)
}
// Wait for all publishers to complete
for i := 0; i < numPublishers; i++ {
<-done
}
// Verify total records
hwm, err := integration.GetHighWaterMark("test-namespace", "test-topic", partition)
if err != nil {
t.Fatalf("Failed to get high water mark: %v", err)
}
expectedTotal := int64(numPublishers * recordsPerPublisher)
if hwm != expectedTotal {
t.Errorf("Expected high water mark %d, got %d", expectedTotal, hwm)
}
// Verify no duplicate offsets
info, err := integration.GetPartitionOffsetInfo("test-namespace", "test-topic", partition)
if err != nil {
t.Fatalf("Failed to get partition info: %v", err)
}
if info.RecordCount != expectedTotal {
t.Errorf("Expected record count %d, got %d", expectedTotal, info.RecordCount)
}
}
// TestOffsetValidationAndErrorHandling tests error conditions and validation
func TestOffsetValidationAndErrorHandling(t *testing.T) {
// Create temporary database
tmpFile, err := os.CreateTemp("", "validation_test_*.db")
if err != nil {
t.Fatalf("Failed to create temp database: %v", err)
}
tmpFile.Close()
defer os.Remove(tmpFile.Name())
db, err := CreateDatabase(tmpFile.Name())
if err != nil {
t.Fatalf("Failed to create database: %v", err)
}
defer db.Close()
storage, err := NewSQLOffsetStorage(db)
if err != nil {
t.Fatalf("Failed to create SQL storage: %v", err)
}
defer storage.Close()
integration := NewSMQOffsetIntegration(storage)
partition := &schema_pb.Partition{
RingSize: 1024,
RangeStart: 0,
RangeStop: 31,
UnixTimeNs: time.Now().UnixNano(),
}
t.Run("InvalidOffsetSubscription", func(t *testing.T) {
// Try to create subscription with invalid offset
_, err := integration.CreateSubscription(
"invalid-sub",
"test-namespace", "test-topic",
partition,
schema_pb.OffsetType_EXACT_OFFSET,
100, // Beyond any existing data
)
if err == nil {
t.Error("Expected error for subscription beyond high water mark")
}
})
t.Run("NegativeOffsetValidation", func(t *testing.T) {
// Try to create subscription with negative offset
_, err := integration.CreateSubscription(
"negative-sub",
"test-namespace", "test-topic",
partition,
schema_pb.OffsetType_EXACT_OFFSET,
-1,
)
if err == nil {
t.Error("Expected error for negative offset")
}
})
t.Run("DuplicateSubscriptionID", func(t *testing.T) {
// Create first subscription
_, err := integration.CreateSubscription(
"duplicate-id",
"test-namespace", "test-topic",
partition,
schema_pb.OffsetType_RESET_TO_EARLIEST,
0,
)
if err != nil {
t.Fatalf("Failed to create first subscription: %v", err)
}
// Try to create duplicate
_, err = integration.CreateSubscription(
"duplicate-id",
"test-namespace", "test-topic",
partition,
schema_pb.OffsetType_RESET_TO_EARLIEST,
0,
)
if err == nil {
t.Error("Expected error for duplicate subscription ID")
}
})
t.Run("OffsetRangeValidation", func(t *testing.T) {
// Add some data first
integration.PublishRecord("test-namespace", "test-topic", partition, []byte("test"), &schema_pb.RecordValue{})
// Test invalid range validation
err := integration.ValidateOffsetRange("test-namespace", "test-topic", partition, 5, 10) // Beyond high water mark
if err == nil {
t.Error("Expected error for range beyond high water mark")
}
err = integration.ValidateOffsetRange("test-namespace", "test-topic", partition, 10, 5) // End before start
if err == nil {
t.Error("Expected error for end offset before start offset")
}
err = integration.ValidateOffsetRange("test-namespace", "test-topic", partition, -1, 5) // Negative start
if err == nil {
t.Error("Expected error for negative start offset")
}
})
}