Browse Source

send assignments to newly connected subscribers

pull/5637/head
chrislu 12 months ago
parent
commit
c8090b1f99
  1. 12
      weed/mq/broker/broker_grpc_pub.go
  2. 22
      weed/mq/broker/broker_grpc_sub_coordinator.go

12
weed/mq/broker/broker_grpc_pub.go

@ -150,11 +150,10 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis
func (b *MessageQueueBroker) loadLocalTopicPartitionFromFiler(t topic.Topic, p topic.Partition) (localTopicPartition *topic.LocalPartition, err error) {
self := b.option.BrokerAddress()
glog.V(0).Infof("broker %s load topic %v partition %v", self, t, p)
// load local topic partition from configuration on filer if not found
var conf *mq_pb.ConfigureTopicResponse
conf, err = b.readTopicConfFromFiler(t, p)
conf, err = b.readTopicConfFromFiler(t)
if err != nil {
return nil, err
}
@ -177,17 +176,20 @@ func (b *MessageQueueBroker) loadLocalTopicPartitionFromFiler(t topic.Topic, p t
return localTopicPartition, nil
}
func (b *MessageQueueBroker) readTopicConfFromFiler(t topic.Topic, p topic.Partition) (conf *mq_pb.ConfigureTopicResponse, err error) {
func (b *MessageQueueBroker) readTopicConfFromFiler(t topic.Topic) (conf *mq_pb.ConfigureTopicResponse, err error) {
glog.V(0).Infof("load conf for topic %v from filer", t)
topicDir := fmt.Sprintf("%s/%s/%s", filer.TopicsDir, t.Namespace, t.Name)
if err = b.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
data, err := filer.ReadInsideFiler(client, topicDir, "topic.conf")
if err != nil {
return fmt.Errorf("read topic %v partition %v conf: %v", t, p, err)
return fmt.Errorf("read topic %v partition %v conf: %v", t, err)
}
// parse into filer conf object
conf = &mq_pb.ConfigureTopicResponse{}
if err = jsonpb.Unmarshal(data, conf); err != nil {
return fmt.Errorf("unmarshal topic %v partition %v conf: %v", t, p, err)
return fmt.Errorf("unmarshal topic %v conf: %v", t, err)
}
return nil
}); err != nil {

22
weed/mq/broker/broker_grpc_sub_coordinator.go

@ -4,6 +4,7 @@ import (
"context"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/mq/sub_coordinator"
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
@ -35,8 +36,27 @@ func (b *MessageQueueBroker) SubscriberToSubCoordinator(stream mq_pb.SeaweedMess
ctx := stream.Context()
// process ack messages
go func() {
// try to load the partition assignment from filer
if conf, err := b.readTopicConfFromFiler(topic.FromPbTopic(initMessage.Topic)); err == nil {
assignedPartitions := make([]*mq_pb.SubscriberToSubCoordinatorResponse_AssignedPartition, len(conf.BrokerPartitionAssignments))
for i, assignment := range conf.BrokerPartitionAssignments {
assignedPartitions[i] = &mq_pb.SubscriberToSubCoordinatorResponse_AssignedPartition{
Partition: assignment.Partition,
Broker: assignment.LeaderBroker,
}
}
// send partition assignment to subscriber
cgi.ResponseChan <- &mq_pb.SubscriberToSubCoordinatorResponse{
Message: &mq_pb.SubscriberToSubCoordinatorResponse_Assignment_{
Assignment: &mq_pb.SubscriberToSubCoordinatorResponse_Assignment{
AssignedPartitions: assignedPartitions,
},
},
}
}
// process ack messages
for {
_, err := stream.Recv()
if err != nil {

Loading…
Cancel
Save