diff --git a/weed/mq/kafka/integration/agent_client.go b/weed/mq/kafka/integration/agent_client.go index c972d0d62..dee84e3a0 100644 --- a/weed/mq/kafka/integration/agent_client.go +++ b/weed/mq/kafka/integration/agent_client.go @@ -10,6 +10,7 @@ import ( "google.golang.org/grpc/credentials/insecure" "github.com/seaweedfs/seaweedfs/weed/mq/kafka/offset" + "github.com/seaweedfs/seaweedfs/weed/mq/pub_balancer" "github.com/seaweedfs/seaweedfs/weed/pb/mq_agent_pb" "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" ) @@ -302,10 +303,12 @@ func (ac *AgentClient) createSubscribeSession(topic string, partition int32, sta }, PartitionOffsets: []*schema_pb.PartitionOffset{ { + // Map Kafka partition to specific SMQ ring range + // Calculate range size dynamically based on MaxPartitionCount Partition: &schema_pb.Partition{ - RingSize: 1024, // Standard ring size - RangeStart: 0, - RangeStop: 1023, + RingSize: pub_balancer.MaxPartitionCount, // Standard ring size + RangeStart: partition * int32(pub_balancer.MaxPartitionCount/32), // Map Kafka partition to ring range + RangeStop: (partition+1)*int32(pub_balancer.MaxPartitionCount/32) - 1, // Dynamic range calculation }, StartTsNs: startOffset, // Use offset as timestamp for now }, diff --git a/weed/mq/kafka/offset/smq_mapping.go b/weed/mq/kafka/offset/smq_mapping.go index 50e69a584..91ebe537c 100644 --- a/weed/mq/kafka/offset/smq_mapping.go +++ b/weed/mq/kafka/offset/smq_mapping.go @@ -4,6 +4,7 @@ import ( "fmt" "time" + "github.com/seaweedfs/seaweedfs/weed/mq/pub_balancer" "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" ) @@ -35,11 +36,12 @@ func (m *KafkaToSMQMapper) KafkaOffsetToSMQPartitionOffset( // Step 2: Create SMQ Partition // SMQ uses a ring-based partitioning scheme + rangeSize := int32(pub_balancer.MaxPartitionCount / 32) // Calculate dynamic range size smqPartition := &schema_pb.Partition{ - RingSize: 1024, // Standard ring size for SMQ - RangeStart: int32(kafkaPartition) * 32, // Map Kafka partition to ring range - RangeStop: (int32(kafkaPartition)+1)*32 - 1, // Each Kafka partition gets 32 ring slots - UnixTimeNs: smqTimestamp, // When this partition mapping was created + RingSize: pub_balancer.MaxPartitionCount, // Standard ring size + RangeStart: int32(kafkaPartition) * rangeSize, // Map Kafka partition to ring range + RangeStop: (int32(kafkaPartition)+1)*rangeSize - 1, // Each Kafka partition gets calculated slots + UnixTimeNs: smqTimestamp, // When this partition mapping was created } // Step 3: Create PartitionOffset with the mapped timestamp @@ -113,10 +115,11 @@ func (m *KafkaToSMQMapper) CreateSMQSubscriptionRequest( } // Create SMQ partition mapping + rangeSize := int32(pub_balancer.MaxPartitionCount / 32) // Calculate dynamic range size smqPartition := &schema_pb.Partition{ - RingSize: 1024, - RangeStart: int32(kafkaPartition) * 32, - RangeStop: (int32(kafkaPartition)+1)*32 - 1, + RingSize: pub_balancer.MaxPartitionCount, + RangeStart: int32(kafkaPartition) * rangeSize, + RangeStop: (int32(kafkaPartition)+1)*rangeSize - 1, UnixTimeNs: time.Now().UnixNano(), } @@ -131,7 +134,8 @@ func (m *KafkaToSMQMapper) CreateSMQSubscriptionRequest( // ExtractKafkaPartitionFromSMQPartition extracts the Kafka partition number from SMQ Partition func ExtractKafkaPartitionFromSMQPartition(smqPartition *schema_pb.Partition) int32 { // Reverse the mapping: SMQ range → Kafka partition - return smqPartition.RangeStart / 32 + rangeSize := int32(pub_balancer.MaxPartitionCount / 32) + return smqPartition.RangeStart / rangeSize } // OffsetMappingInfo provides debugging information about the mapping @@ -155,8 +159,8 @@ func (m *KafkaToSMQMapper) GetMappingInfo(kafkaOffset int64, kafkaPartition int3 KafkaOffset: kafkaOffset, SMQTimestamp: timestamp, KafkaPartition: kafkaPartition, - SMQRangeStart: kafkaPartition * 32, - SMQRangeStop: (kafkaPartition+1)*32 - 1, + SMQRangeStart: kafkaPartition * int32(pub_balancer.MaxPartitionCount/32), + SMQRangeStop: (kafkaPartition+1)*int32(pub_balancer.MaxPartitionCount/32) - 1, MessageSize: size, }, nil } @@ -211,10 +215,11 @@ func (m *KafkaToSMQMapper) CreatePartitionOffsetForTimeRange( startTime int64, ) *schema_pb.PartitionOffset { + rangeSize := int32(pub_balancer.MaxPartitionCount / 32) // Calculate dynamic range size smqPartition := &schema_pb.Partition{ - RingSize: 1024, - RangeStart: kafkaPartition * 32, - RangeStop: (kafkaPartition+1)*32 - 1, + RingSize: pub_balancer.MaxPartitionCount, + RangeStart: kafkaPartition * rangeSize, + RangeStop: (kafkaPartition+1)*rangeSize - 1, UnixTimeNs: time.Now().UnixNano(), }