Browse Source

pub/sub brokers check filer for assigned partitions

pull/5637/head
chrislu 12 months ago
parent
commit
2beaa2d0b3
  1. 30
      weed/mq/broker/broker_grpc_pub.go
  2. 11
      weed/mq/broker/broker_grpc_sub.go
  3. 17
      weed/mq/broker/broker_topic_conf_read_write.go

30
weed/mq/broker/broker_grpc_pub.go

@ -54,10 +54,12 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis
t, p = topic.FromPbTopic(initMessage.Topic), topic.FromPbPartition(initMessage.Partition) t, p = topic.FromPbTopic(initMessage.Topic), topic.FromPbPartition(initMessage.Partition)
localTopicPartition = b.localTopicManager.GetTopicPartition(t, p) localTopicPartition = b.localTopicManager.GetTopicPartition(t, p)
if localTopicPartition == nil { if localTopicPartition == nil {
if localTopicPartition, err = b.genLocalPartitionFromFiler(t, p); err != nil {
response.Error = fmt.Sprintf("topic %v partition %v not setup", initMessage.Topic, initMessage.Partition) response.Error = fmt.Sprintf("topic %v partition %v not setup", initMessage.Topic, initMessage.Partition)
glog.Errorf("topic %v partition %v not setup", initMessage.Topic, initMessage.Partition) glog.Errorf("topic %v partition %v not setup", initMessage.Topic, initMessage.Partition)
return stream.Send(response) return stream.Send(response)
} }
}
ackInterval = int(initMessage.AckInterval) ackInterval = int(initMessage.AckInterval)
stream.Send(response) stream.Send(response)
} else { } else {
@ -141,34 +143,6 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis
return nil return nil
} }
func (b *MessageQueueBroker) loadLocalTopicPartitionFromFiler(t topic.Topic, p topic.Partition) (localTopicPartition *topic.LocalPartition, err error) {
self := b.option.BrokerAddress()
// load local topic partition from configuration on filer if not found
var conf *mq_pb.ConfigureTopicResponse
conf, err = b.readTopicConfFromFiler(t)
if err != nil {
return nil, err
}
// create local topic partition
var hasCreated bool
for _, assignment := range conf.BrokerPartitionAssignments {
if assignment.LeaderBroker == string(self) && p.Equals(topic.FromPbPartition(assignment.Partition)) {
localTopicPartition = topic.FromPbBrokerPartitionAssignment(self, p, assignment, b.genLogFlushFunc(t, assignment.Partition), b.genLogOnDiskReadFunc(t, assignment.Partition))
b.localTopicManager.AddTopicPartition(t, localTopicPartition)
hasCreated = true
break
}
}
if !hasCreated {
return nil, fmt.Errorf("topic %v partition %v not assigned to broker %v", t, p, self)
}
return localTopicPartition, nil
}
// duplicated from master_grpc_server.go // duplicated from master_grpc_server.go
func findClientAddress(ctx context.Context) string { func findClientAddress(ctx context.Context) string {
// fmt.Printf("FromContext %+v\n", ctx) // fmt.Printf("FromContext %+v\n", ctx)

11
weed/mq/broker/broker_grpc_sub.go

@ -22,8 +22,16 @@ func (b *MessageQueueBroker) SubscribeMessage(req *mq_pb.SubscribeMessageRequest
glog.V(0).Infof("Subscriber %s on %v %v connected", req.GetInit().ConsumerId, t, partition) glog.V(0).Infof("Subscriber %s on %v %v connected", req.GetInit().ConsumerId, t, partition)
var localTopicPartition *topic.LocalPartition var localTopicPartition *topic.LocalPartition
localTopicPartition = b.localTopicManager.GetTopicPartition(t, partition)
for localTopicPartition == nil { for localTopicPartition == nil {
localTopicPartition = b.localTopicManager.GetTopicPartition(t, partition)
if localTopicPartition == nil {
if localTopicPartition, err = b.genLocalPartitionFromFiler(t, partition); err != nil {
glog.V(1).Infof("topic %v partition %v not setup", t, partition)
}
}
if localTopicPartition != nil {
break
}
time.Sleep(337 * time.Millisecond) time.Sleep(337 * time.Millisecond)
// Check if the client has disconnected by monitoring the context // Check if the client has disconnected by monitoring the context
select { select {
@ -38,7 +46,6 @@ func (b *MessageQueueBroker) SubscribeMessage(req *mq_pb.SubscribeMessageRequest
default: default:
// Continue processing the request // Continue processing the request
} }
localTopicPartition = b.localTopicManager.GetTopicPartition(t, partition)
} }
localTopicPartition.Subscribers.AddSubscriber(clientName, topic.NewLocalSubscriber()) localTopicPartition.Subscribers.AddSubscriber(clientName, topic.NewLocalSubscriber())

17
weed/mq/broker/broker_topic_conf_read_write.go

@ -53,6 +53,23 @@ func (b *MessageQueueBroker) readTopicConfFromFiler(t topic.Topic) (conf *mq_pb.
return conf, nil return conf, nil
} }
func (b *MessageQueueBroker) genLocalPartitionFromFiler(t topic.Topic, partition topic.Partition) (localPartition *topic.LocalPartition, err error) {
self := b.option.BrokerAddress()
conf, err := b.readTopicConfFromFiler(t)
if err != nil {
return nil, err
}
for _, assignment := range conf.BrokerPartitionAssignments {
if assignment.LeaderBroker == string(self) && partition.Equals(topic.FromPbPartition(assignment.Partition)) {
localPartition = topic.FromPbBrokerPartitionAssignment(b.option.BrokerAddress(), partition, assignment, b.genLogFlushFunc(t, assignment.Partition), b.genLogOnDiskReadFunc(t, assignment.Partition))
b.localTopicManager.AddTopicPartition(t, localPartition)
break
}
}
return localPartition, nil
}
func (b *MessageQueueBroker) ensureTopicActiveAssignments(t topic.Topic, conf *mq_pb.ConfigureTopicResponse) (err error) { func (b *MessageQueueBroker) ensureTopicActiveAssignments(t topic.Topic, conf *mq_pb.ConfigureTopicResponse) (err error) {
// also fix assignee broker if invalid // also fix assignee broker if invalid
addedAssignments, updatedAssignments := pub_balancer.EnsureAssignmentsToActiveBrokers(b.Balancer.Brokers, conf.BrokerPartitionAssignments) addedAssignments, updatedAssignments := pub_balancer.EnsureAssignmentsToActiveBrokers(b.Balancer.Brokers, conf.BrokerPartitionAssignments)

Loading…
Cancel
Save