Browse Source

add subscriber coordinator

pull/4887/head
chrislu 1 year ago
parent
commit
bf13f3ced7
  1. 5
      weed/mq/client/sub_client/subscribe.go
  2. 12
      weed/mq/client/sub_client/subscriber.go
  3. 92
      weed/mq/coordinator/consumer_group.go
  4. 36
      weed/mq/coordinator/coordinator.go
  5. 17
      weed/mq/topic/partition.go

5
weed/mq/client/sub_client/subscribe.go

@ -11,9 +11,14 @@ import (
func (sub *TopicSubscriber) Subscribe() error { func (sub *TopicSubscriber) Subscribe() error {
util.RetryUntil("subscribe", func() error { util.RetryUntil("subscribe", func() error {
// ask balancer for brokers of the topic
if err := sub.doLookup(sub.bootstrapBroker); err != nil { if err := sub.doLookup(sub.bootstrapBroker); err != nil {
return fmt.Errorf("lookup topic %s/%s: %v", sub.ContentConfig.Namespace, sub.ContentConfig.Topic, err) return fmt.Errorf("lookup topic %s/%s: %v", sub.ContentConfig.Namespace, sub.ContentConfig.Topic, err)
} }
// treat the first broker as the topic leader
// connect to the leader broker
// subscribe to the topic
if err := sub.doProcess(); err != nil { if err := sub.doProcess(); err != nil {
return fmt.Errorf("subscribe topic %s/%s: %v", sub.ContentConfig.Namespace, sub.ContentConfig.Topic, err) return fmt.Errorf("subscribe topic %s/%s: %v", sub.ContentConfig.Namespace, sub.ContentConfig.Topic, err)
} }

12
weed/mq/client/sub_client/subscriber.go

@ -6,11 +6,13 @@ import (
) )
type SubscriberConfiguration struct { type SubscriberConfiguration struct {
ClientId string
GroupId string
GroupInstanceId string
BootstrapServers []string
GrpcDialOption grpc.DialOption
ClientId string
GroupId string
GroupInstanceId string
GroupMinimumPeers int32
GroupMaximumPeers int32
BootstrapServers []string
GrpcDialOption grpc.DialOption
} }
type ContentConfiguration struct { type ContentConfiguration struct {

92
weed/mq/coordinator/consumer_group.go

@ -0,0 +1,92 @@
package coordinator
import (
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
"sync"
)
func (cg *ConsumerGroup) SetMinMaxActiveInstances(min, max int32) {
cg.MinimumActiveInstances = min
cg.MaximumActiveInstances = max
}
func (cg *ConsumerGroup) AddConsumerGroupInstance(clientId string) *ConsumerGroupInstance {
cgi := &ConsumerGroupInstance{
ClientId: clientId,
}
cg.ConsumerGroupInstances.Set(clientId, cgi)
return cgi
}
func (cg *ConsumerGroup) RemoveConsumerGroupInstance(clientId string) {
cg.ConsumerGroupInstances.Remove(clientId)
}
func (cg *ConsumerGroup) CoordinateIfNeeded() {
emptyInstanceCount, activeInstanceCount := int32(0), int32(0)
for cgi := range cg.ConsumerGroupInstances.IterBuffered() {
if cgi.Val.Partition == nil {
// this consumer group instance is not assigned a partition
// need to assign one
emptyInstanceCount++
} else {
activeInstanceCount++
}
}
var delta int32
if emptyInstanceCount > 0 {
if cg.MinimumActiveInstances <= 0 {
// need to assign more partitions
delta = emptyInstanceCount
} else if activeInstanceCount < cg.MinimumActiveInstances && activeInstanceCount+emptyInstanceCount >= cg.MinimumActiveInstances {
// need to assign more partitions
delta = cg.MinimumActiveInstances - activeInstanceCount
}
}
if cg.MaximumActiveInstances > 0 {
if activeInstanceCount > cg.MaximumActiveInstances {
// need to remove some partitions
delta = cg.MaximumActiveInstances - activeInstanceCount
}
}
if delta == 0 {
return
}
cg.doCoordinate(activeInstanceCount + delta)
}
func (cg *ConsumerGroup) doCoordinate(target int32) {
// stop existing instances from processing
var wg sync.WaitGroup
for cgi := range cg.ConsumerGroupInstances.IterBuffered() {
if cgi.Val.Partition != nil {
wg.Add(1)
go func(cgi *ConsumerGroupInstance) {
defer wg.Done()
// stop processing
// flush internal state
// wait for all messages to be processed
// close the connection
}(cgi.Val)
}
}
wg.Wait()
partitions := topic.SplitPartitions(target)
// assign partitions to new instances
i := 0
for cgi := range cg.ConsumerGroupInstances.IterBuffered() {
cgi.Val.Partition = partitions[i]
i++
wg.Add(1)
go func(cgi *ConsumerGroupInstance) {
defer wg.Done()
// start processing
// start consuming from the last offset
}(cgi.Val)
}
wg.Wait()
}

36
weed/mq/coordinator/coordinator.go

@ -0,0 +1,36 @@
package coordinator
import (
cmap "github.com/orcaman/concurrent-map/v2"
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
)
type ConsumerGroupInstance struct {
ClientId string
// the consumer group instance may not have an active partition
Partition *topic.Partition
// processed message count
ProcessedMessageCount int64
}
type ConsumerGroup struct {
// map a client id to a consumer group instance
ConsumerGroupInstances cmap.ConcurrentMap[string, *ConsumerGroupInstance]
MinimumActiveInstances int32
MaximumActiveInstances int32
}
type TopicConsumerGroups struct {
// map a consumer group name to a consumer group
ConsumerGroups cmap.ConcurrentMap[string, *ConsumerGroup]
}
// Coordinator coordinates the instances in the consumer group for one topic.
// It is responsible for:
// 1. Assigning partitions to consumer instances.
// 2. Reassigning partitions when a consumer instance is down.
// 3. Reassigning partitions when a consumer instance is up.
type Coordinator struct {
// map client id to subscriber
Subscribers cmap.ConcurrentMap[string, *ConsumerGroupInstance]
// map topic name to consumer groups
TopicSubscribers cmap.ConcurrentMap[string, map[string]TopicConsumerGroups]
}

17
weed/mq/topic/partition.go

@ -30,3 +30,20 @@ func FromPbPartition(partition *mq_pb.Partition) Partition {
RingSize: partition.RingSize, RingSize: partition.RingSize,
} }
} }
func SplitPartitions(targetCount int32) []*Partition {
partitions := make([]*Partition, 0, targetCount)
partitionSize := PartitionCount / targetCount
for i := int32(0); i < targetCount; i++ {
partitionStop := (i + 1) * partitionSize
if i == targetCount-1 {
partitionStop = PartitionCount
}
partitions = append(partitions, &Partition{
RangeStart: i * partitionSize,
RangeStop: partitionStop,
RingSize: PartitionCount,
})
}
return partitions
}
Loading…
Cancel
Save