diff --git a/weed/mq/kafka/integration/seaweedmq_handler.go b/weed/mq/kafka/integration/seaweedmq_handler.go index 19e68bed1..75938168c 100644 --- a/weed/mq/kafka/integration/seaweedmq_handler.go +++ b/weed/mq/kafka/integration/seaweedmq_handler.go @@ -13,6 +13,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/cluster" "github.com/seaweedfs/seaweedfs/weed/mq/kafka/offset" + "github.com/seaweedfs/seaweedfs/weed/mq/pub_balancer" "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" @@ -810,7 +811,7 @@ func (bc *BrokerClient) getOrCreatePublisher(topic string, partition int32) (*Br Name: topic, }, Partition: &schema_pb.Partition{ - RingSize: 1024, // Standard ring size + RingSize: pub_balancer.MaxPartitionCount, // Use the standard ring size constant RangeStart: partition * 32, RangeStop: (partition+1)*32 - 1, }, @@ -869,7 +870,7 @@ func (bc *BrokerClient) GetOrCreateSubscriber(topic string, partition int32, sta }, PartitionOffset: &schema_pb.PartitionOffset{ Partition: &schema_pb.Partition{ - RingSize: 1024, + RingSize: pub_balancer.MaxPartitionCount, RangeStart: partition * 32, RangeStop: (partition+1)*32 - 1, }, diff --git a/weed/mq/kafka/offset/persistence.go b/weed/mq/kafka/offset/persistence.go index 116342b66..7bb01b413 100644 --- a/weed/mq/kafka/offset/persistence.go +++ b/weed/mq/kafka/offset/persistence.go @@ -8,6 +8,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/mq/client/pub_client" "github.com/seaweedfs/seaweedfs/weed/mq/client/sub_client" + "github.com/seaweedfs/seaweedfs/weed/mq/pub_balancer" "github.com/seaweedfs/seaweedfs/weed/mq/topic" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" @@ -207,9 +208,9 @@ func (s *SeaweedMQStorage) LoadOffsetMappings(topicPartition string) ([]OffsetEn PartitionOffsets: []*schema_pb.PartitionOffset{ { Partition: &schema_pb.Partition{ - RingSize: 1024, + RingSize: pub_balancer.MaxPartitionCount, RangeStart: 0, - RangeStop: 1023, + RangeStop: pub_balancer.MaxPartitionCount - 1, }, StartTsNs: 0, // Read from beginning },