You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

315 lines
7.8 KiB

3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
  1. package cluster
  2. import (
  3. "github.com/chrislusf/seaweedfs/weed/pb"
  4. "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
  5. "math"
  6. "sync"
  7. "time"
  8. )
  9. const (
  10. MasterType = "master"
  11. VolumeServerType = "volumeServer"
  12. FilerType = "filer"
  13. BrokerType = "broker"
  14. )
  15. type FilerGroup string
  16. type Filers struct {
  17. filers map[pb.ServerAddress]*ClusterNode
  18. leaders *Leaders
  19. }
  20. type Leaders struct {
  21. leaders [3]pb.ServerAddress
  22. }
  23. type ClusterNode struct {
  24. Address pb.ServerAddress
  25. Version string
  26. counter int
  27. createdTs time.Time
  28. }
  29. type Cluster struct {
  30. filerGroup2filers map[FilerGroup]*Filers
  31. filersLock sync.RWMutex
  32. brokers map[pb.ServerAddress]*ClusterNode
  33. brokersLock sync.RWMutex
  34. }
  35. func NewCluster() *Cluster {
  36. return &Cluster{
  37. filerGroup2filers: make(map[FilerGroup]*Filers),
  38. brokers: make(map[pb.ServerAddress]*ClusterNode),
  39. }
  40. }
  41. func (cluster *Cluster) getFilers(filerGroup FilerGroup, createIfNotFound bool) *Filers {
  42. cluster.filersLock.Lock()
  43. defer cluster.filersLock.Unlock()
  44. filers, found := cluster.filerGroup2filers[filerGroup]
  45. if !found && createIfNotFound {
  46. filers = &Filers{
  47. filers: make(map[pb.ServerAddress]*ClusterNode),
  48. leaders: &Leaders{},
  49. }
  50. cluster.filerGroup2filers[filerGroup] = filers
  51. }
  52. return filers
  53. }
  54. func (cluster *Cluster) AddClusterNode(ns, nodeType string, address pb.ServerAddress, version string) []*master_pb.KeepConnectedResponse {
  55. filerGroup := FilerGroup(ns)
  56. switch nodeType {
  57. case FilerType:
  58. filers := cluster.getFilers(filerGroup, true)
  59. if existingNode, found := filers.filers[address]; found {
  60. existingNode.counter++
  61. return nil
  62. }
  63. filers.filers[address] = &ClusterNode{
  64. Address: address,
  65. Version: version,
  66. counter: 1,
  67. createdTs: time.Now(),
  68. }
  69. return cluster.ensureFilerLeaders(filers, true, filerGroup, nodeType, address)
  70. case BrokerType:
  71. cluster.brokersLock.Lock()
  72. defer cluster.brokersLock.Unlock()
  73. if existingNode, found := cluster.brokers[address]; found {
  74. existingNode.counter++
  75. return nil
  76. }
  77. cluster.brokers[address] = &ClusterNode{
  78. Address: address,
  79. Version: version,
  80. counter: 1,
  81. createdTs: time.Now(),
  82. }
  83. return []*master_pb.KeepConnectedResponse{
  84. {
  85. ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{
  86. NodeType: nodeType,
  87. Address: string(address),
  88. IsAdd: true,
  89. },
  90. },
  91. }
  92. case MasterType:
  93. return []*master_pb.KeepConnectedResponse{
  94. {
  95. ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{
  96. NodeType: nodeType,
  97. Address: string(address),
  98. IsAdd: true,
  99. },
  100. },
  101. }
  102. }
  103. return nil
  104. }
  105. func (cluster *Cluster) RemoveClusterNode(ns string, nodeType string, address pb.ServerAddress) []*master_pb.KeepConnectedResponse {
  106. filerGroup := FilerGroup(ns)
  107. switch nodeType {
  108. case FilerType:
  109. filers := cluster.getFilers(filerGroup, false)
  110. if filers == nil {
  111. return nil
  112. }
  113. if existingNode, found := filers.filers[address]; !found {
  114. return nil
  115. } else {
  116. existingNode.counter--
  117. if existingNode.counter <= 0 {
  118. delete(filers.filers, address)
  119. return cluster.ensureFilerLeaders(filers, false, filerGroup, nodeType, address)
  120. }
  121. }
  122. case BrokerType:
  123. cluster.brokersLock.Lock()
  124. defer cluster.brokersLock.Unlock()
  125. if existingNode, found := cluster.brokers[address]; !found {
  126. return nil
  127. } else {
  128. existingNode.counter--
  129. if existingNode.counter <= 0 {
  130. delete(cluster.brokers, address)
  131. return []*master_pb.KeepConnectedResponse{
  132. {
  133. ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{
  134. NodeType: nodeType,
  135. Address: string(address),
  136. IsAdd: false,
  137. },
  138. },
  139. }
  140. }
  141. }
  142. case MasterType:
  143. return []*master_pb.KeepConnectedResponse{
  144. {
  145. ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{
  146. NodeType: nodeType,
  147. Address: string(address),
  148. IsAdd: false,
  149. },
  150. },
  151. }
  152. }
  153. return nil
  154. }
  155. func (cluster *Cluster) ListClusterNode(filerGroup FilerGroup, nodeType string) (nodes []*ClusterNode) {
  156. switch nodeType {
  157. case FilerType:
  158. filers := cluster.getFilers(filerGroup, false)
  159. if filers == nil {
  160. return
  161. }
  162. cluster.filersLock.RLock()
  163. defer cluster.filersLock.RUnlock()
  164. for _, node := range filers.filers {
  165. nodes = append(nodes, node)
  166. }
  167. case BrokerType:
  168. cluster.brokersLock.RLock()
  169. defer cluster.brokersLock.RUnlock()
  170. for _, node := range cluster.brokers {
  171. nodes = append(nodes, node)
  172. }
  173. case MasterType:
  174. }
  175. return
  176. }
  177. func (cluster *Cluster) IsOneLeader(filerGroup FilerGroup, address pb.ServerAddress) bool {
  178. filers := cluster.getFilers(filerGroup, false)
  179. if filers == nil {
  180. return false
  181. }
  182. return filers.leaders.isOneLeader(address)
  183. }
  184. func (cluster *Cluster) ensureFilerLeaders(filers *Filers, isAdd bool, filerGroup FilerGroup, nodeType string, address pb.ServerAddress) (result []*master_pb.KeepConnectedResponse) {
  185. if isAdd {
  186. if filers.leaders.addLeaderIfVacant(address) {
  187. // has added the address as one leader
  188. result = append(result, &master_pb.KeepConnectedResponse{
  189. ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{
  190. FilerGroup: string(filerGroup),
  191. NodeType: nodeType,
  192. Address: string(address),
  193. IsLeader: true,
  194. IsAdd: true,
  195. },
  196. })
  197. } else {
  198. result = append(result, &master_pb.KeepConnectedResponse{
  199. ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{
  200. FilerGroup: string(filerGroup),
  201. NodeType: nodeType,
  202. Address: string(address),
  203. IsLeader: false,
  204. IsAdd: true,
  205. },
  206. })
  207. }
  208. } else {
  209. if filers.leaders.removeLeaderIfExists(address) {
  210. result = append(result, &master_pb.KeepConnectedResponse{
  211. ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{
  212. FilerGroup: string(filerGroup),
  213. NodeType: nodeType,
  214. Address: string(address),
  215. IsLeader: true,
  216. IsAdd: false,
  217. },
  218. })
  219. // pick the freshest one, since it is less likely to go away
  220. var shortestDuration int64 = math.MaxInt64
  221. now := time.Now()
  222. var candidateAddress pb.ServerAddress
  223. for _, node := range filers.filers {
  224. if filers.leaders.isOneLeader(node.Address) {
  225. continue
  226. }
  227. duration := now.Sub(node.createdTs).Nanoseconds()
  228. if duration < shortestDuration {
  229. shortestDuration = duration
  230. candidateAddress = node.Address
  231. }
  232. }
  233. if candidateAddress != "" {
  234. filers.leaders.addLeaderIfVacant(candidateAddress)
  235. // added a new leader
  236. result = append(result, &master_pb.KeepConnectedResponse{
  237. ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{
  238. NodeType: nodeType,
  239. Address: string(candidateAddress),
  240. IsLeader: true,
  241. IsAdd: true,
  242. },
  243. })
  244. }
  245. } else {
  246. result = append(result, &master_pb.KeepConnectedResponse{
  247. ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{
  248. FilerGroup: string(filerGroup),
  249. NodeType: nodeType,
  250. Address: string(address),
  251. IsLeader: false,
  252. IsAdd: false,
  253. },
  254. })
  255. }
  256. }
  257. return
  258. }
  259. func (leaders *Leaders) addLeaderIfVacant(address pb.ServerAddress) (hasChanged bool) {
  260. if leaders.isOneLeader(address) {
  261. return
  262. }
  263. for i := 0; i < len(leaders.leaders); i++ {
  264. if leaders.leaders[i] == "" {
  265. leaders.leaders[i] = address
  266. hasChanged = true
  267. return
  268. }
  269. }
  270. return
  271. }
  272. func (leaders *Leaders) removeLeaderIfExists(address pb.ServerAddress) (hasChanged bool) {
  273. if !leaders.isOneLeader(address) {
  274. return
  275. }
  276. for i := 0; i < len(leaders.leaders); i++ {
  277. if leaders.leaders[i] == address {
  278. leaders.leaders[i] = ""
  279. hasChanged = true
  280. return
  281. }
  282. }
  283. return
  284. }
  285. func (leaders *Leaders) isOneLeader(address pb.ServerAddress) bool {
  286. for i := 0; i < len(leaders.leaders); i++ {
  287. if leaders.leaders[i] == address {
  288. return true
  289. }
  290. }
  291. return false
  292. }
  293. func (leaders *Leaders) GetLeaders() (addresses []pb.ServerAddress) {
  294. for i := 0; i < len(leaders.leaders); i++ {
  295. if leaders.leaders[i] != "" {
  296. addresses = append(addresses, leaders.leaders[i])
  297. }
  298. }
  299. return
  300. }