You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

357 lines
10 KiB

13 years ago
6 years ago
13 years ago
6 years ago
6 years ago
13 years ago
6 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
6 years ago
6 years ago
9 years ago
  1. package topology
  2. import (
  3. "errors"
  4. "math/rand"
  5. "strings"
  6. "sync"
  7. "sync/atomic"
  8. "github.com/chrislusf/seaweedfs/weed/glog"
  9. "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding"
  10. "github.com/chrislusf/seaweedfs/weed/storage/needle"
  11. )
  12. type NodeId string
  13. type Node interface {
  14. Id() NodeId
  15. String() string
  16. FreeSpace() int64
  17. ReserveOneVolume(r int64) (*DataNode, error)
  18. UpAdjustMaxVolumeCountDelta(maxVolumeCountDelta int64)
  19. UpAdjustMaxSsdVolumeCountDelta(maxSsdVolumeCountDelta int64)
  20. UpAdjustVolumeCountDelta(volumeCountDelta int64)
  21. UpAdjustSsdVolumeCountDelta(ssdVolumeCountDelta int64)
  22. UpAdjustRemoteVolumeCountDelta(remoteVolumeCountDelta int64)
  23. UpAdjustEcShardCountDelta(ecShardCountDelta int64)
  24. UpAdjustActiveVolumeCountDelta(activeVolumeCountDelta int64)
  25. UpAdjustMaxVolumeId(vid needle.VolumeId)
  26. GetVolumeCount() int64
  27. GetSsdVolumeCount() int64
  28. GetEcShardCount() int64
  29. GetActiveVolumeCount() int64
  30. GetRemoteVolumeCount() int64
  31. GetMaxVolumeCount() int64
  32. GetMaxSsdVolumeCount() int64
  33. GetMaxVolumeId() needle.VolumeId
  34. SetParent(Node)
  35. LinkChildNode(node Node)
  36. UnlinkChildNode(nodeId NodeId)
  37. CollectDeadNodeAndFullVolumes(freshThreshHold int64, volumeSizeLimit uint64)
  38. IsDataNode() bool
  39. IsRack() bool
  40. IsDataCenter() bool
  41. Children() []Node
  42. Parent() Node
  43. GetValue() interface{} //get reference to the topology,dc,rack,datanode
  44. }
  45. type NodeImpl struct {
  46. volumeCount int64
  47. remoteVolumeCount int64
  48. ssdVolumeCount int64
  49. activeVolumeCount int64
  50. ecShardCount int64
  51. maxVolumeCount int64
  52. maxSsdVolumeCount int64
  53. id NodeId
  54. parent Node
  55. sync.RWMutex // lock children
  56. children map[NodeId]Node
  57. maxVolumeId needle.VolumeId
  58. //for rack, data center, topology
  59. nodeType string
  60. value interface{}
  61. }
  62. // the first node must satisfy filterFirstNodeFn(), the rest nodes must have one free slot
  63. func (n *NodeImpl) PickNodesByWeight(numberOfNodes int, filterFirstNodeFn func(dn Node) error) (firstNode Node, restNodes []Node, err error) {
  64. var totalWeights int64
  65. var errs []string
  66. n.RLock()
  67. candidates := make([]Node, 0, len(n.children))
  68. candidatesWeights := make([]int64, 0, len(n.children))
  69. //pick nodes which has enough free volumes as candidates, and use free volumes number as node weight.
  70. for _, node := range n.children {
  71. if node.FreeSpace() <= 0 {
  72. continue
  73. }
  74. totalWeights += node.FreeSpace()
  75. candidates = append(candidates, node)
  76. candidatesWeights = append(candidatesWeights, node.FreeSpace())
  77. }
  78. n.RUnlock()
  79. if len(candidates) < numberOfNodes {
  80. glog.V(0).Infoln(n.Id(), "failed to pick", numberOfNodes, "from ", len(candidates), "node candidates")
  81. return nil, nil, errors.New("No enough data node found!")
  82. }
  83. //pick nodes randomly by weights, the node picked earlier has higher final weights
  84. sortedCandidates := make([]Node, 0, len(candidates))
  85. for i := 0; i < len(candidates); i++ {
  86. weightsInterval := rand.Int63n(totalWeights)
  87. lastWeights := int64(0)
  88. for k, weights := range candidatesWeights {
  89. if (weightsInterval >= lastWeights) && (weightsInterval < lastWeights+weights) {
  90. sortedCandidates = append(sortedCandidates, candidates[k])
  91. candidatesWeights[k] = 0
  92. totalWeights -= weights
  93. break
  94. }
  95. lastWeights += weights
  96. }
  97. }
  98. restNodes = make([]Node, 0, numberOfNodes-1)
  99. ret := false
  100. n.RLock()
  101. for k, node := range sortedCandidates {
  102. if err := filterFirstNodeFn(node); err == nil {
  103. firstNode = node
  104. if k >= numberOfNodes-1 {
  105. restNodes = sortedCandidates[:numberOfNodes-1]
  106. } else {
  107. restNodes = append(restNodes, sortedCandidates[:k]...)
  108. restNodes = append(restNodes, sortedCandidates[k+1:numberOfNodes]...)
  109. }
  110. ret = true
  111. break
  112. } else {
  113. errs = append(errs, string(node.Id())+":"+err.Error())
  114. }
  115. }
  116. n.RUnlock()
  117. if !ret {
  118. return nil, nil, errors.New("No matching data node found! \n" + strings.Join(errs, "\n"))
  119. }
  120. return
  121. }
  122. func (n *NodeImpl) IsDataNode() bool {
  123. return n.nodeType == "DataNode"
  124. }
  125. func (n *NodeImpl) IsRack() bool {
  126. return n.nodeType == "Rack"
  127. }
  128. func (n *NodeImpl) IsDataCenter() bool {
  129. return n.nodeType == "DataCenter"
  130. }
  131. func (n *NodeImpl) String() string {
  132. if n.parent != nil {
  133. return n.parent.String() + ":" + string(n.id)
  134. }
  135. return string(n.id)
  136. }
  137. func (n *NodeImpl) Id() NodeId {
  138. return n.id
  139. }
  140. func (n *NodeImpl) FreeSpace() int64 {
  141. freeVolumeSlotCount := n.maxVolumeCount + n.maxSsdVolumeCount + n.remoteVolumeCount - n.volumeCount - n.ssdVolumeCount
  142. if n.ecShardCount > 0 {
  143. freeVolumeSlotCount = freeVolumeSlotCount - n.ecShardCount/erasure_coding.DataShardsCount - 1
  144. }
  145. return freeVolumeSlotCount
  146. }
  147. func (n *NodeImpl) SetParent(node Node) {
  148. n.parent = node
  149. }
  150. func (n *NodeImpl) Children() (ret []Node) {
  151. n.RLock()
  152. defer n.RUnlock()
  153. for _, c := range n.children {
  154. ret = append(ret, c)
  155. }
  156. return ret
  157. }
  158. func (n *NodeImpl) Parent() Node {
  159. return n.parent
  160. }
  161. func (n *NodeImpl) GetValue() interface{} {
  162. return n.value
  163. }
  164. func (n *NodeImpl) ReserveOneVolume(r int64) (assignedNode *DataNode, err error) {
  165. n.RLock()
  166. defer n.RUnlock()
  167. for _, node := range n.children {
  168. freeSpace := node.FreeSpace()
  169. // fmt.Println("r =", r, ", node =", node, ", freeSpace =", freeSpace)
  170. if freeSpace <= 0 {
  171. continue
  172. }
  173. if r >= freeSpace {
  174. r -= freeSpace
  175. } else {
  176. if node.IsDataNode() && node.FreeSpace() > 0 {
  177. // fmt.Println("vid =", vid, " assigned to node =", node, ", freeSpace =", node.FreeSpace())
  178. return node.(*DataNode), nil
  179. }
  180. assignedNode, err = node.ReserveOneVolume(r)
  181. if err == nil {
  182. return
  183. }
  184. }
  185. }
  186. return nil, errors.New("No free volume slot found!")
  187. }
  188. func (n *NodeImpl) UpAdjustMaxVolumeCountDelta(maxVolumeCountDelta int64) { //can be negative
  189. if maxVolumeCountDelta == 0 {
  190. return
  191. }
  192. atomic.AddInt64(&n.maxVolumeCount, maxVolumeCountDelta)
  193. if n.parent != nil {
  194. n.parent.UpAdjustMaxVolumeCountDelta(maxVolumeCountDelta)
  195. }
  196. }
  197. func (n *NodeImpl) UpAdjustMaxSsdVolumeCountDelta(maxSsdVolumeCountDelta int64) { //can be negative
  198. if maxSsdVolumeCountDelta == 0 {
  199. return
  200. }
  201. atomic.AddInt64(&n.maxSsdVolumeCount, maxSsdVolumeCountDelta)
  202. if n.parent != nil {
  203. n.parent.UpAdjustMaxSsdVolumeCountDelta(maxSsdVolumeCountDelta)
  204. }
  205. }
  206. func (n *NodeImpl) UpAdjustVolumeCountDelta(volumeCountDelta int64) { //can be negative
  207. if volumeCountDelta == 0 {
  208. return
  209. }
  210. atomic.AddInt64(&n.volumeCount, volumeCountDelta)
  211. if n.parent != nil {
  212. n.parent.UpAdjustVolumeCountDelta(volumeCountDelta)
  213. }
  214. }
  215. func (n *NodeImpl) UpAdjustRemoteVolumeCountDelta(remoteVolumeCountDelta int64) { //can be negative
  216. if remoteVolumeCountDelta == 0 {
  217. return
  218. }
  219. atomic.AddInt64(&n.remoteVolumeCount, remoteVolumeCountDelta)
  220. if n.parent != nil {
  221. n.parent.UpAdjustRemoteVolumeCountDelta(remoteVolumeCountDelta)
  222. }
  223. }
  224. func (n *NodeImpl) UpAdjustSsdVolumeCountDelta(ssdVolumeCountDelta int64) { //can be negative
  225. if ssdVolumeCountDelta == 0 {
  226. return
  227. }
  228. atomic.AddInt64(&n.ssdVolumeCount, ssdVolumeCountDelta)
  229. if n.parent != nil {
  230. n.parent.UpAdjustSsdVolumeCountDelta(ssdVolumeCountDelta)
  231. }
  232. }
  233. func (n *NodeImpl) UpAdjustEcShardCountDelta(ecShardCountDelta int64) { //can be negative
  234. if ecShardCountDelta == 0 {
  235. return
  236. }
  237. atomic.AddInt64(&n.ecShardCount, ecShardCountDelta)
  238. if n.parent != nil {
  239. n.parent.UpAdjustEcShardCountDelta(ecShardCountDelta)
  240. }
  241. }
  242. func (n *NodeImpl) UpAdjustActiveVolumeCountDelta(activeVolumeCountDelta int64) { //can be negative
  243. if activeVolumeCountDelta == 0 {
  244. return
  245. }
  246. atomic.AddInt64(&n.activeVolumeCount, activeVolumeCountDelta)
  247. if n.parent != nil {
  248. n.parent.UpAdjustActiveVolumeCountDelta(activeVolumeCountDelta)
  249. }
  250. }
  251. func (n *NodeImpl) UpAdjustMaxVolumeId(vid needle.VolumeId) { //can be negative
  252. if n.maxVolumeId < vid {
  253. n.maxVolumeId = vid
  254. if n.parent != nil {
  255. n.parent.UpAdjustMaxVolumeId(vid)
  256. }
  257. }
  258. }
  259. func (n *NodeImpl) GetMaxVolumeId() needle.VolumeId {
  260. return n.maxVolumeId
  261. }
  262. func (n *NodeImpl) GetVolumeCount() int64 {
  263. return n.volumeCount
  264. }
  265. func (n *NodeImpl) GetSsdVolumeCount() int64 {
  266. return n.ssdVolumeCount
  267. }
  268. func (n *NodeImpl) GetEcShardCount() int64 {
  269. return n.ecShardCount
  270. }
  271. func (n *NodeImpl) GetRemoteVolumeCount() int64 {
  272. return n.remoteVolumeCount
  273. }
  274. func (n *NodeImpl) GetActiveVolumeCount() int64 {
  275. return n.activeVolumeCount
  276. }
  277. func (n *NodeImpl) GetMaxVolumeCount() int64 {
  278. return n.maxVolumeCount
  279. }
  280. func (n *NodeImpl) GetMaxSsdVolumeCount() int64 {
  281. return n.maxSsdVolumeCount
  282. }
  283. func (n *NodeImpl) LinkChildNode(node Node) {
  284. n.Lock()
  285. defer n.Unlock()
  286. if n.children[node.Id()] == nil {
  287. n.children[node.Id()] = node
  288. n.UpAdjustMaxVolumeCountDelta(node.GetMaxVolumeCount())
  289. n.UpAdjustMaxSsdVolumeCountDelta(node.GetMaxSsdVolumeCount())
  290. n.UpAdjustMaxVolumeId(node.GetMaxVolumeId())
  291. n.UpAdjustVolumeCountDelta(node.GetVolumeCount())
  292. n.UpAdjustSsdVolumeCountDelta(node.GetSsdVolumeCount())
  293. n.UpAdjustRemoteVolumeCountDelta(node.GetRemoteVolumeCount())
  294. n.UpAdjustEcShardCountDelta(node.GetEcShardCount())
  295. n.UpAdjustActiveVolumeCountDelta(node.GetActiveVolumeCount())
  296. node.SetParent(n)
  297. glog.V(0).Infoln(n, "adds child", node.Id())
  298. }
  299. }
  300. func (n *NodeImpl) UnlinkChildNode(nodeId NodeId) {
  301. n.Lock()
  302. defer n.Unlock()
  303. node := n.children[nodeId]
  304. if node != nil {
  305. node.SetParent(nil)
  306. delete(n.children, node.Id())
  307. n.UpAdjustVolumeCountDelta(-node.GetVolumeCount())
  308. n.UpAdjustSsdVolumeCountDelta(-node.GetSsdVolumeCount())
  309. n.UpAdjustRemoteVolumeCountDelta(-node.GetRemoteVolumeCount())
  310. n.UpAdjustEcShardCountDelta(-node.GetEcShardCount())
  311. n.UpAdjustActiveVolumeCountDelta(-node.GetActiveVolumeCount())
  312. n.UpAdjustMaxVolumeCountDelta(-node.GetMaxVolumeCount())
  313. n.UpAdjustMaxSsdVolumeCountDelta(-node.GetMaxSsdVolumeCount())
  314. glog.V(0).Infoln(n, "removes", node.Id())
  315. }
  316. }
  317. func (n *NodeImpl) CollectDeadNodeAndFullVolumes(freshThreshHold int64, volumeSizeLimit uint64) {
  318. if n.IsRack() {
  319. for _, c := range n.Children() {
  320. dn := c.(*DataNode) //can not cast n to DataNode
  321. for _, v := range dn.GetVolumes() {
  322. if uint64(v.Size) >= volumeSizeLimit {
  323. //fmt.Println("volume",v.Id,"size",v.Size,">",volumeSizeLimit)
  324. n.GetTopology().chanFullVolumes <- v
  325. }
  326. }
  327. }
  328. } else {
  329. for _, c := range n.Children() {
  330. c.CollectDeadNodeAndFullVolumes(freshThreshHold, volumeSizeLimit)
  331. }
  332. }
  333. }
  334. func (n *NodeImpl) GetTopology() *Topology {
  335. var p Node
  336. p = n
  337. for p.Parent() != nil {
  338. p = p.Parent()
  339. }
  340. return p.GetValue().(*Topology)
  341. }