You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

203 lines
4.8 KiB

  1. package election
  2. import (
  3. "github.com/chrislusf/seaweedfs/weed/pb"
  4. "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
  5. "math"
  6. "sync"
  7. "time"
  8. )
  9. type ClusterNode struct {
  10. Address pb.ServerAddress
  11. Version string
  12. counter int
  13. createdTs time.Time
  14. }
  15. type Leaders struct {
  16. leaders [3]pb.ServerAddress
  17. }
  18. type Cluster struct {
  19. nodes map[pb.ServerAddress]*ClusterNode
  20. nodesLock sync.RWMutex
  21. leaders *Leaders
  22. }
  23. func NewCluster() *Cluster {
  24. return &Cluster{
  25. nodes: make(map[pb.ServerAddress]*ClusterNode),
  26. leaders: &Leaders{},
  27. }
  28. }
  29. func (cluster *Cluster) AddClusterNode(nodeType string, address pb.ServerAddress, version string) []*master_pb.KeepConnectedResponse {
  30. switch nodeType {
  31. case "filer":
  32. cluster.nodesLock.Lock()
  33. defer cluster.nodesLock.Unlock()
  34. if existingNode, found := cluster.nodes[address]; found {
  35. existingNode.counter++
  36. return nil
  37. }
  38. cluster.nodes[address] = &ClusterNode{
  39. Address: address,
  40. Version: version,
  41. counter: 1,
  42. createdTs: time.Now(),
  43. }
  44. return cluster.ensureLeader(true, nodeType, address)
  45. case "master":
  46. }
  47. return nil
  48. }
  49. func (cluster *Cluster) RemoveClusterNode(nodeType string, address pb.ServerAddress) []*master_pb.KeepConnectedResponse {
  50. switch nodeType {
  51. case "filer":
  52. cluster.nodesLock.Lock()
  53. defer cluster.nodesLock.Unlock()
  54. if existingNode, found := cluster.nodes[address]; !found {
  55. return nil
  56. } else {
  57. existingNode.counter--
  58. if existingNode.counter <= 0 {
  59. delete(cluster.nodes, address)
  60. return cluster.ensureLeader(false, nodeType, address)
  61. }
  62. }
  63. case "master":
  64. }
  65. return nil
  66. }
  67. func (cluster *Cluster) ListClusterNode(nodeType string) (nodes []*ClusterNode) {
  68. switch nodeType {
  69. case "filer":
  70. cluster.nodesLock.RLock()
  71. defer cluster.nodesLock.RUnlock()
  72. for _, node := range cluster.nodes {
  73. nodes = append(nodes, node)
  74. }
  75. case "master":
  76. }
  77. return
  78. }
  79. func (cluster *Cluster) ensureLeader(isAdd bool, nodeType string, address pb.ServerAddress) (result []*master_pb.KeepConnectedResponse) {
  80. if isAdd {
  81. if cluster.leaders.addLeaderIfVacant(address) {
  82. // has added the address as one leader
  83. result = append(result, &master_pb.KeepConnectedResponse{
  84. ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{
  85. NodeType: nodeType,
  86. Address: string(address),
  87. IsLeader: true,
  88. IsAdd: true,
  89. },
  90. })
  91. } else {
  92. result = append(result, &master_pb.KeepConnectedResponse{
  93. ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{
  94. NodeType: nodeType,
  95. Address: string(address),
  96. IsLeader: false,
  97. IsAdd: true,
  98. },
  99. })
  100. }
  101. } else {
  102. if cluster.leaders.removeLeaderIfExists(address) {
  103. result = append(result, &master_pb.KeepConnectedResponse{
  104. ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{
  105. NodeType: nodeType,
  106. Address: string(address),
  107. IsLeader: true,
  108. IsAdd: false,
  109. },
  110. })
  111. // pick the freshest one, since it is less likely to go away
  112. var shortestDuration int64 = math.MaxInt64
  113. now := time.Now()
  114. var candidateAddress pb.ServerAddress
  115. for _, node := range cluster.nodes {
  116. if cluster.leaders.isOneLeader(node.Address) {
  117. continue
  118. }
  119. duration := now.Sub(node.createdTs).Nanoseconds()
  120. if duration < shortestDuration {
  121. shortestDuration = duration
  122. candidateAddress = node.Address
  123. }
  124. }
  125. if candidateAddress != "" {
  126. cluster.leaders.addLeaderIfVacant(candidateAddress)
  127. // added a new leader
  128. result = append(result, &master_pb.KeepConnectedResponse{
  129. ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{
  130. NodeType: nodeType,
  131. Address: string(candidateAddress),
  132. IsLeader: true,
  133. IsAdd: true,
  134. },
  135. })
  136. }
  137. } else {
  138. result = append(result, &master_pb.KeepConnectedResponse{
  139. ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{
  140. NodeType: nodeType,
  141. Address: string(address),
  142. IsLeader: false,
  143. IsAdd: false,
  144. },
  145. })
  146. }
  147. }
  148. return
  149. }
  150. func (leaders *Leaders) addLeaderIfVacant(address pb.ServerAddress) (hasChanged bool) {
  151. if leaders.isOneLeader(address) {
  152. return
  153. }
  154. for i := 0; i < len(leaders.leaders); i++ {
  155. if leaders.leaders[i] == "" {
  156. leaders.leaders[i] = address
  157. hasChanged = true
  158. return
  159. }
  160. }
  161. return
  162. }
  163. func (leaders *Leaders) removeLeaderIfExists(address pb.ServerAddress) (hasChanged bool) {
  164. if !leaders.isOneLeader(address) {
  165. return
  166. }
  167. for i := 0; i < len(leaders.leaders); i++ {
  168. if leaders.leaders[i] == address {
  169. leaders.leaders[i] = ""
  170. hasChanged = true
  171. return
  172. }
  173. }
  174. return
  175. }
  176. func (leaders *Leaders) isOneLeader(address pb.ServerAddress) bool {
  177. for i := 0; i < len(leaders.leaders); i++ {
  178. if leaders.leaders[i] == address {
  179. return true
  180. }
  181. }
  182. return false
  183. }
  184. func (leaders *Leaders) GetLeaders() (addresses []pb.ServerAddress) {
  185. for i := 0; i < len(leaders.leaders); i++ {
  186. if leaders.leaders[i] != "" {
  187. addresses = append(addresses, leaders.leaders[i])
  188. }
  189. }
  190. return
  191. }