You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

153 lines
4.4 KiB

3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
2 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
2 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
2 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
  1. package cluster
  2. import (
  3. "github.com/seaweedfs/seaweedfs/weed/pb"
  4. "github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
  5. "sync"
  6. "time"
  7. )
  8. const (
  9. MasterType = "master"
  10. VolumeServerType = "volumeServer"
  11. FilerType = "filer"
  12. BrokerType = "broker"
  13. )
  14. type FilerGroupName string
  15. type DataCenter string
  16. type Rack string
  17. type ClusterNode struct {
  18. Address pb.ServerAddress
  19. Version string
  20. counter int
  21. CreatedTs time.Time
  22. DataCenter DataCenter
  23. Rack Rack
  24. }
  25. type ClusterNodeGroups struct {
  26. groupMembers map[FilerGroupName]*GroupMembers
  27. sync.RWMutex
  28. }
  29. type Cluster struct {
  30. filerGroups *ClusterNodeGroups
  31. brokerGroups *ClusterNodeGroups
  32. }
  33. func newClusterNodeGroups() *ClusterNodeGroups {
  34. return &ClusterNodeGroups{
  35. groupMembers: map[FilerGroupName]*GroupMembers{},
  36. }
  37. }
  38. func (g *ClusterNodeGroups) getGroupMembers(filerGroup FilerGroupName, createIfNotFound bool) *GroupMembers {
  39. members, found := g.groupMembers[filerGroup]
  40. if !found && createIfNotFound {
  41. members = &GroupMembers{
  42. members: make(map[pb.ServerAddress]*ClusterNode),
  43. }
  44. g.groupMembers[filerGroup] = members
  45. }
  46. return members
  47. }
  48. func (g *ClusterNodeGroups) AddClusterNode(filerGroup FilerGroupName, nodeType string, dataCenter DataCenter, rack Rack, address pb.ServerAddress, version string) []*master_pb.KeepConnectedResponse {
  49. g.Lock()
  50. defer g.Unlock()
  51. m := g.getGroupMembers(filerGroup, true)
  52. if t := m.addMember(dataCenter, rack, address, version); t != nil {
  53. return buildClusterNodeUpdateMessage(true, filerGroup, nodeType, address)
  54. }
  55. return nil
  56. }
  57. func (g *ClusterNodeGroups) RemoveClusterNode(filerGroup FilerGroupName, nodeType string, address pb.ServerAddress) []*master_pb.KeepConnectedResponse {
  58. g.Lock()
  59. defer g.Unlock()
  60. m := g.getGroupMembers(filerGroup, false)
  61. if m == nil {
  62. return nil
  63. }
  64. if m.removeMember(address) {
  65. return buildClusterNodeUpdateMessage(false, filerGroup, nodeType, address)
  66. }
  67. return nil
  68. }
  69. func (g *ClusterNodeGroups) ListClusterNode(filerGroup FilerGroupName) (nodes []*ClusterNode) {
  70. g.Lock()
  71. defer g.Unlock()
  72. m := g.getGroupMembers(filerGroup, false)
  73. if m == nil {
  74. return nil
  75. }
  76. for _, node := range m.members {
  77. nodes = append(nodes, node)
  78. }
  79. return
  80. }
  81. func NewCluster() *Cluster {
  82. return &Cluster{
  83. filerGroups: newClusterNodeGroups(),
  84. brokerGroups: newClusterNodeGroups(),
  85. }
  86. }
  87. func (cluster *Cluster) getGroupMembers(filerGroup FilerGroupName, nodeType string, createIfNotFound bool) *GroupMembers {
  88. switch nodeType {
  89. case FilerType:
  90. return cluster.filerGroups.getGroupMembers(filerGroup, createIfNotFound)
  91. case BrokerType:
  92. return cluster.brokerGroups.getGroupMembers(filerGroup, createIfNotFound)
  93. }
  94. return nil
  95. }
  96. func (cluster *Cluster) AddClusterNode(ns, nodeType string, dataCenter DataCenter, rack Rack, address pb.ServerAddress, version string) []*master_pb.KeepConnectedResponse {
  97. filerGroup := FilerGroupName(ns)
  98. switch nodeType {
  99. case FilerType:
  100. return cluster.filerGroups.AddClusterNode(filerGroup, nodeType, dataCenter, rack, address, version)
  101. case BrokerType:
  102. return cluster.brokerGroups.AddClusterNode(filerGroup, nodeType, dataCenter, rack, address, version)
  103. case MasterType:
  104. return buildClusterNodeUpdateMessage(true, filerGroup, nodeType, address)
  105. }
  106. return nil
  107. }
  108. func (cluster *Cluster) RemoveClusterNode(ns string, nodeType string, address pb.ServerAddress) []*master_pb.KeepConnectedResponse {
  109. filerGroup := FilerGroupName(ns)
  110. switch nodeType {
  111. case FilerType:
  112. return cluster.filerGroups.RemoveClusterNode(filerGroup, nodeType, address)
  113. case BrokerType:
  114. return cluster.brokerGroups.RemoveClusterNode(filerGroup, nodeType, address)
  115. case MasterType:
  116. return buildClusterNodeUpdateMessage(false, filerGroup, nodeType, address)
  117. }
  118. return nil
  119. }
  120. func (cluster *Cluster) ListClusterNode(filerGroup FilerGroupName, nodeType string) (nodes []*ClusterNode) {
  121. switch nodeType {
  122. case FilerType:
  123. return cluster.filerGroups.ListClusterNode(filerGroup)
  124. case BrokerType:
  125. return cluster.brokerGroups.ListClusterNode(filerGroup)
  126. case MasterType:
  127. }
  128. return
  129. }
  130. func buildClusterNodeUpdateMessage(isAdd bool, filerGroup FilerGroupName, nodeType string, address pb.ServerAddress) (result []*master_pb.KeepConnectedResponse) {
  131. result = append(result, &master_pb.KeepConnectedResponse{
  132. ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{
  133. FilerGroup: string(filerGroup),
  134. NodeType: nodeType,
  135. Address: string(address),
  136. IsAdd: isAdd,
  137. },
  138. })
  139. return
  140. }