You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

180 lines
4.3 KiB

5 years ago
5 years ago
11 years ago
6 years ago
  1. package weed_server
  2. import (
  3. "encoding/json"
  4. "math/rand"
  5. "os"
  6. "path"
  7. "sort"
  8. "strings"
  9. "time"
  10. "google.golang.org/grpc"
  11. "github.com/chrislusf/seaweedfs/weed/pb"
  12. "github.com/chrislusf/raft"
  13. "github.com/chrislusf/seaweedfs/weed/glog"
  14. "github.com/chrislusf/seaweedfs/weed/topology"
  15. )
  16. type RaftServerOption struct {
  17. GrpcDialOption grpc.DialOption
  18. Peers []pb.ServerAddress
  19. ServerAddr pb.ServerAddress
  20. DataDir string
  21. Topo *topology.Topology
  22. RaftResumeState bool
  23. HeartbeatInterval time.Duration
  24. ElectionTimeout time.Duration
  25. }
  26. type RaftServer struct {
  27. peers []pb.ServerAddress // initial peers to join with
  28. raftServer raft.Server
  29. dataDir string
  30. serverAddr pb.ServerAddress
  31. topo *topology.Topology
  32. *raft.GrpcServer
  33. }
  34. type StateMachine struct {
  35. raft.StateMachine
  36. topo *topology.Topology
  37. }
  38. func (s StateMachine) Save() ([]byte, error) {
  39. state := topology.MaxVolumeIdCommand{
  40. MaxVolumeId: s.topo.GetMaxVolumeId(),
  41. }
  42. glog.V(1).Infof("Save raft state %+v", state)
  43. return json.Marshal(state)
  44. }
  45. func (s StateMachine) Recovery(data []byte) error {
  46. state := topology.MaxVolumeIdCommand{}
  47. err := json.Unmarshal(data, &state)
  48. if err != nil {
  49. return err
  50. }
  51. glog.V(1).Infof("Recovery raft state %+v", state)
  52. s.topo.UpAdjustMaxVolumeId(state.MaxVolumeId)
  53. return nil
  54. }
  55. func NewRaftServer(option *RaftServerOption) (*RaftServer, error) {
  56. s := &RaftServer{
  57. peers: option.Peers,
  58. serverAddr: option.ServerAddr,
  59. dataDir: option.DataDir,
  60. topo: option.Topo,
  61. }
  62. if glog.V(4) {
  63. raft.SetLogLevel(2)
  64. }
  65. raft.RegisterCommand(&topology.MaxVolumeIdCommand{})
  66. var err error
  67. transporter := raft.NewGrpcTransporter(option.GrpcDialOption)
  68. glog.V(0).Infof("Starting RaftServer with %v", option.ServerAddr)
  69. // always clear previous log to avoid server is promotable
  70. os.RemoveAll(path.Join(s.dataDir, "log"))
  71. if !option.RaftResumeState {
  72. // always clear previous metadata
  73. os.RemoveAll(path.Join(s.dataDir, "conf"))
  74. os.RemoveAll(path.Join(s.dataDir, "snapshot"))
  75. }
  76. if err := os.MkdirAll(path.Join(s.dataDir, "snapshot"), 0600); err != nil {
  77. return nil, err
  78. }
  79. stateMachine := StateMachine{topo: option.Topo}
  80. s.raftServer, err = raft.NewServer(string(s.serverAddr), s.dataDir, transporter, stateMachine, option.Topo, "")
  81. if err != nil {
  82. glog.V(0).Infoln(err)
  83. return nil, err
  84. }
  85. heartbeatInterval := time.Duration(float64(option.HeartbeatInterval) * (rand.Float64()*0.25 + 1))
  86. s.raftServer.SetHeartbeatInterval(heartbeatInterval)
  87. s.raftServer.SetElectionTimeout(option.ElectionTimeout)
  88. if err := s.raftServer.LoadSnapshot(); err != nil {
  89. return nil, err
  90. }
  91. if err := s.raftServer.Start(); err != nil {
  92. return nil, err
  93. }
  94. for _, peer := range s.peers {
  95. if err := s.raftServer.AddPeer(string(peer), peer.ToGrpcAddress()); err != nil {
  96. return nil, err
  97. }
  98. }
  99. // Remove deleted peers
  100. for existsPeerName := range s.raftServer.Peers() {
  101. exists := false
  102. var existingPeer pb.ServerAddress
  103. for _, peer := range s.peers {
  104. if peer.String() == existsPeerName {
  105. exists, existingPeer = true, peer
  106. break
  107. }
  108. }
  109. if !exists {
  110. if err := s.raftServer.RemovePeer(existsPeerName); err != nil {
  111. glog.V(0).Infoln(err)
  112. return nil, err
  113. } else {
  114. glog.V(0).Infof("removing old peer: %s", existingPeer)
  115. }
  116. }
  117. }
  118. s.GrpcServer = raft.NewGrpcServer(s.raftServer)
  119. if s.raftServer.IsLogEmpty() && isTheFirstOne(option.ServerAddr, s.peers) {
  120. // Initialize the server by joining itself.
  121. // s.DoJoinCommand()
  122. }
  123. glog.V(0).Infof("current cluster leader: %v", s.raftServer.Leader())
  124. return s, nil
  125. }
  126. func (s *RaftServer) Peers() (members []string) {
  127. peers := s.raftServer.Peers()
  128. for _, p := range peers {
  129. members = append(members, p.Name)
  130. }
  131. return
  132. }
  133. func isTheFirstOne(self pb.ServerAddress, peers []pb.ServerAddress) bool {
  134. sort.Slice(peers, func(i, j int) bool {
  135. return strings.Compare(string(peers[i]), string(peers[j])) < 0
  136. })
  137. if len(peers) <= 0 {
  138. return true
  139. }
  140. return self == peers[0]
  141. }
  142. func (s *RaftServer) DoJoinCommand() {
  143. glog.V(0).Infoln("Initializing new cluster")
  144. if _, err := s.raftServer.Do(&raft.DefaultJoinCommand{
  145. Name: s.raftServer.Name(),
  146. ConnectionString: s.serverAddr.ToGrpcAddress(),
  147. }); err != nil {
  148. glog.Errorf("fail to send join command: %v", err)
  149. }
  150. }