You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					
					
						
							112 lines
						
					
					
						
							3.9 KiB
						
					
					
				
			
		
		
		
			
			
			
		
		
	
	
							112 lines
						
					
					
						
							3.9 KiB
						
					
					
				
								package sub_coordinator
							 | 
						|
								
							 | 
						|
								import (
							 | 
						|
									"fmt"
							 | 
						|
									cmap "github.com/orcaman/concurrent-map/v2"
							 | 
						|
									"github.com/seaweedfs/seaweedfs/weed/filer_client"
							 | 
						|
									"github.com/seaweedfs/seaweedfs/weed/glog"
							 | 
						|
									"github.com/seaweedfs/seaweedfs/weed/mq/topic"
							 | 
						|
									"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
							 | 
						|
									"github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
							 | 
						|
									"time"
							 | 
						|
								)
							 | 
						|
								
							 | 
						|
								type ConsumerGroup struct {
							 | 
						|
									topic topic.Topic
							 | 
						|
									// map a consumer group instance id to a consumer group instance
							 | 
						|
									ConsumerGroupInstances cmap.ConcurrentMap[string, *ConsumerGroupInstance]
							 | 
						|
									Market                 *Market
							 | 
						|
									reBalanceTimer         *time.Timer
							 | 
						|
									filerClientAccessor    *filer_client.FilerClientAccessor
							 | 
						|
									stopCh                 chan struct{}
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								func NewConsumerGroup(t *schema_pb.Topic, reblanceSeconds int32, filerClientAccessor *filer_client.FilerClientAccessor) *ConsumerGroup {
							 | 
						|
									cg := &ConsumerGroup{
							 | 
						|
										topic:                  topic.FromPbTopic(t),
							 | 
						|
										ConsumerGroupInstances: cmap.New[*ConsumerGroupInstance](),
							 | 
						|
										filerClientAccessor:    filerClientAccessor,
							 | 
						|
										stopCh:                 make(chan struct{}),
							 | 
						|
									}
							 | 
						|
									if conf, err := cg.filerClientAccessor.ReadTopicConfFromFiler(cg.topic); err == nil {
							 | 
						|
										var partitions []topic.Partition
							 | 
						|
										for _, assignment := range conf.BrokerPartitionAssignments {
							 | 
						|
											partitions = append(partitions, topic.FromPbPartition(assignment.Partition))
							 | 
						|
										}
							 | 
						|
										cg.Market = NewMarket(partitions, time.Duration(reblanceSeconds)*time.Second)
							 | 
						|
									} else {
							 | 
						|
										glog.V(0).Infof("fail to read topic conf from filer: %v", err)
							 | 
						|
										return nil
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									go func() {
							 | 
						|
										for {
							 | 
						|
											select {
							 | 
						|
											case adjustment := <-cg.Market.AdjustmentChan:
							 | 
						|
												cgi, found := cg.ConsumerGroupInstances.Get(string(adjustment.consumer))
							 | 
						|
												if !found {
							 | 
						|
													glog.V(0).Infof("consumer group instance %s not found", adjustment.consumer)
							 | 
						|
													continue
							 | 
						|
												}
							 | 
						|
												if adjustment.isAssign {
							 | 
						|
													if conf, err := cg.filerClientAccessor.ReadTopicConfFromFiler(cg.topic); err == nil {
							 | 
						|
														for _, assignment := range conf.BrokerPartitionAssignments {
							 | 
						|
															if adjustment.partition.Equals(topic.FromPbPartition(assignment.Partition)) {
							 | 
						|
																cgi.ResponseChan <- &mq_pb.SubscriberToSubCoordinatorResponse{
							 | 
						|
																	Message: &mq_pb.SubscriberToSubCoordinatorResponse_Assignment_{
							 | 
						|
																		Assignment: &mq_pb.SubscriberToSubCoordinatorResponse_Assignment{
							 | 
						|
																			PartitionAssignment: &mq_pb.BrokerPartitionAssignment{
							 | 
						|
																				Partition:      adjustment.partition.ToPbPartition(),
							 | 
						|
																				LeaderBroker:   assignment.LeaderBroker,
							 | 
						|
																				FollowerBroker: assignment.FollowerBroker,
							 | 
						|
																			},
							 | 
						|
																		},
							 | 
						|
																	},
							 | 
						|
																}
							 | 
						|
																glog.V(0).Infof("send assignment %v to %s", adjustment.partition, adjustment.consumer)
							 | 
						|
																break
							 | 
						|
															}
							 | 
						|
														}
							 | 
						|
													}
							 | 
						|
												} else {
							 | 
						|
													cgi.ResponseChan <- &mq_pb.SubscriberToSubCoordinatorResponse{
							 | 
						|
														Message: &mq_pb.SubscriberToSubCoordinatorResponse_UnAssignment_{
							 | 
						|
															UnAssignment: &mq_pb.SubscriberToSubCoordinatorResponse_UnAssignment{
							 | 
						|
																Partition: adjustment.partition.ToPbPartition(),
							 | 
						|
															},
							 | 
						|
														},
							 | 
						|
													}
							 | 
						|
													glog.V(0).Infof("send unassignment %v to %s", adjustment.partition, adjustment.consumer)
							 | 
						|
												}
							 | 
						|
											case <-cg.stopCh:
							 | 
						|
												return
							 | 
						|
											}
							 | 
						|
										}
							 | 
						|
									}()
							 | 
						|
								
							 | 
						|
									return cg
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								func (cg *ConsumerGroup) AckAssignment(cgi *ConsumerGroupInstance, assignment *mq_pb.SubscriberToSubCoordinatorRequest_AckAssignmentMessage) {
							 | 
						|
									fmt.Printf("ack assignment %v\n", assignment)
							 | 
						|
									cg.Market.ConfirmAdjustment(&Adjustment{
							 | 
						|
										consumer:  cgi.InstanceId,
							 | 
						|
										partition: topic.FromPbPartition(assignment.Partition),
							 | 
						|
										isAssign:  true,
							 | 
						|
									})
							 | 
						|
								}
							 | 
						|
								func (cg *ConsumerGroup) AckUnAssignment(cgi *ConsumerGroupInstance, assignment *mq_pb.SubscriberToSubCoordinatorRequest_AckUnAssignmentMessage) {
							 | 
						|
									fmt.Printf("ack unassignment %v\n", assignment)
							 | 
						|
									cg.Market.ConfirmAdjustment(&Adjustment{
							 | 
						|
										consumer:  cgi.InstanceId,
							 | 
						|
										partition: topic.FromPbPartition(assignment.Partition),
							 | 
						|
										isAssign:  false,
							 | 
						|
									})
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								func (cg *ConsumerGroup) OnPartitionListChange(assignments []*mq_pb.BrokerPartitionAssignment) {
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								func (cg *ConsumerGroup) Shutdown() {
							 | 
						|
									close(cg.stopCh)
							 | 
						|
								}
							 |