diff --git a/weed/mq/broker/broker_grpc_configure.go b/weed/mq/broker/broker_grpc_configure.go index 042621a4c..660a82f83 100644 --- a/weed/mq/broker/broker_grpc_configure.go +++ b/weed/mq/broker/broker_grpc_configure.go @@ -78,8 +78,7 @@ func (b *MessageQueueBroker) AssignTopicPartitions(c context.Context, request *m // drain existing topic partition subscriptions for _, assignment := range request.BrokerPartitionAssignments { - topicPartition := topic.FromPbPartition(assignment.Partition) - localPartition := topic.FromPbBrokerPartitionAssignment(self, assignment, b.genLogFlushFunc(request.Topic, topicPartition)) + localPartition := topic.FromPbBrokerPartitionAssignment(self, assignment, b.genLogFlushFunc(request.Topic, assignment.Partition)) if request.IsDraining { // TODO drain existing topic partition subscriptions @@ -111,7 +110,7 @@ func (b *MessageQueueBroker) AssignTopicPartitions(c context.Context, request *m return ret, nil } -func (b *MessageQueueBroker) genLogFlushFunc(t *mq_pb.Topic, partition topic.Partition) log_buffer.LogFlushFuncType { +func (b *MessageQueueBroker) genLogFlushFunc(t *mq_pb.Topic, partition *mq_pb.Partition) log_buffer.LogFlushFuncType { topicDir := fmt.Sprintf("%s/%s/%s", filer.TopicsDir, t.Namespace, t.Name) partitionGeneration := time.Unix(0, partition.UnixTimeNs).UTC().Format(topic.TIME_FORMAT) partitionDir := fmt.Sprintf("%s/%s/%4d-%4d", topicDir, partitionGeneration, partition.RangeStart, partition.RangeStop)