You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

207 lines
5.0 KiB

  1. package election
  2. import (
  3. "github.com/chrislusf/seaweedfs/weed/pb"
  4. "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
  5. "math"
  6. "sync"
  7. "time"
  8. )
  9. type ClusterNode struct {
  10. Address pb.ServerAddress
  11. Version string
  12. counter int
  13. createdTs time.Time
  14. }
  15. type Leaders struct {
  16. leaders [3]pb.ServerAddress
  17. }
  18. type Cluster struct {
  19. nodes map[pb.ServerAddress]*ClusterNode
  20. nodesLock sync.RWMutex
  21. leaders *Leaders
  22. }
  23. func NewCluster() *Cluster {
  24. return &Cluster{
  25. nodes: make(map[pb.ServerAddress]*ClusterNode),
  26. leaders: &Leaders{},
  27. }
  28. }
  29. func (cluster *Cluster) AddClusterNode(nodeType string, address pb.ServerAddress, version string) []*master_pb.KeepConnectedResponse {
  30. switch nodeType {
  31. case "filer":
  32. cluster.nodesLock.Lock()
  33. defer cluster.nodesLock.Unlock()
  34. if existingNode, found := cluster.nodes[address]; found {
  35. existingNode.counter++
  36. return nil
  37. }
  38. cluster.nodes[address] = &ClusterNode{
  39. Address: address,
  40. Version: version,
  41. counter: 1,
  42. createdTs: time.Now(),
  43. }
  44. return cluster.ensureLeader(true, nodeType, address)
  45. case "master":
  46. }
  47. return nil
  48. }
  49. func (cluster *Cluster) RemoveClusterNode(nodeType string, address pb.ServerAddress) []*master_pb.KeepConnectedResponse {
  50. switch nodeType {
  51. case "filer":
  52. cluster.nodesLock.Lock()
  53. defer cluster.nodesLock.Unlock()
  54. if existingNode, found := cluster.nodes[address]; !found {
  55. return nil
  56. } else {
  57. existingNode.counter--
  58. if existingNode.counter <= 0 {
  59. delete(cluster.nodes, address)
  60. return cluster.ensureLeader(false, nodeType, address)
  61. }
  62. }
  63. case "master":
  64. }
  65. return nil
  66. }
  67. func (cluster *Cluster) ListClusterNode(nodeType string) (nodes []*ClusterNode) {
  68. switch nodeType {
  69. case "filer":
  70. cluster.nodesLock.RLock()
  71. defer cluster.nodesLock.RUnlock()
  72. for _, node := range cluster.nodes {
  73. nodes = append(nodes, node)
  74. }
  75. case "master":
  76. }
  77. return
  78. }
  79. func (cluster *Cluster) IsOneLeader(address pb.ServerAddress) bool {
  80. return cluster.leaders.isOneLeader(address)
  81. }
  82. func (cluster *Cluster) ensureLeader(isAdd bool, nodeType string, address pb.ServerAddress) (result []*master_pb.KeepConnectedResponse) {
  83. if isAdd {
  84. if cluster.leaders.addLeaderIfVacant(address) {
  85. // has added the address as one leader
  86. result = append(result, &master_pb.KeepConnectedResponse{
  87. ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{
  88. NodeType: nodeType,
  89. Address: string(address),
  90. IsLeader: true,
  91. IsAdd: true,
  92. },
  93. })
  94. } else {
  95. result = append(result, &master_pb.KeepConnectedResponse{
  96. ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{
  97. NodeType: nodeType,
  98. Address: string(address),
  99. IsLeader: false,
  100. IsAdd: true,
  101. },
  102. })
  103. }
  104. } else {
  105. if cluster.leaders.removeLeaderIfExists(address) {
  106. result = append(result, &master_pb.KeepConnectedResponse{
  107. ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{
  108. NodeType: nodeType,
  109. Address: string(address),
  110. IsLeader: true,
  111. IsAdd: false,
  112. },
  113. })
  114. // pick the freshest one, since it is less likely to go away
  115. var shortestDuration int64 = math.MaxInt64
  116. now := time.Now()
  117. var candidateAddress pb.ServerAddress
  118. for _, node := range cluster.nodes {
  119. if cluster.leaders.isOneLeader(node.Address) {
  120. continue
  121. }
  122. duration := now.Sub(node.createdTs).Nanoseconds()
  123. if duration < shortestDuration {
  124. shortestDuration = duration
  125. candidateAddress = node.Address
  126. }
  127. }
  128. if candidateAddress != "" {
  129. cluster.leaders.addLeaderIfVacant(candidateAddress)
  130. // added a new leader
  131. result = append(result, &master_pb.KeepConnectedResponse{
  132. ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{
  133. NodeType: nodeType,
  134. Address: string(candidateAddress),
  135. IsLeader: true,
  136. IsAdd: true,
  137. },
  138. })
  139. }
  140. } else {
  141. result = append(result, &master_pb.KeepConnectedResponse{
  142. ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{
  143. NodeType: nodeType,
  144. Address: string(address),
  145. IsLeader: false,
  146. IsAdd: false,
  147. },
  148. })
  149. }
  150. }
  151. return
  152. }
  153. func (leaders *Leaders) addLeaderIfVacant(address pb.ServerAddress) (hasChanged bool) {
  154. if leaders.isOneLeader(address) {
  155. return
  156. }
  157. for i := 0; i < len(leaders.leaders); i++ {
  158. if leaders.leaders[i] == "" {
  159. leaders.leaders[i] = address
  160. hasChanged = true
  161. return
  162. }
  163. }
  164. return
  165. }
  166. func (leaders *Leaders) removeLeaderIfExists(address pb.ServerAddress) (hasChanged bool) {
  167. if !leaders.isOneLeader(address) {
  168. return
  169. }
  170. for i := 0; i < len(leaders.leaders); i++ {
  171. if leaders.leaders[i] == address {
  172. leaders.leaders[i] = ""
  173. hasChanged = true
  174. return
  175. }
  176. }
  177. return
  178. }
  179. func (leaders *Leaders) isOneLeader(address pb.ServerAddress) bool {
  180. for i := 0; i < len(leaders.leaders); i++ {
  181. if leaders.leaders[i] == address {
  182. return true
  183. }
  184. }
  185. return false
  186. }
  187. func (leaders *Leaders) GetLeaders() (addresses []pb.ServerAddress) {
  188. for i := 0; i < len(leaders.leaders); i++ {
  189. if leaders.leaders[i] != "" {
  190. addresses = append(addresses, leaders.leaders[i])
  191. }
  192. }
  193. return
  194. }