You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
225 lines
6.4 KiB
225 lines
6.4 KiB
package kafka
|
|
|
|
import (
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/mq/pub_balancer"
|
|
)
|
|
|
|
func TestPartitionMapper_GetRangeSize(t *testing.T) {
|
|
mapper := NewPartitionMapper()
|
|
rangeSize := mapper.GetRangeSize()
|
|
|
|
if rangeSize != 32 {
|
|
t.Errorf("Expected range size 32, got %d", rangeSize)
|
|
}
|
|
|
|
// Verify that the range size divides evenly into available partitions
|
|
maxPartitions := mapper.GetMaxKafkaPartitions()
|
|
totalUsed := maxPartitions * rangeSize
|
|
|
|
if totalUsed > int32(pub_balancer.MaxPartitionCount) {
|
|
t.Errorf("Total used slots (%d) exceeds MaxPartitionCount (%d)", totalUsed, pub_balancer.MaxPartitionCount)
|
|
}
|
|
|
|
t.Logf("Range size: %d, Max Kafka partitions: %d, Ring utilization: %.2f%%",
|
|
rangeSize, maxPartitions, float64(totalUsed)/float64(pub_balancer.MaxPartitionCount)*100)
|
|
}
|
|
|
|
func TestPartitionMapper_MapKafkaPartitionToSMQRange(t *testing.T) {
|
|
mapper := NewPartitionMapper()
|
|
|
|
tests := []struct {
|
|
kafkaPartition int32
|
|
expectedStart int32
|
|
expectedStop int32
|
|
}{
|
|
{0, 0, 31},
|
|
{1, 32, 63},
|
|
{2, 64, 95},
|
|
{10, 320, 351},
|
|
}
|
|
|
|
for _, tt := range tests {
|
|
t.Run("", func(t *testing.T) {
|
|
start, stop := mapper.MapKafkaPartitionToSMQRange(tt.kafkaPartition)
|
|
|
|
if start != tt.expectedStart {
|
|
t.Errorf("Kafka partition %d: expected start %d, got %d", tt.kafkaPartition, tt.expectedStart, start)
|
|
}
|
|
|
|
if stop != tt.expectedStop {
|
|
t.Errorf("Kafka partition %d: expected stop %d, got %d", tt.kafkaPartition, tt.expectedStop, stop)
|
|
}
|
|
|
|
// Verify range size is consistent
|
|
rangeSize := stop - start + 1
|
|
if rangeSize != mapper.GetRangeSize() {
|
|
t.Errorf("Inconsistent range size: expected %d, got %d", mapper.GetRangeSize(), rangeSize)
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestPartitionMapper_ExtractKafkaPartitionFromSMQRange(t *testing.T) {
|
|
mapper := NewPartitionMapper()
|
|
|
|
tests := []struct {
|
|
rangeStart int32
|
|
expectedKafka int32
|
|
}{
|
|
{0, 0},
|
|
{32, 1},
|
|
{64, 2},
|
|
{320, 10},
|
|
}
|
|
|
|
for _, tt := range tests {
|
|
t.Run("", func(t *testing.T) {
|
|
kafkaPartition := mapper.ExtractKafkaPartitionFromSMQRange(tt.rangeStart)
|
|
|
|
if kafkaPartition != tt.expectedKafka {
|
|
t.Errorf("Range start %d: expected Kafka partition %d, got %d",
|
|
tt.rangeStart, tt.expectedKafka, kafkaPartition)
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestPartitionMapper_RoundTrip(t *testing.T) {
|
|
mapper := NewPartitionMapper()
|
|
|
|
// Test round-trip conversion for all valid Kafka partitions
|
|
maxPartitions := mapper.GetMaxKafkaPartitions()
|
|
|
|
for kafkaPartition := int32(0); kafkaPartition < maxPartitions; kafkaPartition++ {
|
|
// Kafka -> SMQ -> Kafka
|
|
rangeStart, rangeStop := mapper.MapKafkaPartitionToSMQRange(kafkaPartition)
|
|
extractedKafka := mapper.ExtractKafkaPartitionFromSMQRange(rangeStart)
|
|
|
|
if extractedKafka != kafkaPartition {
|
|
t.Errorf("Round-trip failed for partition %d: got %d", kafkaPartition, extractedKafka)
|
|
}
|
|
|
|
// Verify no overlap with next partition
|
|
if kafkaPartition < maxPartitions-1 {
|
|
nextStart, _ := mapper.MapKafkaPartitionToSMQRange(kafkaPartition + 1)
|
|
if rangeStop >= nextStart {
|
|
t.Errorf("Partition %d range [%d,%d] overlaps with partition %d start %d",
|
|
kafkaPartition, rangeStart, rangeStop, kafkaPartition+1, nextStart)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestPartitionMapper_CreateSMQPartition(t *testing.T) {
|
|
mapper := NewPartitionMapper()
|
|
|
|
kafkaPartition := int32(5)
|
|
unixTimeNs := time.Now().UnixNano()
|
|
|
|
partition := mapper.CreateSMQPartition(kafkaPartition, unixTimeNs)
|
|
|
|
if partition.RingSize != pub_balancer.MaxPartitionCount {
|
|
t.Errorf("Expected ring size %d, got %d", pub_balancer.MaxPartitionCount, partition.RingSize)
|
|
}
|
|
|
|
expectedStart, expectedStop := mapper.MapKafkaPartitionToSMQRange(kafkaPartition)
|
|
if partition.RangeStart != expectedStart {
|
|
t.Errorf("Expected range start %d, got %d", expectedStart, partition.RangeStart)
|
|
}
|
|
|
|
if partition.RangeStop != expectedStop {
|
|
t.Errorf("Expected range stop %d, got %d", expectedStop, partition.RangeStop)
|
|
}
|
|
|
|
if partition.UnixTimeNs != unixTimeNs {
|
|
t.Errorf("Expected timestamp %d, got %d", unixTimeNs, partition.UnixTimeNs)
|
|
}
|
|
}
|
|
|
|
func TestPartitionMapper_ValidateKafkaPartition(t *testing.T) {
|
|
mapper := NewPartitionMapper()
|
|
|
|
tests := []struct {
|
|
partition int32
|
|
valid bool
|
|
}{
|
|
{-1, false},
|
|
{0, true},
|
|
{1, true},
|
|
{mapper.GetMaxKafkaPartitions() - 1, true},
|
|
{mapper.GetMaxKafkaPartitions(), false},
|
|
{1000, false},
|
|
}
|
|
|
|
for _, tt := range tests {
|
|
t.Run("", func(t *testing.T) {
|
|
valid := mapper.ValidateKafkaPartition(tt.partition)
|
|
if valid != tt.valid {
|
|
t.Errorf("Partition %d: expected valid=%v, got %v", tt.partition, tt.valid, valid)
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestPartitionMapper_ConsistencyWithGlobalFunctions(t *testing.T) {
|
|
mapper := NewPartitionMapper()
|
|
|
|
kafkaPartition := int32(7)
|
|
unixTimeNs := time.Now().UnixNano()
|
|
|
|
// Test that global functions produce same results as mapper methods
|
|
start1, stop1 := mapper.MapKafkaPartitionToSMQRange(kafkaPartition)
|
|
start2, stop2 := MapKafkaPartitionToSMQRange(kafkaPartition)
|
|
|
|
if start1 != start2 || stop1 != stop2 {
|
|
t.Errorf("Global function inconsistent: mapper=(%d,%d), global=(%d,%d)",
|
|
start1, stop1, start2, stop2)
|
|
}
|
|
|
|
partition1 := mapper.CreateSMQPartition(kafkaPartition, unixTimeNs)
|
|
partition2 := CreateSMQPartition(kafkaPartition, unixTimeNs)
|
|
|
|
if partition1.RangeStart != partition2.RangeStart || partition1.RangeStop != partition2.RangeStop {
|
|
t.Errorf("Global CreateSMQPartition inconsistent")
|
|
}
|
|
|
|
extracted1 := mapper.ExtractKafkaPartitionFromSMQRange(start1)
|
|
extracted2 := ExtractKafkaPartitionFromSMQRange(start1)
|
|
|
|
if extracted1 != extracted2 {
|
|
t.Errorf("Global ExtractKafkaPartitionFromSMQRange inconsistent: %d vs %d", extracted1, extracted2)
|
|
}
|
|
}
|
|
|
|
func TestPartitionMapper_GetPartitionMappingInfo(t *testing.T) {
|
|
mapper := NewPartitionMapper()
|
|
|
|
info := mapper.GetPartitionMappingInfo()
|
|
|
|
// Verify all expected keys are present
|
|
expectedKeys := []string{"ring_size", "range_size", "max_kafka_partitions", "ring_utilization"}
|
|
for _, key := range expectedKeys {
|
|
if _, exists := info[key]; !exists {
|
|
t.Errorf("Missing key in mapping info: %s", key)
|
|
}
|
|
}
|
|
|
|
// Verify values are reasonable
|
|
if info["ring_size"].(int) != pub_balancer.MaxPartitionCount {
|
|
t.Errorf("Incorrect ring_size in info")
|
|
}
|
|
|
|
if info["range_size"].(int32) != mapper.GetRangeSize() {
|
|
t.Errorf("Incorrect range_size in info")
|
|
}
|
|
|
|
utilization := info["ring_utilization"].(float64)
|
|
if utilization <= 0 || utilization > 1 {
|
|
t.Errorf("Invalid ring utilization: %f", utilization)
|
|
}
|
|
|
|
t.Logf("Partition mapping info: %+v", info)
|
|
}
|