You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

194 lines
4.7 KiB

5 years ago
5 years ago
3 years ago
3 years ago
11 years ago
3 years ago
3 years ago
  1. package weed_server
  2. import (
  3. "encoding/json"
  4. transport "github.com/Jille/raft-grpc-transport"
  5. "io"
  6. "io/ioutil"
  7. "math/rand"
  8. "os"
  9. "path"
  10. "time"
  11. "google.golang.org/grpc"
  12. "github.com/chrislusf/seaweedfs/weed/pb"
  13. "github.com/chrislusf/raft"
  14. hashicorpRaft "github.com/hashicorp/raft"
  15. "github.com/chrislusf/seaweedfs/weed/glog"
  16. "github.com/chrislusf/seaweedfs/weed/topology"
  17. )
  18. type RaftServerOption struct {
  19. GrpcDialOption grpc.DialOption
  20. Peers map[string]pb.ServerAddress
  21. ServerAddr pb.ServerAddress
  22. DataDir string
  23. Topo *topology.Topology
  24. RaftResumeState bool
  25. HeartbeatInterval time.Duration
  26. ElectionTimeout time.Duration
  27. RaftBootstrap bool
  28. }
  29. type RaftServer struct {
  30. peers map[string]pb.ServerAddress // initial peers to join with
  31. raftServer raft.Server
  32. raftHashicorp hashicorpRaft.Raft
  33. TransportManager *transport.Manager
  34. dataDir string
  35. serverAddr pb.ServerAddress
  36. topo *topology.Topology
  37. *raft.GrpcServer
  38. }
  39. type StateMachine struct {
  40. raft.StateMachine
  41. topo *topology.Topology
  42. }
  43. var _ hashicorpRaft.FSM = &StateMachine{}
  44. func (s StateMachine) Save() ([]byte, error) {
  45. state := topology.MaxVolumeIdCommand{
  46. MaxVolumeId: s.topo.GetMaxVolumeId(),
  47. }
  48. glog.V(1).Infof("Save raft state %+v", state)
  49. return json.Marshal(state)
  50. }
  51. func (s StateMachine) Recovery(data []byte) error {
  52. state := topology.MaxVolumeIdCommand{}
  53. err := json.Unmarshal(data, &state)
  54. if err != nil {
  55. return err
  56. }
  57. glog.V(1).Infof("Recovery raft state %+v", state)
  58. s.topo.UpAdjustMaxVolumeId(state.MaxVolumeId)
  59. return nil
  60. }
  61. func (s *StateMachine) Apply(l *hashicorpRaft.Log) interface{} {
  62. before := s.topo.GetMaxVolumeId()
  63. state := topology.MaxVolumeIdCommand{}
  64. err := json.Unmarshal(l.Data, &state)
  65. if err != nil {
  66. return err
  67. }
  68. s.topo.UpAdjustMaxVolumeId(state.MaxVolumeId)
  69. glog.V(1).Infoln("max volume id", before, "==>", s.topo.GetMaxVolumeId())
  70. return nil
  71. }
  72. func (s *StateMachine) Snapshot() (hashicorpRaft.FSMSnapshot, error) {
  73. return &topology.MaxVolumeIdCommand{
  74. MaxVolumeId: s.topo.GetMaxVolumeId(),
  75. }, nil
  76. }
  77. func (s *StateMachine) Restore(r io.ReadCloser) error {
  78. b, err := ioutil.ReadAll(r)
  79. if err != nil {
  80. return err
  81. }
  82. if err := s.Recovery(b); err != nil {
  83. return err
  84. }
  85. return nil
  86. }
  87. func NewRaftServer(option *RaftServerOption) (*RaftServer, error) {
  88. s := &RaftServer{
  89. peers: option.Peers,
  90. serverAddr: option.ServerAddr,
  91. dataDir: option.DataDir,
  92. topo: option.Topo,
  93. }
  94. if glog.V(4) {
  95. raft.SetLogLevel(2)
  96. }
  97. raft.RegisterCommand(&topology.MaxVolumeIdCommand{})
  98. var err error
  99. transporter := raft.NewGrpcTransporter(option.GrpcDialOption)
  100. glog.V(0).Infof("Starting RaftServer with %v", option.ServerAddr)
  101. // always clear previous log to avoid server is promotable
  102. os.RemoveAll(path.Join(s.dataDir, "log"))
  103. if !option.RaftResumeState {
  104. // always clear previous metadata
  105. os.RemoveAll(path.Join(s.dataDir, "conf"))
  106. os.RemoveAll(path.Join(s.dataDir, "snapshot"))
  107. }
  108. if err := os.MkdirAll(path.Join(s.dataDir, "snapshot"), 0700); err != nil {
  109. return nil, err
  110. }
  111. stateMachine := StateMachine{topo: option.Topo}
  112. s.raftServer, err = raft.NewServer(string(s.serverAddr), s.dataDir, transporter, stateMachine, option.Topo, "")
  113. if err != nil {
  114. glog.V(0).Infoln(err)
  115. return nil, err
  116. }
  117. heartbeatInterval := time.Duration(float64(option.HeartbeatInterval) * (rand.Float64()*0.25 + 1))
  118. s.raftServer.SetHeartbeatInterval(heartbeatInterval)
  119. s.raftServer.SetElectionTimeout(option.ElectionTimeout)
  120. if err := s.raftServer.LoadSnapshot(); err != nil {
  121. return nil, err
  122. }
  123. if err := s.raftServer.Start(); err != nil {
  124. return nil, err
  125. }
  126. for name, peer := range s.peers {
  127. if err := s.raftServer.AddPeer(name, peer.ToGrpcAddress()); err != nil {
  128. return nil, err
  129. }
  130. }
  131. // Remove deleted peers
  132. for existsPeerName := range s.raftServer.Peers() {
  133. if existingPeer, found := s.peers[existsPeerName]; !found {
  134. if err := s.raftServer.RemovePeer(existsPeerName); err != nil {
  135. glog.V(0).Infoln(err)
  136. return nil, err
  137. } else {
  138. glog.V(0).Infof("removing old peer: %s", existingPeer)
  139. }
  140. }
  141. }
  142. s.GrpcServer = raft.NewGrpcServer(s.raftServer)
  143. glog.V(0).Infof("current cluster leader: %v", s.raftServer.Leader())
  144. return s, nil
  145. }
  146. func (s *RaftServer) Peers() (members []string) {
  147. peers := s.raftServer.Peers()
  148. for _, p := range peers {
  149. members = append(members, p.Name)
  150. }
  151. return
  152. }
  153. func (s *RaftServer) DoJoinCommand() {
  154. glog.V(0).Infoln("Initializing new cluster")
  155. if _, err := s.raftServer.Do(&raft.DefaultJoinCommand{
  156. Name: s.raftServer.Name(),
  157. ConnectionString: s.serverAddr.ToGrpcAddress(),
  158. }); err != nil {
  159. glog.Errorf("fail to send join command: %v", err)
  160. }
  161. }