Browse Source

simplify api

pull/5637/head
chrislu 1 year ago
parent
commit
db3670a3a5
  1. 2
      weed/mq/broker/broker_grpc_configure.go
  2. 2
      weed/mq/broker/broker_grpc_lookup.go
  3. 7
      weed/mq/pub_balancer/lookup.go

2
weed/mq/broker/broker_grpc_configure.go

@ -38,7 +38,7 @@ func (b *MessageQueueBroker) ConfigureTopic(ctx context.Context, request *mq_pb.
} }
ret := &mq_pb.ConfigureTopicResponse{} ret := &mq_pb.ConfigureTopicResponse{}
ret.BrokerPartitionAssignments, _, err = b.Balancer.LookupOrAllocateTopicPartitions(request.Topic, true, request.PartitionCount)
ret.BrokerPartitionAssignments, _, err = b.Balancer.LookupOrAllocateTopicPartitions(request.Topic, request.PartitionCount)
for _, bpa := range ret.BrokerPartitionAssignments { for _, bpa := range ret.BrokerPartitionAssignments {
fmt.Printf("create topic %s partition %+v on %s\n", request.Topic, bpa.Partition, bpa.LeaderBroker) fmt.Printf("create topic %s partition %+v on %s\n", request.Topic, bpa.Partition, bpa.LeaderBroker)

2
weed/mq/broker/broker_grpc_lookup.go

@ -35,7 +35,7 @@ func (b *MessageQueueBroker) LookupTopicBrokers(ctx context.Context, request *mq
ret := &mq_pb.LookupTopicBrokersResponse{} ret := &mq_pb.LookupTopicBrokersResponse{}
ret.Topic = request.Topic ret.Topic = request.Topic
ret.BrokerPartitionAssignments, _, err = b.Balancer.LookupOrAllocateTopicPartitions(ret.Topic, false, -1)
ret.BrokerPartitionAssignments, _, err = b.Balancer.LookupOrAllocateTopicPartitions(ret.Topic, -1)
return ret, err return ret, err
} }

7
weed/mq/pub_balancer/lookup.go

@ -10,7 +10,7 @@ var (
ErrNoBroker = errors.New("no broker") ErrNoBroker = errors.New("no broker")
) )
func (balancer *Balancer) LookupOrAllocateTopicPartitions(topic *mq_pb.Topic, publish bool, partitionCount int32) (assignments []*mq_pb.BrokerPartitionAssignment, alreadyExists bool, err error) {
func (balancer *Balancer) LookupOrAllocateTopicPartitions(topic *mq_pb.Topic, partitionCount int32) (assignments []*mq_pb.BrokerPartitionAssignment, alreadyExists bool, err error) {
if partitionCount == 0 { if partitionCount == 0 {
partitionCount = 6 partitionCount = 6
} }
@ -35,10 +35,13 @@ func (balancer *Balancer) LookupOrAllocateTopicPartitions(topic *mq_pb.Topic, pu
} }
} }
} }
if len(assignments) > 0 || !(publish && len(assignments) !=int(partitionCount) && partitionCount > 0) {
if len(assignments) > 0 {
glog.V(0).Infof("existing topic partitions %d: %+v", len(assignments), assignments) glog.V(0).Infof("existing topic partitions %d: %+v", len(assignments), assignments)
return assignments, true, nil return assignments, true, nil
} }
if partitionCount < 0 {
return nil, false, nil
}
// find the topic partitions on the filer // find the topic partitions on the filer
// if the topic is not found // if the topic is not found

Loading…
Cancel
Save