package offset import ( "testing" "time" "github.com/seaweedfs/seaweedfs/weed/mq/kafka" "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) func TestKafkaToSMQMapping(t *testing.T) { // Create a ledger with some test data ledger := NewLedger() mapper := NewKafkaToSMQMapper(ledger) // Add some test records baseTime := time.Now().UnixNano() testRecords := []struct { kafkaOffset int64 timestamp int64 size int32 }{ {0, baseTime + 1000, 100}, {1, baseTime + 2000, 150}, {2, baseTime + 3000, 200}, {3, baseTime + 4000, 120}, } // Populate the ledger for _, record := range testRecords { offset := ledger.AssignOffsets(1) require.Equal(t, record.kafkaOffset, offset) err := ledger.AppendRecord(record.kafkaOffset, record.timestamp, record.size) require.NoError(t, err) } t.Run("KafkaOffsetToSMQPartitionOffset", func(t *testing.T) { kafkaPartition := int32(0) kafkaOffset := int64(1) partitionOffset, err := mapper.KafkaOffsetToSMQPartitionOffset( kafkaOffset, "test-topic", kafkaPartition) require.NoError(t, err) // Verify the mapping assert.Equal(t, baseTime+2000, partitionOffset.StartTsNs) assert.Equal(t, int32(2520), partitionOffset.Partition.RingSize) start, stop := kafka.MapKafkaPartitionToSMQRange(kafkaPartition) assert.Equal(t, start, partitionOffset.Partition.RangeStart) assert.Equal(t, stop, partitionOffset.Partition.RangeStop) t.Logf("Kafka offset %d → SMQ timestamp %d", kafkaOffset, partitionOffset.StartTsNs) }) t.Run("SMQPartitionOffsetToKafkaOffset", func(t *testing.T) { // Create a partition offset partitionOffset := &schema_pb.PartitionOffset{ StartTsNs: baseTime + 3000, // This should map to Kafka offset 2 } kafkaOffset, err := mapper.SMQPartitionOffsetToKafkaOffset(partitionOffset) require.NoError(t, err) assert.Equal(t, int64(2), kafkaOffset) t.Logf("SMQ timestamp %d → Kafka offset %d", partitionOffset.StartTsNs, kafkaOffset) }) t.Run("MultiplePartitionMapping", func(t *testing.T) { testPartitions := []int32{0, 1, 2, 15} for _, kp := range testPartitions { partitionOffset, err := mapper.KafkaOffsetToSMQPartitionOffset( 0, "test-topic", kp) require.NoError(t, err) start, stop := kafka.MapKafkaPartitionToSMQRange(kp) assert.Equal(t, start, partitionOffset.Partition.RangeStart) assert.Equal(t, stop, partitionOffset.Partition.RangeStop) // Verify reverse mapping extractedPartition := ExtractKafkaPartitionFromSMQPartition(partitionOffset.Partition) assert.Equal(t, kp, extractedPartition) t.Logf("Kafka partition %d → SMQ range [%d, %d]", kp, start, stop) } }) } func TestCreateSMQSubscriptionRequest(t *testing.T) { ledger := NewLedger() mapper := NewKafkaToSMQMapper(ledger) // Add some test data baseTime := time.Now().UnixNano() for i := int64(0); i < 5; i++ { offset := ledger.AssignOffsets(1) err := ledger.AppendRecord(offset, baseTime+i*1000, 100) require.NoError(t, err) } t.Run("SpecificOffset", func(t *testing.T) { partitionOffset, offsetType, err := mapper.CreateSMQSubscriptionRequest( "test-topic", 0, 2, "test-group") require.NoError(t, err) assert.Equal(t, schema_pb.OffsetType_EXACT_TS_NS, offsetType) assert.Equal(t, baseTime+2000, partitionOffset.StartTsNs) start, stop := kafka.MapKafkaPartitionToSMQRange(0) assert.Equal(t, start, partitionOffset.Partition.RangeStart) assert.Equal(t, stop, partitionOffset.Partition.RangeStop) t.Logf("Specific offset 2 → SMQ timestamp %d", partitionOffset.StartTsNs) }) t.Run("EarliestOffset", func(t *testing.T) { partitionOffset, offsetType, err := mapper.CreateSMQSubscriptionRequest( "test-topic", 0, -2, "test-group") require.NoError(t, err) assert.Equal(t, schema_pb.OffsetType_RESET_TO_EARLIEST, offsetType) assert.Equal(t, baseTime, partitionOffset.StartTsNs) t.Logf("EARLIEST → SMQ timestamp %d", partitionOffset.StartTsNs) }) t.Run("LatestOffset", func(t *testing.T) { partitionOffset, offsetType, err := mapper.CreateSMQSubscriptionRequest( "test-topic", 0, -1, "test-group") require.NoError(t, err) assert.Equal(t, schema_pb.OffsetType_RESET_TO_LATEST, offsetType) assert.Equal(t, baseTime+4000, partitionOffset.StartTsNs) t.Logf("LATEST → SMQ timestamp %d", partitionOffset.StartTsNs) }) t.Run("FutureOffset", func(t *testing.T) { // Request offset beyond high water mark partitionOffset, offsetType, err := mapper.CreateSMQSubscriptionRequest( "test-topic", 0, 10, "test-group") require.NoError(t, err) assert.Equal(t, schema_pb.OffsetType_EXACT_TS_NS, offsetType) // Should use current time for future offsets assert.True(t, partitionOffset.StartTsNs > baseTime+4000) t.Logf("Future offset 10 → SMQ timestamp %d (current time)", partitionOffset.StartTsNs) }) } func TestMappingValidation(t *testing.T) { ledger := NewLedger() mapper := NewKafkaToSMQMapper(ledger) t.Run("ValidSequentialMapping", func(t *testing.T) { baseTime := time.Now().UnixNano() // Add sequential records for i := int64(0); i < 3; i++ { offset := ledger.AssignOffsets(1) err := ledger.AppendRecord(offset, baseTime+i*1000, 100) require.NoError(t, err) } err := mapper.ValidateMapping("test-topic", 0) assert.NoError(t, err) }) t.Run("InvalidNonSequentialOffsets", func(t *testing.T) { ledger2 := NewLedger() mapper2 := NewKafkaToSMQMapper(ledger2) baseTime := time.Now().UnixNano() // Manually create non-sequential offsets (this shouldn't happen in practice) ledger2.entries = []OffsetEntry{ {KafkaOffset: 0, Timestamp: baseTime, Size: 100}, {KafkaOffset: 2, Timestamp: baseTime + 1000, Size: 100}, // Gap! } err := mapper2.ValidateMapping("test-topic", 0) assert.Error(t, err) assert.Contains(t, err.Error(), "non-sequential") }) } func TestGetMappingInfo(t *testing.T) { ledger := NewLedger() mapper := NewKafkaToSMQMapper(ledger) baseTime := time.Now().UnixNano() offset := ledger.AssignOffsets(1) err := ledger.AppendRecord(offset, baseTime, 150) require.NoError(t, err) info, err := mapper.GetMappingInfo(0, 2) require.NoError(t, err) assert.Equal(t, int64(0), info.KafkaOffset) assert.Equal(t, baseTime, info.SMQTimestamp) assert.Equal(t, int32(2), info.KafkaPartition) start, stop := kafka.MapKafkaPartitionToSMQRange(2) assert.Equal(t, start, info.SMQRangeStart) assert.Equal(t, stop, info.SMQRangeStop) assert.Equal(t, int32(150), info.MessageSize) t.Logf("Mapping info: Kafka %d:%d → SMQ %d [%d-%d] (%d bytes)", info.KafkaPartition, info.KafkaOffset, info.SMQTimestamp, info.SMQRangeStart, info.SMQRangeStop, info.MessageSize) } func TestGetOffsetRange(t *testing.T) { ledger := NewLedger() mapper := NewKafkaToSMQMapper(ledger) baseTime := time.Now().UnixNano() timestamps := []int64{ baseTime + 1000, baseTime + 2000, baseTime + 3000, baseTime + 4000, baseTime + 5000, } // Add records for i, timestamp := range timestamps { offset := ledger.AssignOffsets(1) err := ledger.AppendRecord(offset, timestamp, 100) require.NoError(t, err, "Failed to add record %d", i) } t.Run("FullRange", func(t *testing.T) { startOffset, endOffset, err := mapper.GetOffsetRange( baseTime+1500, baseTime+4500) require.NoError(t, err) assert.Equal(t, int64(1), startOffset) // First offset >= baseTime+1500 assert.Equal(t, int64(3), endOffset) // Last offset <= baseTime+4500 t.Logf("Time range [%d, %d] → Kafka offsets [%d, %d]", baseTime+1500, baseTime+4500, startOffset, endOffset) }) t.Run("NoMatchingRange", func(t *testing.T) { _, _, err := mapper.GetOffsetRange(baseTime+10000, baseTime+20000) assert.Error(t, err) assert.Contains(t, err.Error(), "no offsets found") }) } func TestCreatePartitionOffsetForTimeRange(t *testing.T) { ledger := NewLedger() mapper := NewKafkaToSMQMapper(ledger) startTime := time.Now().UnixNano() kafkaPartition := int32(5) partitionOffset := mapper.CreatePartitionOffsetForTimeRange(kafkaPartition, startTime) assert.Equal(t, startTime, partitionOffset.StartTsNs) assert.Equal(t, int32(2520), partitionOffset.Partition.RingSize) start, stop := kafka.MapKafkaPartitionToSMQRange(kafkaPartition) assert.Equal(t, start, partitionOffset.Partition.RangeStart) assert.Equal(t, stop, partitionOffset.Partition.RangeStop) t.Logf("Kafka partition %d time range → SMQ PartitionOffset [%d-%d] @ %d", kafkaPartition, partitionOffset.Partition.RangeStart, partitionOffset.Partition.RangeStop, partitionOffset.StartTsNs) } // BenchmarkMapping tests the performance of offset mapping operations func BenchmarkMapping(b *testing.B) { ledger := NewLedger() mapper := NewKafkaToSMQMapper(ledger) // Populate with test data baseTime := time.Now().UnixNano() for i := int64(0); i < 1000; i++ { offset := ledger.AssignOffsets(1) ledger.AppendRecord(offset, baseTime+i*1000, 100) } b.Run("KafkaToSMQ", func(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { kafkaOffset := int64(i % 1000) _, err := mapper.KafkaOffsetToSMQPartitionOffset(kafkaOffset, "test", 0) if err != nil { b.Fatal(err) } } }) b.Run("SMQToKafka", func(b *testing.B) { partitionOffset := &schema_pb.PartitionOffset{ StartTsNs: baseTime + 500000, // Middle timestamp } b.ResetTimer() for i := 0; i < b.N; i++ { _, err := mapper.SMQPartitionOffsetToKafkaOffset(partitionOffset) if err != nil { b.Fatal(err) } } }) }