You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

283 lines
6.8 KiB

3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
  1. package cluster
  2. import (
  3. "github.com/chrislusf/seaweedfs/weed/pb"
  4. "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
  5. "math"
  6. "sync"
  7. "time"
  8. )
  9. const (
  10. MasterType = "master"
  11. VolumeServerType = "volumeServer"
  12. FilerType = "filer"
  13. BrokerType = "broker"
  14. )
  15. type ClusterNode struct {
  16. Address pb.ServerAddress
  17. Version string
  18. counter int
  19. createdTs time.Time
  20. }
  21. type Leaders struct {
  22. leaders [3]pb.ServerAddress
  23. }
  24. type Cluster struct {
  25. filers map[pb.ServerAddress]*ClusterNode
  26. filersLock sync.RWMutex
  27. filerLeaders *Leaders
  28. brokers map[pb.ServerAddress]*ClusterNode
  29. brokersLock sync.RWMutex
  30. }
  31. func NewCluster() *Cluster {
  32. return &Cluster{
  33. filers: make(map[pb.ServerAddress]*ClusterNode),
  34. filerLeaders: &Leaders{},
  35. brokers: make(map[pb.ServerAddress]*ClusterNode),
  36. }
  37. }
  38. func (cluster *Cluster) AddClusterNode(nodeType string, address pb.ServerAddress, version string) []*master_pb.KeepConnectedResponse {
  39. switch nodeType {
  40. case FilerType:
  41. cluster.filersLock.Lock()
  42. defer cluster.filersLock.Unlock()
  43. if existingNode, found := cluster.filers[address]; found {
  44. existingNode.counter++
  45. return nil
  46. }
  47. cluster.filers[address] = &ClusterNode{
  48. Address: address,
  49. Version: version,
  50. counter: 1,
  51. createdTs: time.Now(),
  52. }
  53. return cluster.ensureFilerLeaders(true, nodeType, address)
  54. case BrokerType:
  55. cluster.brokersLock.Lock()
  56. defer cluster.brokersLock.Unlock()
  57. if existingNode, found := cluster.brokers[address]; found {
  58. existingNode.counter++
  59. return nil
  60. }
  61. cluster.brokers[address] = &ClusterNode{
  62. Address: address,
  63. Version: version,
  64. counter: 1,
  65. createdTs: time.Now(),
  66. }
  67. return []*master_pb.KeepConnectedResponse{
  68. {
  69. ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{
  70. NodeType: nodeType,
  71. Address: string(address),
  72. IsAdd: true,
  73. },
  74. },
  75. }
  76. case MasterType:
  77. return []*master_pb.KeepConnectedResponse{
  78. {
  79. ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{
  80. NodeType: nodeType,
  81. Address: string(address),
  82. IsAdd: true,
  83. },
  84. },
  85. }
  86. }
  87. return nil
  88. }
  89. func (cluster *Cluster) RemoveClusterNode(nodeType string, address pb.ServerAddress) []*master_pb.KeepConnectedResponse {
  90. switch nodeType {
  91. case FilerType:
  92. cluster.filersLock.Lock()
  93. defer cluster.filersLock.Unlock()
  94. if existingNode, found := cluster.filers[address]; !found {
  95. return nil
  96. } else {
  97. existingNode.counter--
  98. if existingNode.counter <= 0 {
  99. delete(cluster.filers, address)
  100. return cluster.ensureFilerLeaders(false, nodeType, address)
  101. }
  102. }
  103. case BrokerType:
  104. cluster.brokersLock.Lock()
  105. defer cluster.brokersLock.Unlock()
  106. if existingNode, found := cluster.brokers[address]; !found {
  107. return nil
  108. } else {
  109. existingNode.counter--
  110. if existingNode.counter <= 0 {
  111. delete(cluster.brokers, address)
  112. return []*master_pb.KeepConnectedResponse{
  113. {
  114. ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{
  115. NodeType: nodeType,
  116. Address: string(address),
  117. IsAdd: false,
  118. },
  119. },
  120. }
  121. }
  122. }
  123. case MasterType:
  124. return []*master_pb.KeepConnectedResponse{
  125. {
  126. ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{
  127. NodeType: nodeType,
  128. Address: string(address),
  129. IsAdd: false,
  130. },
  131. },
  132. }
  133. }
  134. return nil
  135. }
  136. func (cluster *Cluster) ListClusterNode(nodeType string) (nodes []*ClusterNode) {
  137. switch nodeType {
  138. case FilerType:
  139. cluster.filersLock.RLock()
  140. defer cluster.filersLock.RUnlock()
  141. for _, node := range cluster.filers {
  142. nodes = append(nodes, node)
  143. }
  144. case BrokerType:
  145. cluster.brokersLock.RLock()
  146. defer cluster.brokersLock.RUnlock()
  147. for _, node := range cluster.brokers {
  148. nodes = append(nodes, node)
  149. }
  150. case MasterType:
  151. }
  152. return
  153. }
  154. func (cluster *Cluster) IsOneLeader(address pb.ServerAddress) bool {
  155. return cluster.filerLeaders.isOneLeader(address)
  156. }
  157. func (cluster *Cluster) ensureFilerLeaders(isAdd bool, nodeType string, address pb.ServerAddress) (result []*master_pb.KeepConnectedResponse) {
  158. if isAdd {
  159. if cluster.filerLeaders.addLeaderIfVacant(address) {
  160. // has added the address as one leader
  161. result = append(result, &master_pb.KeepConnectedResponse{
  162. ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{
  163. NodeType: nodeType,
  164. Address: string(address),
  165. IsLeader: true,
  166. IsAdd: true,
  167. },
  168. })
  169. } else {
  170. result = append(result, &master_pb.KeepConnectedResponse{
  171. ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{
  172. NodeType: nodeType,
  173. Address: string(address),
  174. IsLeader: false,
  175. IsAdd: true,
  176. },
  177. })
  178. }
  179. } else {
  180. if cluster.filerLeaders.removeLeaderIfExists(address) {
  181. result = append(result, &master_pb.KeepConnectedResponse{
  182. ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{
  183. NodeType: nodeType,
  184. Address: string(address),
  185. IsLeader: true,
  186. IsAdd: false,
  187. },
  188. })
  189. // pick the freshest one, since it is less likely to go away
  190. var shortestDuration int64 = math.MaxInt64
  191. now := time.Now()
  192. var candidateAddress pb.ServerAddress
  193. for _, node := range cluster.filers {
  194. if cluster.filerLeaders.isOneLeader(node.Address) {
  195. continue
  196. }
  197. duration := now.Sub(node.createdTs).Nanoseconds()
  198. if duration < shortestDuration {
  199. shortestDuration = duration
  200. candidateAddress = node.Address
  201. }
  202. }
  203. if candidateAddress != "" {
  204. cluster.filerLeaders.addLeaderIfVacant(candidateAddress)
  205. // added a new leader
  206. result = append(result, &master_pb.KeepConnectedResponse{
  207. ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{
  208. NodeType: nodeType,
  209. Address: string(candidateAddress),
  210. IsLeader: true,
  211. IsAdd: true,
  212. },
  213. })
  214. }
  215. } else {
  216. result = append(result, &master_pb.KeepConnectedResponse{
  217. ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{
  218. NodeType: nodeType,
  219. Address: string(address),
  220. IsLeader: false,
  221. IsAdd: false,
  222. },
  223. })
  224. }
  225. }
  226. return
  227. }
  228. func (leaders *Leaders) addLeaderIfVacant(address pb.ServerAddress) (hasChanged bool) {
  229. if leaders.isOneLeader(address) {
  230. return
  231. }
  232. for i := 0; i < len(leaders.leaders); i++ {
  233. if leaders.leaders[i] == "" {
  234. leaders.leaders[i] = address
  235. hasChanged = true
  236. return
  237. }
  238. }
  239. return
  240. }
  241. func (leaders *Leaders) removeLeaderIfExists(address pb.ServerAddress) (hasChanged bool) {
  242. if !leaders.isOneLeader(address) {
  243. return
  244. }
  245. for i := 0; i < len(leaders.leaders); i++ {
  246. if leaders.leaders[i] == address {
  247. leaders.leaders[i] = ""
  248. hasChanged = true
  249. return
  250. }
  251. }
  252. return
  253. }
  254. func (leaders *Leaders) isOneLeader(address pb.ServerAddress) bool {
  255. for i := 0; i < len(leaders.leaders); i++ {
  256. if leaders.leaders[i] == address {
  257. return true
  258. }
  259. }
  260. return false
  261. }
  262. func (leaders *Leaders) GetLeaders() (addresses []pb.ServerAddress) {
  263. for i := 0; i < len(leaders.leaders); i++ {
  264. if leaders.leaders[i] != "" {
  265. addresses = append(addresses, leaders.leaders[i])
  266. }
  267. }
  268. return
  269. }