Browse Source

fix assignments if brokers changed

pull/5637/head
chrislu 11 months ago
parent
commit
b0a2e9aea3
  1. 11
      weed/mq/broker/broker_topic_conf_read_write.go
  2. 20
      weed/mq/pub_balancer/allocate.go

11
weed/mq/broker/broker_topic_conf_read_write.go

@ -5,6 +5,7 @@ import (
"fmt"
"github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/mq/pub_balancer"
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
@ -46,5 +47,15 @@ func (b *MessageQueueBroker) readTopicConfFromFiler(t topic.Topic) (conf *mq_pb.
}); err != nil {
return nil, err
}
// also fix assignee broker if invalid
changedAssignments := pub_balancer.EnsureAssignmentsToActiveBrokers(b.Balancer.Brokers, conf.BrokerPartitionAssignments)
if len(changedAssignments) > 0 {
glog.V(0).Infof("topic %v partition assignments changed: %v", t, changedAssignments)
if err = b.saveTopicConfToFiler(t.ToPbTopic(), conf); err != nil {
return nil, err
}
}
return conf, err
}

20
weed/mq/pub_balancer/allocate.go

@ -55,3 +55,23 @@ func pickBrokers(brokers cmap.ConcurrentMap[string, *BrokerStats], count int32)
}
return pickedBrokers
}
func EnsureAssignmentsToActiveBrokers(activeBrokers cmap.ConcurrentMap[string,*BrokerStats], assignments []*mq_pb.BrokerPartitionAssignment) (changedAssignments []*mq_pb.BrokerPartitionAssignment) {
for _, assignment := range assignments {
if assignment.LeaderBroker == "" {
changedAssignments = append(changedAssignments, assignment)
continue
}
if _, found := activeBrokers.Get(assignment.LeaderBroker); !found {
changedAssignments = append(changedAssignments, assignment)
continue
}
}
// pick the brokers with the least number of partitions
pickedBrokers := pickBrokers(activeBrokers, int32(len(changedAssignments)))
for i, assignment := range changedAssignments {
assignment.LeaderBroker = pickedBrokers[i]
}
return changedAssignments
}
Loading…
Cancel
Save