Browse Source

consumer instance passing MaxPartitionCount to coordinator

pull/5890/head
chrislu 8 months ago
parent
commit
372bd8d71d
  1. 1
      weed/mq/client/sub_client/connect_to_sub_coordinator.go
  2. 2
      weed/mq/client/sub_client/subscriber.go
  3. 1
      weed/mq/sub_coordinator/consumer_group.go
  4. 1
      weed/mq/sub_coordinator/coordinator.go
  5. 1
      weed/pb/mq.proto
  6. 17
      weed/pb/mq_pb/mq.pb.go

1
weed/mq/client/sub_client/connect_to_sub_coordinator.go

@ -51,6 +51,7 @@ func (sub *TopicSubscriber) doKeepConnectedToSubCoordinator() {
ConsumerGroup: sub.SubscriberConfig.ConsumerGroup, ConsumerGroup: sub.SubscriberConfig.ConsumerGroup,
ConsumerGroupInstanceId: sub.SubscriberConfig.ConsumerGroupInstanceId, ConsumerGroupInstanceId: sub.SubscriberConfig.ConsumerGroupInstanceId,
Topic: sub.ContentConfig.Topic.ToPbTopic(), Topic: sub.ContentConfig.Topic.ToPbTopic(),
MaxPartitionCount: sub.ProcessorConfig.ConcurrentPartitionLimit,
}, },
}, },
}); err != nil { }); err != nil {

2
weed/mq/client/sub_client/subscriber.go

@ -21,7 +21,7 @@ type ContentConfiguration struct {
} }
type ProcessorConfiguration struct { type ProcessorConfiguration struct {
ConcurrentPartitionLimit int // how many partitions to process concurrently
ConcurrentPartitionLimit int32 // how many partitions to process concurrently
} }
type OnEachMessageFunc func(key, value []byte) (shouldContinue bool, err error) type OnEachMessageFunc func(key, value []byte) (shouldContinue bool, err error)

1
weed/mq/sub_coordinator/consumer_group.go

@ -14,6 +14,7 @@ type ConsumerGroupInstance struct {
// the consumer group instance may not have an active partition // the consumer group instance may not have an active partition
Partitions []*topic.Partition Partitions []*topic.Partition
ResponseChan chan *mq_pb.SubscriberToSubCoordinatorResponse ResponseChan chan *mq_pb.SubscriberToSubCoordinatorResponse
MaxPartitionCount int32
} }
type ConsumerGroup struct { type ConsumerGroup struct {
topic topic.Topic topic topic.Topic

1
weed/mq/sub_coordinator/coordinator.go

@ -67,6 +67,7 @@ func (c *Coordinator) AddSubscriber(initMessage *mq_pb.SubscriberToSubCoordinato
cgi, _ = cg.ConsumerGroupInstances.Get(initMessage.ConsumerGroupInstanceId) cgi, _ = cg.ConsumerGroupInstances.Get(initMessage.ConsumerGroupInstanceId)
} }
} }
cgi.MaxPartitionCount = initMessage.MaxPartitionCount
cg.OnAddConsumerGroupInstance(initMessage.ConsumerGroupInstanceId, initMessage.Topic) cg.OnAddConsumerGroupInstance(initMessage.ConsumerGroupInstanceId, initMessage.Topic)
return cgi return cgi
} }

1
weed/pb/mq.proto

@ -163,6 +163,7 @@ message SubscriberToSubCoordinatorRequest {
string consumer_group = 1; string consumer_group = 1;
string consumer_group_instance_id = 2; string consumer_group_instance_id = 2;
Topic topic = 3; Topic topic = 3;
int32 max_partition_count = 4;
} }
message AckMessage { message AckMessage {
Partition partition = 1; Partition partition = 1;

17
weed/pb/mq_pb/mq.pb.go

@ -2195,6 +2195,7 @@ type SubscriberToSubCoordinatorRequest_InitMessage struct {
ConsumerGroup string `protobuf:"bytes,1,opt,name=consumer_group,json=consumerGroup,proto3" json:"consumer_group,omitempty"` ConsumerGroup string `protobuf:"bytes,1,opt,name=consumer_group,json=consumerGroup,proto3" json:"consumer_group,omitempty"`
ConsumerGroupInstanceId string `protobuf:"bytes,2,opt,name=consumer_group_instance_id,json=consumerGroupInstanceId,proto3" json:"consumer_group_instance_id,omitempty"` ConsumerGroupInstanceId string `protobuf:"bytes,2,opt,name=consumer_group_instance_id,json=consumerGroupInstanceId,proto3" json:"consumer_group_instance_id,omitempty"`
Topic *Topic `protobuf:"bytes,3,opt,name=topic,proto3" json:"topic,omitempty"` Topic *Topic `protobuf:"bytes,3,opt,name=topic,proto3" json:"topic,omitempty"`
MaxPartitionCount int32 `protobuf:"varint,4,opt,name=max_partition_count,json=maxPartitionCount,proto3" json:"max_partition_count,omitempty"`
} }
func (x *SubscriberToSubCoordinatorRequest_InitMessage) Reset() { func (x *SubscriberToSubCoordinatorRequest_InitMessage) Reset() {
@ -2250,6 +2251,13 @@ func (x *SubscriberToSubCoordinatorRequest_InitMessage) GetTopic() *Topic {
return nil return nil
} }
func (x *SubscriberToSubCoordinatorRequest_InitMessage) GetMaxPartitionCount() int32 {
if x != nil {
return x.MaxPartitionCount
}
return 0
}
type SubscriberToSubCoordinatorRequest_AckMessage struct { type SubscriberToSubCoordinatorRequest_AckMessage struct {
state protoimpl.MessageState state protoimpl.MessageState
sizeCache protoimpl.SizeCache sizeCache protoimpl.SizeCache
@ -2946,7 +2954,7 @@ var file_mq_proto_rawDesc = []byte{
0x28, 0x08, 0x52, 0x0a, 0x69, 0x73, 0x44, 0x72, 0x61, 0x69, 0x6e, 0x69, 0x6e, 0x67, 0x22, 0x1f, 0x28, 0x08, 0x52, 0x0a, 0x69, 0x73, 0x44, 0x72, 0x61, 0x69, 0x6e, 0x69, 0x6e, 0x67, 0x22, 0x1f,
0x0a, 0x1d, 0x41, 0x73, 0x73, 0x69, 0x67, 0x6e, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x50, 0x61, 0x72, 0x0a, 0x1d, 0x41, 0x73, 0x73, 0x69, 0x67, 0x6e, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x50, 0x61, 0x72,
0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22,
0xca, 0x03, 0x0a, 0x21, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x72, 0x54, 0x6f,
0xfa, 0x03, 0x0a, 0x21, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x72, 0x54, 0x6f,
0x53, 0x75, 0x62, 0x43, 0x6f, 0x6f, 0x72, 0x64, 0x69, 0x6e, 0x61, 0x74, 0x6f, 0x72, 0x52, 0x65, 0x53, 0x75, 0x62, 0x43, 0x6f, 0x6f, 0x72, 0x64, 0x69, 0x6e, 0x61, 0x74, 0x6f, 0x72, 0x52, 0x65,
0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x51, 0x0a, 0x04, 0x69, 0x6e, 0x69, 0x74, 0x18, 0x01, 0x20, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x51, 0x0a, 0x04, 0x69, 0x6e, 0x69, 0x74, 0x18, 0x01, 0x20,
0x01, 0x28, 0x0b, 0x32, 0x3b, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x01, 0x28, 0x0b, 0x32, 0x3b, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f,
@ -2958,7 +2966,7 @@ var file_mq_proto_rawDesc = []byte{
0x67, 0x5f, 0x70, 0x62, 0x2e, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x72, 0x54, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x72, 0x54,
0x6f, 0x53, 0x75, 0x62, 0x43, 0x6f, 0x6f, 0x72, 0x64, 0x69, 0x6e, 0x61, 0x74, 0x6f, 0x72, 0x52, 0x6f, 0x53, 0x75, 0x62, 0x43, 0x6f, 0x6f, 0x72, 0x64, 0x69, 0x6e, 0x61, 0x74, 0x6f, 0x72, 0x52,
0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x2e, 0x41, 0x63, 0x6b, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x2e, 0x41, 0x63, 0x6b, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67,
0x65, 0x48, 0x00, 0x52, 0x03, 0x61, 0x63, 0x6b, 0x1a, 0x9c, 0x01, 0x0a, 0x0b, 0x49, 0x6e, 0x69,
0x65, 0x48, 0x00, 0x52, 0x03, 0x61, 0x63, 0x6b, 0x1a, 0xcc, 0x01, 0x0a, 0x0b, 0x49, 0x6e, 0x69,
0x74, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x25, 0x0a, 0x0e, 0x63, 0x6f, 0x6e, 0x73, 0x74, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x25, 0x0a, 0x0e, 0x63, 0x6f, 0x6e, 0x73,
0x75, 0x6d, 0x65, 0x72, 0x5f, 0x67, 0x72, 0x6f, 0x75, 0x70, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x75, 0x6d, 0x65, 0x72, 0x5f, 0x67, 0x72, 0x6f, 0x75, 0x70, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09,
0x52, 0x0d, 0x63, 0x6f, 0x6e, 0x73, 0x75, 0x6d, 0x65, 0x72, 0x47, 0x72, 0x6f, 0x75, 0x70, 0x12, 0x52, 0x0d, 0x63, 0x6f, 0x6e, 0x73, 0x75, 0x6d, 0x65, 0x72, 0x47, 0x72, 0x6f, 0x75, 0x70, 0x12,
@ -2968,7 +2976,10 @@ var file_mq_proto_rawDesc = []byte{
0x75, 0x70, 0x49, 0x6e, 0x73, 0x74, 0x61, 0x6e, 0x63, 0x65, 0x49, 0x64, 0x12, 0x29, 0x0a, 0x05, 0x75, 0x70, 0x49, 0x6e, 0x73, 0x74, 0x61, 0x6e, 0x63, 0x65, 0x49, 0x64, 0x12, 0x29, 0x0a, 0x05,
0x74, 0x6f, 0x70, 0x69, 0x63, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x13, 0x2e, 0x6d, 0x65, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x13, 0x2e, 0x6d, 0x65,
0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x54, 0x6f, 0x70, 0x69, 0x63,
0x52, 0x05, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x1a, 0x58, 0x0a, 0x0a, 0x41, 0x63, 0x6b, 0x4d, 0x65,
0x52, 0x05, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x12, 0x2e, 0x0a, 0x13, 0x6d, 0x61, 0x78, 0x5f, 0x70,
0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x63, 0x6f, 0x75, 0x6e, 0x74, 0x18, 0x04,
0x20, 0x01, 0x28, 0x05, 0x52, 0x11, 0x6d, 0x61, 0x78, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69,
0x6f, 0x6e, 0x43, 0x6f, 0x75, 0x6e, 0x74, 0x1a, 0x58, 0x0a, 0x0a, 0x41, 0x63, 0x6b, 0x4d, 0x65,
0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x35, 0x0a, 0x09, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x35, 0x0a, 0x09, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69,
0x6f, 0x6e, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x17, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x6f, 0x6e, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x17, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61,
0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f,

Loading…
Cancel
Save