Browse Source

proxy to broker leader

pull/5637/head
Chris Lu 11 months ago
parent
commit
656b78d1af
  1. 4
      weed/cluster/lock_client.go
  2. 5
      weed/mq/broker/broker_grpc_balance.go
  3. 4
      weed/mq/broker/broker_grpc_configure.go
  4. 10
      weed/mq/broker/broker_grpc_lookup.go
  5. 4
      weed/mq/broker/broker_grpc_pub_balancer.go
  6. 4
      weed/mq/broker/broker_grpc_sub_coordinator.go

4
weed/cluster/lock_client.go

@ -122,10 +122,6 @@ func (lock *LiveLock) AttemptToLock(lockDuration time.Duration) error {
return nil return nil
} }
func (lock *LiveLock) IsLocked() bool {
return lock != nil && lock.isLocked
}
func (lock *LiveLock) StopShortLivedLock() error { func (lock *LiveLock) StopShortLivedLock() error {
if !lock.isLocked { if !lock.isLocked {
return nil return nil

5
weed/mq/broker/broker_grpc_balance.go

@ -4,14 +4,9 @@ import (
"context" "context"
"github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
) )
func (b *MessageQueueBroker) BalanceTopics(ctx context.Context, request *mq_pb.BalanceTopicsRequest) (resp *mq_pb.BalanceTopicsResponse, err error) { func (b *MessageQueueBroker) BalanceTopics(ctx context.Context, request *mq_pb.BalanceTopicsRequest) (resp *mq_pb.BalanceTopicsResponse, err error) {
if !b.lockAsBalancer.IsLocked() {
return nil, status.Errorf(codes.Unavailable, "no balancer")
}
if !b.isLockOwner() { if !b.isLockOwner() {
proxyErr := b.withBrokerClient(false, pb.ServerAddress(b.lockAsBalancer.LockOwner()), func(client mq_pb.SeaweedMessagingClient) error { proxyErr := b.withBrokerClient(false, pb.ServerAddress(b.lockAsBalancer.LockOwner()), func(client mq_pb.SeaweedMessagingClient) error {
resp, err = client.BalanceTopics(ctx, request) resp, err = client.BalanceTopics(ctx, request)

4
weed/mq/broker/broker_grpc_configure.go

@ -16,10 +16,6 @@ import (
// It generates an assignments based on existing allocations, // It generates an assignments based on existing allocations,
// and then assign the partitions to the brokers. // and then assign the partitions to the brokers.
func (b *MessageQueueBroker) ConfigureTopic(ctx context.Context, request *mq_pb.ConfigureTopicRequest) (resp *mq_pb.ConfigureTopicResponse, err error) { func (b *MessageQueueBroker) ConfigureTopic(ctx context.Context, request *mq_pb.ConfigureTopicRequest) (resp *mq_pb.ConfigureTopicResponse, err error) {
if !b.lockAsBalancer.IsLocked() {
glog.V(0).Infof("broker %s found balancer:%s, %s isLocked:%v", b.option.BrokerAddress(), pb.ServerAddress(b.lockAsBalancer.LockOwner()), b.lockAsBalancer.LockOwner(), b.lockAsBalancer.IsLocked())
return nil, status.Errorf(codes.Unavailable, "no balancer")
}
if !b.isLockOwner() { if !b.isLockOwner() {
proxyErr := b.withBrokerClient(false, pb.ServerAddress(b.lockAsBalancer.LockOwner()), func(client mq_pb.SeaweedMessagingClient) error { proxyErr := b.withBrokerClient(false, pb.ServerAddress(b.lockAsBalancer.LockOwner()), func(client mq_pb.SeaweedMessagingClient) error {
resp, err = client.ConfigureTopic(ctx, request) resp, err = client.ConfigureTopic(ctx, request)

10
weed/mq/broker/broker_grpc_lookup.go

@ -7,16 +7,11 @@ import (
"github.com/seaweedfs/seaweedfs/weed/mq/topic" "github.com/seaweedfs/seaweedfs/weed/mq/topic"
"github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
) )
// LookupTopicBrokers returns the brokers that are serving the topic // LookupTopicBrokers returns the brokers that are serving the topic
func (b *MessageQueueBroker) LookupTopicBrokers(ctx context.Context, request *mq_pb.LookupTopicBrokersRequest) (resp *mq_pb.LookupTopicBrokersResponse, err error) { func (b *MessageQueueBroker) LookupTopicBrokers(ctx context.Context, request *mq_pb.LookupTopicBrokersRequest) (resp *mq_pb.LookupTopicBrokersResponse, err error) {
if !b.lockAsBalancer.IsLocked() {
return nil, status.Errorf(codes.Unavailable, "no balancer")
}
if !b.lockAsBalancer.IsLocked() {
if !b.isLockOwner() {
proxyErr := b.withBrokerClient(false, pb.ServerAddress(b.lockAsBalancer.LockOwner()), func(client mq_pb.SeaweedMessagingClient) error { proxyErr := b.withBrokerClient(false, pb.ServerAddress(b.lockAsBalancer.LockOwner()), func(client mq_pb.SeaweedMessagingClient) error {
resp, err = client.LookupTopicBrokers(ctx, request) resp, err = client.LookupTopicBrokers(ctx, request)
return nil return nil
@ -42,9 +37,6 @@ func (b *MessageQueueBroker) LookupTopicBrokers(ctx context.Context, request *mq
} }
func (b *MessageQueueBroker) ListTopics(ctx context.Context, request *mq_pb.ListTopicsRequest) (resp *mq_pb.ListTopicsResponse, err error) { func (b *MessageQueueBroker) ListTopics(ctx context.Context, request *mq_pb.ListTopicsRequest) (resp *mq_pb.ListTopicsResponse, err error) {
if !b.lockAsBalancer.IsLocked() {
return nil, status.Errorf(codes.Unavailable, "no balancer")
}
if !b.isLockOwner() { if !b.isLockOwner() {
proxyErr := b.withBrokerClient(false, pb.ServerAddress(b.lockAsBalancer.LockOwner()), func(client mq_pb.SeaweedMessagingClient) error { proxyErr := b.withBrokerClient(false, pb.ServerAddress(b.lockAsBalancer.LockOwner()), func(client mq_pb.SeaweedMessagingClient) error {
resp, err = client.ListTopics(ctx, request) resp, err = client.ListTopics(ctx, request)

4
weed/mq/broker/broker_grpc_pub_balancer.go

@ -9,7 +9,7 @@ import (
// PublisherToPubBalancer receives connections from brokers and collects stats // PublisherToPubBalancer receives connections from brokers and collects stats
func (b *MessageQueueBroker) PublisherToPubBalancer(stream mq_pb.SeaweedMessaging_PublisherToPubBalancerServer) error { func (b *MessageQueueBroker) PublisherToPubBalancer(stream mq_pb.SeaweedMessaging_PublisherToPubBalancerServer) error {
if !b.lockAsBalancer.IsLocked() {
if !b.isLockOwner() {
return status.Errorf(codes.Unavailable, "not current broker balancer") return status.Errorf(codes.Unavailable, "not current broker balancer")
} }
req, err := stream.Recv() req, err := stream.Recv()
@ -35,7 +35,7 @@ func (b *MessageQueueBroker) PublisherToPubBalancer(stream mq_pb.SeaweedMessagin
if err != nil { if err != nil {
return err return err
} }
if !b.lockAsBalancer.IsLocked() {
if !b.isLockOwner() {
return status.Errorf(codes.Unavailable, "not current broker balancer") return status.Errorf(codes.Unavailable, "not current broker balancer")
} }
if receivedStats := req.GetStats(); receivedStats != nil { if receivedStats := req.GetStats(); receivedStats != nil {

4
weed/mq/broker/broker_grpc_sub_coordinator.go

@ -12,7 +12,7 @@ import (
// SubscriberToSubCoordinator coordinates the subscribers // SubscriberToSubCoordinator coordinates the subscribers
func (b *MessageQueueBroker) SubscriberToSubCoordinator(stream mq_pb.SeaweedMessaging_SubscriberToSubCoordinatorServer) error { func (b *MessageQueueBroker) SubscriberToSubCoordinator(stream mq_pb.SeaweedMessaging_SubscriberToSubCoordinatorServer) error {
if !b.lockAsBalancer.IsLocked() {
if !b.isLockOwner() {
return status.Errorf(codes.Unavailable, "not current broker balancer") return status.Errorf(codes.Unavailable, "not current broker balancer")
} }
req, err := stream.Recv() req, err := stream.Recv()
@ -43,7 +43,7 @@ func (b *MessageQueueBroker) SubscriberToSubCoordinator(stream mq_pb.SeaweedMess
for i, assignment := range conf.BrokerPartitionAssignments { for i, assignment := range conf.BrokerPartitionAssignments {
assignedPartitions[i] = &mq_pb.SubscriberToSubCoordinatorResponse_AssignedPartition{ assignedPartitions[i] = &mq_pb.SubscriberToSubCoordinatorResponse_AssignedPartition{
Partition: assignment.Partition, Partition: assignment.Partition,
Broker: assignment.LeaderBroker,
Broker: assignment.LeaderBroker,
} }
} }
// send partition assignment to subscriber // send partition assignment to subscriber

Loading…
Cancel
Save