You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

355 lines
8.9 KiB

3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
  1. package cluster
  2. import (
  3. "github.com/chrislusf/seaweedfs/weed/pb"
  4. "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
  5. "math"
  6. "sync"
  7. "time"
  8. )
  9. const (
  10. MasterType = "master"
  11. VolumeServerType = "volumeServer"
  12. FilerType = "filer"
  13. BrokerType = "broker"
  14. )
  15. type FilerGroup string
  16. type Filers struct {
  17. filers map[pb.ServerAddress]*ClusterNode
  18. leaders *Leaders
  19. }
  20. type Leaders struct {
  21. leaders [3]pb.ServerAddress
  22. }
  23. type DataCenter string
  24. type Rack string
  25. type DataCenterBrokers struct {
  26. brokers map[Rack]*RackBrokers
  27. }
  28. type RackBrokers struct {
  29. brokers map[pb.ServerAddress]*ClusterNode
  30. }
  31. type ClusterNode struct {
  32. Address pb.ServerAddress
  33. Version string
  34. counter int
  35. CreatedTs time.Time
  36. }
  37. type Cluster struct {
  38. filerGroup2filers map[FilerGroup]*Filers
  39. filersLock sync.RWMutex
  40. brokers map[DataCenter]*DataCenterBrokers
  41. brokersLock sync.RWMutex
  42. }
  43. func NewCluster() *Cluster {
  44. return &Cluster{
  45. filerGroup2filers: make(map[FilerGroup]*Filers),
  46. brokers: make(map[DataCenter]*DataCenterBrokers),
  47. }
  48. }
  49. func (cluster *Cluster) getFilers(filerGroup FilerGroup, createIfNotFound bool) *Filers {
  50. filers, found := cluster.filerGroup2filers[filerGroup]
  51. if !found && createIfNotFound {
  52. filers = &Filers{
  53. filers: make(map[pb.ServerAddress]*ClusterNode),
  54. leaders: &Leaders{},
  55. }
  56. cluster.filerGroup2filers[filerGroup] = filers
  57. }
  58. return filers
  59. }
  60. func (cluster *Cluster) AddClusterNode(ns, nodeType string, dataCenter DataCenter, rack Rack, address pb.ServerAddress, version string) []*master_pb.KeepConnectedResponse {
  61. filerGroup := FilerGroup(ns)
  62. switch nodeType {
  63. case FilerType:
  64. cluster.filersLock.Lock()
  65. defer cluster.filersLock.Unlock()
  66. filers := cluster.getFilers(filerGroup, true)
  67. if existingNode, found := filers.filers[address]; found {
  68. existingNode.counter++
  69. return nil
  70. }
  71. filers.filers[address] = &ClusterNode{
  72. Address: address,
  73. Version: version,
  74. counter: 1,
  75. CreatedTs: time.Now(),
  76. }
  77. return cluster.ensureFilerLeaders(filers, true, filerGroup, nodeType, address)
  78. case BrokerType:
  79. cluster.brokersLock.Lock()
  80. defer cluster.brokersLock.Unlock()
  81. existingDataCenterBrokers, foundDataCenter := cluster.brokers[dataCenter]
  82. if !foundDataCenter {
  83. existingDataCenterBrokers = &DataCenterBrokers{
  84. brokers: make(map[Rack]*RackBrokers),
  85. }
  86. }
  87. existingRackBrokers, foundRack := existingDataCenterBrokers.brokers[rack]
  88. if !foundRack {
  89. existingRackBrokers = &RackBrokers{
  90. brokers: make(map[pb.ServerAddress]*ClusterNode),
  91. }
  92. }
  93. if existingBroker, found := existingRackBrokers.brokers[address]; found {
  94. existingBroker.counter++
  95. return nil
  96. }
  97. existingRackBrokers.brokers[address] = &ClusterNode{
  98. Address: address,
  99. Version: version,
  100. counter: 1,
  101. CreatedTs: time.Now(),
  102. }
  103. return []*master_pb.KeepConnectedResponse{
  104. {
  105. ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{
  106. NodeType: nodeType,
  107. Address: string(address),
  108. IsAdd: true,
  109. },
  110. },
  111. }
  112. case MasterType:
  113. return []*master_pb.KeepConnectedResponse{
  114. {
  115. ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{
  116. NodeType: nodeType,
  117. Address: string(address),
  118. IsAdd: true,
  119. },
  120. },
  121. }
  122. }
  123. return nil
  124. }
  125. func (cluster *Cluster) RemoveClusterNode(ns string, nodeType string, dataCenter DataCenter, rack Rack, address pb.ServerAddress) []*master_pb.KeepConnectedResponse {
  126. filerGroup := FilerGroup(ns)
  127. switch nodeType {
  128. case FilerType:
  129. cluster.filersLock.Lock()
  130. defer cluster.filersLock.Unlock()
  131. filers := cluster.getFilers(filerGroup, false)
  132. if filers == nil {
  133. return nil
  134. }
  135. if existingNode, found := filers.filers[address]; !found {
  136. return nil
  137. } else {
  138. existingNode.counter--
  139. if existingNode.counter <= 0 {
  140. delete(filers.filers, address)
  141. return cluster.ensureFilerLeaders(filers, false, filerGroup, nodeType, address)
  142. }
  143. }
  144. case BrokerType:
  145. cluster.brokersLock.Lock()
  146. defer cluster.brokersLock.Unlock()
  147. existingDataCenterBrokers, foundDataCenter := cluster.brokers[dataCenter]
  148. if !foundDataCenter {
  149. return nil
  150. }
  151. existingRackBrokers, foundRack := existingDataCenterBrokers.brokers[Rack(rack)]
  152. if !foundRack {
  153. return nil
  154. }
  155. existingBroker, found := existingRackBrokers.brokers[address]
  156. if !found {
  157. return nil
  158. }
  159. existingBroker.counter--
  160. if existingBroker.counter <= 0 {
  161. delete(existingRackBrokers.brokers, address)
  162. return []*master_pb.KeepConnectedResponse{
  163. {
  164. ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{
  165. NodeType: nodeType,
  166. Address: string(address),
  167. IsAdd: false,
  168. },
  169. },
  170. }
  171. }
  172. return nil
  173. case MasterType:
  174. return []*master_pb.KeepConnectedResponse{
  175. {
  176. ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{
  177. NodeType: nodeType,
  178. Address: string(address),
  179. IsAdd: false,
  180. },
  181. },
  182. }
  183. }
  184. return nil
  185. }
  186. func (cluster *Cluster) ListClusterNode(filerGroup FilerGroup, nodeType string) (nodes []*ClusterNode) {
  187. switch nodeType {
  188. case FilerType:
  189. cluster.filersLock.RLock()
  190. defer cluster.filersLock.RUnlock()
  191. filers := cluster.getFilers(filerGroup, false)
  192. if filers == nil {
  193. return
  194. }
  195. for _, node := range filers.filers {
  196. nodes = append(nodes, node)
  197. }
  198. case BrokerType:
  199. cluster.brokersLock.RLock()
  200. defer cluster.brokersLock.RUnlock()
  201. for _, dcNodes := range cluster.brokers {
  202. for _, rackNodes := range dcNodes.brokers {
  203. for _, node := range rackNodes.brokers {
  204. nodes = append(nodes, node)
  205. }
  206. }
  207. }
  208. case MasterType:
  209. }
  210. return
  211. }
  212. func (cluster *Cluster) IsOneLeader(filerGroup FilerGroup, address pb.ServerAddress) bool {
  213. filers := cluster.getFilers(filerGroup, false)
  214. if filers == nil {
  215. return false
  216. }
  217. return filers.leaders.isOneLeader(address)
  218. }
  219. func (cluster *Cluster) ensureFilerLeaders(filers *Filers, isAdd bool, filerGroup FilerGroup, nodeType string, address pb.ServerAddress) (result []*master_pb.KeepConnectedResponse) {
  220. if isAdd {
  221. if filers.leaders.addLeaderIfVacant(address) {
  222. // has added the address as one leader
  223. result = append(result, &master_pb.KeepConnectedResponse{
  224. ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{
  225. FilerGroup: string(filerGroup),
  226. NodeType: nodeType,
  227. Address: string(address),
  228. IsLeader: true,
  229. IsAdd: true,
  230. },
  231. })
  232. } else {
  233. result = append(result, &master_pb.KeepConnectedResponse{
  234. ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{
  235. FilerGroup: string(filerGroup),
  236. NodeType: nodeType,
  237. Address: string(address),
  238. IsLeader: false,
  239. IsAdd: true,
  240. },
  241. })
  242. }
  243. } else {
  244. if filers.leaders.removeLeaderIfExists(address) {
  245. result = append(result, &master_pb.KeepConnectedResponse{
  246. ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{
  247. FilerGroup: string(filerGroup),
  248. NodeType: nodeType,
  249. Address: string(address),
  250. IsLeader: true,
  251. IsAdd: false,
  252. },
  253. })
  254. // pick the freshest one, since it is less likely to go away
  255. var shortestDuration int64 = math.MaxInt64
  256. now := time.Now()
  257. var candidateAddress pb.ServerAddress
  258. for _, node := range filers.filers {
  259. if filers.leaders.isOneLeader(node.Address) {
  260. continue
  261. }
  262. duration := now.Sub(node.CreatedTs).Nanoseconds()
  263. if duration < shortestDuration {
  264. shortestDuration = duration
  265. candidateAddress = node.Address
  266. }
  267. }
  268. if candidateAddress != "" {
  269. filers.leaders.addLeaderIfVacant(candidateAddress)
  270. // added a new leader
  271. result = append(result, &master_pb.KeepConnectedResponse{
  272. ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{
  273. NodeType: nodeType,
  274. Address: string(candidateAddress),
  275. IsLeader: true,
  276. IsAdd: true,
  277. },
  278. })
  279. }
  280. } else {
  281. result = append(result, &master_pb.KeepConnectedResponse{
  282. ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{
  283. FilerGroup: string(filerGroup),
  284. NodeType: nodeType,
  285. Address: string(address),
  286. IsLeader: false,
  287. IsAdd: false,
  288. },
  289. })
  290. }
  291. }
  292. return
  293. }
  294. func (leaders *Leaders) addLeaderIfVacant(address pb.ServerAddress) (hasChanged bool) {
  295. if leaders.isOneLeader(address) {
  296. return
  297. }
  298. for i := 0; i < len(leaders.leaders); i++ {
  299. if leaders.leaders[i] == "" {
  300. leaders.leaders[i] = address
  301. hasChanged = true
  302. return
  303. }
  304. }
  305. return
  306. }
  307. func (leaders *Leaders) removeLeaderIfExists(address pb.ServerAddress) (hasChanged bool) {
  308. if !leaders.isOneLeader(address) {
  309. return
  310. }
  311. for i := 0; i < len(leaders.leaders); i++ {
  312. if leaders.leaders[i] == address {
  313. leaders.leaders[i] = ""
  314. hasChanged = true
  315. return
  316. }
  317. }
  318. return
  319. }
  320. func (leaders *Leaders) isOneLeader(address pb.ServerAddress) bool {
  321. for i := 0; i < len(leaders.leaders); i++ {
  322. if leaders.leaders[i] == address {
  323. return true
  324. }
  325. }
  326. return false
  327. }
  328. func (leaders *Leaders) GetLeaders() (addresses []pb.ServerAddress) {
  329. for i := 0; i < len(leaders.leaders); i++ {
  330. if leaders.leaders[i] != "" {
  331. addresses = append(addresses, leaders.leaders[i])
  332. }
  333. }
  334. return
  335. }