198 lines
4.9 KiB

5 years ago
5 years ago
5 years ago
3 years ago
3 years ago
11 years ago
3 years ago
3 years ago
  1. package weed_server
  2. import (
  3. "encoding/json"
  4. transport "github.com/Jille/raft-grpc-transport"
  5. "io"
  6. "math/rand"
  7. "os"
  8. "path"
  9. "time"
  10. "google.golang.org/grpc"
  11. "github.com/seaweedfs/seaweedfs/weed/pb"
  12. hashicorpRaft "github.com/hashicorp/raft"
  13. "github.com/seaweedfs/raft"
  14. "github.com/seaweedfs/seaweedfs/weed/glog"
  15. "github.com/seaweedfs/seaweedfs/weed/topology"
  16. )
  17. type RaftServerOption struct {
  18. GrpcDialOption grpc.DialOption
  19. Peers map[string]pb.ServerAddress
  20. ServerAddr pb.ServerAddress
  21. DataDir string
  22. Topo *topology.Topology
  23. RaftResumeState bool
  24. HeartbeatInterval time.Duration
  25. ElectionTimeout time.Duration
  26. RaftBootstrap bool
  27. }
  28. type RaftServer struct {
  29. peers map[string]pb.ServerAddress // initial peers to join with
  30. raftServer raft.Server
  31. RaftHashicorp *hashicorpRaft.Raft
  32. TransportManager *transport.Manager
  33. dataDir string
  34. serverAddr pb.ServerAddress
  35. topo *topology.Topology
  36. *raft.GrpcServer
  37. }
  38. type StateMachine struct {
  39. raft.StateMachine
  40. topo *topology.Topology
  41. }
  42. var _ hashicorpRaft.FSM = &StateMachine{}
  43. func (s StateMachine) Save() ([]byte, error) {
  44. state := topology.MaxVolumeIdCommand{
  45. MaxVolumeId: s.topo.GetMaxVolumeId(),
  46. }
  47. glog.V(1).Infof("Save raft state %+v", state)
  48. return json.Marshal(state)
  49. }
  50. func (s StateMachine) Recovery(data []byte) error {
  51. state := topology.MaxVolumeIdCommand{}
  52. err := json.Unmarshal(data, &state)
  53. if err != nil {
  54. return err
  55. }
  56. glog.V(1).Infof("Recovery raft state %+v", state)
  57. s.topo.UpAdjustMaxVolumeId(state.MaxVolumeId)
  58. return nil
  59. }
  60. func (s *StateMachine) Apply(l *hashicorpRaft.Log) interface{} {
  61. before := s.topo.GetMaxVolumeId()
  62. state := topology.MaxVolumeIdCommand{}
  63. err := json.Unmarshal(l.Data, &state)
  64. if err != nil {
  65. return err
  66. }
  67. s.topo.UpAdjustMaxVolumeId(state.MaxVolumeId)
  68. glog.V(1).Infoln("max volume id", before, "==>", s.topo.GetMaxVolumeId())
  69. return nil
  70. }
  71. func (s *StateMachine) Snapshot() (hashicorpRaft.FSMSnapshot, error) {
  72. return &topology.MaxVolumeIdCommand{
  73. MaxVolumeId: s.topo.GetMaxVolumeId(),
  74. }, nil
  75. }
  76. func (s *StateMachine) Restore(r io.ReadCloser) error {
  77. b, err := io.ReadAll(r)
  78. if err != nil {
  79. return err
  80. }
  81. if err := s.Recovery(b); err != nil {
  82. return err
  83. }
  84. return nil
  85. }
  86. func NewRaftServer(option *RaftServerOption) (*RaftServer, error) {
  87. s := &RaftServer{
  88. peers: option.Peers,
  89. serverAddr: option.ServerAddr,
  90. dataDir: option.DataDir,
  91. topo: option.Topo,
  92. }
  93. if glog.V(4) {
  94. raft.SetLogLevel(2)
  95. }
  96. raft.RegisterCommand(&topology.MaxVolumeIdCommand{})
  97. var err error
  98. transporter := raft.NewGrpcTransporter(option.GrpcDialOption)
  99. glog.V(0).Infof("Starting RaftServer with %v", option.ServerAddr)
  100. // always clear previous log to avoid server is promotable
  101. os.RemoveAll(path.Join(s.dataDir, "log"))
  102. if !option.RaftResumeState {
  103. // always clear previous metadata
  104. os.RemoveAll(path.Join(s.dataDir, "conf"))
  105. os.RemoveAll(path.Join(s.dataDir, "snapshot"))
  106. }
  107. if err := os.MkdirAll(path.Join(s.dataDir, "snapshot"), os.ModePerm); err != nil {
  108. return nil, err
  109. }
  110. stateMachine := StateMachine{topo: option.Topo}
  111. s.raftServer, err = raft.NewServer(string(s.serverAddr), s.dataDir, transporter, stateMachine, option.Topo, "")
  112. if err != nil {
  113. glog.V(0).Infoln(err)
  114. return nil, err
  115. }
  116. heartbeatInterval := time.Duration(float64(option.HeartbeatInterval) * (rand.Float64()*0.25 + 1))
  117. s.raftServer.SetHeartbeatInterval(heartbeatInterval)
  118. s.raftServer.SetElectionTimeout(option.ElectionTimeout)
  119. if err := s.raftServer.LoadSnapshot(); err != nil {
  120. return nil, err
  121. }
  122. if err := s.raftServer.Start(); err != nil {
  123. return nil, err
  124. }
  125. for name, peer := range s.peers {
  126. if err := s.raftServer.AddPeer(name, peer.ToGrpcAddress()); err != nil {
  127. return nil, err
  128. }
  129. }
  130. // Remove deleted peers
  131. for existsPeerName := range s.raftServer.Peers() {
  132. if existingPeer, found := s.peers[existsPeerName]; !found {
  133. if err := s.raftServer.RemovePeer(existsPeerName); err != nil {
  134. glog.V(0).Infoln(err)
  135. return nil, err
  136. } else {
  137. glog.V(0).Infof("removing old peer: %s", existingPeer)
  138. }
  139. }
  140. }
  141. s.GrpcServer = raft.NewGrpcServer(s.raftServer)
  142. glog.V(0).Infof("current cluster leader: %v", s.raftServer.Leader())
  143. return s, nil
  144. }
  145. func (s *RaftServer) Peers() (members []string) {
  146. if s.raftServer != nil {
  147. peers := s.raftServer.Peers()
  148. for _, p := range peers {
  149. members = append(members, p.Name)
  150. }
  151. } else if s.RaftHashicorp != nil {
  152. cfg := s.RaftHashicorp.GetConfiguration()
  153. for _, p := range cfg.Configuration().Servers {
  154. members = append(members, string(p.ID))
  155. }
  156. }
  157. return
  158. }
  159. func (s *RaftServer) DoJoinCommand() {
  160. glog.V(0).Infoln("Initializing new cluster")
  161. if _, err := s.raftServer.Do(&raft.DefaultJoinCommand{
  162. Name: s.raftServer.Name(),
  163. ConnectionString: s.serverAddr.ToGrpcAddress(),
  164. }); err != nil {
  165. glog.Errorf("fail to send join command: %v", err)
  166. }
  167. }