|
@ -24,6 +24,7 @@ func NewConsumerGroup(t *mq_pb.Topic, reblanceSeconds int32, filerClientAccessor |
|
|
topic: topic.FromPbTopic(t), |
|
|
topic: topic.FromPbTopic(t), |
|
|
ConsumerGroupInstances: cmap.New[*ConsumerGroupInstance](), |
|
|
ConsumerGroupInstances: cmap.New[*ConsumerGroupInstance](), |
|
|
filerClientAccessor: filerClientAccessor, |
|
|
filerClientAccessor: filerClientAccessor, |
|
|
|
|
|
stopCh: make(chan struct{}), |
|
|
} |
|
|
} |
|
|
if conf, err := cg.filerClientAccessor.ReadTopicConfFromFiler(cg.topic); err == nil { |
|
|
if conf, err := cg.filerClientAccessor.ReadTopicConfFromFiler(cg.topic); err == nil { |
|
|
var partitions []topic.Partition |
|
|
var partitions []topic.Partition |
|
@ -53,8 +54,8 @@ func NewConsumerGroup(t *mq_pb.Topic, reblanceSeconds int32, filerClientAccessor |
|
|
Message: &mq_pb.SubscriberToSubCoordinatorResponse_Assignment_{ |
|
|
Message: &mq_pb.SubscriberToSubCoordinatorResponse_Assignment_{ |
|
|
Assignment: &mq_pb.SubscriberToSubCoordinatorResponse_Assignment{ |
|
|
Assignment: &mq_pb.SubscriberToSubCoordinatorResponse_Assignment{ |
|
|
PartitionAssignment: &mq_pb.BrokerPartitionAssignment{ |
|
|
PartitionAssignment: &mq_pb.BrokerPartitionAssignment{ |
|
|
Partition: adjustment.partition.ToPbPartition(), |
|
|
|
|
|
LeaderBroker: assignment.LeaderBroker, |
|
|
|
|
|
|
|
|
Partition: adjustment.partition.ToPbPartition(), |
|
|
|
|
|
LeaderBroker: assignment.LeaderBroker, |
|
|
FollowerBroker: assignment.FollowerBroker, |
|
|
FollowerBroker: assignment.FollowerBroker, |
|
|
}, |
|
|
}, |
|
|
}, |
|
|
}, |
|
|