You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

317 lines
7.9 KiB

3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
  1. package cluster
  2. import (
  3. "github.com/chrislusf/seaweedfs/weed/pb"
  4. "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
  5. "math"
  6. "sync"
  7. "time"
  8. )
  9. const (
  10. MasterType = "master"
  11. VolumeServerType = "volumeServer"
  12. FilerType = "filer"
  13. BrokerType = "broker"
  14. )
  15. type FilerGroup string
  16. type Filers struct {
  17. filers map[pb.ServerAddress]*ClusterNode
  18. leaders *Leaders
  19. }
  20. type Leaders struct {
  21. leaders [3]pb.ServerAddress
  22. }
  23. type ClusterNode struct {
  24. Address pb.ServerAddress
  25. Version string
  26. counter int
  27. CreatedTs time.Time
  28. }
  29. type Cluster struct {
  30. filerGroup2filers map[FilerGroup]*Filers
  31. filersLock sync.RWMutex
  32. brokers map[pb.ServerAddress]*ClusterNode
  33. brokersLock sync.RWMutex
  34. }
  35. func NewCluster() *Cluster {
  36. return &Cluster{
  37. filerGroup2filers: make(map[FilerGroup]*Filers),
  38. brokers: make(map[pb.ServerAddress]*ClusterNode),
  39. }
  40. }
  41. func (cluster *Cluster) getFilers(filerGroup FilerGroup, createIfNotFound bool) *Filers {
  42. filers, found := cluster.filerGroup2filers[filerGroup]
  43. if !found && createIfNotFound {
  44. filers = &Filers{
  45. filers: make(map[pb.ServerAddress]*ClusterNode),
  46. leaders: &Leaders{},
  47. }
  48. cluster.filerGroup2filers[filerGroup] = filers
  49. }
  50. return filers
  51. }
  52. func (cluster *Cluster) AddClusterNode(ns, nodeType string, address pb.ServerAddress, version string) []*master_pb.KeepConnectedResponse {
  53. filerGroup := FilerGroup(ns)
  54. switch nodeType {
  55. case FilerType:
  56. cluster.filersLock.Lock()
  57. defer cluster.filersLock.Unlock()
  58. filers := cluster.getFilers(filerGroup, true)
  59. if existingNode, found := filers.filers[address]; found {
  60. existingNode.counter++
  61. return nil
  62. }
  63. filers.filers[address] = &ClusterNode{
  64. Address: address,
  65. Version: version,
  66. counter: 1,
  67. CreatedTs: time.Now(),
  68. }
  69. return cluster.ensureFilerLeaders(filers, true, filerGroup, nodeType, address)
  70. case BrokerType:
  71. cluster.brokersLock.Lock()
  72. defer cluster.brokersLock.Unlock()
  73. if existingNode, found := cluster.brokers[address]; found {
  74. existingNode.counter++
  75. return nil
  76. }
  77. cluster.brokers[address] = &ClusterNode{
  78. Address: address,
  79. Version: version,
  80. counter: 1,
  81. CreatedTs: time.Now(),
  82. }
  83. return []*master_pb.KeepConnectedResponse{
  84. {
  85. ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{
  86. NodeType: nodeType,
  87. Address: string(address),
  88. IsAdd: true,
  89. },
  90. },
  91. }
  92. case MasterType:
  93. return []*master_pb.KeepConnectedResponse{
  94. {
  95. ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{
  96. NodeType: nodeType,
  97. Address: string(address),
  98. IsAdd: true,
  99. },
  100. },
  101. }
  102. }
  103. return nil
  104. }
  105. func (cluster *Cluster) RemoveClusterNode(ns string, nodeType string, address pb.ServerAddress) []*master_pb.KeepConnectedResponse {
  106. filerGroup := FilerGroup(ns)
  107. switch nodeType {
  108. case FilerType:
  109. cluster.filersLock.Lock()
  110. defer cluster.filersLock.Unlock()
  111. filers := cluster.getFilers(filerGroup, false)
  112. if filers == nil {
  113. return nil
  114. }
  115. if existingNode, found := filers.filers[address]; !found {
  116. return nil
  117. } else {
  118. existingNode.counter--
  119. if existingNode.counter <= 0 {
  120. delete(filers.filers, address)
  121. return cluster.ensureFilerLeaders(filers, false, filerGroup, nodeType, address)
  122. }
  123. }
  124. case BrokerType:
  125. cluster.brokersLock.Lock()
  126. defer cluster.brokersLock.Unlock()
  127. if existingNode, found := cluster.brokers[address]; !found {
  128. return nil
  129. } else {
  130. existingNode.counter--
  131. if existingNode.counter <= 0 {
  132. delete(cluster.brokers, address)
  133. return []*master_pb.KeepConnectedResponse{
  134. {
  135. ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{
  136. NodeType: nodeType,
  137. Address: string(address),
  138. IsAdd: false,
  139. },
  140. },
  141. }
  142. }
  143. }
  144. case MasterType:
  145. return []*master_pb.KeepConnectedResponse{
  146. {
  147. ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{
  148. NodeType: nodeType,
  149. Address: string(address),
  150. IsAdd: false,
  151. },
  152. },
  153. }
  154. }
  155. return nil
  156. }
  157. func (cluster *Cluster) ListClusterNode(filerGroup FilerGroup, nodeType string) (nodes []*ClusterNode) {
  158. switch nodeType {
  159. case FilerType:
  160. cluster.filersLock.RLock()
  161. defer cluster.filersLock.RUnlock()
  162. filers := cluster.getFilers(filerGroup, false)
  163. if filers == nil {
  164. return
  165. }
  166. for _, node := range filers.filers {
  167. nodes = append(nodes, node)
  168. }
  169. case BrokerType:
  170. cluster.brokersLock.RLock()
  171. defer cluster.brokersLock.RUnlock()
  172. for _, node := range cluster.brokers {
  173. nodes = append(nodes, node)
  174. }
  175. case MasterType:
  176. }
  177. return
  178. }
  179. func (cluster *Cluster) IsOneLeader(filerGroup FilerGroup, address pb.ServerAddress) bool {
  180. filers := cluster.getFilers(filerGroup, false)
  181. if filers == nil {
  182. return false
  183. }
  184. return filers.leaders.isOneLeader(address)
  185. }
  186. func (cluster *Cluster) ensureFilerLeaders(filers *Filers, isAdd bool, filerGroup FilerGroup, nodeType string, address pb.ServerAddress) (result []*master_pb.KeepConnectedResponse) {
  187. if isAdd {
  188. if filers.leaders.addLeaderIfVacant(address) {
  189. // has added the address as one leader
  190. result = append(result, &master_pb.KeepConnectedResponse{
  191. ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{
  192. FilerGroup: string(filerGroup),
  193. NodeType: nodeType,
  194. Address: string(address),
  195. IsLeader: true,
  196. IsAdd: true,
  197. },
  198. })
  199. } else {
  200. result = append(result, &master_pb.KeepConnectedResponse{
  201. ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{
  202. FilerGroup: string(filerGroup),
  203. NodeType: nodeType,
  204. Address: string(address),
  205. IsLeader: false,
  206. IsAdd: true,
  207. },
  208. })
  209. }
  210. } else {
  211. if filers.leaders.removeLeaderIfExists(address) {
  212. result = append(result, &master_pb.KeepConnectedResponse{
  213. ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{
  214. FilerGroup: string(filerGroup),
  215. NodeType: nodeType,
  216. Address: string(address),
  217. IsLeader: true,
  218. IsAdd: false,
  219. },
  220. })
  221. // pick the freshest one, since it is less likely to go away
  222. var shortestDuration int64 = math.MaxInt64
  223. now := time.Now()
  224. var candidateAddress pb.ServerAddress
  225. for _, node := range filers.filers {
  226. if filers.leaders.isOneLeader(node.Address) {
  227. continue
  228. }
  229. duration := now.Sub(node.CreatedTs).Nanoseconds()
  230. if duration < shortestDuration {
  231. shortestDuration = duration
  232. candidateAddress = node.Address
  233. }
  234. }
  235. if candidateAddress != "" {
  236. filers.leaders.addLeaderIfVacant(candidateAddress)
  237. // added a new leader
  238. result = append(result, &master_pb.KeepConnectedResponse{
  239. ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{
  240. NodeType: nodeType,
  241. Address: string(candidateAddress),
  242. IsLeader: true,
  243. IsAdd: true,
  244. },
  245. })
  246. }
  247. } else {
  248. result = append(result, &master_pb.KeepConnectedResponse{
  249. ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{
  250. FilerGroup: string(filerGroup),
  251. NodeType: nodeType,
  252. Address: string(address),
  253. IsLeader: false,
  254. IsAdd: false,
  255. },
  256. })
  257. }
  258. }
  259. return
  260. }
  261. func (leaders *Leaders) addLeaderIfVacant(address pb.ServerAddress) (hasChanged bool) {
  262. if leaders.isOneLeader(address) {
  263. return
  264. }
  265. for i := 0; i < len(leaders.leaders); i++ {
  266. if leaders.leaders[i] == "" {
  267. leaders.leaders[i] = address
  268. hasChanged = true
  269. return
  270. }
  271. }
  272. return
  273. }
  274. func (leaders *Leaders) removeLeaderIfExists(address pb.ServerAddress) (hasChanged bool) {
  275. if !leaders.isOneLeader(address) {
  276. return
  277. }
  278. for i := 0; i < len(leaders.leaders); i++ {
  279. if leaders.leaders[i] == address {
  280. leaders.leaders[i] = ""
  281. hasChanged = true
  282. return
  283. }
  284. }
  285. return
  286. }
  287. func (leaders *Leaders) isOneLeader(address pb.ServerAddress) bool {
  288. for i := 0; i < len(leaders.leaders); i++ {
  289. if leaders.leaders[i] == address {
  290. return true
  291. }
  292. }
  293. return false
  294. }
  295. func (leaders *Leaders) GetLeaders() (addresses []pb.ServerAddress) {
  296. for i := 0; i < len(leaders.leaders); i++ {
  297. if leaders.leaders[i] != "" {
  298. addresses = append(addresses, leaders.leaders[i])
  299. }
  300. }
  301. return
  302. }