Browse Source

subscriber receives partitions and dispatch to processors

pull/5890/head
chrislu 8 months ago
parent
commit
1f20178ded
  1. 30
      weed/mq/client/sub_client/connect_to_sub_coordinator.go
  2. 77
      weed/mq/client/sub_client/subscribe.go
  3. 23
      weed/mq/client/sub_client/subscriber.go
  4. 10
      weed/mq/topic/partition.go

30
weed/mq/client/sub_client/connect_to_sub_coordinator.go

@ -7,7 +7,6 @@ import (
"github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"io" "io"
"sync"
"time" "time"
) )
@ -69,8 +68,10 @@ func (sub *TopicSubscriber) doKeepConnectedToSubCoordinator() {
assignment := resp.GetAssignment() assignment := resp.GetAssignment()
if assignment != nil { if assignment != nil {
glog.V(0).Infof("subscriber %s receive assignment: %v", sub.ContentConfig.Topic, assignment) glog.V(0).Infof("subscriber %s receive assignment: %v", sub.ContentConfig.Topic, assignment)
for _, assignedPartition := range assignment.PartitionAssignments {
sub.brokerPartitionAssignmentChan <- assignedPartition
}
} }
sub.onEachAssignment(assignment)
} }
return nil return nil
@ -84,31 +85,6 @@ func (sub *TopicSubscriber) doKeepConnectedToSubCoordinator() {
} }
} }
func (sub *TopicSubscriber) onEachAssignment(assignment *mq_pb.SubscriberToSubCoordinatorResponse_Assignment) {
if assignment == nil {
return
}
// process each partition, with a concurrency limit
var wg sync.WaitGroup
semaphore := make(chan struct{}, sub.ProcessorConfig.ConcurrentPartitionLimit)
for _, assigned := range assignment.PartitionAssignments {
wg.Add(1)
semaphore <- struct{}{}
go func(assigned *mq_pb.BrokerPartitionAssignment) {
defer wg.Done()
defer func() { <-semaphore }()
glog.V(0).Infof("subscriber %s/%s assigned partition %+v at %v", sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup, assigned.Partition, assigned.LeaderBroker)
err := sub.onEachPartition(assigned)
if err != nil {
glog.V(0).Infof("subscriber %s/%s partition %+v at %v: %v", sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup, assigned.Partition, assigned.LeaderBroker, err)
}
}(assigned)
}
wg.Wait()
}
func (sub *TopicSubscriber) onEachPartition(assigned *mq_pb.BrokerPartitionAssignment) error { func (sub *TopicSubscriber) onEachPartition(assigned *mq_pb.BrokerPartitionAssignment) error {
// connect to the partition broker // connect to the partition broker
return pb.WithBrokerGrpcClient(true, assigned.LeaderBroker, sub.SubscriberConfig.GrpcDialOption, func(client mq_pb.SeaweedMessagingClient) error { return pb.WithBrokerGrpcClient(true, assigned.LeaderBroker, sub.SubscriberConfig.GrpcDialOption, func(client mq_pb.SeaweedMessagingClient) error {

77
weed/mq/client/sub_client/subscribe.go

@ -1,11 +1,88 @@
package sub_client package sub_client
import (
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"sync"
"time"
)
type ProcessorState struct {
}
// Subscribe subscribes to a topic's specified partitions. // Subscribe subscribes to a topic's specified partitions.
// If a partition is moved to another broker, the subscriber will automatically reconnect to the new broker. // If a partition is moved to another broker, the subscriber will automatically reconnect to the new broker.
func (sub *TopicSubscriber) Subscribe() error { func (sub *TopicSubscriber) Subscribe() error {
go sub.startProcessors()
// loop forever // loop forever
sub.doKeepConnectedToSubCoordinator() sub.doKeepConnectedToSubCoordinator()
return nil return nil
} }
func (sub *TopicSubscriber) startProcessors() {
// listen to the messages from the sub coordinator
// start one processor per partition
var wg sync.WaitGroup
semaphore := make(chan struct{}, sub.ProcessorConfig.ConcurrentPartitionLimit)
for assigned := range sub.brokerPartitionAssignmentChan {
wg.Add(1)
semaphore <- struct{}{}
topicPartition := topic.FromPbPartition(assigned.Partition)
// wait until no covering partition is still in progress
sub.waitUntilNoOverlappingPartitionInFlight(topicPartition)
// start a processors
sub.activeProcessorsLock.Lock()
sub.activeProcessors[topicPartition] = &ProcessorState{}
sub.activeProcessorsLock.Unlock()
go func(assigned *mq_pb.BrokerPartitionAssignment, topicPartition topic.Partition) {
defer func() {
sub.activeProcessorsLock.Lock()
delete(sub.activeProcessors, topicPartition)
sub.activeProcessorsLock.Unlock()
<-semaphore
wg.Done()
}()
glog.V(0).Infof("subscriber %s/%s assigned partition %+v at %v", sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup, assigned.Partition, assigned.LeaderBroker)
err := sub.onEachPartition(assigned)
if err != nil {
glog.V(0).Infof("subscriber %s/%s partition %+v at %v: %v", sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup, assigned.Partition, assigned.LeaderBroker, err)
} else {
glog.V(0).Infof("subscriber %s/%s partition %+v at %v completed", sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup, assigned.Partition, assigned.LeaderBroker)
}
}(assigned, topicPartition)
}
wg.Wait()
}
func (sub *TopicSubscriber) waitUntilNoOverlappingPartitionInFlight(topicPartition topic.Partition) {
foundOverlapping := true
for foundOverlapping {
sub.activeProcessorsLock.Lock()
foundOverlapping = false
for partition, _ := range sub.activeProcessors {
if partition.Overlaps(topicPartition) {
foundOverlapping = true
break
}
}
sub.activeProcessorsLock.Unlock()
if foundOverlapping {
glog.V(0).Infof("subscriber %s/%s waiting for partition %+v to complete", sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup, topicPartition)
time.Sleep(1 * time.Second)
}
}
}

23
weed/mq/client/sub_client/subscriber.go

@ -4,6 +4,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/mq/topic" "github.com/seaweedfs/seaweedfs/weed/mq/topic"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"google.golang.org/grpc" "google.golang.org/grpc"
"sync"
"time" "time"
) )
@ -30,23 +31,27 @@ type OnCompletionFunc func()
type TopicSubscriber struct { type TopicSubscriber struct {
SubscriberConfig *SubscriberConfiguration SubscriberConfig *SubscriberConfiguration
ContentConfig *ContentConfiguration ContentConfig *ContentConfiguration
ProcessorConfig *ProcessorConfiguration
brokerPartitionAssignments []*mq_pb.BrokerPartitionAssignment
OnEachMessageFunc OnEachMessageFunc
ProcessorConfig *ProcessorConfiguration
brokerPartitionAssignmentChan chan *mq_pb.BrokerPartitionAssignment
OnEachMessageFunc OnEachMessageFunc
OnCompletionFunc OnCompletionFunc OnCompletionFunc OnCompletionFunc
bootstrapBrokers []string bootstrapBrokers []string
waitForMoreMessage bool waitForMoreMessage bool
alreadyProcessedTsNs int64 alreadyProcessedTsNs int64
activeProcessors map[topic.Partition]*ProcessorState
activeProcessorsLock sync.Mutex
} }
func NewTopicSubscriber(bootstrapBrokers []string, subscriber *SubscriberConfiguration, content *ContentConfiguration, processor ProcessorConfiguration) *TopicSubscriber { func NewTopicSubscriber(bootstrapBrokers []string, subscriber *SubscriberConfiguration, content *ContentConfiguration, processor ProcessorConfiguration) *TopicSubscriber {
return &TopicSubscriber{ return &TopicSubscriber{
SubscriberConfig: subscriber,
ContentConfig: content,
ProcessorConfig: &processor,
bootstrapBrokers: bootstrapBrokers,
waitForMoreMessage: true,
alreadyProcessedTsNs: content.StartTime.UnixNano(),
SubscriberConfig: subscriber,
ContentConfig: content,
ProcessorConfig: &processor,
brokerPartitionAssignmentChan: make(chan *mq_pb.BrokerPartitionAssignment, 1024),
bootstrapBrokers: bootstrapBrokers,
waitForMoreMessage: true,
alreadyProcessedTsNs: content.StartTime.UnixNano(),
activeProcessors: make(map[topic.Partition]*ProcessorState),
} }
} }

10
weed/mq/topic/partition.go

@ -71,3 +71,13 @@ func (partition Partition) ToPbPartition() *mq_pb.Partition {
UnixTimeNs: partition.UnixTimeNs, UnixTimeNs: partition.UnixTimeNs,
} }
} }
func (partition Partition) Overlaps(partition2 Partition) bool {
if partition.RangeStart >= partition2.RangeStop {
return false
}
if partition.RangeStop <= partition2.RangeStart {
return false
}
return true
}
Loading…
Cancel
Save