Browse Source

move initial assignment to rebalance logic

pull/5890/head
chrislu 7 months ago
parent
commit
972e9faaa2
  1. 13
      weed/mq/broker/broker_grpc_sub_coordinator.go
  2. 9
      weed/mq/broker/broker_server.go
  3. 11
      weed/mq/sub_coordinator/consumer_group.go

13
weed/mq/broker/broker_grpc_sub_coordinator.go

@ -4,7 +4,6 @@ import (
"context"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/mq/sub_coordinator"
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
@ -37,18 +36,6 @@ func (b *MessageQueueBroker) SubscriberToSubCoordinator(stream mq_pb.SeaweedMess
ctx := stream.Context()
go func() {
// try to load the partition assignment from filer
if conf, err := b.readTopicConfFromFiler(topic.FromPbTopic(initMessage.Topic)); err == nil {
// send partition assignment to subscriber
cgi.ResponseChan <- &mq_pb.SubscriberToSubCoordinatorResponse{
Message: &mq_pb.SubscriberToSubCoordinatorResponse_Assignment_{
Assignment: &mq_pb.SubscriberToSubCoordinatorResponse_Assignment{
PartitionAssignments: conf.BrokerPartitionAssignments,
},
},
}
}
// process ack messages
for {
_, err := stream.Recv()

9
weed/mq/broker/broker_server.go

@ -65,8 +65,8 @@ func NewMessageBroker(option *MessageQueueBrokerOption, grpcDialOption grpc.Dial
Coordinator: coordinator,
}
fca := &sub_coordinator.FilerClientAccessor{
GetFilerFn: mqBroker.GetFiler,
GrpcDialOption: grpcDialOption,
GetFiler: mqBroker.GetFiler,
GetGrpcDialOption: mqBroker.GetGrpcDialOption,
}
mqBroker.fca = fca
coordinator.FilerClientAccessor = fca
@ -126,6 +126,11 @@ func (b *MessageQueueBroker) OnBrokerUpdate(update *master_pb.ClusterNodeUpdate,
}
func (b *MessageQueueBroker) GetGrpcDialOption() grpc.DialOption {
return b.grpcDialOption
}
func (b *MessageQueueBroker) GetFiler() pb.ServerAddress {
return b.currentFiler
}

11
weed/mq/sub_coordinator/consumer_group.go

@ -77,10 +77,13 @@ func (cg *ConsumerGroup) BalanceConsumerGroupInstances(knownPartitionSlotToBroke
// collect current topic partitions
partitionSlotToBrokerList := knownPartitionSlotToBrokerList
if partitionSlotToBrokerList == nil {
var found bool
partitionSlotToBrokerList, found = cg.pubBalancer.TopicToBrokers.Get(cg.topic.String())
if !found {
glog.V(0).Infof("topic %s not found in balancer", cg.topic.String())
if conf, err := cg.filerClientAccessor.ReadTopicConfFromFiler(cg.topic); err == nil {
partitionSlotToBrokerList = pub_balancer.NewPartitionSlotToBrokerList(pub_balancer.MaxPartitionCount)
for _, assignment := range conf.BrokerPartitionAssignments {
partitionSlotToBrokerList.AddBroker(assignment.Partition, assignment.LeaderBroker)
}
} else {
glog.V(0).Infof("fail to read topic conf from filer: %v", err)
return
}
}

Loading…
Cancel
Save