You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

212 lines
5.0 KiB

3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
  1. package cluster
  2. import (
  3. "github.com/chrislusf/seaweedfs/weed/pb"
  4. "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
  5. "math"
  6. "sync"
  7. "time"
  8. )
  9. const (
  10. MasterType = "master"
  11. FilerType = "filer"
  12. )
  13. type ClusterNode struct {
  14. Address pb.ServerAddress
  15. Version string
  16. counter int
  17. createdTs time.Time
  18. }
  19. type Leaders struct {
  20. leaders [3]pb.ServerAddress
  21. }
  22. type Cluster struct {
  23. nodes map[pb.ServerAddress]*ClusterNode
  24. nodesLock sync.RWMutex
  25. leaders *Leaders
  26. }
  27. func NewCluster() *Cluster {
  28. return &Cluster{
  29. nodes: make(map[pb.ServerAddress]*ClusterNode),
  30. leaders: &Leaders{},
  31. }
  32. }
  33. func (cluster *Cluster) AddClusterNode(nodeType string, address pb.ServerAddress, version string) []*master_pb.KeepConnectedResponse {
  34. switch nodeType {
  35. case FilerType:
  36. cluster.nodesLock.Lock()
  37. defer cluster.nodesLock.Unlock()
  38. if existingNode, found := cluster.nodes[address]; found {
  39. existingNode.counter++
  40. return nil
  41. }
  42. cluster.nodes[address] = &ClusterNode{
  43. Address: address,
  44. Version: version,
  45. counter: 1,
  46. createdTs: time.Now(),
  47. }
  48. return cluster.ensureLeader(true, nodeType, address)
  49. case MasterType:
  50. }
  51. return nil
  52. }
  53. func (cluster *Cluster) RemoveClusterNode(nodeType string, address pb.ServerAddress) []*master_pb.KeepConnectedResponse {
  54. switch nodeType {
  55. case FilerType:
  56. cluster.nodesLock.Lock()
  57. defer cluster.nodesLock.Unlock()
  58. if existingNode, found := cluster.nodes[address]; !found {
  59. return nil
  60. } else {
  61. existingNode.counter--
  62. if existingNode.counter <= 0 {
  63. delete(cluster.nodes, address)
  64. return cluster.ensureLeader(false, nodeType, address)
  65. }
  66. }
  67. case MasterType:
  68. }
  69. return nil
  70. }
  71. func (cluster *Cluster) ListClusterNode(nodeType string) (nodes []*ClusterNode) {
  72. switch nodeType {
  73. case FilerType:
  74. cluster.nodesLock.RLock()
  75. defer cluster.nodesLock.RUnlock()
  76. for _, node := range cluster.nodes {
  77. nodes = append(nodes, node)
  78. }
  79. case MasterType:
  80. }
  81. return
  82. }
  83. func (cluster *Cluster) IsOneLeader(address pb.ServerAddress) bool {
  84. return cluster.leaders.isOneLeader(address)
  85. }
  86. func (cluster *Cluster) ensureLeader(isAdd bool, nodeType string, address pb.ServerAddress) (result []*master_pb.KeepConnectedResponse) {
  87. if isAdd {
  88. if cluster.leaders.addLeaderIfVacant(address) {
  89. // has added the address as one leader
  90. result = append(result, &master_pb.KeepConnectedResponse{
  91. ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{
  92. NodeType: nodeType,
  93. Address: string(address),
  94. IsLeader: true,
  95. IsAdd: true,
  96. },
  97. })
  98. } else {
  99. result = append(result, &master_pb.KeepConnectedResponse{
  100. ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{
  101. NodeType: nodeType,
  102. Address: string(address),
  103. IsLeader: false,
  104. IsAdd: true,
  105. },
  106. })
  107. }
  108. } else {
  109. if cluster.leaders.removeLeaderIfExists(address) {
  110. result = append(result, &master_pb.KeepConnectedResponse{
  111. ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{
  112. NodeType: nodeType,
  113. Address: string(address),
  114. IsLeader: true,
  115. IsAdd: false,
  116. },
  117. })
  118. // pick the freshest one, since it is less likely to go away
  119. var shortestDuration int64 = math.MaxInt64
  120. now := time.Now()
  121. var candidateAddress pb.ServerAddress
  122. for _, node := range cluster.nodes {
  123. if cluster.leaders.isOneLeader(node.Address) {
  124. continue
  125. }
  126. duration := now.Sub(node.createdTs).Nanoseconds()
  127. if duration < shortestDuration {
  128. shortestDuration = duration
  129. candidateAddress = node.Address
  130. }
  131. }
  132. if candidateAddress != "" {
  133. cluster.leaders.addLeaderIfVacant(candidateAddress)
  134. // added a new leader
  135. result = append(result, &master_pb.KeepConnectedResponse{
  136. ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{
  137. NodeType: nodeType,
  138. Address: string(candidateAddress),
  139. IsLeader: true,
  140. IsAdd: true,
  141. },
  142. })
  143. }
  144. } else {
  145. result = append(result, &master_pb.KeepConnectedResponse{
  146. ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{
  147. NodeType: nodeType,
  148. Address: string(address),
  149. IsLeader: false,
  150. IsAdd: false,
  151. },
  152. })
  153. }
  154. }
  155. return
  156. }
  157. func (leaders *Leaders) addLeaderIfVacant(address pb.ServerAddress) (hasChanged bool) {
  158. if leaders.isOneLeader(address) {
  159. return
  160. }
  161. for i := 0; i < len(leaders.leaders); i++ {
  162. if leaders.leaders[i] == "" {
  163. leaders.leaders[i] = address
  164. hasChanged = true
  165. return
  166. }
  167. }
  168. return
  169. }
  170. func (leaders *Leaders) removeLeaderIfExists(address pb.ServerAddress) (hasChanged bool) {
  171. if !leaders.isOneLeader(address) {
  172. return
  173. }
  174. for i := 0; i < len(leaders.leaders); i++ {
  175. if leaders.leaders[i] == address {
  176. leaders.leaders[i] = ""
  177. hasChanged = true
  178. return
  179. }
  180. }
  181. return
  182. }
  183. func (leaders *Leaders) isOneLeader(address pb.ServerAddress) bool {
  184. for i := 0; i < len(leaders.leaders); i++ {
  185. if leaders.leaders[i] == address {
  186. return true
  187. }
  188. }
  189. return false
  190. }
  191. func (leaders *Leaders) GetLeaders() (addresses []pb.ServerAddress) {
  192. for i := 0; i < len(leaders.leaders); i++ {
  193. if leaders.leaders[i] != "" {
  194. addresses = append(addresses, leaders.leaders[i])
  195. }
  196. }
  197. return
  198. }