You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

194 lines
5.3 KiB

3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
2 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
2 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
  1. package cluster
  2. import (
  3. "github.com/seaweedfs/seaweedfs/weed/pb"
  4. "github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
  5. "sync"
  6. "time"
  7. )
  8. const (
  9. MasterType = "master"
  10. VolumeServerType = "volumeServer"
  11. FilerType = "filer"
  12. BrokerType = "broker"
  13. )
  14. type FilerGroupName string
  15. type DataCenter string
  16. type Rack string
  17. type Leaders struct {
  18. leaders [3]pb.ServerAddress
  19. }
  20. type ClusterNode struct {
  21. Address pb.ServerAddress
  22. Version string
  23. counter int
  24. CreatedTs time.Time
  25. DataCenter DataCenter
  26. Rack Rack
  27. }
  28. type GroupMembers struct {
  29. members map[pb.ServerAddress]*ClusterNode
  30. }
  31. type ClusterNodeGroups struct {
  32. groupMembers map[FilerGroupName]*GroupMembers
  33. sync.RWMutex
  34. }
  35. type Cluster struct {
  36. filerGroups *ClusterNodeGroups
  37. brokerGroups *ClusterNodeGroups
  38. }
  39. func newClusterNodeGroups() *ClusterNodeGroups {
  40. return &ClusterNodeGroups{
  41. groupMembers: map[FilerGroupName]*GroupMembers{},
  42. }
  43. }
  44. func (g *ClusterNodeGroups) getGroupMembers(filerGroup FilerGroupName, createIfNotFound bool) *GroupMembers {
  45. members, found := g.groupMembers[filerGroup]
  46. if !found && createIfNotFound {
  47. members = &GroupMembers{
  48. members: make(map[pb.ServerAddress]*ClusterNode),
  49. }
  50. g.groupMembers[filerGroup] = members
  51. }
  52. return members
  53. }
  54. func (m *GroupMembers) addMember(dataCenter DataCenter, rack Rack, address pb.ServerAddress, version string) *ClusterNode {
  55. if existingNode, found := m.members[address]; found {
  56. existingNode.counter++
  57. return nil
  58. }
  59. t := &ClusterNode{
  60. Address: address,
  61. Version: version,
  62. counter: 1,
  63. CreatedTs: time.Now(),
  64. DataCenter: dataCenter,
  65. Rack: rack,
  66. }
  67. m.members[address] = t
  68. return t
  69. }
  70. func (m *GroupMembers) removeMember(address pb.ServerAddress) bool {
  71. if existingNode, found := m.members[address]; !found {
  72. return false
  73. } else {
  74. existingNode.counter--
  75. if existingNode.counter <= 0 {
  76. delete(m.members, address)
  77. return true
  78. }
  79. }
  80. return false
  81. }
  82. func (m *GroupMembers) GetMembers() (addresses []pb.ServerAddress) {
  83. for k := range m.members {
  84. addresses = append(addresses, k)
  85. }
  86. return
  87. }
  88. func (g *ClusterNodeGroups) AddClusterNode(filerGroup FilerGroupName, nodeType string, dataCenter DataCenter, rack Rack, address pb.ServerAddress, version string) []*master_pb.KeepConnectedResponse {
  89. g.Lock()
  90. defer g.Unlock()
  91. m := g.getGroupMembers(filerGroup, true)
  92. if t := m.addMember(dataCenter, rack, address, version); t != nil {
  93. return buildClusterNodeUpdateMessage(true, filerGroup, nodeType, address)
  94. }
  95. return nil
  96. }
  97. func (g *ClusterNodeGroups) RemoveClusterNode(filerGroup FilerGroupName, nodeType string, address pb.ServerAddress) []*master_pb.KeepConnectedResponse {
  98. g.Lock()
  99. defer g.Unlock()
  100. m := g.getGroupMembers(filerGroup, false)
  101. if m == nil {
  102. return nil
  103. }
  104. if m.removeMember(address) {
  105. return buildClusterNodeUpdateMessage(false, filerGroup, nodeType, address)
  106. }
  107. return nil
  108. }
  109. func (g *ClusterNodeGroups) ListClusterNode(filerGroup FilerGroupName) (nodes []*ClusterNode) {
  110. g.Lock()
  111. defer g.Unlock()
  112. m := g.getGroupMembers(filerGroup, false)
  113. if m == nil {
  114. return nil
  115. }
  116. for _, node := range m.members {
  117. nodes = append(nodes, node)
  118. }
  119. return
  120. }
  121. func NewCluster() *Cluster {
  122. return &Cluster{
  123. filerGroups: newClusterNodeGroups(),
  124. brokerGroups: newClusterNodeGroups(),
  125. }
  126. }
  127. func (cluster *Cluster) getGroupMembers(filerGroup FilerGroupName, nodeType string, createIfNotFound bool) *GroupMembers {
  128. switch nodeType {
  129. case FilerType:
  130. return cluster.filerGroups.getGroupMembers(filerGroup, createIfNotFound)
  131. case BrokerType:
  132. return cluster.brokerGroups.getGroupMembers(filerGroup, createIfNotFound)
  133. }
  134. return nil
  135. }
  136. func (cluster *Cluster) AddClusterNode(ns, nodeType string, dataCenter DataCenter, rack Rack, address pb.ServerAddress, version string) []*master_pb.KeepConnectedResponse {
  137. filerGroup := FilerGroupName(ns)
  138. switch nodeType {
  139. case FilerType:
  140. return cluster.filerGroups.AddClusterNode(filerGroup, nodeType, dataCenter, rack, address, version)
  141. case BrokerType:
  142. return cluster.brokerGroups.AddClusterNode(filerGroup, nodeType, dataCenter, rack, address, version)
  143. case MasterType:
  144. return buildClusterNodeUpdateMessage(true, filerGroup, nodeType, address)
  145. }
  146. return nil
  147. }
  148. func (cluster *Cluster) RemoveClusterNode(ns string, nodeType string, address pb.ServerAddress) []*master_pb.KeepConnectedResponse {
  149. filerGroup := FilerGroupName(ns)
  150. switch nodeType {
  151. case FilerType:
  152. return cluster.filerGroups.RemoveClusterNode(filerGroup, nodeType, address)
  153. case BrokerType:
  154. return cluster.brokerGroups.RemoveClusterNode(filerGroup, nodeType, address)
  155. case MasterType:
  156. return buildClusterNodeUpdateMessage(false, filerGroup, nodeType, address)
  157. }
  158. return nil
  159. }
  160. func (cluster *Cluster) ListClusterNode(filerGroup FilerGroupName, nodeType string) (nodes []*ClusterNode) {
  161. switch nodeType {
  162. case FilerType:
  163. return cluster.filerGroups.ListClusterNode(filerGroup)
  164. case BrokerType:
  165. return cluster.brokerGroups.ListClusterNode(filerGroup)
  166. case MasterType:
  167. }
  168. return
  169. }
  170. func buildClusterNodeUpdateMessage(isAdd bool, filerGroup FilerGroupName, nodeType string, address pb.ServerAddress) (result []*master_pb.KeepConnectedResponse) {
  171. result = append(result, &master_pb.KeepConnectedResponse{
  172. ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{
  173. FilerGroup: string(filerGroup),
  174. NodeType: nodeType,
  175. Address: string(address),
  176. IsAdd: isAdd,
  177. },
  178. })
  179. return
  180. }