diff --git a/weed/command/master.go b/weed/command/master.go index 6ef511742..908299c8a 100644 --- a/weed/command/master.go +++ b/weed/command/master.go @@ -206,11 +206,13 @@ func startMaster(masterOption MasterOptions, masterWhiteList []string) { if !*m.raftHashicorp { go func() { time.Sleep(timeSleep) - if ms.Topo.RaftServer.Leader() == "" && ms.Topo.RaftServer.IsLogEmpty() && isTheFirstOne(myMasterAddress, peers) { - if ms.MasterClient.FindLeaderFromOtherPeers(myMasterAddress) == "" { - raftServer.DoJoinCommand() - } + + ms.Topo.RaftServerAccessLock.RLock() + isEmptyMaster := ms.Topo.RaftServer.Leader() == "" && ms.Topo.RaftServer.IsLogEmpty() + if isEmptyMaster && isTheFirstOne(myMasterAddress, peers) && ms.MasterClient.FindLeaderFromOtherPeers(myMasterAddress) == "" { + raftServer.DoJoinCommand() } + ms.Topo.RaftServerAccessLock.RUnlock() }() } diff --git a/weed/server/master_grpc_server_raft.go b/weed/server/master_grpc_server_raft.go index 45a6c7a7e..7f8ad70df 100644 --- a/weed/server/master_grpc_server_raft.go +++ b/weed/server/master_grpc_server_raft.go @@ -3,7 +3,9 @@ package weed_server import ( "context" "fmt" + "github.com/hashicorp/raft" + "github.com/seaweedfs/seaweedfs/weed/cluster" "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" ) @@ -11,11 +13,14 @@ import ( func (ms *MasterServer) RaftListClusterServers(ctx context.Context, req *master_pb.RaftListClusterServersRequest) (*master_pb.RaftListClusterServersResponse, error) { resp := &master_pb.RaftListClusterServersResponse{} + ms.Topo.RaftServerAccessLock.RLock() if ms.Topo.HashicorpRaft == nil { + ms.Topo.RaftServerAccessLock.RUnlock() return resp, nil } servers := ms.Topo.HashicorpRaft.GetConfiguration().Configuration().Servers + ms.Topo.RaftServerAccessLock.RUnlock() for _, server := range servers { resp.ClusterServers = append(resp.ClusterServers, &master_pb.RaftListClusterServersResponse_ClusterServers{ @@ -30,6 +35,9 @@ func (ms *MasterServer) RaftListClusterServers(ctx context.Context, req *master_ func (ms *MasterServer) RaftAddServer(ctx context.Context, req *master_pb.RaftAddServerRequest) (*master_pb.RaftAddServerResponse, error) { resp := &master_pb.RaftAddServerResponse{} + ms.Topo.RaftServerAccessLock.RLock() + defer ms.Topo.RaftServerAccessLock.RUnlock() + if ms.Topo.HashicorpRaft == nil { return resp, nil } @@ -54,6 +62,9 @@ func (ms *MasterServer) RaftAddServer(ctx context.Context, req *master_pb.RaftAd func (ms *MasterServer) RaftRemoveServer(ctx context.Context, req *master_pb.RaftRemoveServerRequest) (*master_pb.RaftRemoveServerResponse, error) { resp := &master_pb.RaftRemoveServerResponse{} + ms.Topo.RaftServerAccessLock.RLock() + defer ms.Topo.RaftServerAccessLock.RUnlock() + if ms.Topo.HashicorpRaft == nil { return resp, nil } diff --git a/weed/server/master_server.go b/weed/server/master_server.go index feee59455..ecbfd64af 100644 --- a/weed/server/master_server.go +++ b/weed/server/master_server.go @@ -166,6 +166,8 @@ func NewMasterServer(r *mux.Router, option *MasterOption, peers map[string]pb.Se func (ms *MasterServer) SetRaftServer(raftServer *RaftServer) { var raftServerName string + + ms.Topo.RaftServerAccessLock.Lock() if raftServer.raftServer != nil { ms.Topo.RaftServer = raftServer.raftServer ms.Topo.RaftServer.AddEventListener(raft.LeaderChangeEventType, func(e raft.Event) { @@ -193,14 +195,18 @@ func (ms *MasterServer) SetRaftServer(raftServer *RaftServer) { }() raftServerName = ms.Topo.HashicorpRaft.String() } + ms.Topo.RaftServerAccessLock.Unlock() + if ms.Topo.IsLeader() { glog.V(0).Infoln("[", raftServerName, "]", "I am the leader!") } else { + ms.Topo.RaftServerAccessLock.RLock() if ms.Topo.RaftServer != nil && ms.Topo.RaftServer.Leader() != "" { glog.V(0).Infoln("[", ms.Topo.RaftServer.Name(), "]", ms.Topo.RaftServer.Leader(), "is the leader.") } else if ms.Topo.HashicorpRaft != nil && ms.Topo.HashicorpRaft.Leader() != "" { glog.V(0).Infoln("[", ms.Topo.HashicorpRaft.String(), "]", ms.Topo.HashicorpRaft.Leader(), "is the leader.") } + ms.Topo.RaftServerAccessLock.RUnlock() } } @@ -210,16 +216,15 @@ func (ms *MasterServer) proxyToLeader(f http.HandlerFunc) http.HandlerFunc { f(w, r) return } - var raftServerLeader string - if ms.Topo.RaftServer != nil && ms.Topo.RaftServer.Leader() != "" { - raftServerLeader = ms.Topo.RaftServer.Leader() - } else if ms.Topo.HashicorpRaft != nil && ms.Topo.HashicorpRaft.Leader() != "" { - raftServerLeader = string(ms.Topo.HashicorpRaft.Leader()) - } + + // get the current raft leader + leaderAddr, _ := ms.Topo.MaybeLeader() + raftServerLeader := string(leaderAddr) if raftServerLeader == "" { f(w, r) return } + ms.boundedLeaderChan <- 1 defer func() { <-ms.boundedLeaderChan }() targetUrl, err := url.Parse("http://" + raftServerLeader) @@ -228,6 +233,8 @@ func (ms *MasterServer) proxyToLeader(f http.HandlerFunc) http.HandlerFunc { fmt.Errorf("Leader URL http://%s Parse Error: %v", raftServerLeader, err)) return } + + // proxy to leader glog.V(4).Infoln("proxying to leader", raftServerLeader) proxy := httputil.NewSingleHostReverseProxy(targetUrl) director := proxy.Director @@ -336,6 +343,9 @@ func (ms *MasterServer) createSequencer(option *MasterOption) sequence.Sequencer } func (ms *MasterServer) OnPeerUpdate(update *master_pb.ClusterNodeUpdate, startFrom time.Time) { + ms.Topo.RaftServerAccessLock.RLock() + defer ms.Topo.RaftServerAccessLock.RUnlock() + if update.NodeType != cluster.MasterType || ms.Topo.HashicorpRaft == nil { return } diff --git a/weed/server/master_server_handlers_ui.go b/weed/server/master_server_handlers_ui.go index e377a0e19..2c6fa3474 100644 --- a/weed/server/master_server_handlers_ui.go +++ b/weed/server/master_server_handlers_ui.go @@ -16,6 +16,10 @@ func (ms *MasterServer) uiStatusHandler(w http.ResponseWriter, r *http.Request) infos := make(map[string]interface{}) infos["Up Time"] = time.Now().Sub(startTime).String() infos["Max Volume Id"] = ms.Topo.GetMaxVolumeId() + + ms.Topo.RaftServerAccessLock.RLock() + defer ms.Topo.RaftServerAccessLock.RUnlock() + if ms.Topo.RaftServer != nil { args := struct { Version string diff --git a/weed/topology/topology.go b/weed/topology/topology.go index 546642841..35224d280 100644 --- a/weed/topology/topology.go +++ b/weed/topology/topology.go @@ -43,10 +43,11 @@ type Topology struct { Configuration *Configuration - RaftServer raft.Server - HashicorpRaft *hashicorpRaft.Raft - UuidAccessLock sync.RWMutex - UuidMap map[string][]string + RaftServer raft.Server + RaftServerAccessLock sync.RWMutex + HashicorpRaft *hashicorpRaft.Raft + UuidAccessLock sync.RWMutex + UuidMap map[string][]string } func NewTopology(id string, seq sequence.Sequencer, volumeSizeLimit uint64, pulse int, replicationAsMin bool) *Topology { @@ -73,6 +74,9 @@ func NewTopology(id string, seq sequence.Sequencer, volumeSizeLimit uint64, puls } func (t *Topology) IsLeader() bool { + t.RaftServerAccessLock.RLock() + defer t.RaftServerAccessLock.RUnlock() + if t.RaftServer != nil { if t.RaftServer.State() == raft.Leader { return true @@ -90,23 +94,35 @@ func (t *Topology) IsLeader() bool { return false } -func (t *Topology) Leader() (pb.ServerAddress, error) { - var l pb.ServerAddress +func (t *Topology) Leader() (l pb.ServerAddress, err error) { for count := 0; count < 3; count++ { - if t.RaftServer != nil { - l = pb.ServerAddress(t.RaftServer.Leader()) - } else if t.HashicorpRaft != nil { - l = pb.ServerAddress(t.HashicorpRaft.Leader()) - } else { - return "", errors.New("Raft Server not ready yet!") + l, err = t.MaybeLeader() + if err != nil { + return } if l != "" { break - } else { - time.Sleep(time.Duration(5+count) * time.Second) } + + time.Sleep(time.Duration(5+count) * time.Second) + } + + return +} + +func (t *Topology) MaybeLeader() (l pb.ServerAddress, err error) { + t.RaftServerAccessLock.RLock() + defer t.RaftServerAccessLock.RUnlock() + + if t.RaftServer != nil { + l = pb.ServerAddress(t.RaftServer.Leader()) + } else if t.HashicorpRaft != nil { + l = pb.ServerAddress(t.HashicorpRaft.Leader()) + } else { + err = errors.New("Raft Server not ready yet!") } - return l, nil + + return } func (t *Topology) Lookup(collection string, vid needle.VolumeId) (dataNodes []*DataNode) { @@ -136,6 +152,10 @@ func (t *Topology) Lookup(collection string, vid needle.VolumeId) (dataNodes []* func (t *Topology) NextVolumeId() (needle.VolumeId, error) { vid := t.GetMaxVolumeId() next := vid.Next() + + t.RaftServerAccessLock.RLock() + defer t.RaftServerAccessLock.RUnlock() + if t.RaftServer != nil { if _, err := t.RaftServer.Do(NewMaxVolumeIdCommand(next)); err != nil { return 0, err