You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

265 lines
6.5 KiB

3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
  1. package cluster
  2. import (
  3. "github.com/chrislusf/seaweedfs/weed/pb"
  4. "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
  5. "math"
  6. "sync"
  7. "time"
  8. )
  9. const (
  10. MasterType = "master"
  11. VolumeServerType = "volumeServer"
  12. FilerType = "filer"
  13. BrokerType = "broker"
  14. )
  15. type ClusterNode struct {
  16. Address pb.ServerAddress
  17. Version string
  18. counter int
  19. createdTs time.Time
  20. }
  21. type Leaders struct {
  22. leaders [3]pb.ServerAddress
  23. }
  24. type Cluster struct {
  25. filers map[pb.ServerAddress]*ClusterNode
  26. filersLock sync.RWMutex
  27. filerLeaders *Leaders
  28. brokers map[pb.ServerAddress]*ClusterNode
  29. brokersLock sync.RWMutex
  30. }
  31. func NewCluster() *Cluster {
  32. return &Cluster{
  33. filers: make(map[pb.ServerAddress]*ClusterNode),
  34. filerLeaders: &Leaders{},
  35. brokers: make(map[pb.ServerAddress]*ClusterNode),
  36. }
  37. }
  38. func (cluster *Cluster) AddClusterNode(nodeType string, address pb.ServerAddress, version string) []*master_pb.KeepConnectedResponse {
  39. switch nodeType {
  40. case FilerType:
  41. cluster.filersLock.Lock()
  42. defer cluster.filersLock.Unlock()
  43. if existingNode, found := cluster.filers[address]; found {
  44. existingNode.counter++
  45. return nil
  46. }
  47. cluster.filers[address] = &ClusterNode{
  48. Address: address,
  49. Version: version,
  50. counter: 1,
  51. createdTs: time.Now(),
  52. }
  53. return cluster.ensureFilerLeaders(true, nodeType, address)
  54. case BrokerType:
  55. cluster.brokersLock.Lock()
  56. defer cluster.brokersLock.Unlock()
  57. if existingNode, found := cluster.brokers[address]; found {
  58. existingNode.counter++
  59. return nil
  60. }
  61. cluster.brokers[address] = &ClusterNode{
  62. Address: address,
  63. Version: version,
  64. counter: 1,
  65. createdTs: time.Now(),
  66. }
  67. return []*master_pb.KeepConnectedResponse{
  68. {
  69. ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{
  70. NodeType: nodeType,
  71. Address: string(address),
  72. IsAdd: true,
  73. },
  74. },
  75. }
  76. case MasterType:
  77. }
  78. return nil
  79. }
  80. func (cluster *Cluster) RemoveClusterNode(nodeType string, address pb.ServerAddress) []*master_pb.KeepConnectedResponse {
  81. switch nodeType {
  82. case FilerType:
  83. cluster.filersLock.Lock()
  84. defer cluster.filersLock.Unlock()
  85. if existingNode, found := cluster.filers[address]; !found {
  86. return nil
  87. } else {
  88. existingNode.counter--
  89. if existingNode.counter <= 0 {
  90. delete(cluster.filers, address)
  91. return cluster.ensureFilerLeaders(false, nodeType, address)
  92. }
  93. }
  94. case BrokerType:
  95. cluster.brokersLock.Lock()
  96. defer cluster.brokersLock.Unlock()
  97. if existingNode, found := cluster.brokers[address]; !found {
  98. return nil
  99. } else {
  100. existingNode.counter--
  101. if existingNode.counter <= 0 {
  102. delete(cluster.brokers, address)
  103. return []*master_pb.KeepConnectedResponse{
  104. {
  105. ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{
  106. NodeType: nodeType,
  107. Address: string(address),
  108. IsAdd: false,
  109. },
  110. },
  111. }
  112. }
  113. }
  114. case MasterType:
  115. }
  116. return nil
  117. }
  118. func (cluster *Cluster) ListClusterNode(nodeType string) (nodes []*ClusterNode) {
  119. switch nodeType {
  120. case FilerType:
  121. cluster.filersLock.RLock()
  122. defer cluster.filersLock.RUnlock()
  123. for _, node := range cluster.filers {
  124. nodes = append(nodes, node)
  125. }
  126. case BrokerType:
  127. cluster.brokersLock.RLock()
  128. defer cluster.brokersLock.RUnlock()
  129. for _, node := range cluster.brokers {
  130. nodes = append(nodes, node)
  131. }
  132. case MasterType:
  133. }
  134. return
  135. }
  136. func (cluster *Cluster) IsOneLeader(address pb.ServerAddress) bool {
  137. return cluster.filerLeaders.isOneLeader(address)
  138. }
  139. func (cluster *Cluster) ensureFilerLeaders(isAdd bool, nodeType string, address pb.ServerAddress) (result []*master_pb.KeepConnectedResponse) {
  140. if isAdd {
  141. if cluster.filerLeaders.addLeaderIfVacant(address) {
  142. // has added the address as one leader
  143. result = append(result, &master_pb.KeepConnectedResponse{
  144. ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{
  145. NodeType: nodeType,
  146. Address: string(address),
  147. IsLeader: true,
  148. IsAdd: true,
  149. },
  150. })
  151. } else {
  152. result = append(result, &master_pb.KeepConnectedResponse{
  153. ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{
  154. NodeType: nodeType,
  155. Address: string(address),
  156. IsLeader: false,
  157. IsAdd: true,
  158. },
  159. })
  160. }
  161. } else {
  162. if cluster.filerLeaders.removeLeaderIfExists(address) {
  163. result = append(result, &master_pb.KeepConnectedResponse{
  164. ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{
  165. NodeType: nodeType,
  166. Address: string(address),
  167. IsLeader: true,
  168. IsAdd: false,
  169. },
  170. })
  171. // pick the freshest one, since it is less likely to go away
  172. var shortestDuration int64 = math.MaxInt64
  173. now := time.Now()
  174. var candidateAddress pb.ServerAddress
  175. for _, node := range cluster.filers {
  176. if cluster.filerLeaders.isOneLeader(node.Address) {
  177. continue
  178. }
  179. duration := now.Sub(node.createdTs).Nanoseconds()
  180. if duration < shortestDuration {
  181. shortestDuration = duration
  182. candidateAddress = node.Address
  183. }
  184. }
  185. if candidateAddress != "" {
  186. cluster.filerLeaders.addLeaderIfVacant(candidateAddress)
  187. // added a new leader
  188. result = append(result, &master_pb.KeepConnectedResponse{
  189. ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{
  190. NodeType: nodeType,
  191. Address: string(candidateAddress),
  192. IsLeader: true,
  193. IsAdd: true,
  194. },
  195. })
  196. }
  197. } else {
  198. result = append(result, &master_pb.KeepConnectedResponse{
  199. ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{
  200. NodeType: nodeType,
  201. Address: string(address),
  202. IsLeader: false,
  203. IsAdd: false,
  204. },
  205. })
  206. }
  207. }
  208. return
  209. }
  210. func (leaders *Leaders) addLeaderIfVacant(address pb.ServerAddress) (hasChanged bool) {
  211. if leaders.isOneLeader(address) {
  212. return
  213. }
  214. for i := 0; i < len(leaders.leaders); i++ {
  215. if leaders.leaders[i] == "" {
  216. leaders.leaders[i] = address
  217. hasChanged = true
  218. return
  219. }
  220. }
  221. return
  222. }
  223. func (leaders *Leaders) removeLeaderIfExists(address pb.ServerAddress) (hasChanged bool) {
  224. if !leaders.isOneLeader(address) {
  225. return
  226. }
  227. for i := 0; i < len(leaders.leaders); i++ {
  228. if leaders.leaders[i] == address {
  229. leaders.leaders[i] = ""
  230. hasChanged = true
  231. return
  232. }
  233. }
  234. return
  235. }
  236. func (leaders *Leaders) isOneLeader(address pb.ServerAddress) bool {
  237. for i := 0; i < len(leaders.leaders); i++ {
  238. if leaders.leaders[i] == address {
  239. return true
  240. }
  241. }
  242. return false
  243. }
  244. func (leaders *Leaders) GetLeaders() (addresses []pb.ServerAddress) {
  245. for i := 0; i < len(leaders.leaders); i++ {
  246. if leaders.leaders[i] != "" {
  247. addresses = append(addresses, leaders.leaders[i])
  248. }
  249. }
  250. return
  251. }