diff --git a/weed/cluster/lock_client.go b/weed/cluster/lock_client.go index 57801735c..4ebb067df 100644 --- a/weed/cluster/lock_client.go +++ b/weed/cluster/lock_client.go @@ -122,10 +122,6 @@ func (lock *LiveLock) AttemptToLock(lockDuration time.Duration) error { return nil } -func (lock *LiveLock) IsLocked() bool { - return lock != nil && lock.isLocked -} - func (lock *LiveLock) StopShortLivedLock() error { if !lock.isLocked { return nil diff --git a/weed/mq/broker/broker_grpc_balance.go b/weed/mq/broker/broker_grpc_balance.go index 4d74cede4..412407211 100644 --- a/weed/mq/broker/broker_grpc_balance.go +++ b/weed/mq/broker/broker_grpc_balance.go @@ -4,14 +4,9 @@ import ( "context" "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" ) func (b *MessageQueueBroker) BalanceTopics(ctx context.Context, request *mq_pb.BalanceTopicsRequest) (resp *mq_pb.BalanceTopicsResponse, err error) { - if !b.lockAsBalancer.IsLocked() { - return nil, status.Errorf(codes.Unavailable, "no balancer") - } if !b.isLockOwner() { proxyErr := b.withBrokerClient(false, pb.ServerAddress(b.lockAsBalancer.LockOwner()), func(client mq_pb.SeaweedMessagingClient) error { resp, err = client.BalanceTopics(ctx, request) diff --git a/weed/mq/broker/broker_grpc_configure.go b/weed/mq/broker/broker_grpc_configure.go index d882eeea7..6a6e92922 100644 --- a/weed/mq/broker/broker_grpc_configure.go +++ b/weed/mq/broker/broker_grpc_configure.go @@ -16,10 +16,6 @@ import ( // It generates an assignments based on existing allocations, // and then assign the partitions to the brokers. func (b *MessageQueueBroker) ConfigureTopic(ctx context.Context, request *mq_pb.ConfigureTopicRequest) (resp *mq_pb.ConfigureTopicResponse, err error) { - if !b.lockAsBalancer.IsLocked() { - glog.V(0).Infof("broker %s found balancer:%s, %s isLocked:%v", b.option.BrokerAddress(), pb.ServerAddress(b.lockAsBalancer.LockOwner()), b.lockAsBalancer.LockOwner(), b.lockAsBalancer.IsLocked()) - return nil, status.Errorf(codes.Unavailable, "no balancer") - } if !b.isLockOwner() { proxyErr := b.withBrokerClient(false, pb.ServerAddress(b.lockAsBalancer.LockOwner()), func(client mq_pb.SeaweedMessagingClient) error { resp, err = client.ConfigureTopic(ctx, request) diff --git a/weed/mq/broker/broker_grpc_lookup.go b/weed/mq/broker/broker_grpc_lookup.go index 7881090c9..14c1f37da 100644 --- a/weed/mq/broker/broker_grpc_lookup.go +++ b/weed/mq/broker/broker_grpc_lookup.go @@ -7,16 +7,11 @@ import ( "github.com/seaweedfs/seaweedfs/weed/mq/topic" "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" ) // LookupTopicBrokers returns the brokers that are serving the topic func (b *MessageQueueBroker) LookupTopicBrokers(ctx context.Context, request *mq_pb.LookupTopicBrokersRequest) (resp *mq_pb.LookupTopicBrokersResponse, err error) { - if !b.lockAsBalancer.IsLocked() { - return nil, status.Errorf(codes.Unavailable, "no balancer") - } - if !b.lockAsBalancer.IsLocked() { + if !b.isLockOwner() { proxyErr := b.withBrokerClient(false, pb.ServerAddress(b.lockAsBalancer.LockOwner()), func(client mq_pb.SeaweedMessagingClient) error { resp, err = client.LookupTopicBrokers(ctx, request) return nil @@ -42,9 +37,6 @@ func (b *MessageQueueBroker) LookupTopicBrokers(ctx context.Context, request *mq } func (b *MessageQueueBroker) ListTopics(ctx context.Context, request *mq_pb.ListTopicsRequest) (resp *mq_pb.ListTopicsResponse, err error) { - if !b.lockAsBalancer.IsLocked() { - return nil, status.Errorf(codes.Unavailable, "no balancer") - } if !b.isLockOwner() { proxyErr := b.withBrokerClient(false, pb.ServerAddress(b.lockAsBalancer.LockOwner()), func(client mq_pb.SeaweedMessagingClient) error { resp, err = client.ListTopics(ctx, request) diff --git a/weed/mq/broker/broker_grpc_pub_balancer.go b/weed/mq/broker/broker_grpc_pub_balancer.go index 4edceb8a0..2cf90b4bc 100644 --- a/weed/mq/broker/broker_grpc_pub_balancer.go +++ b/weed/mq/broker/broker_grpc_pub_balancer.go @@ -9,7 +9,7 @@ import ( // PublisherToPubBalancer receives connections from brokers and collects stats func (b *MessageQueueBroker) PublisherToPubBalancer(stream mq_pb.SeaweedMessaging_PublisherToPubBalancerServer) error { - if !b.lockAsBalancer.IsLocked() { + if !b.isLockOwner() { return status.Errorf(codes.Unavailable, "not current broker balancer") } req, err := stream.Recv() @@ -35,7 +35,7 @@ func (b *MessageQueueBroker) PublisherToPubBalancer(stream mq_pb.SeaweedMessagin if err != nil { return err } - if !b.lockAsBalancer.IsLocked() { + if !b.isLockOwner() { return status.Errorf(codes.Unavailable, "not current broker balancer") } if receivedStats := req.GetStats(); receivedStats != nil { diff --git a/weed/mq/broker/broker_grpc_sub_coordinator.go b/weed/mq/broker/broker_grpc_sub_coordinator.go index 94bd6b0e2..89c221af5 100644 --- a/weed/mq/broker/broker_grpc_sub_coordinator.go +++ b/weed/mq/broker/broker_grpc_sub_coordinator.go @@ -12,7 +12,7 @@ import ( // SubscriberToSubCoordinator coordinates the subscribers func (b *MessageQueueBroker) SubscriberToSubCoordinator(stream mq_pb.SeaweedMessaging_SubscriberToSubCoordinatorServer) error { - if !b.lockAsBalancer.IsLocked() { + if !b.isLockOwner() { return status.Errorf(codes.Unavailable, "not current broker balancer") } req, err := stream.Recv() @@ -43,7 +43,7 @@ func (b *MessageQueueBroker) SubscriberToSubCoordinator(stream mq_pb.SeaweedMess for i, assignment := range conf.BrokerPartitionAssignments { assignedPartitions[i] = &mq_pb.SubscriberToSubCoordinatorResponse_AssignedPartition{ Partition: assignment.Partition, - Broker: assignment.LeaderBroker, + Broker: assignment.LeaderBroker, } } // send partition assignment to subscriber