Browse Source

balance subscribers

need to ensure the offsets are persisted
pull/5890/head
chrislu 7 months ago
parent
commit
edc87c16f9
  1. 5
      weed/mq/broker/broker_grpc_sub.go
  2. 12
      weed/mq/broker/broker_grpc_sub_coordinator.go
  3. 4
      weed/mq/broker/broker_server.go
  4. 1
      weed/mq/client/sub_client/connect_to_sub_coordinator.go
  5. 4
      weed/mq/client/sub_client/on_each_partition.go
  6. 7
      weed/mq/client/sub_client/subscribe.go
  7. 4
      weed/mq/pub_balancer/pub_balancer.go
  8. 163
      weed/mq/sub_coordinator/consumer_group.go
  9. 8
      weed/mq/sub_coordinator/consumer_group_instance.go
  10. 22
      weed/mq/sub_coordinator/market.go
  11. 35
      weed/mq/sub_coordinator/sub_coordinator.go
  12. 6
      weed/pb/mq.proto
  13. 990
      weed/pb/mq_pb/mq.pb.go

5
weed/mq/broker/broker_grpc_sub.go

@ -94,6 +94,11 @@ func (b *MessageQueueBroker) SubscribeMessage(stream mq_pb.SeaweedMessaging_Subs
ack, err := stream.Recv() ack, err := stream.Recv()
if err != nil { if err != nil {
if err == io.EOF { if err == io.EOF {
stream.Send(&mq_pb.SubscribeMessageResponse{Message: &mq_pb.SubscribeMessageResponse_Ctrl{
Ctrl: &mq_pb.SubscribeMessageResponse_SubscribeCtrlMessage{
IsEndOfStream: true,
},
}})
break break
} }
glog.V(0).Infof("topic %v partition %v subscriber %s error: %v", t, partition, clientName, err) glog.V(0).Infof("topic %v partition %v subscriber %s error: %v", t, partition, clientName, err)

12
weed/mq/broker/broker_grpc_sub_coordinator.go

@ -20,10 +20,14 @@ func (b *MessageQueueBroker) SubscriberToSubCoordinator(stream mq_pb.SeaweedMess
} }
var cgi *sub_coordinator.ConsumerGroupInstance var cgi *sub_coordinator.ConsumerGroupInstance
var cg *sub_coordinator.ConsumerGroup
// process init message // process init message
initMessage := req.GetInit() initMessage := req.GetInit()
if initMessage != nil { if initMessage != nil {
cgi = b.SubCoordinator.AddSubscriber(initMessage)
cg, cgi, err = b.SubCoordinator.AddSubscriber(initMessage)
if err != nil {
return status.Errorf(codes.InvalidArgument, err.Error())
}
glog.V(0).Infof("subscriber %s/%s/%s connected", initMessage.ConsumerGroup, initMessage.ConsumerGroupInstanceId, initMessage.Topic) glog.V(0).Infof("subscriber %s/%s/%s connected", initMessage.ConsumerGroup, initMessage.ConsumerGroupInstanceId, initMessage.Topic)
} else { } else {
return status.Errorf(codes.InvalidArgument, "subscriber init message is empty") return status.Errorf(codes.InvalidArgument, "subscriber init message is empty")
@ -45,7 +49,11 @@ func (b *MessageQueueBroker) SubscriberToSubCoordinator(stream mq_pb.SeaweedMess
if ackUnAssignment := req.GetAckUnAssignment(); ackUnAssignment != nil { if ackUnAssignment := req.GetAckUnAssignment(); ackUnAssignment != nil {
glog.V(0).Infof("subscriber %s/%s/%s ack close of %v", initMessage.ConsumerGroup, initMessage.ConsumerGroupInstanceId, initMessage.Topic, ackUnAssignment) glog.V(0).Infof("subscriber %s/%s/%s ack close of %v", initMessage.ConsumerGroup, initMessage.ConsumerGroupInstanceId, initMessage.Topic, ackUnAssignment)
cgi.AckUnAssignment(ackUnAssignment)
cg.AckUnAssignment(cgi, ackUnAssignment)
}
if ackAssignment := req.GetAckAssignment(); ackAssignment != nil {
glog.V(0).Infof("subscriber %s/%s/%s ack assignment %v", initMessage.ConsumerGroup, initMessage.ConsumerGroupInstanceId, initMessage.Topic, ackAssignment)
cg.AckAssignment(cgi, ackAssignment)
} }
select { select {

4
weed/mq/broker/broker_server.go

@ -53,7 +53,7 @@ type MessageQueueBroker struct {
func NewMessageBroker(option *MessageQueueBrokerOption, grpcDialOption grpc.DialOption) (mqBroker *MessageQueueBroker, err error) { func NewMessageBroker(option *MessageQueueBrokerOption, grpcDialOption grpc.DialOption) (mqBroker *MessageQueueBroker, err error) {
pubBalancer := pub_balancer.NewPubBalancer() pubBalancer := pub_balancer.NewPubBalancer()
subCoordinator := sub_coordinator.NewSubCoordinator(pubBalancer)
subCoordinator := sub_coordinator.NewSubCoordinator()
mqBroker = &MessageQueueBroker{ mqBroker = &MessageQueueBroker{
option: option, option: option,
@ -73,8 +73,6 @@ func NewMessageBroker(option *MessageQueueBrokerOption, grpcDialOption grpc.Dial
mqBroker.MasterClient.SetOnPeerUpdateFn(mqBroker.OnBrokerUpdate) mqBroker.MasterClient.SetOnPeerUpdateFn(mqBroker.OnBrokerUpdate)
pubBalancer.OnPartitionChange = mqBroker.SubCoordinator.OnPartitionChange pubBalancer.OnPartitionChange = mqBroker.SubCoordinator.OnPartitionChange
pubBalancer.OnAddBroker = mqBroker.SubCoordinator.OnSubAddBroker
pubBalancer.OnRemoveBroker = mqBroker.SubCoordinator.OnSubRemoveBroker
go mqBroker.MasterClient.KeepConnectedToMaster() go mqBroker.MasterClient.KeepConnectedToMaster()

1
weed/mq/client/sub_client/connect_to_sub_coordinator.go

@ -58,6 +58,7 @@ func (sub *TopicSubscriber) doKeepConnectedToSubCoordinator() {
go func() { go func() {
for reply := range sub.brokerPartitionAssignmentAckChan { for reply := range sub.brokerPartitionAssignmentAckChan {
glog.V(0).Infof("subscriber instance %s ack %+v", sub.SubscriberConfig.ConsumerGroupInstanceId, reply)
if err := stream.Send(reply); err != nil { if err := stream.Send(reply); err != nil {
glog.V(0).Infof("subscriber %s reply: %v", sub.ContentConfig.Topic, err) glog.V(0).Infof("subscriber %s reply: %v", sub.ContentConfig.Topic, err)
return return

4
weed/mq/client/sub_client/on_each_partition.go

@ -64,7 +64,8 @@ func (sub *TopicSubscriber) onEachPartition(assigned *mq_pb.BrokerPartitionAssig
for { for {
select { select {
case <-stopCh: case <-stopCh:
break
subscribeClient.CloseSend()
return
case ack := <-partitionOffsetChan: case ack := <-partitionOffsetChan:
subscribeClient.SendMsg(&mq_pb.SubscribeMessageRequest{ subscribeClient.SendMsg(&mq_pb.SubscribeMessageRequest{
Message: &mq_pb.SubscribeMessageRequest_Ack{ Message: &mq_pb.SubscribeMessageRequest_Ack{
@ -76,7 +77,6 @@ func (sub *TopicSubscriber) onEachPartition(assigned *mq_pb.BrokerPartitionAssig
}) })
} }
} }
subscribeClient.CloseSend()
}() }()
var lastErr error var lastErr error

7
weed/mq/client/sub_client/subscribe.go

@ -59,6 +59,13 @@ func (sub *TopicSubscriber) startProcessors() {
wg.Done() wg.Done()
}() }()
glog.V(0).Infof("subscriber %s/%s assigned partition %+v at %v", sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup, assigned.Partition, assigned.LeaderBroker) glog.V(0).Infof("subscriber %s/%s assigned partition %+v at %v", sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup, assigned.Partition, assigned.LeaderBroker)
sub.brokerPartitionAssignmentAckChan <- &mq_pb.SubscriberToSubCoordinatorRequest{
Message: &mq_pb.SubscriberToSubCoordinatorRequest_AckAssignment{
AckAssignment: &mq_pb.SubscriberToSubCoordinatorRequest_AckAssignmentMessage{
Partition: assigned.Partition,
},
},
}
err := sub.onEachPartition(assigned, stopChan) err := sub.onEachPartition(assigned, stopChan)
if err != nil { if err != nil {
glog.V(0).Infof("subscriber %s/%s partition %+v at %v: %v", sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup, assigned.Partition, assigned.LeaderBroker, err) glog.V(0).Infof("subscriber %s/%s partition %+v at %v: %v", sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup, assigned.Partition, assigned.LeaderBroker, err)

4
weed/mq/pub_balancer/pub_balancer.go

@ -33,8 +33,6 @@ type PubBalancer struct {
// Collected from all brokers when they connect to the broker leader // Collected from all brokers when they connect to the broker leader
TopicToBrokers cmap.ConcurrentMap[string, *PartitionSlotToBrokerList] // key: topic name TopicToBrokers cmap.ConcurrentMap[string, *PartitionSlotToBrokerList] // key: topic name
OnPartitionChange func(topic *mq_pb.Topic, assignments []*mq_pb.BrokerPartitionAssignment) OnPartitionChange func(topic *mq_pb.Topic, assignments []*mq_pb.BrokerPartitionAssignment)
OnAddBroker func(broker string, brokerStats *BrokerStats)
OnRemoveBroker func(broker string, brokerStats *BrokerStats)
} }
func NewPubBalancer() *PubBalancer { func NewPubBalancer() *PubBalancer {
@ -54,7 +52,6 @@ func (balancer *PubBalancer) AddBroker(broker string) (brokerStats *BrokerStats)
} }
} }
balancer.onPubAddBroker(broker, brokerStats) balancer.onPubAddBroker(broker, brokerStats)
balancer.OnAddBroker(broker, brokerStats)
return brokerStats return brokerStats
} }
@ -75,7 +72,6 @@ func (balancer *PubBalancer) RemoveBroker(broker string, stats *BrokerStats) {
} }
} }
balancer.onPubRemoveBroker(broker, stats) balancer.onPubRemoveBroker(broker, stats)
balancer.OnRemoveBroker(broker, stats)
} }
func (balancer *PubBalancer) OnBrokerStatsUpdated(broker string, brokerStats *BrokerStats, receivedStats *mq_pb.BrokerStats) { func (balancer *PubBalancer) OnBrokerStatsUpdated(broker string, brokerStats *BrokerStats, receivedStats *mq_pb.BrokerStats) {

163
weed/mq/sub_coordinator/consumer_group.go

@ -1,9 +1,9 @@
package sub_coordinator package sub_coordinator
import ( import (
"fmt"
cmap "github.com/orcaman/concurrent-map/v2" cmap "github.com/orcaman/concurrent-map/v2"
"github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/mq/pub_balancer"
"github.com/seaweedfs/seaweedfs/weed/mq/topic" "github.com/seaweedfs/seaweedfs/weed/mq/topic"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"time" "time"
@ -13,128 +13,97 @@ type ConsumerGroup struct {
topic topic.Topic topic topic.Topic
// map a consumer group instance id to a consumer group instance // map a consumer group instance id to a consumer group instance
ConsumerGroupInstances cmap.ConcurrentMap[string, *ConsumerGroupInstance] ConsumerGroupInstances cmap.ConcurrentMap[string, *ConsumerGroupInstance]
mapping *PartitionConsumerMapping
Market *Market
reBalanceTimer *time.Timer reBalanceTimer *time.Timer
pubBalancer *pub_balancer.PubBalancer
filerClientAccessor *FilerClientAccessor filerClientAccessor *FilerClientAccessor
stopCh chan struct{}
} }
func NewConsumerGroup(t *mq_pb.Topic, pubBalancer *pub_balancer.PubBalancer, filerClientAccessor *FilerClientAccessor) *ConsumerGroup {
return &ConsumerGroup{
func NewConsumerGroup(t *mq_pb.Topic, reblanceSeconds int32, filerClientAccessor *FilerClientAccessor) *ConsumerGroup {
cg := &ConsumerGroup{
topic: topic.FromPbTopic(t), topic: topic.FromPbTopic(t),
ConsumerGroupInstances: cmap.New[*ConsumerGroupInstance](), ConsumerGroupInstances: cmap.New[*ConsumerGroupInstance](),
mapping: NewPartitionConsumerMapping(pub_balancer.MaxPartitionCount),
pubBalancer: pubBalancer,
filerClientAccessor: filerClientAccessor, filerClientAccessor: filerClientAccessor,
} }
}
func (cg *ConsumerGroup) OnAddConsumerGroupInstance(consumerGroupInstance string, topic *mq_pb.Topic, maxPartitionCount, rebalanceSeconds int32) {
cg.onConsumerGroupInstanceChange(true, "add consumer instance "+consumerGroupInstance, maxPartitionCount, rebalanceSeconds)
}
func (cg *ConsumerGroup) OnRemoveConsumerGroupInstance(consumerGroupInstance string, topic *mq_pb.Topic, maxPartitionCount, rebalanceSeconds int32) {
cg.onConsumerGroupInstanceChange(false, "remove consumer instance "+consumerGroupInstance, maxPartitionCount, rebalanceSeconds)
}
func (cg *ConsumerGroup) onConsumerGroupInstanceChange(isAdd bool, reason string, maxPartitionCount, rebalanceSeconds int32) {
if cg.reBalanceTimer != nil {
cg.reBalanceTimer.Stop()
cg.reBalanceTimer = nil
}
if maxPartitionCount == 0 {
maxPartitionCount = 1
}
if rebalanceSeconds == 0 {
rebalanceSeconds = 10
}
if isAdd {
if conf, err := cg.filerClientAccessor.ReadTopicConfFromFiler(cg.topic); err == nil {
var sumMaxPartitionCount int32
for _, cgi := range cg.ConsumerGroupInstances.Items() {
sumMaxPartitionCount += cgi.MaxPartitionCount
}
if sumMaxPartitionCount < int32(len(conf.BrokerPartitionAssignments)) && sumMaxPartitionCount+maxPartitionCount >= int32(len(conf.BrokerPartitionAssignments)) {
partitionSlotToBrokerList := pub_balancer.NewPartitionSlotToBrokerList(pub_balancer.MaxPartitionCount)
for _, assignment := range conf.BrokerPartitionAssignments {
partitionSlotToBrokerList.AddBroker(assignment.Partition, assignment.LeaderBroker, assignment.FollowerBroker)
}
cg.BalanceConsumerGroupInstances(partitionSlotToBrokerList, reason)
return
}
}
}
cg.reBalanceTimer = time.AfterFunc(time.Duration(rebalanceSeconds)*time.Second, func() {
cg.BalanceConsumerGroupInstances(nil, reason)
cg.reBalanceTimer = nil
})
}
func (cg *ConsumerGroup) OnPartitionListChange(assignments []*mq_pb.BrokerPartitionAssignment) {
if cg.reBalanceTimer != nil {
cg.reBalanceTimer.Stop()
cg.reBalanceTimer = nil
}
partitionSlotToBrokerList := pub_balancer.NewPartitionSlotToBrokerList(pub_balancer.MaxPartitionCount)
for _, assignment := range assignments {
partitionSlotToBrokerList.AddBroker(assignment.Partition, assignment.LeaderBroker, assignment.FollowerBroker)
}
cg.BalanceConsumerGroupInstances(partitionSlotToBrokerList, "partition list change")
}
func (cg *ConsumerGroup) BalanceConsumerGroupInstances(knownPartitionSlotToBrokerList *pub_balancer.PartitionSlotToBrokerList, reason string) {
glog.V(0).Infof("rebalance consumer group %s due to %s", cg.topic.String(), reason)
// collect current topic partitions
partitionSlotToBrokerList := knownPartitionSlotToBrokerList
if partitionSlotToBrokerList == nil {
if conf, err := cg.filerClientAccessor.ReadTopicConfFromFiler(cg.topic); err == nil { if conf, err := cg.filerClientAccessor.ReadTopicConfFromFiler(cg.topic); err == nil {
partitionSlotToBrokerList = pub_balancer.NewPartitionSlotToBrokerList(pub_balancer.MaxPartitionCount)
var partitions []topic.Partition
for _, assignment := range conf.BrokerPartitionAssignments { for _, assignment := range conf.BrokerPartitionAssignments {
partitionSlotToBrokerList.AddBroker(assignment.Partition, assignment.LeaderBroker, assignment.FollowerBroker)
partitions = append(partitions, topic.FromPbPartition(assignment.Partition))
} }
cg.Market = NewMarket(partitions, time.Duration(reblanceSeconds)*time.Second)
} else { } else {
glog.V(0).Infof("fail to read topic conf from filer: %v", err) glog.V(0).Infof("fail to read topic conf from filer: %v", err)
return
}
return nil
} }
// collect current consumer group instance ids
var consumerInstances []*ConsumerGroupInstance
for _, consumerGroupInstance := range cg.ConsumerGroupInstances.Items() {
consumerInstances = append(consumerInstances, consumerGroupInstance)
}
cg.mapping.BalanceToConsumerInstances(partitionSlotToBrokerList, consumerInstances)
// convert cg.mapping currentMapping to map of consumer group instance id to partition slots
consumerInstanceToPartitionSlots := make(map[ConsumerGroupInstanceId][]*PartitionSlotToConsumerInstance)
for _, partitionSlot := range cg.mapping.currentMapping.PartitionSlots {
consumerInstanceToPartitionSlots[partitionSlot.AssignedInstanceId] = append(consumerInstanceToPartitionSlots[partitionSlot.AssignedInstanceId], partitionSlot)
}
// notify consumer group instances
for _, consumerGroupInstance := range cg.ConsumerGroupInstances.Items() {
partitionSlots, found := consumerInstanceToPartitionSlots[consumerGroupInstance.InstanceId]
go func() {
for {
select {
case adjustment := <-cg.Market.AdjustmentChan:
cgi, found := cg.ConsumerGroupInstances.Get(string(adjustment.consumer))
if !found { if !found {
partitionSlots = make([]*PartitionSlotToConsumerInstance, 0)
glog.V(0).Infof("consumer group instance %s not found", adjustment.consumer)
continue
} }
for _, partitionSlot := range partitionSlots {
consumerGroupInstance.ResponseChan <- &mq_pb.SubscriberToSubCoordinatorResponse{
if adjustment.isAssign {
if conf, err := cg.filerClientAccessor.ReadTopicConfFromFiler(cg.topic); err == nil {
for _, assignment := range conf.BrokerPartitionAssignments {
if adjustment.partition.Equals(topic.FromPbPartition(assignment.Partition)) {
cgi.ResponseChan <- &mq_pb.SubscriberToSubCoordinatorResponse{
Message: &mq_pb.SubscriberToSubCoordinatorResponse_Assignment_{ Message: &mq_pb.SubscriberToSubCoordinatorResponse_Assignment_{
Assignment: &mq_pb.SubscriberToSubCoordinatorResponse_Assignment{ Assignment: &mq_pb.SubscriberToSubCoordinatorResponse_Assignment{
PartitionAssignment: &mq_pb.BrokerPartitionAssignment{ PartitionAssignment: &mq_pb.BrokerPartitionAssignment{
Partition: &mq_pb.Partition{
RangeStop: partitionSlot.RangeStop,
RangeStart: partitionSlot.RangeStart,
RingSize: partitionSlotToBrokerList.RingSize,
UnixTimeNs: partitionSlot.UnixTimeNs,
Partition: adjustment.partition.ToPbPartition(),
LeaderBroker: assignment.LeaderBroker,
FollowerBroker: assignment.FollowerBroker,
}, },
LeaderBroker: partitionSlot.Broker,
FollowerBroker: partitionSlot.FollowerBroker,
}, },
}, },
}
glog.V(0).Infof("send assignment %v to %s", adjustment.partition, adjustment.consumer)
break
}
}
}
} else {
cgi.ResponseChan <- &mq_pb.SubscriberToSubCoordinatorResponse{
Message: &mq_pb.SubscriberToSubCoordinatorResponse_UnAssignment_{
UnAssignment: &mq_pb.SubscriberToSubCoordinatorResponse_UnAssignment{
Partition: adjustment.partition.ToPbPartition(),
},
}, },
} }
glog.V(0).Infof("send unassignment %v to %s", adjustment.partition, adjustment.consumer)
}
case <-cg.stopCh:
return
} }
} }
}()
return cg
}
func (cg *ConsumerGroup) AckAssignment(cgi *ConsumerGroupInstance, assignment *mq_pb.SubscriberToSubCoordinatorRequest_AckAssignmentMessage) {
fmt.Printf("ack assignment %v\n", assignment)
cg.Market.ConfirmAdjustment(&Adjustment{
consumer: cgi.InstanceId,
partition: topic.FromPbPartition(assignment.Partition),
isAssign: true,
})
}
func (cg *ConsumerGroup) AckUnAssignment(cgi *ConsumerGroupInstance, assignment *mq_pb.SubscriberToSubCoordinatorRequest_AckUnAssignmentMessage) {
fmt.Printf("ack unassignment %v\n", assignment)
cg.Market.ConfirmAdjustment(&Adjustment{
consumer: cgi.InstanceId,
partition: topic.FromPbPartition(assignment.Partition),
isAssign: false,
})
}
func (cg *ConsumerGroup) OnPartitionListChange(assignments []*mq_pb.BrokerPartitionAssignment) {
}
func (cg *ConsumerGroup) Shutdown() {
close(cg.stopCh)
} }

8
weed/mq/sub_coordinator/consumer_group_instance.go

@ -1,7 +1,6 @@
package sub_coordinator package sub_coordinator
import ( import (
"fmt"
"github.com/seaweedfs/seaweedfs/weed/mq/topic" "github.com/seaweedfs/seaweedfs/weed/mq/topic"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
) )
@ -15,13 +14,10 @@ type ConsumerGroupInstance struct {
MaxPartitionCount int32 MaxPartitionCount int32
} }
func NewConsumerGroupInstance(instanceId string) *ConsumerGroupInstance {
func NewConsumerGroupInstance(instanceId string, maxPartitionCount int32) *ConsumerGroupInstance {
return &ConsumerGroupInstance{ return &ConsumerGroupInstance{
InstanceId: ConsumerGroupInstanceId(instanceId), InstanceId: ConsumerGroupInstanceId(instanceId),
ResponseChan: make(chan *mq_pb.SubscriberToSubCoordinatorResponse, 1), ResponseChan: make(chan *mq_pb.SubscriberToSubCoordinatorResponse, 1),
MaxPartitionCount: maxPartitionCount,
} }
} }
func (i ConsumerGroupInstance) AckUnAssignment(assignment *mq_pb.SubscriberToSubCoordinatorRequest_AckUnAssignmentMessage) {
fmt.Printf("ack unassignment %v\n", assignment)
}

22
weed/mq/sub_coordinator/market.go

@ -158,6 +158,7 @@ func (m *Market) assignPartitionToConsumer(partition *PartitionSlot) {
} }
if bestConsumer != nil { if bestConsumer != nil {
// change consumer assigned partitions later when the adjustment is confirmed
adjustment := &Adjustment{ adjustment := &Adjustment{
isAssign: true, isAssign: true,
partition: partition.Partition, partition: partition.Partition,
@ -187,6 +188,8 @@ func (m *Market) loopBalanceLoad() {
m.inflightAdjustments = inflightAdjustments m.inflightAdjustments = inflightAdjustments
m.doBalanceLoad() m.doBalanceLoad()
println("Balance load completed.")
m.Status()
} }
case <-m.balanceRequestChan: case <-m.balanceRequestChan:
m.hasBalanceRequest = true m.hasBalanceRequest = true
@ -288,6 +291,8 @@ func (m *Market) ConfirmAdjustment(adjustment *Adjustment) {
} else { } else {
m.unassignPartitionSlot(adjustment.partition) m.unassignPartitionSlot(adjustment.partition)
} }
glog.V(0).Infof("ConfirmAdjustment %+v", adjustment)
m.Status()
} }
func (m *Market) unassignPartitionSlot(partition topic.Partition) { func (m *Market) unassignPartitionSlot(partition topic.Partition) {
@ -344,3 +349,20 @@ func (m *Market) confirmAssignPartition(partition topic.Partition, consumerInsta
consumerInstance.AssignedPartitions = append(consumerInstance.AssignedPartitions, partition) consumerInstance.AssignedPartitions = append(consumerInstance.AssignedPartitions, partition)
} }
func (m *Market) Status() {
m.mu.Lock()
defer m.mu.Unlock()
glog.V(0).Infof("Market has %d partitions and %d consumer instances", len(m.partitions), len(m.consumerInstances))
for partition, slot := range m.partitions {
if slot.AssignedTo == nil {
glog.V(0).Infof("Partition %+v is not assigned to any consumer", partition)
} else {
glog.V(0).Infof("Partition %+v is assigned to consumer %+v", partition, slot.AssignedTo.InstanceId)
}
}
for _, consumer := range m.consumerInstances {
glog.V(0).Infof("Consumer %+v has %d partitions", consumer.InstanceId, len(consumer.AssignedPartitions))
}
}

35
weed/mq/sub_coordinator/sub_coordinator.go

@ -1,8 +1,8 @@
package sub_coordinator package sub_coordinator
import ( import (
"fmt"
cmap "github.com/orcaman/concurrent-map/v2" cmap "github.com/orcaman/concurrent-map/v2"
"github.com/seaweedfs/seaweedfs/weed/mq/pub_balancer"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
) )
@ -18,14 +18,12 @@ type TopicConsumerGroups struct {
type SubCoordinator struct { type SubCoordinator struct {
// map topic name to consumer groups // map topic name to consumer groups
TopicSubscribers cmap.ConcurrentMap[string, *TopicConsumerGroups] TopicSubscribers cmap.ConcurrentMap[string, *TopicConsumerGroups]
balancer *pub_balancer.PubBalancer
FilerClientAccessor *FilerClientAccessor FilerClientAccessor *FilerClientAccessor
} }
func NewSubCoordinator(balancer *pub_balancer.PubBalancer) *SubCoordinator {
func NewSubCoordinator() *SubCoordinator {
return &SubCoordinator{ return &SubCoordinator{
TopicSubscribers: cmap.New[*TopicConsumerGroups](), TopicSubscribers: cmap.New[*TopicConsumerGroups](),
balancer: balancer,
} }
} }
@ -52,25 +50,29 @@ func toTopicName(topic *mq_pb.Topic) string {
return topicName return topicName
} }
func (c *SubCoordinator) AddSubscriber(initMessage *mq_pb.SubscriberToSubCoordinatorRequest_InitMessage) *ConsumerGroupInstance {
func (c *SubCoordinator) AddSubscriber(initMessage *mq_pb.SubscriberToSubCoordinatorRequest_InitMessage) (*ConsumerGroup, *ConsumerGroupInstance, error) {
tcg := c.GetTopicConsumerGroups(initMessage.Topic, true) tcg := c.GetTopicConsumerGroups(initMessage.Topic, true)
cg, _ := tcg.ConsumerGroups.Get(initMessage.ConsumerGroup) cg, _ := tcg.ConsumerGroups.Get(initMessage.ConsumerGroup)
if cg == nil { if cg == nil {
cg = NewConsumerGroup(initMessage.Topic, c.balancer, c.FilerClientAccessor)
if !tcg.ConsumerGroups.SetIfAbsent(initMessage.ConsumerGroup, cg) {
cg = NewConsumerGroup(initMessage.Topic, initMessage.RebalanceSeconds, c.FilerClientAccessor)
if cg != nil {
tcg.ConsumerGroups.SetIfAbsent(initMessage.ConsumerGroup, cg)
}
cg, _ = tcg.ConsumerGroups.Get(initMessage.ConsumerGroup) cg, _ = tcg.ConsumerGroups.Get(initMessage.ConsumerGroup)
} }
if cg == nil {
return nil, nil, fmt.Errorf("fail to create consumer group %s: topic %s not found", initMessage.ConsumerGroup, initMessage.Topic)
} }
cgi, _ := cg.ConsumerGroupInstances.Get(initMessage.ConsumerGroupInstanceId) cgi, _ := cg.ConsumerGroupInstances.Get(initMessage.ConsumerGroupInstanceId)
if cgi == nil { if cgi == nil {
cgi = NewConsumerGroupInstance(initMessage.ConsumerGroupInstanceId)
cgi = NewConsumerGroupInstance(initMessage.ConsumerGroupInstanceId, initMessage.MaxPartitionCount)
if !cg.ConsumerGroupInstances.SetIfAbsent(initMessage.ConsumerGroupInstanceId, cgi) { if !cg.ConsumerGroupInstances.SetIfAbsent(initMessage.ConsumerGroupInstanceId, cgi) {
cgi, _ = cg.ConsumerGroupInstances.Get(initMessage.ConsumerGroupInstanceId) cgi, _ = cg.ConsumerGroupInstances.Get(initMessage.ConsumerGroupInstanceId)
} }
} }
cgi.MaxPartitionCount = initMessage.MaxPartitionCount cgi.MaxPartitionCount = initMessage.MaxPartitionCount
cg.OnAddConsumerGroupInstance(initMessage.ConsumerGroupInstanceId, initMessage.Topic, initMessage.MaxPartitionCount, initMessage.RebalanceSeconds)
return cgi
cg.Market.AddConsumerInstance(cgi)
return cg, cgi, nil
} }
func (c *SubCoordinator) RemoveSubscriber(initMessage *mq_pb.SubscriberToSubCoordinatorRequest_InitMessage) { func (c *SubCoordinator) RemoveSubscriber(initMessage *mq_pb.SubscriberToSubCoordinatorRequest_InitMessage) {
@ -83,9 +85,10 @@ func (c *SubCoordinator) RemoveSubscriber(initMessage *mq_pb.SubscriberToSubCoor
return return
} }
cg.ConsumerGroupInstances.Remove(initMessage.ConsumerGroupInstanceId) cg.ConsumerGroupInstances.Remove(initMessage.ConsumerGroupInstanceId)
cg.OnRemoveConsumerGroupInstance(initMessage.ConsumerGroupInstanceId, initMessage.Topic, initMessage.MaxPartitionCount, initMessage.RebalanceSeconds)
cg.Market.RemoveConsumerInstance(ConsumerGroupInstanceId(initMessage.ConsumerGroupInstanceId))
if cg.ConsumerGroupInstances.Count() == 0 { if cg.ConsumerGroupInstances.Count() == 0 {
tcg.ConsumerGroups.Remove(initMessage.ConsumerGroup) tcg.ConsumerGroups.Remove(initMessage.ConsumerGroup)
cg.Shutdown()
} }
if tcg.ConsumerGroups.Count() == 0 { if tcg.ConsumerGroups.Count() == 0 {
c.RemoveTopic(initMessage.Topic) c.RemoveTopic(initMessage.Topic)
@ -101,13 +104,3 @@ func (c *SubCoordinator) OnPartitionChange(topic *mq_pb.Topic, assignments []*mq
cg.OnPartitionListChange(assignments) cg.OnPartitionListChange(assignments)
} }
} }
// OnSubAddBroker is called when a broker is added to the balancer
func (c *SubCoordinator) OnSubAddBroker(broker string, brokerStats *pub_balancer.BrokerStats) {
}
// OnSubRemoveBroker is called when a broker is removed from the balancer
func (c *SubCoordinator) OnSubRemoveBroker(broker string, brokerStats *pub_balancer.BrokerStats) {
}

6
weed/pb/mq.proto

@ -180,9 +180,13 @@ message SubscriberToSubCoordinatorRequest {
message AckUnAssignmentMessage { message AckUnAssignmentMessage {
Partition partition = 1; Partition partition = 1;
} }
message AckAssignmentMessage {
Partition partition = 1;
}
oneof message { oneof message {
InitMessage init = 1; InitMessage init = 1;
AckUnAssignmentMessage ack_un_assignment = 2;
AckAssignmentMessage ack_assignment = 2;
AckUnAssignmentMessage ack_un_assignment = 3;
} }
} }
message SubscriberToSubCoordinatorResponse { message SubscriberToSubCoordinatorResponse {

990
weed/pb/mq_pb/mq.pb.go
File diff suppressed because it is too large
View File

Loading…
Cancel
Save