You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

200 lines
5.0 KiB

2 years ago
5 years ago
5 years ago
5 years ago
3 years ago
2 years ago
3 years ago
11 years ago
3 years ago
3 years ago
2 years ago
  1. package weed_server
  2. import (
  3. "encoding/json"
  4. transport "github.com/Jille/raft-grpc-transport"
  5. "github.com/seaweedfs/raft/protobuf"
  6. "io"
  7. "math/rand"
  8. "os"
  9. "path"
  10. "time"
  11. "google.golang.org/grpc"
  12. "github.com/seaweedfs/seaweedfs/weed/pb"
  13. hashicorpRaft "github.com/hashicorp/raft"
  14. "github.com/seaweedfs/raft"
  15. "github.com/seaweedfs/seaweedfs/weed/glog"
  16. "github.com/seaweedfs/seaweedfs/weed/topology"
  17. )
  18. type RaftServerOption struct {
  19. GrpcDialOption grpc.DialOption
  20. Peers map[string]pb.ServerAddress
  21. ServerAddr pb.ServerAddress
  22. DataDir string
  23. Topo *topology.Topology
  24. RaftResumeState bool
  25. HeartbeatInterval time.Duration
  26. ElectionTimeout time.Duration
  27. RaftBootstrap bool
  28. }
  29. type RaftServer struct {
  30. protobuf.UnimplementedRaftServer
  31. raftGrpcServer *raft.GrpcServer
  32. peers map[string]pb.ServerAddress // initial peers to join with
  33. raftServer raft.Server
  34. RaftHashicorp *hashicorpRaft.Raft
  35. TransportManager *transport.Manager
  36. dataDir string
  37. serverAddr pb.ServerAddress
  38. topo *topology.Topology
  39. }
  40. type StateMachine struct {
  41. raft.StateMachine
  42. topo *topology.Topology
  43. }
  44. var _ hashicorpRaft.FSM = &StateMachine{}
  45. func (s StateMachine) Save() ([]byte, error) {
  46. state := topology.MaxVolumeIdCommand{
  47. MaxVolumeId: s.topo.GetMaxVolumeId(),
  48. }
  49. glog.V(1).Infof("Save raft state %+v", state)
  50. return json.Marshal(state)
  51. }
  52. func (s StateMachine) Recovery(data []byte) error {
  53. state := topology.MaxVolumeIdCommand{}
  54. err := json.Unmarshal(data, &state)
  55. if err != nil {
  56. return err
  57. }
  58. glog.V(1).Infof("Recovery raft state %+v", state)
  59. s.topo.UpAdjustMaxVolumeId(state.MaxVolumeId)
  60. return nil
  61. }
  62. func (s *StateMachine) Apply(l *hashicorpRaft.Log) interface{} {
  63. before := s.topo.GetMaxVolumeId()
  64. state := topology.MaxVolumeIdCommand{}
  65. err := json.Unmarshal(l.Data, &state)
  66. if err != nil {
  67. return err
  68. }
  69. s.topo.UpAdjustMaxVolumeId(state.MaxVolumeId)
  70. glog.V(1).Infoln("max volume id", before, "==>", s.topo.GetMaxVolumeId())
  71. return nil
  72. }
  73. func (s *StateMachine) Snapshot() (hashicorpRaft.FSMSnapshot, error) {
  74. return &topology.MaxVolumeIdCommand{
  75. MaxVolumeId: s.topo.GetMaxVolumeId(),
  76. }, nil
  77. }
  78. func (s *StateMachine) Restore(r io.ReadCloser) error {
  79. b, err := io.ReadAll(r)
  80. if err != nil {
  81. return err
  82. }
  83. if err := s.Recovery(b); err != nil {
  84. return err
  85. }
  86. return nil
  87. }
  88. func NewRaftServer(option *RaftServerOption) (*RaftServer, error) {
  89. s := &RaftServer{
  90. peers: option.Peers,
  91. serverAddr: option.ServerAddr,
  92. dataDir: option.DataDir,
  93. topo: option.Topo,
  94. }
  95. if glog.V(4) {
  96. raft.SetLogLevel(2)
  97. }
  98. raft.RegisterCommand(&topology.MaxVolumeIdCommand{})
  99. var err error
  100. transporter := raft.NewGrpcTransporter(option.GrpcDialOption)
  101. glog.V(0).Infof("Starting RaftServer with %v", option.ServerAddr)
  102. // always clear previous log to avoid server is promotable
  103. os.RemoveAll(path.Join(s.dataDir, "log"))
  104. if !option.RaftResumeState {
  105. // always clear previous metadata
  106. os.RemoveAll(path.Join(s.dataDir, "conf"))
  107. os.RemoveAll(path.Join(s.dataDir, "snapshot"))
  108. }
  109. if err := os.MkdirAll(path.Join(s.dataDir, "snapshot"), os.ModePerm); err != nil {
  110. return nil, err
  111. }
  112. stateMachine := StateMachine{topo: option.Topo}
  113. s.raftServer, err = raft.NewServer(string(s.serverAddr), s.dataDir, transporter, stateMachine, option.Topo, "")
  114. if err != nil {
  115. glog.V(0).Infoln(err)
  116. return nil, err
  117. }
  118. heartbeatInterval := time.Duration(float64(option.HeartbeatInterval) * (rand.Float64()*0.25 + 1))
  119. s.raftServer.SetHeartbeatInterval(heartbeatInterval)
  120. s.raftServer.SetElectionTimeout(option.ElectionTimeout)
  121. if err := s.raftServer.LoadSnapshot(); err != nil {
  122. return nil, err
  123. }
  124. if err := s.raftServer.Start(); err != nil {
  125. return nil, err
  126. }
  127. for name, peer := range s.peers {
  128. if err := s.raftServer.AddPeer(name, peer.ToGrpcAddress()); err != nil {
  129. return nil, err
  130. }
  131. }
  132. // Remove deleted peers
  133. for existsPeerName := range s.raftServer.Peers() {
  134. if existingPeer, found := s.peers[existsPeerName]; !found {
  135. if err := s.raftServer.RemovePeer(existsPeerName); err != nil {
  136. glog.V(0).Infoln(err)
  137. return nil, err
  138. } else {
  139. glog.V(0).Infof("removing old peer: %s", existingPeer)
  140. }
  141. }
  142. }
  143. s.raftGrpcServer = raft.NewGrpcServer(s.raftServer)
  144. glog.V(0).Infof("current cluster leader: %v", s.raftServer.Leader())
  145. return s, nil
  146. }
  147. func (s *RaftServer) Peers() (members []string) {
  148. if s.raftServer != nil {
  149. peers := s.raftServer.Peers()
  150. for _, p := range peers {
  151. members = append(members, p.Name)
  152. }
  153. } else if s.RaftHashicorp != nil {
  154. cfg := s.RaftHashicorp.GetConfiguration()
  155. for _, p := range cfg.Configuration().Servers {
  156. members = append(members, string(p.ID))
  157. }
  158. }
  159. return
  160. }
  161. func (s *RaftServer) DoJoinCommand() {
  162. glog.V(0).Infoln("Initializing new cluster")
  163. if _, err := s.raftServer.Do(&raft.DefaultJoinCommand{
  164. Name: s.raftServer.Name(),
  165. ConnectionString: s.serverAddr.ToGrpcAddress(),
  166. }); err != nil {
  167. glog.Errorf("fail to send join command: %v", err)
  168. }
  169. }