diff --git a/weed/mq/broker/broker_grpc_configure.go b/weed/mq/broker/broker_grpc_configure.go index 008c08bbe..8d3727b1f 100644 --- a/weed/mq/broker/broker_grpc_configure.go +++ b/weed/mq/broker/broker_grpc_configure.go @@ -82,18 +82,20 @@ func (b *MessageQueueBroker) AssignTopicPartitions(c context.Context, request *m // drain existing topic partition subscriptions for _, assignment := range request.BrokerPartitionAssignments { - localPartition := topic.FromPbBrokerPartitionAssignment(self, assignment, b.genLogFlushFunc(request.Topic, assignment.Partition), b.genLogOnDiskReadFunc(request.Topic, assignment.Partition)) + t := topic.FromPbTopic(request.Topic) + partition := topic.FromPbPartition(assignment.Partition) + b.accessLock.Lock() if request.IsDraining { // TODO drain existing topic partition subscriptions - - b.localTopicManager.RemoveTopicPartition( - topic.FromPbTopic(request.Topic), - localPartition.Partition) + b.localTopicManager.RemoveTopicPartition(t, partition) } else { - b.localTopicManager.AddTopicPartition( - topic.FromPbTopic(request.Topic), - localPartition) + var localPartition *topic.LocalPartition + if localPartition = b.localTopicManager.GetTopicPartition(t, partition); localPartition == nil { + localPartition = topic.FromPbBrokerPartitionAssignment(self, partition, assignment, b.genLogFlushFunc(request.Topic, assignment.Partition), b.genLogOnDiskReadFunc(request.Topic, assignment.Partition)) + b.localTopicManager.AddTopicPartition(t, localPartition) + } } + b.accessLock.Unlock() } // if is leader, notify the followers to drain existing topic partition subscriptions diff --git a/weed/mq/broker/broker_server.go b/weed/mq/broker/broker_server.go index 615964621..1a2c09ca4 100644 --- a/weed/mq/broker/broker_server.go +++ b/weed/mq/broker/broker_server.go @@ -6,6 +6,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/mq/pub_balancer" "github.com/seaweedfs/seaweedfs/weed/mq/sub_coordinator" "github.com/seaweedfs/seaweedfs/weed/mq/topic" + "sync" "time" "github.com/seaweedfs/seaweedfs/weed/cluster" @@ -43,6 +44,7 @@ type MessageQueueBroker struct { lockAsBalancer *cluster.LiveLock currentBalancer pb.ServerAddress Coordinator *sub_coordinator.Coordinator + accessLock sync.Mutex } func NewMessageBroker(option *MessageQueueBrokerOption, grpcDialOption grpc.DialOption) (mqBroker *MessageQueueBroker, err error) { diff --git a/weed/mq/topic/local_partition.go b/weed/mq/topic/local_partition.go index 84602add7..f4a080f38 100644 --- a/weed/mq/topic/local_partition.go +++ b/weed/mq/topic/local_partition.go @@ -84,13 +84,13 @@ func (p *LocalPartition) GetEarliestInMemoryMessagePosition() log_buffer.Message return p.logBuffer.GetEarliestPosition() } -func FromPbBrokerPartitionAssignment(self pb.ServerAddress, assignment *mq_pb.BrokerPartitionAssignment, logFlushFn log_buffer.LogFlushFuncType, readFromDiskFn log_buffer.LogReadFromDiskFuncType) *LocalPartition { +func FromPbBrokerPartitionAssignment(self pb.ServerAddress, partition Partition, assignment *mq_pb.BrokerPartitionAssignment, logFlushFn log_buffer.LogFlushFuncType, readFromDiskFn log_buffer.LogReadFromDiskFuncType) *LocalPartition { isLeader := assignment.LeaderBroker == string(self) followers := make([]pb.ServerAddress, len(assignment.FollowerBrokers)) for i, followerBroker := range assignment.FollowerBrokers { followers[i] = pb.ServerAddress(followerBroker) } - return NewLocalPartition(FromPbPartition(assignment.Partition), isLeader, followers, logFlushFn, readFromDiskFn) + return NewLocalPartition(partition, isLeader, followers, logFlushFn, readFromDiskFn) } func (p *LocalPartition) closePublishers() {