Browse Source

ring size MaxPartitionCount

pull/7231/head
chrislu 2 months ago
parent
commit
5c17bba00b
  1. 9
      weed/mq/kafka/integration/agent_client.go
  2. 29
      weed/mq/kafka/offset/smq_mapping.go

9
weed/mq/kafka/integration/agent_client.go

@ -10,6 +10,7 @@ import (
"google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/credentials/insecure"
"github.com/seaweedfs/seaweedfs/weed/mq/kafka/offset" "github.com/seaweedfs/seaweedfs/weed/mq/kafka/offset"
"github.com/seaweedfs/seaweedfs/weed/mq/pub_balancer"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_agent_pb" "github.com/seaweedfs/seaweedfs/weed/pb/mq_agent_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
) )
@ -302,10 +303,12 @@ func (ac *AgentClient) createSubscribeSession(topic string, partition int32, sta
}, },
PartitionOffsets: []*schema_pb.PartitionOffset{ PartitionOffsets: []*schema_pb.PartitionOffset{
{ {
// Map Kafka partition to specific SMQ ring range
// Calculate range size dynamically based on MaxPartitionCount
Partition: &schema_pb.Partition{ Partition: &schema_pb.Partition{
RingSize: 1024, // Standard ring size
RangeStart: 0,
RangeStop: 1023,
RingSize: pub_balancer.MaxPartitionCount, // Standard ring size
RangeStart: partition * int32(pub_balancer.MaxPartitionCount/32), // Map Kafka partition to ring range
RangeStop: (partition+1)*int32(pub_balancer.MaxPartitionCount/32) - 1, // Dynamic range calculation
}, },
StartTsNs: startOffset, // Use offset as timestamp for now StartTsNs: startOffset, // Use offset as timestamp for now
}, },

29
weed/mq/kafka/offset/smq_mapping.go

@ -4,6 +4,7 @@ import (
"fmt" "fmt"
"time" "time"
"github.com/seaweedfs/seaweedfs/weed/mq/pub_balancer"
"github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
) )
@ -35,10 +36,11 @@ func (m *KafkaToSMQMapper) KafkaOffsetToSMQPartitionOffset(
// Step 2: Create SMQ Partition // Step 2: Create SMQ Partition
// SMQ uses a ring-based partitioning scheme // SMQ uses a ring-based partitioning scheme
rangeSize := int32(pub_balancer.MaxPartitionCount / 32) // Calculate dynamic range size
smqPartition := &schema_pb.Partition{ smqPartition := &schema_pb.Partition{
RingSize: 1024, // Standard ring size for SMQ
RangeStart: int32(kafkaPartition) * 32, // Map Kafka partition to ring range
RangeStop: (int32(kafkaPartition)+1)*32 - 1, // Each Kafka partition gets 32 ring slots
RingSize: pub_balancer.MaxPartitionCount, // Standard ring size
RangeStart: int32(kafkaPartition) * rangeSize, // Map Kafka partition to ring range
RangeStop: (int32(kafkaPartition)+1)*rangeSize - 1, // Each Kafka partition gets calculated slots
UnixTimeNs: smqTimestamp, // When this partition mapping was created UnixTimeNs: smqTimestamp, // When this partition mapping was created
} }
@ -113,10 +115,11 @@ func (m *KafkaToSMQMapper) CreateSMQSubscriptionRequest(
} }
// Create SMQ partition mapping // Create SMQ partition mapping
rangeSize := int32(pub_balancer.MaxPartitionCount / 32) // Calculate dynamic range size
smqPartition := &schema_pb.Partition{ smqPartition := &schema_pb.Partition{
RingSize: 1024,
RangeStart: int32(kafkaPartition) * 32,
RangeStop: (int32(kafkaPartition)+1)*32 - 1,
RingSize: pub_balancer.MaxPartitionCount,
RangeStart: int32(kafkaPartition) * rangeSize,
RangeStop: (int32(kafkaPartition)+1)*rangeSize - 1,
UnixTimeNs: time.Now().UnixNano(), UnixTimeNs: time.Now().UnixNano(),
} }
@ -131,7 +134,8 @@ func (m *KafkaToSMQMapper) CreateSMQSubscriptionRequest(
// ExtractKafkaPartitionFromSMQPartition extracts the Kafka partition number from SMQ Partition // ExtractKafkaPartitionFromSMQPartition extracts the Kafka partition number from SMQ Partition
func ExtractKafkaPartitionFromSMQPartition(smqPartition *schema_pb.Partition) int32 { func ExtractKafkaPartitionFromSMQPartition(smqPartition *schema_pb.Partition) int32 {
// Reverse the mapping: SMQ range → Kafka partition // Reverse the mapping: SMQ range → Kafka partition
return smqPartition.RangeStart / 32
rangeSize := int32(pub_balancer.MaxPartitionCount / 32)
return smqPartition.RangeStart / rangeSize
} }
// OffsetMappingInfo provides debugging information about the mapping // OffsetMappingInfo provides debugging information about the mapping
@ -155,8 +159,8 @@ func (m *KafkaToSMQMapper) GetMappingInfo(kafkaOffset int64, kafkaPartition int3
KafkaOffset: kafkaOffset, KafkaOffset: kafkaOffset,
SMQTimestamp: timestamp, SMQTimestamp: timestamp,
KafkaPartition: kafkaPartition, KafkaPartition: kafkaPartition,
SMQRangeStart: kafkaPartition * 32,
SMQRangeStop: (kafkaPartition+1)*32 - 1,
SMQRangeStart: kafkaPartition * int32(pub_balancer.MaxPartitionCount/32),
SMQRangeStop: (kafkaPartition+1)*int32(pub_balancer.MaxPartitionCount/32) - 1,
MessageSize: size, MessageSize: size,
}, nil }, nil
} }
@ -211,10 +215,11 @@ func (m *KafkaToSMQMapper) CreatePartitionOffsetForTimeRange(
startTime int64, startTime int64,
) *schema_pb.PartitionOffset { ) *schema_pb.PartitionOffset {
rangeSize := int32(pub_balancer.MaxPartitionCount / 32) // Calculate dynamic range size
smqPartition := &schema_pb.Partition{ smqPartition := &schema_pb.Partition{
RingSize: 1024,
RangeStart: kafkaPartition * 32,
RangeStop: (kafkaPartition+1)*32 - 1,
RingSize: pub_balancer.MaxPartitionCount,
RangeStart: kafkaPartition * rangeSize,
RangeStop: (kafkaPartition+1)*rangeSize - 1,
UnixTimeNs: time.Now().UnixNano(), UnixTimeNs: time.Now().UnixNano(),
} }

Loading…
Cancel
Save