diff --git a/weed/server/master_server.go b/weed/server/master_server.go index 65fa622e7..44a1664c0 100644 --- a/weed/server/master_server.go +++ b/weed/server/master_server.go @@ -186,22 +186,7 @@ func (ms *MasterServer) SetRaftServer(raftServer *RaftServer) { raftServerName = fmt.Sprintf("[%s]", ms.Topo.RaftServer.Name()) } else if raftServer.RaftHashicorp != nil { ms.Topo.HashicorpRaft = raftServer.RaftHashicorp - leaderCh := raftServer.RaftHashicorp.LeaderCh() - prevLeader, _ := ms.Topo.HashicorpRaft.LeaderWithID() raftServerName = ms.Topo.HashicorpRaft.String() - go func() { - for { - select { - case isLeader := <-leaderCh: - ms.Topo.RaftServerAccessLock.RLock() - leader, _ := ms.Topo.HashicorpRaft.LeaderWithID() - ms.Topo.RaftServerAccessLock.RUnlock() - glog.V(0).Infof("is leader %+v change event: %+v => %+v", isLeader, prevLeader, leader) - stats.MasterLeaderChangeCounter.WithLabelValues(fmt.Sprintf("%+v", leader)).Inc() - prevLeader = leader - } - } - }() } ms.Topo.RaftServerAccessLock.Unlock() diff --git a/weed/server/raft_hashicorp.go b/weed/server/raft_hashicorp.go index d06066b93..299df323a 100644 --- a/weed/server/raft_hashicorp.go +++ b/weed/server/raft_hashicorp.go @@ -5,6 +5,14 @@ package weed_server import ( "fmt" + "math/rand" + "os" + "path" + "path/filepath" + "sort" + "strings" + "time" + transport "github.com/Jille/raft-grpc-transport" "github.com/armon/go-metrics" "github.com/armon/go-metrics/prometheus" @@ -14,13 +22,6 @@ import ( "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/stats" "google.golang.org/grpc" - "math/rand" - "os" - "path" - "path/filepath" - "sort" - "strings" - "time" ) const ( @@ -56,46 +57,61 @@ func (s *RaftServer) AddPeersConfiguration() (cfg raft.Configuration) { return cfg } -func (s *RaftServer) UpdatePeers() { +func (s *RaftServer) monitorLeaderLoop(updatePeers bool) { for { + prevLeader, _ := s.RaftHashicorp.LeaderWithID() select { case isLeader := <-s.RaftHashicorp.LeaderCh(): + leader, _ := s.RaftHashicorp.LeaderWithID() if isLeader { - peerLeader := string(s.serverAddr) - existsPeerName := make(map[string]bool) - for _, server := range s.RaftHashicorp.GetConfiguration().Configuration().Servers { - if string(server.ID) == peerLeader { - continue - } - existsPeerName[string(server.ID)] = true - } - for _, peer := range s.peers { - peerName := string(peer) - if peerName == peerLeader || existsPeerName[peerName] { - continue - } - glog.V(0).Infof("adding new peer: %s", peerName) - s.RaftHashicorp.AddVoter( - raft.ServerID(peerName), raft.ServerAddress(peer.ToGrpcAddress()), 0, 0) - } - for peer := range existsPeerName { - if _, found := s.peers[peer]; !found { - glog.V(0).Infof("removing old peer: %s", peer) - s.RaftHashicorp.RemoveServer(raft.ServerID(peer), 0, 0) - } - } - if _, found := s.peers[peerLeader]; !found { - glog.V(0).Infof("removing old leader peer: %s", peerLeader) - s.RaftHashicorp.RemoveServer(raft.ServerID(peerLeader), 0, 0) + + if updatePeers { + s.updatePeers() + updatePeers = false } + + s.topo.DoBarrier() + + stats.MasterLeaderChangeCounter.WithLabelValues(fmt.Sprintf("%+v", leader)).Inc() + } else { + s.topo.BarrierReset() } - return - case <-time.After(updatePeersTimeout): - return + glog.V(0).Infof("is leader %+v change event: %+v => %+v", isLeader, prevLeader, leader) + prevLeader = leader } } } +func (s *RaftServer) updatePeers() { + peerLeader := string(s.serverAddr) + existsPeerName := make(map[string]bool) + for _, server := range s.RaftHashicorp.GetConfiguration().Configuration().Servers { + if string(server.ID) == peerLeader { + continue + } + existsPeerName[string(server.ID)] = true + } + for _, peer := range s.peers { + peerName := string(peer) + if peerName == peerLeader || existsPeerName[peerName] { + continue + } + glog.V(0).Infof("adding new peer: %s", peerName) + s.RaftHashicorp.AddVoter( + raft.ServerID(peerName), raft.ServerAddress(peer.ToGrpcAddress()), 0, 0) + } + for peer := range existsPeerName { + if _, found := s.peers[peer]; !found { + glog.V(0).Infof("removing old peer: %s", peer) + s.RaftHashicorp.RemoveServer(raft.ServerID(peer), 0, 0) + } + } + if _, found := s.peers[peerLeader]; !found { + glog.V(0).Infof("removing old leader peer: %s", peerLeader) + s.RaftHashicorp.RemoveServer(raft.ServerID(peerLeader), 0, 0) + } +} + func NewHashicorpRaftServer(option *RaftServerOption) (*RaftServer, error) { s := &RaftServer{ peers: option.Peers, @@ -157,6 +173,8 @@ func NewHashicorpRaftServer(option *RaftServerOption) (*RaftServer, error) { if err != nil { return nil, fmt.Errorf("raft.NewRaft: %v", err) } + + updatePeers := false if option.RaftBootstrap || len(s.RaftHashicorp.GetConfiguration().Configuration().Servers) == 0 { cfg := s.AddPeersConfiguration() // Need to get lock, in case all servers do this at the same time. @@ -169,9 +187,11 @@ func NewHashicorpRaftServer(option *RaftServerOption) (*RaftServer, error) { return nil, fmt.Errorf("raft.Raft.BootstrapCluster: %v", err) } } else { - go s.UpdatePeers() + updatePeers = true } + go s.monitorLeaderLoop(updatePeers) + ticker := time.NewTicker(c.HeartbeatTimeout * 10) if glog.V(4) { go func() { diff --git a/weed/server/raft_server.go b/weed/server/raft_server.go index d718ecac7..4bcd808c2 100644 --- a/weed/server/raft_server.go +++ b/weed/server/raft_server.go @@ -2,13 +2,14 @@ package weed_server import ( "encoding/json" - transport "github.com/Jille/raft-grpc-transport" "io" "math/rand" "os" "path" "time" + transport "github.com/Jille/raft-grpc-transport" + "google.golang.org/grpc" "github.com/seaweedfs/seaweedfs/weed/pb" diff --git a/weed/topology/topology.go b/weed/topology/topology.go index 01822cbf2..44566e361 100644 --- a/weed/topology/topology.go +++ b/weed/topology/topology.go @@ -50,8 +50,11 @@ type Topology struct { RaftServer raft.Server RaftServerAccessLock sync.RWMutex HashicorpRaft *hashicorpRaft.Raft - UuidAccessLock sync.RWMutex - UuidMap map[string][]string + barrierLock sync.Mutex + barrierDone bool + + UuidAccessLock sync.RWMutex + UuidMap map[string][]string } func NewTopology(id string, seq sequence.Sequencer, volumeSizeLimit uint64, pulse int, replicationAsMin bool) *Topology { @@ -120,6 +123,42 @@ func (t *Topology) IsLeader() bool { return false } +func (t *Topology) IsLeaderAndCanRead() bool { + if t.RaftServer != nil { + return t.IsLeader() + } else if t.HashicorpRaft != nil { + return t.IsLeader() && t.DoBarrier() + } else { + return false + } +} + +func (t *Topology) DoBarrier() bool { + t.barrierLock.Lock() + defer t.barrierLock.Unlock() + if t.barrierDone { + return true + } + + glog.V(0).Infof("raft do barrier") + barrier := t.HashicorpRaft.Barrier(2 * time.Minute) + if err := barrier.Error(); err != nil { + glog.Errorf("failed to wait for barrier, error %s", err) + return false + + } + + t.barrierDone = true + glog.V(0).Infof("raft do barrier success") + return true +} + +func (t *Topology) BarrierReset() { + t.barrierLock.Lock() + defer t.barrierLock.Unlock() + t.barrierDone = false +} + func (t *Topology) Leader() (l pb.ServerAddress, err error) { exponentialBackoff := backoff.NewExponentialBackOff() exponentialBackoff.InitialInterval = 100 * time.Millisecond @@ -180,6 +219,10 @@ func (t *Topology) Lookup(collection string, vid needle.VolumeId) (dataNodes []* } func (t *Topology) NextVolumeId() (needle.VolumeId, error) { + if !t.IsLeaderAndCanRead() { + return 0, fmt.Errorf("as leader can not read yet") + + } vid := t.GetMaxVolumeId() next := vid.Next()