diff --git a/weed/pb/server_address.go b/weed/pb/server_address.go index e1f5da74d..132330568 100644 --- a/weed/pb/server_address.go +++ b/weed/pb/server_address.go @@ -62,6 +62,13 @@ func (sa ServerAddress) ToHttpAddress() string { return string(sa) } +func (sa ServerAddress) Equals(other ServerAddress) bool { + if sa == other { + return true + } + return sa.ToHttpAddress() == other.ToHttpAddress() +} + func (sa ServerAddress) ToGrpcAddress() string { portsSepIndex := strings.LastIndex(string(sa), ":") if portsSepIndex < 0 { diff --git a/weed/server/master_grpc_server.go b/weed/server/master_grpc_server.go index d33782ae1..395eb4ebd 100644 --- a/weed/server/master_grpc_server.go +++ b/weed/server/master_grpc_server.go @@ -110,10 +110,10 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ if !ms.Topo.IsLeader() { // tell the volume servers about the leader - newLeader, err := ms.Topo.Leader() - if err != nil { + newLeader, err := ms.Topo.MaybeLeader() + if err != nil || newLeader == "" { glog.Warningf("SendHeartbeat find leader: %v", err) - return err + return raft.NotLeaderError } if err := stream.Send(&master_pb.HeartbeatResponse{ Leader: string(newLeader), diff --git a/weed/topology/topology.go b/weed/topology/topology.go index db7cb17d7..d16db06dd 100644 --- a/weed/topology/topology.go +++ b/weed/topology/topology.go @@ -117,8 +117,10 @@ func (t *Topology) IsLeader() bool { if t.RaftServer.State() == raft.Leader { return true } - if leader, err := t.Leader(); err == nil { - if pb.ServerAddress(t.RaftServer.Name()) == leader { + // Directly check leader to avoid re-acquiring lock via MaybeLeader() + leader := pb.ServerAddress(t.RaftServer.Leader()) + if leader != "" { + if pb.ServerAddress(t.RaftServer.Name()).Equals(leader) { return true } } @@ -175,6 +177,16 @@ func (t *Topology) Leader() (l pb.ServerAddress, err error) { func() (l pb.ServerAddress, err error) { l, err = t.MaybeLeader() if err == nil && l == "" { + // Thread-safe check if we are the leader + t.RaftServerAccessLock.RLock() + if t.RaftServer != nil && t.RaftServer.State() == raft.Leader { + l = pb.ServerAddress(t.RaftServer.Name()) + } + t.RaftServerAccessLock.RUnlock() + + if l != "" { + return l, nil + } err = leaderNotSelected } return l, err