You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

363 lines
9.2 KiB

3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
  1. package cluster
  2. import (
  3. "github.com/chrislusf/seaweedfs/weed/pb"
  4. "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
  5. "math"
  6. "sync"
  7. "time"
  8. )
  9. const (
  10. MasterType = "master"
  11. VolumeServerType = "volumeServer"
  12. FilerType = "filer"
  13. BrokerType = "broker"
  14. )
  15. type FilerGroup string
  16. type Filers struct {
  17. filers map[pb.ServerAddress]*ClusterNode
  18. leaders *Leaders
  19. }
  20. type Leaders struct {
  21. leaders [3]pb.ServerAddress
  22. }
  23. type DataCenter string
  24. type Rack string
  25. type DataCenterBrokers struct {
  26. brokers map[Rack]*RackBrokers
  27. }
  28. type RackBrokers struct {
  29. brokers map[pb.ServerAddress]*ClusterNode
  30. }
  31. type ClusterNode struct {
  32. Address pb.ServerAddress
  33. Version string
  34. counter int
  35. CreatedTs time.Time
  36. DataCenter DataCenter
  37. Rack Rack
  38. }
  39. type Cluster struct {
  40. filerGroup2filers map[FilerGroup]*Filers
  41. filersLock sync.RWMutex
  42. brokers map[DataCenter]*DataCenterBrokers
  43. brokersLock sync.RWMutex
  44. }
  45. func NewCluster() *Cluster {
  46. return &Cluster{
  47. filerGroup2filers: make(map[FilerGroup]*Filers),
  48. brokers: make(map[DataCenter]*DataCenterBrokers),
  49. }
  50. }
  51. func (cluster *Cluster) getFilers(filerGroup FilerGroup, createIfNotFound bool) *Filers {
  52. filers, found := cluster.filerGroup2filers[filerGroup]
  53. if !found && createIfNotFound {
  54. filers = &Filers{
  55. filers: make(map[pb.ServerAddress]*ClusterNode),
  56. leaders: &Leaders{},
  57. }
  58. cluster.filerGroup2filers[filerGroup] = filers
  59. }
  60. return filers
  61. }
  62. func (cluster *Cluster) AddClusterNode(ns, nodeType string, dataCenter DataCenter, rack Rack, address pb.ServerAddress, version string) []*master_pb.KeepConnectedResponse {
  63. filerGroup := FilerGroup(ns)
  64. switch nodeType {
  65. case FilerType:
  66. cluster.filersLock.Lock()
  67. defer cluster.filersLock.Unlock()
  68. filers := cluster.getFilers(filerGroup, true)
  69. if existingNode, found := filers.filers[address]; found {
  70. existingNode.counter++
  71. return nil
  72. }
  73. filers.filers[address] = &ClusterNode{
  74. Address: address,
  75. Version: version,
  76. counter: 1,
  77. CreatedTs: time.Now(),
  78. DataCenter: dataCenter,
  79. Rack: rack,
  80. }
  81. return cluster.ensureFilerLeaders(filers, true, filerGroup, nodeType, address)
  82. case BrokerType:
  83. cluster.brokersLock.Lock()
  84. defer cluster.brokersLock.Unlock()
  85. existingDataCenterBrokers, foundDataCenter := cluster.brokers[dataCenter]
  86. if !foundDataCenter {
  87. existingDataCenterBrokers = &DataCenterBrokers{
  88. brokers: make(map[Rack]*RackBrokers),
  89. }
  90. cluster.brokers[dataCenter] = existingDataCenterBrokers
  91. }
  92. existingRackBrokers, foundRack := existingDataCenterBrokers.brokers[rack]
  93. if !foundRack {
  94. existingRackBrokers = &RackBrokers{
  95. brokers: make(map[pb.ServerAddress]*ClusterNode),
  96. }
  97. existingDataCenterBrokers.brokers[rack] = existingRackBrokers
  98. }
  99. if existingBroker, found := existingRackBrokers.brokers[address]; found {
  100. existingBroker.counter++
  101. return nil
  102. }
  103. existingRackBrokers.brokers[address] = &ClusterNode{
  104. Address: address,
  105. Version: version,
  106. counter: 1,
  107. CreatedTs: time.Now(),
  108. DataCenter: dataCenter,
  109. Rack: rack,
  110. }
  111. return []*master_pb.KeepConnectedResponse{
  112. {
  113. ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{
  114. NodeType: nodeType,
  115. Address: string(address),
  116. IsAdd: true,
  117. },
  118. },
  119. }
  120. case MasterType:
  121. return []*master_pb.KeepConnectedResponse{
  122. {
  123. ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{
  124. NodeType: nodeType,
  125. Address: string(address),
  126. IsAdd: true,
  127. },
  128. },
  129. }
  130. }
  131. return nil
  132. }
  133. func (cluster *Cluster) RemoveClusterNode(ns string, nodeType string, dataCenter DataCenter, rack Rack, address pb.ServerAddress) []*master_pb.KeepConnectedResponse {
  134. filerGroup := FilerGroup(ns)
  135. switch nodeType {
  136. case FilerType:
  137. cluster.filersLock.Lock()
  138. defer cluster.filersLock.Unlock()
  139. filers := cluster.getFilers(filerGroup, false)
  140. if filers == nil {
  141. return nil
  142. }
  143. if existingNode, found := filers.filers[address]; !found {
  144. return nil
  145. } else {
  146. existingNode.counter--
  147. if existingNode.counter <= 0 {
  148. delete(filers.filers, address)
  149. return cluster.ensureFilerLeaders(filers, false, filerGroup, nodeType, address)
  150. }
  151. }
  152. case BrokerType:
  153. cluster.brokersLock.Lock()
  154. defer cluster.brokersLock.Unlock()
  155. existingDataCenterBrokers, foundDataCenter := cluster.brokers[dataCenter]
  156. if !foundDataCenter {
  157. return nil
  158. }
  159. existingRackBrokers, foundRack := existingDataCenterBrokers.brokers[Rack(rack)]
  160. if !foundRack {
  161. return nil
  162. }
  163. existingBroker, found := existingRackBrokers.brokers[address]
  164. if !found {
  165. return nil
  166. }
  167. existingBroker.counter--
  168. if existingBroker.counter <= 0 {
  169. delete(existingRackBrokers.brokers, address)
  170. return []*master_pb.KeepConnectedResponse{
  171. {
  172. ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{
  173. NodeType: nodeType,
  174. Address: string(address),
  175. IsAdd: false,
  176. },
  177. },
  178. }
  179. }
  180. return nil
  181. case MasterType:
  182. return []*master_pb.KeepConnectedResponse{
  183. {
  184. ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{
  185. NodeType: nodeType,
  186. Address: string(address),
  187. IsAdd: false,
  188. },
  189. },
  190. }
  191. }
  192. return nil
  193. }
  194. func (cluster *Cluster) ListClusterNode(filerGroup FilerGroup, nodeType string) (nodes []*ClusterNode) {
  195. switch nodeType {
  196. case FilerType:
  197. cluster.filersLock.RLock()
  198. defer cluster.filersLock.RUnlock()
  199. filers := cluster.getFilers(filerGroup, false)
  200. if filers == nil {
  201. return
  202. }
  203. for _, node := range filers.filers {
  204. nodes = append(nodes, node)
  205. }
  206. case BrokerType:
  207. cluster.brokersLock.RLock()
  208. defer cluster.brokersLock.RUnlock()
  209. for _, dcNodes := range cluster.brokers {
  210. for _, rackNodes := range dcNodes.brokers {
  211. for _, node := range rackNodes.brokers {
  212. nodes = append(nodes, node)
  213. }
  214. }
  215. }
  216. case MasterType:
  217. }
  218. return
  219. }
  220. func (cluster *Cluster) IsOneLeader(filerGroup FilerGroup, address pb.ServerAddress) bool {
  221. filers := cluster.getFilers(filerGroup, false)
  222. if filers == nil {
  223. return false
  224. }
  225. return filers.leaders.isOneLeader(address)
  226. }
  227. func (cluster *Cluster) ensureFilerLeaders(filers *Filers, isAdd bool, filerGroup FilerGroup, nodeType string, address pb.ServerAddress) (result []*master_pb.KeepConnectedResponse) {
  228. if isAdd {
  229. if filers.leaders.addLeaderIfVacant(address) {
  230. // has added the address as one leader
  231. result = append(result, &master_pb.KeepConnectedResponse{
  232. ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{
  233. FilerGroup: string(filerGroup),
  234. NodeType: nodeType,
  235. Address: string(address),
  236. IsLeader: true,
  237. IsAdd: true,
  238. },
  239. })
  240. } else {
  241. result = append(result, &master_pb.KeepConnectedResponse{
  242. ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{
  243. FilerGroup: string(filerGroup),
  244. NodeType: nodeType,
  245. Address: string(address),
  246. IsLeader: false,
  247. IsAdd: true,
  248. },
  249. })
  250. }
  251. } else {
  252. if filers.leaders.removeLeaderIfExists(address) {
  253. result = append(result, &master_pb.KeepConnectedResponse{
  254. ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{
  255. FilerGroup: string(filerGroup),
  256. NodeType: nodeType,
  257. Address: string(address),
  258. IsLeader: true,
  259. IsAdd: false,
  260. },
  261. })
  262. // pick the freshest one, since it is less likely to go away
  263. var shortestDuration int64 = math.MaxInt64
  264. now := time.Now()
  265. var candidateAddress pb.ServerAddress
  266. for _, node := range filers.filers {
  267. if filers.leaders.isOneLeader(node.Address) {
  268. continue
  269. }
  270. duration := now.Sub(node.CreatedTs).Nanoseconds()
  271. if duration < shortestDuration {
  272. shortestDuration = duration
  273. candidateAddress = node.Address
  274. }
  275. }
  276. if candidateAddress != "" {
  277. filers.leaders.addLeaderIfVacant(candidateAddress)
  278. // added a new leader
  279. result = append(result, &master_pb.KeepConnectedResponse{
  280. ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{
  281. NodeType: nodeType,
  282. Address: string(candidateAddress),
  283. IsLeader: true,
  284. IsAdd: true,
  285. },
  286. })
  287. }
  288. } else {
  289. result = append(result, &master_pb.KeepConnectedResponse{
  290. ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{
  291. FilerGroup: string(filerGroup),
  292. NodeType: nodeType,
  293. Address: string(address),
  294. IsLeader: false,
  295. IsAdd: false,
  296. },
  297. })
  298. }
  299. }
  300. return
  301. }
  302. func (leaders *Leaders) addLeaderIfVacant(address pb.ServerAddress) (hasChanged bool) {
  303. if leaders.isOneLeader(address) {
  304. return
  305. }
  306. for i := 0; i < len(leaders.leaders); i++ {
  307. if leaders.leaders[i] == "" {
  308. leaders.leaders[i] = address
  309. hasChanged = true
  310. return
  311. }
  312. }
  313. return
  314. }
  315. func (leaders *Leaders) removeLeaderIfExists(address pb.ServerAddress) (hasChanged bool) {
  316. if !leaders.isOneLeader(address) {
  317. return
  318. }
  319. for i := 0; i < len(leaders.leaders); i++ {
  320. if leaders.leaders[i] == address {
  321. leaders.leaders[i] = ""
  322. hasChanged = true
  323. return
  324. }
  325. }
  326. return
  327. }
  328. func (leaders *Leaders) isOneLeader(address pb.ServerAddress) bool {
  329. for i := 0; i < len(leaders.leaders); i++ {
  330. if leaders.leaders[i] == address {
  331. return true
  332. }
  333. }
  334. return false
  335. }
  336. func (leaders *Leaders) GetLeaders() (addresses []pb.ServerAddress) {
  337. for i := 0; i < len(leaders.leaders); i++ {
  338. if leaders.leaders[i] != "" {
  339. addresses = append(addresses, leaders.leaders[i])
  340. }
  341. }
  342. return
  343. }