diff --git a/weed/mq/broker/broker_grpc_assign.go b/weed/mq/broker/broker_grpc_assign.go
index 352a6f69b..48ec0d5bd 100644
--- a/weed/mq/broker/broker_grpc_assign.go
+++ b/weed/mq/broker/broker_grpc_assign.go
@@ -79,11 +79,11 @@ func (b *MessageQueueBroker) assignTopicPartitionsToBrokers(ctx context.Context,
 						return fmt.Errorf("create topic %s %v on %s: %v", t, bpa.LeaderBroker, bpa.Partition, doCreateErr)
 					}
 				}
-				brokerStats, found := b.Balancer.Brokers.Get(bpa.LeaderBroker)
+				brokerStats, found := b.PubBalancer.Brokers.Get(bpa.LeaderBroker)
 				if !found {
 					brokerStats = pub_balancer.NewBrokerStats()
-					if !b.Balancer.Brokers.SetIfAbsent(bpa.LeaderBroker, brokerStats) {
-						brokerStats, _ = b.Balancer.Brokers.Get(bpa.LeaderBroker)
+					if !b.PubBalancer.Brokers.SetIfAbsent(bpa.LeaderBroker, brokerStats) {
+						brokerStats, _ = b.PubBalancer.Brokers.Get(bpa.LeaderBroker)
 					}
 				}
 				brokerStats.RegisterAssignment(t, bpa.Partition, isAdd)
diff --git a/weed/mq/broker/broker_grpc_balance.go b/weed/mq/broker/broker_grpc_balance.go
index 412407211..54634c9d1 100644
--- a/weed/mq/broker/broker_grpc_balance.go
+++ b/weed/mq/broker/broker_grpc_balance.go
@@ -20,8 +20,8 @@ func (b *MessageQueueBroker) BalanceTopics(ctx context.Context, request *mq_pb.B
 
 	ret := &mq_pb.BalanceTopicsResponse{}
 
-	actions := b.Balancer.BalancePublishers()
-	err = b.Balancer.ExecuteBalanceAction(actions, b.grpcDialOption)
+	actions := b.PubBalancer.BalancePublishers()
+	err = b.PubBalancer.ExecuteBalanceAction(actions, b.grpcDialOption)
 
 	return ret, err
 }
diff --git a/weed/mq/broker/broker_grpc_configure.go b/weed/mq/broker/broker_grpc_configure.go
index 7f8afab03..7222c8359 100644
--- a/weed/mq/broker/broker_grpc_configure.go
+++ b/weed/mq/broker/broker_grpc_configure.go
@@ -60,10 +60,10 @@ func (b *MessageQueueBroker) ConfigureTopic(ctx context.Context, request *mq_pb.
 		}
 	}
 	resp = &mq_pb.ConfigureTopicResponse{}
-	if b.Balancer.Brokers.IsEmpty() {
+	if b.PubBalancer.Brokers.IsEmpty() {
 		return nil, status.Errorf(codes.Unavailable, pub_balancer.ErrNoBroker.Error())
 	}
-	resp.BrokerPartitionAssignments = pub_balancer.AllocateTopicPartitions(b.Balancer.Brokers, request.PartitionCount)
+	resp.BrokerPartitionAssignments = pub_balancer.AllocateTopicPartitions(b.PubBalancer.Brokers, request.PartitionCount)
 	resp.RecordType = request.RecordType
 
 	// save the topic configuration on filer
@@ -71,7 +71,7 @@ func (b *MessageQueueBroker) ConfigureTopic(ctx context.Context, request *mq_pb.
 		return nil, fmt.Errorf("configure topic: %v", err)
 	}
 
-	b.Balancer.OnPartitionChange(request.Topic, resp.BrokerPartitionAssignments)
+	b.PubBalancer.OnPartitionChange(request.Topic, resp.BrokerPartitionAssignments)
 
 	glog.V(0).Infof("ConfigureTopic: topic %s partition assignments: %v", request.Topic, resp.BrokerPartitionAssignments)
 
diff --git a/weed/mq/broker/broker_grpc_lookup.go b/weed/mq/broker/broker_grpc_lookup.go
index da2c64dfc..db62fd88a 100644
--- a/weed/mq/broker/broker_grpc_lookup.go
+++ b/weed/mq/broker/broker_grpc_lookup.go
@@ -50,7 +50,7 @@ func (b *MessageQueueBroker) ListTopics(ctx context.Context, request *mq_pb.List
 
 	ret := &mq_pb.ListTopicsResponse{}
 	knownTopics := make(map[string]struct{})
-	for brokerStatsItem := range b.Balancer.Brokers.IterBuffered() {
+	for brokerStatsItem := range b.PubBalancer.Brokers.IterBuffered() {
 		_, brokerStats := brokerStatsItem.Key, brokerStatsItem.Val
 		for topicPartitionStatsItem := range brokerStats.TopicPartitionStats.IterBuffered() {
 			topicPartitionStat := topicPartitionStatsItem.Val
diff --git a/weed/mq/broker/broker_grpc_pub_balancer.go b/weed/mq/broker/broker_grpc_pub_balancer.go
index b8675caca..5978d2173 100644
--- a/weed/mq/broker/broker_grpc_pub_balancer.go
+++ b/weed/mq/broker/broker_grpc_pub_balancer.go
@@ -22,12 +22,12 @@ func (b *MessageQueueBroker) PublisherToPubBalancer(stream mq_pb.SeaweedMessagin
 	initMessage := req.GetInit()
 	var brokerStats *pub_balancer.BrokerStats
 	if initMessage != nil {
-		brokerStats = b.Balancer.AddBroker(initMessage.Broker)
+		brokerStats = b.PubBalancer.AddBroker(initMessage.Broker)
 	} else {
 		return status.Errorf(codes.InvalidArgument, "balancer init message is empty")
 	}
 	defer func() {
-		b.Balancer.RemoveBroker(initMessage.Broker, brokerStats)
+		b.PubBalancer.RemoveBroker(initMessage.Broker, brokerStats)
 	}()
 
 	// process stats message
@@ -40,7 +40,7 @@ func (b *MessageQueueBroker) PublisherToPubBalancer(stream mq_pb.SeaweedMessagin
 			return status.Errorf(codes.Unavailable, "not current broker balancer")
 		}
 		if receivedStats := req.GetStats(); receivedStats != nil {
-			b.Balancer.OnBrokerStatsUpdated(initMessage.Broker, brokerStats, receivedStats)
+			b.PubBalancer.OnBrokerStatsUpdated(initMessage.Broker, brokerStats, receivedStats)
 			// glog.V(4).Infof("received from %v: %+v", initMessage.Broker, receivedStats)
 		}
 	}
diff --git a/weed/mq/broker/broker_grpc_sub_coordinator.go b/weed/mq/broker/broker_grpc_sub_coordinator.go
index a1b29f45c..266859c24 100644
--- a/weed/mq/broker/broker_grpc_sub_coordinator.go
+++ b/weed/mq/broker/broker_grpc_sub_coordinator.go
@@ -23,13 +23,13 @@ func (b *MessageQueueBroker) SubscriberToSubCoordinator(stream mq_pb.SeaweedMess
 	// process init message
 	initMessage := req.GetInit()
 	if initMessage != nil {
-		cgi = b.Coordinator.AddSubscriber(initMessage)
+		cgi = b.SubCoordinator.AddSubscriber(initMessage)
 		glog.V(0).Infof("subscriber %s/%s/%s connected", initMessage.ConsumerGroup, initMessage.ConsumerGroupInstanceId, initMessage.Topic)
 	} else {
 		return status.Errorf(codes.InvalidArgument, "subscriber init message is empty")
 	}
 	defer func() {
-		b.Coordinator.RemoveSubscriber(initMessage)
+		b.SubCoordinator.RemoveSubscriber(initMessage)
 		glog.V(0).Infof("subscriber %s/%s/%s disconnected: %v", initMessage.ConsumerGroup, initMessage.ConsumerGroupInstanceId, initMessage.Topic, err)
 	}()
 
diff --git a/weed/mq/broker/broker_server.go b/weed/mq/broker/broker_server.go
index 2e449083a..cdf652294 100644
--- a/weed/mq/broker/broker_server.go
+++ b/weed/mq/broker/broker_server.go
@@ -43,17 +43,17 @@ type MessageQueueBroker struct {
 	filers            map[pb.ServerAddress]struct{}
 	currentFiler      pb.ServerAddress
 	localTopicManager *topic.LocalTopicManager
-	Balancer          *pub_balancer.PubBalancer
-	lockAsBalancer    *cluster.LiveLock
-	Coordinator       *sub_coordinator.SubCoordinator
-	accessLock        sync.Mutex
+	PubBalancer       *pub_balancer.PubBalancer
+	lockAsBalancer *cluster.LiveLock
+	SubCoordinator *sub_coordinator.SubCoordinator
+	accessLock     sync.Mutex
 	fca               *sub_coordinator.FilerClientAccessor
 }
 
 func NewMessageBroker(option *MessageQueueBrokerOption, grpcDialOption grpc.DialOption) (mqBroker *MessageQueueBroker, err error) {
 
-	pub_broker_balancer := pub_balancer.NewBalancer()
-	coordinator := sub_coordinator.NewCoordinator(pub_broker_balancer)
+	pubBalancer := pub_balancer.NewPubBalancer()
+	subCoordinator := sub_coordinator.NewSubCoordinator(pubBalancer)
 
 	mqBroker = &MessageQueueBroker{
 		option:            option,
@@ -61,20 +61,20 @@ func NewMessageBroker(option *MessageQueueBrokerOption, grpcDialOption grpc.Dial
 		MasterClient:      wdclient.NewMasterClient(grpcDialOption, option.FilerGroup, cluster.BrokerType, option.BrokerAddress(), option.DataCenter, option.Rack, *pb.NewServiceDiscoveryFromMap(option.Masters)),
 		filers:            make(map[pb.ServerAddress]struct{}),
 		localTopicManager: topic.NewLocalTopicManager(),
-		Balancer:          pub_broker_balancer,
-		Coordinator:       coordinator,
+		PubBalancer:       pubBalancer,
+		SubCoordinator:    subCoordinator,
 	}
 	fca := &sub_coordinator.FilerClientAccessor{
 		GetFiler:          mqBroker.GetFiler,
 		GetGrpcDialOption: mqBroker.GetGrpcDialOption,
 	}
 	mqBroker.fca = fca
-	coordinator.FilerClientAccessor = fca
+	subCoordinator.FilerClientAccessor = fca
 
 	mqBroker.MasterClient.SetOnPeerUpdateFn(mqBroker.OnBrokerUpdate)
-	pub_broker_balancer.OnPartitionChange = mqBroker.Coordinator.OnPartitionChange
-	pub_broker_balancer.OnAddBroker = mqBroker.Coordinator.OnSubAddBroker
-	pub_broker_balancer.OnRemoveBroker = mqBroker.Coordinator.OnSubRemoveBroker
+	pubBalancer.OnPartitionChange = mqBroker.SubCoordinator.OnPartitionChange
+	pubBalancer.OnAddBroker = mqBroker.SubCoordinator.OnSubAddBroker
+	pubBalancer.OnRemoveBroker = mqBroker.SubCoordinator.OnSubRemoveBroker
 
 	go mqBroker.MasterClient.KeepConnectedToMaster()
 
diff --git a/weed/mq/broker/broker_topic_conf_read_write.go b/weed/mq/broker/broker_topic_conf_read_write.go
index 211473dad..ea5cb71b9 100644
--- a/weed/mq/broker/broker_topic_conf_read_write.go
+++ b/weed/mq/broker/broker_topic_conf_read_write.go
@@ -52,7 +52,7 @@ func (b *MessageQueueBroker) genLocalPartitionFromFiler(t topic.Topic, partition
 
 func (b *MessageQueueBroker) ensureTopicActiveAssignments(t topic.Topic, conf *mq_pb.ConfigureTopicResponse) (err error) {
 	// also fix assignee broker if invalid
-	hasChanges := pub_balancer.EnsureAssignmentsToActiveBrokers(b.Balancer.Brokers, 1, conf.BrokerPartitionAssignments)
+	hasChanges := pub_balancer.EnsureAssignmentsToActiveBrokers(b.PubBalancer.Brokers, 1, conf.BrokerPartitionAssignments)
 	if hasChanges {
 		glog.V(0).Infof("topic %v partition updated assignments: %v", t, conf.BrokerPartitionAssignments)
 		if err = b.fca.SaveTopicConfToFiler(t.ToPbTopic(), conf); err != nil {
diff --git a/weed/mq/pub_balancer/pub_balancer.go b/weed/mq/pub_balancer/pub_balancer.go
index 1c8e31558..3499d2233 100644
--- a/weed/mq/pub_balancer/pub_balancer.go
+++ b/weed/mq/pub_balancer/pub_balancer.go
@@ -37,7 +37,7 @@ type PubBalancer struct {
 	OnRemoveBroker    func(broker string, brokerStats *BrokerStats)
 }
 
-func NewBalancer() *PubBalancer {
+func NewPubBalancer() *PubBalancer {
 	return &PubBalancer{
 		Brokers:        cmap.New[*BrokerStats](),
 		TopicToBrokers: cmap.New[*PartitionSlotToBrokerList](),
diff --git a/weed/mq/sub_coordinator/sub_coordinator.go b/weed/mq/sub_coordinator/sub_coordinator.go
index f78e8c849..b03397ad8 100644
--- a/weed/mq/sub_coordinator/sub_coordinator.go
+++ b/weed/mq/sub_coordinator/sub_coordinator.go
@@ -22,7 +22,7 @@ type SubCoordinator struct {
 	FilerClientAccessor *FilerClientAccessor
 }
 
-func NewCoordinator(balancer *pub_balancer.PubBalancer) *SubCoordinator {
+func NewSubCoordinator(balancer *pub_balancer.PubBalancer) *SubCoordinator {
 	return &SubCoordinator{
 		TopicSubscribers: cmap.New[*TopicConsumerGroups](),
 		balancer:         balancer,