Browse Source

refactor

pull/5637/head
chrislu 12 months ago
parent
commit
c3f8530f97
  1. 48
      weed/mq/broker/broker_grpc_assign.go
  2. 46
      weed/mq/broker/broker_grpc_configure.go

48
weed/mq/broker/broker_grpc_assign.go

@ -4,9 +4,11 @@ import (
"context" "context"
"fmt" "fmt"
"github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/mq/pub_balancer"
"github.com/seaweedfs/seaweedfs/weed/mq/topic" "github.com/seaweedfs/seaweedfs/weed/mq/topic"
"github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"sync"
) )
// AssignTopicPartitions Runs on the assigned broker, to execute the topic partition assignment // AssignTopicPartitions Runs on the assigned broker, to execute the topic partition assignment
@ -50,3 +52,49 @@ func (b *MessageQueueBroker) AssignTopicPartitions(c context.Context, request *m
glog.V(0).Infof("AssignTopicPartitions: topic %s partition assignments: %v", request.Topic, request.BrokerPartitionAssignments) glog.V(0).Infof("AssignTopicPartitions: topic %s partition assignments: %v", request.Topic, request.BrokerPartitionAssignments)
return ret, nil return ret, nil
} }
// called by broker leader to drain existing partitions.
// new/updated partitions will be detected by broker from the filer
func (b *MessageQueueBroker) assignTopicPartitionsToBrokers(ctx context.Context, t *mq_pb.Topic, assignments []*mq_pb.BrokerPartitionAssignment, isAdd bool) error {
// notify the brokers to create the topic partitions in parallel
var wg sync.WaitGroup
for _, bpa := range assignments {
wg.Add(1)
go func(bpa *mq_pb.BrokerPartitionAssignment) {
defer wg.Done()
if doCreateErr := b.withBrokerClient(false, pb.ServerAddress(bpa.LeaderBroker), func(client mq_pb.SeaweedMessagingClient) error {
_, doCreateErr := client.AssignTopicPartitions(ctx, &mq_pb.AssignTopicPartitionsRequest{
Topic: t,
BrokerPartitionAssignments: []*mq_pb.BrokerPartitionAssignment{
{
Partition: bpa.Partition,
},
},
IsLeader: true,
IsDraining: !isAdd,
})
if doCreateErr != nil {
if !isAdd {
return fmt.Errorf("drain topic %s %v on %s: %v", t, bpa.LeaderBroker, bpa.Partition, doCreateErr)
} else {
return fmt.Errorf("create topic %s %v on %s: %v", t, bpa.LeaderBroker, bpa.Partition, doCreateErr)
}
}
brokerStats, found := b.Balancer.Brokers.Get(bpa.LeaderBroker)
if !found {
brokerStats = pub_balancer.NewBrokerStats()
if !b.Balancer.Brokers.SetIfAbsent(bpa.LeaderBroker, brokerStats) {
brokerStats, _ = b.Balancer.Brokers.Get(bpa.LeaderBroker)
}
}
brokerStats.RegisterAssignment(t, bpa.Partition, isAdd)
return nil
}); doCreateErr != nil {
glog.Errorf("create topic %s partition %+v on %s: %v", t, bpa.Partition, bpa.LeaderBroker, doCreateErr)
}
}(bpa)
}
wg.Wait()
return nil
}

46
weed/mq/broker/broker_grpc_configure.go

@ -6,11 +6,9 @@ import (
"github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/mq/pub_balancer" "github.com/seaweedfs/seaweedfs/weed/mq/pub_balancer"
"github.com/seaweedfs/seaweedfs/weed/mq/topic" "github.com/seaweedfs/seaweedfs/weed/mq/topic"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"google.golang.org/grpc/codes" "google.golang.org/grpc/codes"
"google.golang.org/grpc/status" "google.golang.org/grpc/status"
"sync"
) )
// ConfigureTopic Runs on any broker, but proxied to the balancer if not the balancer // ConfigureTopic Runs on any broker, but proxied to the balancer if not the balancer
@ -62,47 +60,3 @@ func (b *MessageQueueBroker) ConfigureTopic(ctx context.Context, request *mq_pb.
return resp, err return resp, err
} }
func (b *MessageQueueBroker) assignTopicPartitionsToBrokers(ctx context.Context, t *mq_pb.Topic, assignments []*mq_pb.BrokerPartitionAssignment, isAdd bool) error {
// notify the brokers to create the topic partitions in parallel
var wg sync.WaitGroup
for _, bpa := range assignments {
wg.Add(1)
go func(bpa *mq_pb.BrokerPartitionAssignment) {
defer wg.Done()
if doCreateErr := b.withBrokerClient(false, pb.ServerAddress(bpa.LeaderBroker), func(client mq_pb.SeaweedMessagingClient) error {
_, doCreateErr := client.AssignTopicPartitions(ctx, &mq_pb.AssignTopicPartitionsRequest{
Topic: t,
BrokerPartitionAssignments: []*mq_pb.BrokerPartitionAssignment{
{
Partition: bpa.Partition,
},
},
IsLeader: true,
IsDraining: !isAdd,
})
if doCreateErr != nil {
if !isAdd {
return fmt.Errorf("drain topic %s %v on %s: %v", t, bpa.LeaderBroker, bpa.Partition, doCreateErr)
} else {
return fmt.Errorf("create topic %s %v on %s: %v", t, bpa.LeaderBroker, bpa.Partition, doCreateErr)
}
}
brokerStats, found := b.Balancer.Brokers.Get(bpa.LeaderBroker)
if !found {
brokerStats = pub_balancer.NewBrokerStats()
if !b.Balancer.Brokers.SetIfAbsent(bpa.LeaderBroker, brokerStats) {
brokerStats, _ = b.Balancer.Brokers.Get(bpa.LeaderBroker)
}
}
brokerStats.RegisterAssignment(t, bpa.Partition, isAdd)
return nil
}); doCreateErr != nil {
glog.Errorf("create topic %s partition %+v on %s: %v", t, bpa.Partition, bpa.LeaderBroker, doCreateErr)
}
}(bpa)
}
wg.Wait()
return nil
}
Loading…
Cancel
Save