diff --git a/weed/mq/broker/broker_grpc_pub.go b/weed/mq/broker/broker_grpc_pub.go index d633a3efa..f31dc7eff 100644 --- a/weed/mq/broker/broker_grpc_pub.go +++ b/weed/mq/broker/broker_grpc_pub.go @@ -36,9 +36,6 @@ import ( // Each subscription may not get data. It can act as a backup. func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_PublishMessageServer) error { - // 1. write to the volume server - // 2. find the topic metadata owning filer - // 3. write to the filer req, err := stream.Recv() if err != nil { diff --git a/weed/mq/topic/local_manager.go b/weed/mq/topic/local_manager.go index 91081c602..d87eff911 100644 --- a/weed/mq/topic/local_manager.go +++ b/weed/mq/topic/local_manager.go @@ -28,7 +28,7 @@ func (manager *LocalTopicManager) AddLocalPartition(topic Topic, localPartition if !manager.topics.SetIfAbsent(topic.String(), localTopic) { localTopic, _ = manager.topics.Get(topic.String()) } - localTopic.AddPartition(localPartition) + localTopic.addPartition(localPartition) } // GetLocalPartition gets a topic from the local topic manager diff --git a/weed/mq/topic/local_topic.go b/weed/mq/topic/local_topic.go index 35e313742..25ae03df6 100644 --- a/weed/mq/topic/local_topic.go +++ b/weed/mq/topic/local_topic.go @@ -5,6 +5,7 @@ import "sync" type LocalTopic struct { Topic Partitions []*LocalPartition + partitionLock sync.RWMutex } func NewLocalTopic(topic Topic) *LocalTopic { @@ -15,6 +16,9 @@ func NewLocalTopic(topic Topic) *LocalTopic { } func (localTopic *LocalTopic) findPartition(partition Partition) *LocalPartition { + localTopic.partitionLock.RLock() + defer localTopic.partitionLock.RUnlock() + for _, localPartition := range localTopic.Partitions { if localPartition.Partition.Equals(partition) { return localPartition @@ -23,6 +27,9 @@ func (localTopic *LocalTopic) findPartition(partition Partition) *LocalPartition return nil } func (localTopic *LocalTopic) removePartition(partition Partition) bool { + localTopic.partitionLock.Lock() + defer localTopic.partitionLock.Unlock() + foundPartitionIndex := -1 for i, localPartition := range localTopic.Partitions { if localPartition.Partition.Equals(partition) { @@ -37,9 +44,13 @@ func (localTopic *LocalTopic) removePartition(partition Partition) bool { localTopic.Partitions = append(localTopic.Partitions[:foundPartitionIndex], localTopic.Partitions[foundPartitionIndex+1:]...) return true } -func (localTopic *LocalTopic) AddPartition(localPartition *LocalPartition) { - if localTopic.findPartition(localPartition.Partition) != nil { - return +func (localTopic *LocalTopic) addPartition(localPartition *LocalPartition) { + localTopic.partitionLock.Lock() + defer localTopic.partitionLock.Unlock() + for _, partition := range localTopic.Partitions { + if localPartition.Partition.Equals(partition.Partition) { + return + } } localTopic.Partitions = append(localTopic.Partitions, localPartition) }