You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

155 lines
3.5 KiB

  1. package election
  2. import (
  3. "github.com/chrislusf/seaweedfs/weed/pb"
  4. "math"
  5. "sync"
  6. "time"
  7. )
  8. type ClusterNode struct {
  9. Address pb.ServerAddress
  10. Version string
  11. counter int
  12. createdTs time.Time
  13. }
  14. type Leaders struct {
  15. leaders [3]pb.ServerAddress
  16. }
  17. type Cluster struct {
  18. nodes map[pb.ServerAddress]*ClusterNode
  19. nodesLock sync.RWMutex
  20. leaders *Leaders
  21. }
  22. func NewCluster() *Cluster {
  23. return &Cluster{
  24. nodes: make(map[pb.ServerAddress]*ClusterNode),
  25. leaders: &Leaders{},
  26. }
  27. }
  28. func (cluster *Cluster) AddClusterNode(nodeType string, address pb.ServerAddress, version string) {
  29. switch nodeType {
  30. case "filer":
  31. cluster.nodesLock.Lock()
  32. defer cluster.nodesLock.Unlock()
  33. if existingNode, found := cluster.nodes[address]; found {
  34. existingNode.counter++
  35. return
  36. }
  37. cluster.nodes[address] = &ClusterNode{
  38. Address: address,
  39. Version: version,
  40. counter: 1,
  41. createdTs: time.Now(),
  42. }
  43. cluster.ensureLeader(true, address)
  44. case "master":
  45. }
  46. }
  47. func (cluster *Cluster) RemoveClusterNode(nodeType string, address pb.ServerAddress) {
  48. switch nodeType {
  49. case "filer":
  50. cluster.nodesLock.Lock()
  51. defer cluster.nodesLock.Unlock()
  52. if existingNode, found := cluster.nodes[address]; !found {
  53. return
  54. } else {
  55. existingNode.counter--
  56. if existingNode.counter <= 0 {
  57. delete(cluster.nodes, address)
  58. cluster.ensureLeader(false, address)
  59. }
  60. }
  61. case "master":
  62. }
  63. }
  64. func (cluster *Cluster) ListClusterNode(nodeType string) (nodes []*ClusterNode) {
  65. switch nodeType {
  66. case "filer":
  67. cluster.nodesLock.RLock()
  68. defer cluster.nodesLock.RUnlock()
  69. for _, node := range cluster.nodes {
  70. nodes = append(nodes, node)
  71. }
  72. case "master":
  73. }
  74. return
  75. }
  76. func (cluster *Cluster) ensureLeader(isAdd bool, address pb.ServerAddress) {
  77. if isAdd {
  78. if cluster.leaders.addLeaderIfVacant(address) {
  79. // has added the address as one leader
  80. }
  81. } else {
  82. if cluster.leaders.removeLeaderIfExists(address) {
  83. // pick the freshest one, since it is less likely to go away
  84. var shortestDuration int64 = math.MaxInt64
  85. now := time.Now()
  86. var candidateAddress pb.ServerAddress
  87. for _, node := range cluster.nodes {
  88. if cluster.leaders.isOneLeader(node.Address) {
  89. continue
  90. }
  91. duration := now.Sub(node.createdTs).Nanoseconds()
  92. if duration < shortestDuration {
  93. shortestDuration = duration
  94. candidateAddress = node.Address
  95. }
  96. }
  97. if candidateAddress != "" {
  98. cluster.leaders.addLeaderIfVacant(candidateAddress)
  99. }
  100. // removed the leader, and maybe added a new leader
  101. }
  102. }
  103. }
  104. func (leaders *Leaders) addLeaderIfVacant(address pb.ServerAddress) (hasChanged bool) {
  105. if leaders.isOneLeader(address) {
  106. return
  107. }
  108. for i := 0; i < len(leaders.leaders); i++ {
  109. if leaders.leaders[i] == "" {
  110. leaders.leaders[i] = address
  111. hasChanged = true
  112. return
  113. }
  114. }
  115. return
  116. }
  117. func (leaders *Leaders) removeLeaderIfExists(address pb.ServerAddress) (hasChanged bool) {
  118. if !leaders.isOneLeader(address) {
  119. return
  120. }
  121. for i := 0; i < len(leaders.leaders); i++ {
  122. if leaders.leaders[i] == address {
  123. leaders.leaders[i] = ""
  124. hasChanged = true
  125. return
  126. }
  127. }
  128. return
  129. }
  130. func (leaders *Leaders) isOneLeader(address pb.ServerAddress) bool {
  131. for i := 0; i < len(leaders.leaders); i++ {
  132. if leaders.leaders[i] == address {
  133. return true
  134. }
  135. }
  136. return false
  137. }
  138. func (leaders *Leaders) GetLeaders() (addresses []pb.ServerAddress) {
  139. for i := 0; i < len(leaders.leaders); i++ {
  140. if leaders.leaders[i] != "" {
  141. addresses = append(addresses, leaders.leaders[i])
  142. }
  143. }
  144. return
  145. }