Browse Source

local partition is generated or not

pull/5637/head
chrislu 10 months ago
parent
commit
55714f54ab
  1. 2
      weed/mq/broker/broker_grpc_pub.go
  2. 4
      weed/mq/broker/broker_grpc_sub.go
  3. 15
      weed/mq/broker/broker_topic_conf_read_write.go

2
weed/mq/broker/broker_grpc_pub.go

@ -53,7 +53,7 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis
var p topic.Partition
if initMessage != nil {
t, p = topic.FromPbTopic(initMessage.Topic), topic.FromPbPartition(initMessage.Partition)
localTopicPartition, err = b.GetOrGenLocalPartition(t, p)
localTopicPartition, _, err = b.GetOrGenLocalPartition(t, p)
if err != nil {
response.Error = fmt.Sprintf("topic %v partition %v not setup", initMessage.Topic, initMessage.Partition)
glog.Errorf("topic %v partition %v not setup", initMessage.Topic, initMessage.Partition)

4
weed/mq/broker/broker_grpc_sub.go

@ -26,7 +26,7 @@ func (b *MessageQueueBroker) SubscribeMessage(req *mq_pb.SubscribeMessageRequest
var localTopicPartition *topic.LocalPartition
for localTopicPartition == nil {
localTopicPartition, err = b.GetOrGenLocalPartition(t, partition)
localTopicPartition, _, err = b.GetOrGenLocalPartition(t, partition)
if err != nil {
glog.V(1).Infof("topic %v partition %v not setup", t, partition)
}
@ -143,7 +143,7 @@ func (b *MessageQueueBroker) FollowInMemoryMessages(req *mq_pb.FollowInMemoryMe
var localTopicPartition *topic.LocalPartition
for localTopicPartition == nil {
localTopicPartition, err = b.GetOrGenLocalPartition(t, partition)
localTopicPartition, _, err = b.GetOrGenLocalPartition(t, partition)
if err != nil {
glog.V(1).Infof("topic %v partition %v not setup", t, partition)
}

15
weed/mq/broker/broker_topic_conf_read_write.go

@ -56,34 +56,35 @@ func (b *MessageQueueBroker) readTopicConfFromFiler(t topic.Topic) (conf *mq_pb.
return conf, nil
}
func (b *MessageQueueBroker) GetOrGenLocalPartition(t topic.Topic, partition topic.Partition) (localPartition *topic.LocalPartition, err error) {
func (b *MessageQueueBroker) GetOrGenLocalPartition(t topic.Topic, partition topic.Partition) (localPartition *topic.LocalPartition, isGenerated bool, err error) {
b.accessLock.Lock()
defer b.accessLock.Unlock()
if localPartition = b.localTopicManager.GetTopicPartition(t, partition); localPartition == nil {
localPartition, err = b.genLocalPartitionFromFiler(t, partition)
localPartition, isGenerated, err = b.genLocalPartitionFromFiler(t, partition)
if err != nil {
return nil, err
return nil, false, err
}
}
return localPartition, nil
return localPartition, isGenerated, nil
}
func (b *MessageQueueBroker) genLocalPartitionFromFiler(t topic.Topic, partition topic.Partition) (localPartition *topic.LocalPartition, err error) {
func (b *MessageQueueBroker) genLocalPartitionFromFiler(t topic.Topic, partition topic.Partition) (localPartition *topic.LocalPartition, isGenerated bool, err error) {
self := b.option.BrokerAddress()
conf, err := b.readTopicConfFromFiler(t)
if err != nil {
return nil, err
return nil, isGenerated, err
}
for _, assignment := range conf.BrokerPartitionAssignments {
if assignment.LeaderBroker == string(self) && partition.Equals(topic.FromPbPartition(assignment.Partition)) {
localPartition = topic.FromPbBrokerPartitionAssignment(b.option.BrokerAddress(), partition, assignment, b.genLogFlushFunc(t, assignment.Partition), b.genLogOnDiskReadFunc(t, assignment.Partition))
b.localTopicManager.AddTopicPartition(t, localPartition)
isGenerated = true
break
}
}
return localPartition, nil
return localPartition, isGenerated, nil
}
func (b *MessageQueueBroker) ensureTopicActiveAssignments(t topic.Topic, conf *mq_pb.ConfigureTopicResponse) (err error) {

Loading…
Cancel
Save