You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

274 lines
7.8 KiB

13 years ago
6 years ago
13 years ago
6 years ago
6 years ago
13 years ago
4 years ago
5 years ago
5 years ago
5 years ago
5 years ago
6 years ago
6 years ago
4 years ago
9 years ago
4 years ago
  1. package topology
  2. import (
  3. "errors"
  4. "github.com/chrislusf/seaweedfs/weed/glog"
  5. "github.com/chrislusf/seaweedfs/weed/stats"
  6. "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding"
  7. "github.com/chrislusf/seaweedfs/weed/storage/needle"
  8. "github.com/chrislusf/seaweedfs/weed/storage/types"
  9. "math/rand"
  10. "strings"
  11. "sync"
  12. )
  13. type NodeId string
  14. type Node interface {
  15. Id() NodeId
  16. String() string
  17. AvailableSpaceFor(option *VolumeGrowOption) int64
  18. ReserveOneVolume(r int64, option *VolumeGrowOption) (*DataNode, error)
  19. UpAdjustDiskUsageDelta(deltaDiskUsages *DiskUsages)
  20. UpAdjustMaxVolumeId(vid needle.VolumeId)
  21. GetDiskUsages() *DiskUsages
  22. GetMaxVolumeId() needle.VolumeId
  23. SetParent(Node)
  24. LinkChildNode(node Node)
  25. UnlinkChildNode(nodeId NodeId)
  26. CollectDeadNodeAndFullVolumes(freshThreshHold int64, volumeSizeLimit uint64, growThreshold float64)
  27. IsDataNode() bool
  28. IsRack() bool
  29. IsDataCenter() bool
  30. Children() []Node
  31. Parent() Node
  32. GetValue() interface{} //get reference to the topology,dc,rack,datanode
  33. }
  34. type NodeImpl struct {
  35. diskUsages *DiskUsages
  36. id NodeId
  37. parent Node
  38. sync.RWMutex // lock children
  39. children map[NodeId]Node
  40. maxVolumeId needle.VolumeId
  41. //for rack, data center, topology
  42. nodeType string
  43. value interface{}
  44. }
  45. func (n *NodeImpl) GetDiskUsages() *DiskUsages {
  46. return n.diskUsages
  47. }
  48. // the first node must satisfy filterFirstNodeFn(), the rest nodes must have one free slot
  49. func (n *NodeImpl) PickNodesByWeight(numberOfNodes int, option *VolumeGrowOption, filterFirstNodeFn func(dn Node) error) (firstNode Node, restNodes []Node, err error) {
  50. var totalWeights int64
  51. var errs []string
  52. n.RLock()
  53. candidates := make([]Node, 0, len(n.children))
  54. candidatesWeights := make([]int64, 0, len(n.children))
  55. //pick nodes which has enough free volumes as candidates, and use free volumes number as node weight.
  56. for _, node := range n.children {
  57. if node.AvailableSpaceFor(option) <= 0 {
  58. continue
  59. }
  60. totalWeights += node.AvailableSpaceFor(option)
  61. candidates = append(candidates, node)
  62. candidatesWeights = append(candidatesWeights, node.AvailableSpaceFor(option))
  63. }
  64. n.RUnlock()
  65. if len(candidates) < numberOfNodes {
  66. glog.V(0).Infoln(n.Id(), "failed to pick", numberOfNodes, "from ", len(candidates), "node candidates")
  67. return nil, nil, errors.New("No enough data node found!")
  68. }
  69. //pick nodes randomly by weights, the node picked earlier has higher final weights
  70. sortedCandidates := make([]Node, 0, len(candidates))
  71. for i := 0; i < len(candidates); i++ {
  72. weightsInterval := rand.Int63n(totalWeights)
  73. lastWeights := int64(0)
  74. for k, weights := range candidatesWeights {
  75. if (weightsInterval >= lastWeights) && (weightsInterval < lastWeights+weights) {
  76. sortedCandidates = append(sortedCandidates, candidates[k])
  77. candidatesWeights[k] = 0
  78. totalWeights -= weights
  79. break
  80. }
  81. lastWeights += weights
  82. }
  83. }
  84. restNodes = make([]Node, 0, numberOfNodes-1)
  85. ret := false
  86. n.RLock()
  87. for k, node := range sortedCandidates {
  88. if err := filterFirstNodeFn(node); err == nil {
  89. firstNode = node
  90. if k >= numberOfNodes-1 {
  91. restNodes = sortedCandidates[:numberOfNodes-1]
  92. } else {
  93. restNodes = append(restNodes, sortedCandidates[:k]...)
  94. restNodes = append(restNodes, sortedCandidates[k+1:numberOfNodes]...)
  95. }
  96. ret = true
  97. break
  98. } else {
  99. errs = append(errs, string(node.Id())+":"+err.Error())
  100. }
  101. }
  102. n.RUnlock()
  103. if !ret {
  104. return nil, nil, errors.New("No matching data node found! \n" + strings.Join(errs, "\n"))
  105. }
  106. return
  107. }
  108. func (n *NodeImpl) IsDataNode() bool {
  109. return n.nodeType == "DataNode"
  110. }
  111. func (n *NodeImpl) IsRack() bool {
  112. return n.nodeType == "Rack"
  113. }
  114. func (n *NodeImpl) IsDataCenter() bool {
  115. return n.nodeType == "DataCenter"
  116. }
  117. func (n *NodeImpl) String() string {
  118. if n.parent != nil {
  119. return n.parent.String() + ":" + string(n.id)
  120. }
  121. return string(n.id)
  122. }
  123. func (n *NodeImpl) Id() NodeId {
  124. return n.id
  125. }
  126. func (n *NodeImpl) getOrCreateDisk(diskType types.DiskType) *DiskUsageCounts {
  127. return n.diskUsages.getOrCreateDisk(diskType)
  128. }
  129. func (n *NodeImpl) AvailableSpaceFor(option *VolumeGrowOption) int64 {
  130. t := n.getOrCreateDisk(option.DiskType)
  131. freeVolumeSlotCount := t.maxVolumeCount + t.remoteVolumeCount - t.volumeCount
  132. if t.ecShardCount > 0 {
  133. freeVolumeSlotCount = freeVolumeSlotCount - t.ecShardCount/erasure_coding.DataShardsCount - 1
  134. }
  135. return freeVolumeSlotCount
  136. }
  137. func (n *NodeImpl) SetParent(node Node) {
  138. n.parent = node
  139. }
  140. func (n *NodeImpl) Children() (ret []Node) {
  141. n.RLock()
  142. defer n.RUnlock()
  143. for _, c := range n.children {
  144. ret = append(ret, c)
  145. }
  146. return ret
  147. }
  148. func (n *NodeImpl) Parent() Node {
  149. return n.parent
  150. }
  151. func (n *NodeImpl) GetValue() interface{} {
  152. return n.value
  153. }
  154. func (n *NodeImpl) ReserveOneVolume(r int64, option *VolumeGrowOption) (assignedNode *DataNode, err error) {
  155. n.RLock()
  156. defer n.RUnlock()
  157. for _, node := range n.children {
  158. freeSpace := node.AvailableSpaceFor(option)
  159. // fmt.Println("r =", r, ", node =", node, ", freeSpace =", freeSpace)
  160. if freeSpace <= 0 {
  161. continue
  162. }
  163. if r >= freeSpace {
  164. r -= freeSpace
  165. } else {
  166. if node.IsDataNode() && node.AvailableSpaceFor(option) > 0 {
  167. // fmt.Println("vid =", vid, " assigned to node =", node, ", freeSpace =", node.FreeSpace())
  168. return node.(*DataNode), nil
  169. }
  170. assignedNode, err = node.ReserveOneVolume(r, option)
  171. if err == nil {
  172. return
  173. }
  174. }
  175. }
  176. return nil, errors.New("No free volume slot found!")
  177. }
  178. func (n *NodeImpl) UpAdjustDiskUsageDelta(deltaDiskUsages *DiskUsages) { //can be negative
  179. for diskType, diskUsage := range deltaDiskUsages.usages {
  180. existingDisk := n.getOrCreateDisk(diskType)
  181. existingDisk.addDiskUsageCounts(diskUsage)
  182. }
  183. if n.parent != nil {
  184. n.parent.UpAdjustDiskUsageDelta(deltaDiskUsages)
  185. }
  186. }
  187. func (n *NodeImpl) UpAdjustMaxVolumeId(vid needle.VolumeId) { //can be negative
  188. if n.maxVolumeId < vid {
  189. n.maxVolumeId = vid
  190. if n.parent != nil {
  191. n.parent.UpAdjustMaxVolumeId(vid)
  192. }
  193. }
  194. }
  195. func (n *NodeImpl) GetMaxVolumeId() needle.VolumeId {
  196. return n.maxVolumeId
  197. }
  198. func (n *NodeImpl) LinkChildNode(node Node) {
  199. n.Lock()
  200. defer n.Unlock()
  201. n.doLinkChildNode(node)
  202. }
  203. func (n *NodeImpl) doLinkChildNode(node Node) {
  204. if n.children[node.Id()] == nil {
  205. n.children[node.Id()] = node
  206. n.UpAdjustDiskUsageDelta(node.GetDiskUsages())
  207. n.UpAdjustMaxVolumeId(node.GetMaxVolumeId())
  208. node.SetParent(n)
  209. glog.V(0).Infoln(n, "adds child", node.Id())
  210. }
  211. }
  212. func (n *NodeImpl) UnlinkChildNode(nodeId NodeId) {
  213. n.Lock()
  214. defer n.Unlock()
  215. node := n.children[nodeId]
  216. if node != nil {
  217. node.SetParent(nil)
  218. delete(n.children, node.Id())
  219. n.UpAdjustDiskUsageDelta(node.GetDiskUsages().negative())
  220. glog.V(0).Infoln(n, "removes", node.Id())
  221. }
  222. }
  223. func (n *NodeImpl) CollectDeadNodeAndFullVolumes(freshThreshHold int64, volumeSizeLimit uint64, growThreshold float64) {
  224. if n.IsRack() {
  225. for _, c := range n.Children() {
  226. dn := c.(*DataNode) //can not cast n to DataNode
  227. for _, v := range dn.GetVolumes() {
  228. if v.Size >= volumeSizeLimit {
  229. //fmt.Println("volume",v.Id,"size",v.Size,">",volumeSizeLimit)
  230. n.GetTopology().chanFullVolumes <- v
  231. } else if float64(v.Size) > float64(volumeSizeLimit)*growThreshold {
  232. n.GetTopology().chanCrowdedVolumes <- v
  233. }
  234. copyCount := v.ReplicaPlacement.GetCopyCount()
  235. if copyCount > 1 {
  236. if copyCount > len(n.GetTopology().Lookup(v.Collection, v.Id)) {
  237. stats.MasterReplicaPlacementMismatch.WithLabelValues(v.Collection, v.Id.String()).Set(1)
  238. } else {
  239. stats.MasterReplicaPlacementMismatch.WithLabelValues(v.Collection, v.Id.String()).Set(0)
  240. }
  241. }
  242. }
  243. }
  244. } else {
  245. for _, c := range n.Children() {
  246. c.CollectDeadNodeAndFullVolumes(freshThreshHold, volumeSizeLimit, growThreshold)
  247. }
  248. }
  249. }
  250. func (n *NodeImpl) GetTopology() *Topology {
  251. var p Node
  252. p = n
  253. for p.Parent() != nil {
  254. p = p.Parent()
  255. }
  256. return p.GetValue().(*Topology)
  257. }