|
|
@ -30,16 +30,17 @@ func (b *MessageQueueBroker) ConfigureTopic(ctx context.Context, request *mq_pb. |
|
|
|
} |
|
|
|
|
|
|
|
t := topic.FromPbTopic(request.Topic) |
|
|
|
resp, err = b.readTopicConfFromFiler(t) |
|
|
|
if err != nil { |
|
|
|
var readErr error |
|
|
|
resp, readErr = b.readTopicConfFromFiler(t) |
|
|
|
if readErr != nil { |
|
|
|
glog.V(0).Infof("read topic %s conf: %v", request.Topic, err) |
|
|
|
} else { |
|
|
|
err = b.ensureTopicActiveAssignments(t, resp) |
|
|
|
readErr = b.ensureTopicActiveAssignments(t, resp) |
|
|
|
// no need to assign directly.
|
|
|
|
// The added or updated assignees will read from filer directly.
|
|
|
|
// The gone assignees will die by themselves.
|
|
|
|
} |
|
|
|
if err == nil && len(resp.BrokerPartitionAssignments) == int(request.PartitionCount) { |
|
|
|
if readErr == nil && len(resp.BrokerPartitionAssignments) == int(request.PartitionCount) { |
|
|
|
glog.V(0).Infof("existing topic partitions %d: %+v", len(resp.BrokerPartitionAssignments), resp.BrokerPartitionAssignments) |
|
|
|
} else { |
|
|
|
if resp!=nil && len(resp.BrokerPartitionAssignments) > 0 { |
|
|
|