diff --git a/weed/mq/broker/broker_grpc_balancer.go b/weed/mq/broker/broker_grpc_balancer.go index e602f4bad..079a1612f 100644 --- a/weed/mq/broker/broker_grpc_balancer.go +++ b/weed/mq/broker/broker_grpc_balancer.go @@ -26,7 +26,9 @@ func (broker *MessageQueueBroker) ConnectToBalancer(stream mq_pb.SeaweedMessagin brokerStats, found = broker.Balancer.Brokers.Get(initMessage.Broker) if !found { brokerStats = balancer.NewBrokerStats() - broker.Balancer.Brokers.Set(initMessage.Broker, brokerStats) + if !broker.Balancer.Brokers.SetIfAbsent(initMessage.Broker, brokerStats) { + brokerStats, _ = broker.Balancer.Brokers.Get(initMessage.Broker) + } } } else { return status.Errorf(codes.InvalidArgument, "balancer init message is empty") diff --git a/weed/mq/broker/broker_grpc_create.go b/weed/mq/broker/broker_grpc_create.go index 44c2d4ee9..0e76899fa 100644 --- a/weed/mq/broker/broker_grpc_create.go +++ b/weed/mq/broker/broker_grpc_create.go @@ -43,7 +43,9 @@ func (broker *MessageQueueBroker) CreateTopic(ctx context.Context, request *mq_p brokerStats, found := broker.Balancer.Brokers.Get(bpa.LeaderBroker) if !found { brokerStats = balancer.NewBrokerStats() - broker.Balancer.Brokers.Set(bpa.LeaderBroker, brokerStats) + if !broker.Balancer.Brokers.SetIfAbsent(bpa.LeaderBroker, brokerStats) { + brokerStats, _ = broker.Balancer.Brokers.Get(bpa.LeaderBroker) + } } brokerStats.RegisterAssignment(request.Topic, bpa.Partition) return nil @@ -52,6 +54,8 @@ func (broker *MessageQueueBroker) CreateTopic(ctx context.Context, request *mq_p } } + // TODO revert if some error happens in the middle of the assignments + return ret, err } diff --git a/weed/mq/broker/broker_stats.go b/weed/mq/broker/broker_stats.go index 2c06f4668..eb9246e4a 100644 --- a/weed/mq/broker/broker_stats.go +++ b/weed/mq/broker/broker_stats.go @@ -61,7 +61,7 @@ func (broker *MessageQueueBroker) BrokerConnectToBalancer(self string) error { if err != nil { return fmt.Errorf("send stats message: %v", err) } - glog.V(4).Infof("sent stats: %+v", stats) + glog.V(3).Infof("sent stats: %+v", stats) time.Sleep(time.Millisecond*5000 + time.Duration(rand.Intn(1000))*time.Millisecond) } diff --git a/weed/mq/topic/local_manager.go b/weed/mq/topic/local_manager.go index 432fa4153..a2bc23f24 100644 --- a/weed/mq/topic/local_manager.go +++ b/weed/mq/topic/local_manager.go @@ -28,7 +28,9 @@ func (manager *LocalTopicManager) AddTopicPartition(topic Topic, localPartition Partitions: make([]*LocalPartition, 0), } } - manager.topics.SetIfAbsent(topic.String(), localTopic) + if !manager.topics.SetIfAbsent(topic.String(), localTopic) { + localTopic, _ = manager.topics.Get(topic.String()) + } if localTopic.findPartition(localPartition.Partition) != nil { return } @@ -61,6 +63,15 @@ func (manager *LocalTopicManager) CollectStats(duration time.Duration) *mq_pb.Br stats := &mq_pb.BrokerStats{ Stats: make(map[string]*mq_pb.TopicPartitionStats), } + + // collect current broker's cpu usage + // this needs to be in front, so the following stats can be more accurate + usages, err := cpu.Percent(duration, false) + if err == nil && len(usages) > 0 { + stats.CpuUsagePercent = int32(usages[0]) + } + + // collect current broker's topics and partitions manager.topics.IterCb(func(topic string, localTopic *LocalTopic) { for _, localPartition := range localTopic.Partitions { topicPartition := &TopicPartition{ @@ -85,12 +96,6 @@ func (manager *LocalTopicManager) CollectStats(duration time.Duration) *mq_pb.Br } }) - // collect current broker's cpu usage - usages, err := cpu.Percent(duration, false) - if err == nil && len(usages) > 0 { - stats.CpuUsagePercent = int32(usages[0]) - } - return stats }