Browse Source

adds locking

pull/5890/head
chrislu 4 months ago
parent
commit
270e91b0be
  1. 3
      weed/mq/broker/broker_grpc_pub.go
  2. 2
      weed/mq/topic/local_manager.go
  3. 17
      weed/mq/topic/local_topic.go

3
weed/mq/broker/broker_grpc_pub.go

@ -36,9 +36,6 @@ import (
// Each subscription may not get data. It can act as a backup. // Each subscription may not get data. It can act as a backup.
func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_PublishMessageServer) error { func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_PublishMessageServer) error {
// 1. write to the volume server
// 2. find the topic metadata owning filer
// 3. write to the filer
req, err := stream.Recv() req, err := stream.Recv()
if err != nil { if err != nil {

2
weed/mq/topic/local_manager.go

@ -28,7 +28,7 @@ func (manager *LocalTopicManager) AddLocalPartition(topic Topic, localPartition
if !manager.topics.SetIfAbsent(topic.String(), localTopic) { if !manager.topics.SetIfAbsent(topic.String(), localTopic) {
localTopic, _ = manager.topics.Get(topic.String()) localTopic, _ = manager.topics.Get(topic.String())
} }
localTopic.AddPartition(localPartition)
localTopic.addPartition(localPartition)
} }
// GetLocalPartition gets a topic from the local topic manager // GetLocalPartition gets a topic from the local topic manager

17
weed/mq/topic/local_topic.go

@ -5,6 +5,7 @@ import "sync"
type LocalTopic struct { type LocalTopic struct {
Topic Topic
Partitions []*LocalPartition Partitions []*LocalPartition
partitionLock sync.RWMutex
} }
func NewLocalTopic(topic Topic) *LocalTopic { func NewLocalTopic(topic Topic) *LocalTopic {
@ -15,6 +16,9 @@ func NewLocalTopic(topic Topic) *LocalTopic {
} }
func (localTopic *LocalTopic) findPartition(partition Partition) *LocalPartition { func (localTopic *LocalTopic) findPartition(partition Partition) *LocalPartition {
localTopic.partitionLock.RLock()
defer localTopic.partitionLock.RUnlock()
for _, localPartition := range localTopic.Partitions { for _, localPartition := range localTopic.Partitions {
if localPartition.Partition.Equals(partition) { if localPartition.Partition.Equals(partition) {
return localPartition return localPartition
@ -23,6 +27,9 @@ func (localTopic *LocalTopic) findPartition(partition Partition) *LocalPartition
return nil return nil
} }
func (localTopic *LocalTopic) removePartition(partition Partition) bool { func (localTopic *LocalTopic) removePartition(partition Partition) bool {
localTopic.partitionLock.Lock()
defer localTopic.partitionLock.Unlock()
foundPartitionIndex := -1 foundPartitionIndex := -1
for i, localPartition := range localTopic.Partitions { for i, localPartition := range localTopic.Partitions {
if localPartition.Partition.Equals(partition) { if localPartition.Partition.Equals(partition) {
@ -37,9 +44,13 @@ func (localTopic *LocalTopic) removePartition(partition Partition) bool {
localTopic.Partitions = append(localTopic.Partitions[:foundPartitionIndex], localTopic.Partitions[foundPartitionIndex+1:]...) localTopic.Partitions = append(localTopic.Partitions[:foundPartitionIndex], localTopic.Partitions[foundPartitionIndex+1:]...)
return true return true
} }
func (localTopic *LocalTopic) AddPartition(localPartition *LocalPartition) {
if localTopic.findPartition(localPartition.Partition) != nil {
return
func (localTopic *LocalTopic) addPartition(localPartition *LocalPartition) {
localTopic.partitionLock.Lock()
defer localTopic.partitionLock.Unlock()
for _, partition := range localTopic.Partitions {
if localPartition.Partition.Equals(partition.Partition) {
return
}
} }
localTopic.Partitions = append(localTopic.Partitions, localPartition) localTopic.Partitions = append(localTopic.Partitions, localPartition)
} }

Loading…
Cancel
Save