You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

297 lines
8.6 KiB

13 years ago
13 years ago
6 years ago
6 years ago
13 years ago
4 years ago
5 years ago
5 years ago
5 years ago
5 years ago
6 years ago
6 years ago
4 years ago
9 years ago
4 years ago
  1. package topology
  2. import (
  3. "errors"
  4. "math/rand"
  5. "strings"
  6. "sync"
  7. "sync/atomic"
  8. "time"
  9. "github.com/seaweedfs/seaweedfs/weed/glog"
  10. "github.com/seaweedfs/seaweedfs/weed/stats"
  11. "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
  12. "github.com/seaweedfs/seaweedfs/weed/storage/needle"
  13. "github.com/seaweedfs/seaweedfs/weed/storage/types"
  14. )
  15. type NodeId string
  16. type Node interface {
  17. Id() NodeId
  18. String() string
  19. AvailableSpaceFor(option *VolumeGrowOption) int64
  20. ReserveOneVolume(r int64, option *VolumeGrowOption) (*DataNode, error)
  21. UpAdjustDiskUsageDelta(deltaDiskUsages *DiskUsages)
  22. UpAdjustMaxVolumeId(vid needle.VolumeId)
  23. GetDiskUsages() *DiskUsages
  24. GetMaxVolumeId() needle.VolumeId
  25. SetParent(Node)
  26. LinkChildNode(node Node)
  27. UnlinkChildNode(nodeId NodeId)
  28. CollectDeadNodeAndFullVolumes(freshThreshHold int64, volumeSizeLimit uint64, growThreshold float64)
  29. IsDataNode() bool
  30. IsRack() bool
  31. IsDataCenter() bool
  32. Children() []Node
  33. Parent() Node
  34. GetValue() interface{} //get reference to the topology,dc,rack,datanode
  35. }
  36. type NodeImpl struct {
  37. diskUsages *DiskUsages
  38. id NodeId
  39. parent Node
  40. sync.RWMutex // lock children
  41. children map[NodeId]Node
  42. maxVolumeId needle.VolumeId
  43. //for rack, data center, topology
  44. nodeType string
  45. value interface{}
  46. }
  47. func (n *NodeImpl) GetDiskUsages() *DiskUsages {
  48. return n.diskUsages
  49. }
  50. // the first node must satisfy filterFirstNodeFn(), the rest nodes must have one free slot
  51. func (n *NodeImpl) PickNodesByWeight(numberOfNodes int, option *VolumeGrowOption, filterFirstNodeFn func(dn Node) error) (firstNode Node, restNodes []Node, err error) {
  52. var totalWeights int64
  53. var errs []string
  54. n.RLock()
  55. candidates := make([]Node, 0, len(n.children))
  56. candidatesWeights := make([]int64, 0, len(n.children))
  57. //pick nodes which has enough free volumes as candidates, and use free volumes number as node weight.
  58. for _, node := range n.children {
  59. if node.AvailableSpaceFor(option) <= 0 {
  60. continue
  61. }
  62. totalWeights += node.AvailableSpaceFor(option)
  63. candidates = append(candidates, node)
  64. candidatesWeights = append(candidatesWeights, node.AvailableSpaceFor(option))
  65. }
  66. n.RUnlock()
  67. if len(candidates) < numberOfNodes {
  68. glog.V(0).Infoln(n.Id(), "failed to pick", numberOfNodes, "from ", len(candidates), "node candidates")
  69. return nil, nil, errors.New("Not enough data nodes found!")
  70. }
  71. //pick nodes randomly by weights, the node picked earlier has higher final weights
  72. sortedCandidates := make([]Node, 0, len(candidates))
  73. for i := 0; i < len(candidates); i++ {
  74. weightsInterval := rand.Int63n(totalWeights)
  75. lastWeights := int64(0)
  76. for k, weights := range candidatesWeights {
  77. if (weightsInterval >= lastWeights) && (weightsInterval < lastWeights+weights) {
  78. sortedCandidates = append(sortedCandidates, candidates[k])
  79. candidatesWeights[k] = 0
  80. totalWeights -= weights
  81. break
  82. }
  83. lastWeights += weights
  84. }
  85. }
  86. restNodes = make([]Node, 0, numberOfNodes-1)
  87. ret := false
  88. n.RLock()
  89. for k, node := range sortedCandidates {
  90. if err := filterFirstNodeFn(node); err == nil {
  91. firstNode = node
  92. if k >= numberOfNodes-1 {
  93. restNodes = sortedCandidates[:numberOfNodes-1]
  94. } else {
  95. restNodes = append(restNodes, sortedCandidates[:k]...)
  96. restNodes = append(restNodes, sortedCandidates[k+1:numberOfNodes]...)
  97. }
  98. ret = true
  99. break
  100. } else {
  101. errs = append(errs, string(node.Id())+":"+err.Error())
  102. }
  103. }
  104. n.RUnlock()
  105. if !ret {
  106. return nil, nil, errors.New("No matching data node found! \n" + strings.Join(errs, "\n"))
  107. }
  108. return
  109. }
  110. func (n *NodeImpl) IsDataNode() bool {
  111. return n.nodeType == "DataNode"
  112. }
  113. func (n *NodeImpl) IsRack() bool {
  114. return n.nodeType == "Rack"
  115. }
  116. func (n *NodeImpl) IsDataCenter() bool {
  117. return n.nodeType == "DataCenter"
  118. }
  119. func (n *NodeImpl) String() string {
  120. if n.parent != nil {
  121. return n.parent.String() + ":" + string(n.id)
  122. }
  123. return string(n.id)
  124. }
  125. func (n *NodeImpl) Id() NodeId {
  126. return n.id
  127. }
  128. func (n *NodeImpl) getOrCreateDisk(diskType types.DiskType) *DiskUsageCounts {
  129. return n.diskUsages.getOrCreateDisk(diskType)
  130. }
  131. func (n *NodeImpl) AvailableSpaceFor(option *VolumeGrowOption) int64 {
  132. t := n.getOrCreateDisk(option.DiskType)
  133. freeVolumeSlotCount := atomic.LoadInt64(&t.maxVolumeCount) + atomic.LoadInt64(&t.remoteVolumeCount) - atomic.LoadInt64(&t.volumeCount)
  134. ecShardCount := atomic.LoadInt64(&t.ecShardCount)
  135. if ecShardCount > 0 {
  136. freeVolumeSlotCount = freeVolumeSlotCount - ecShardCount/erasure_coding.DataShardsCount - 1
  137. }
  138. return freeVolumeSlotCount
  139. }
  140. func (n *NodeImpl) SetParent(node Node) {
  141. n.parent = node
  142. }
  143. func (n *NodeImpl) Children() (ret []Node) {
  144. n.RLock()
  145. defer n.RUnlock()
  146. for _, c := range n.children {
  147. ret = append(ret, c)
  148. }
  149. return ret
  150. }
  151. func (n *NodeImpl) Parent() Node {
  152. return n.parent
  153. }
  154. func (n *NodeImpl) GetValue() interface{} {
  155. return n.value
  156. }
  157. func (n *NodeImpl) ReserveOneVolume(r int64, option *VolumeGrowOption) (assignedNode *DataNode, err error) {
  158. n.RLock()
  159. defer n.RUnlock()
  160. for _, node := range n.children {
  161. freeSpace := node.AvailableSpaceFor(option)
  162. // fmt.Println("r =", r, ", node =", node, ", freeSpace =", freeSpace)
  163. if freeSpace <= 0 {
  164. continue
  165. }
  166. if r >= freeSpace {
  167. r -= freeSpace
  168. } else {
  169. if node.IsDataNode() && node.AvailableSpaceFor(option) > 0 {
  170. // fmt.Println("vid =", vid, " assigned to node =", node, ", freeSpace =", node.FreeSpace())
  171. dn := node.(*DataNode)
  172. if dn.IsTerminating {
  173. continue
  174. }
  175. return dn, nil
  176. }
  177. assignedNode, err = node.ReserveOneVolume(r, option)
  178. if err == nil {
  179. return
  180. }
  181. }
  182. }
  183. return nil, errors.New("No free volume slot found!")
  184. }
  185. func (n *NodeImpl) UpAdjustDiskUsageDelta(deltaDiskUsages *DiskUsages) { //can be negative
  186. for diskType, diskUsage := range deltaDiskUsages.usages {
  187. existingDisk := n.getOrCreateDisk(diskType)
  188. existingDisk.addDiskUsageCounts(diskUsage)
  189. }
  190. if n.parent != nil {
  191. n.parent.UpAdjustDiskUsageDelta(deltaDiskUsages)
  192. }
  193. }
  194. func (n *NodeImpl) UpAdjustMaxVolumeId(vid needle.VolumeId) { //can be negative
  195. if n.maxVolumeId < vid {
  196. n.maxVolumeId = vid
  197. if n.parent != nil {
  198. n.parent.UpAdjustMaxVolumeId(vid)
  199. }
  200. }
  201. }
  202. func (n *NodeImpl) GetMaxVolumeId() needle.VolumeId {
  203. return n.maxVolumeId
  204. }
  205. func (n *NodeImpl) LinkChildNode(node Node) {
  206. n.Lock()
  207. defer n.Unlock()
  208. n.doLinkChildNode(node)
  209. }
  210. func (n *NodeImpl) doLinkChildNode(node Node) {
  211. if n.children[node.Id()] == nil {
  212. n.children[node.Id()] = node
  213. n.UpAdjustDiskUsageDelta(node.GetDiskUsages())
  214. n.UpAdjustMaxVolumeId(node.GetMaxVolumeId())
  215. node.SetParent(n)
  216. glog.V(0).Infoln(n, "adds child", node.Id())
  217. }
  218. }
  219. func (n *NodeImpl) UnlinkChildNode(nodeId NodeId) {
  220. n.Lock()
  221. defer n.Unlock()
  222. node := n.children[nodeId]
  223. if node != nil {
  224. node.SetParent(nil)
  225. delete(n.children, node.Id())
  226. n.UpAdjustDiskUsageDelta(node.GetDiskUsages().negative())
  227. glog.V(0).Infoln(n, "removes", node.Id())
  228. }
  229. }
  230. func (n *NodeImpl) CollectDeadNodeAndFullVolumes(freshThreshHold int64, volumeSizeLimit uint64, growThreshold float64) {
  231. if n.IsRack() {
  232. for _, c := range n.Children() {
  233. dn := c.(*DataNode) //can not cast n to DataNode
  234. dn.RLock()
  235. for _, v := range dn.GetVolumes() {
  236. topo := n.GetTopology()
  237. diskType := types.ToDiskType(v.DiskType)
  238. vl := topo.GetVolumeLayout(v.Collection, v.ReplicaPlacement, v.Ttl, diskType)
  239. if v.Size >= volumeSizeLimit {
  240. vl.accessLock.RLock()
  241. vacuumTime, ok := vl.vacuumedVolumes[v.Id]
  242. vl.accessLock.RUnlock()
  243. // If a volume has been vacuumed in the past 20 seconds, we do not check whether it has reached full capacity.
  244. // After 20s(grpc timeout), theoretically all the heartbeats of the volume server have reached the master,
  245. // the volume size should be correct, not the size before the vacuum.
  246. if !ok || time.Now().Add(-20*time.Second).After(vacuumTime) {
  247. //fmt.Println("volume",v.Id,"size",v.Size,">",volumeSizeLimit)
  248. topo.chanFullVolumes <- v
  249. }
  250. } else if float64(v.Size) > float64(volumeSizeLimit)*growThreshold {
  251. topo.chanCrowdedVolumes <- v
  252. }
  253. copyCount := v.ReplicaPlacement.GetCopyCount()
  254. if copyCount > 1 {
  255. if copyCount > len(topo.Lookup(v.Collection, v.Id)) {
  256. stats.MasterReplicaPlacementMismatch.WithLabelValues(v.Collection, v.Id.String()).Set(1)
  257. } else {
  258. stats.MasterReplicaPlacementMismatch.WithLabelValues(v.Collection, v.Id.String()).Set(0)
  259. }
  260. }
  261. }
  262. dn.RUnlock()
  263. }
  264. } else {
  265. for _, c := range n.Children() {
  266. c.CollectDeadNodeAndFullVolumes(freshThreshHold, volumeSizeLimit, growThreshold)
  267. }
  268. }
  269. }
  270. func (n *NodeImpl) GetTopology() *Topology {
  271. var p Node
  272. p = n
  273. for p.Parent() != nil {
  274. p = p.Parent()
  275. }
  276. return p.GetValue().(*Topology)
  277. }