You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

369 lines
11 KiB

13 years ago
6 years ago
13 years ago
6 years ago
6 years ago
13 years ago
6 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
6 years ago
6 years ago
9 years ago
  1. package topology
  2. import (
  3. "errors"
  4. "github.com/chrislusf/seaweedfs/weed/storage"
  5. "math/rand"
  6. "strings"
  7. "sync"
  8. "sync/atomic"
  9. "github.com/chrislusf/seaweedfs/weed/glog"
  10. "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding"
  11. "github.com/chrislusf/seaweedfs/weed/storage/needle"
  12. )
  13. type NodeId string
  14. type Node interface {
  15. Id() NodeId
  16. String() string
  17. FreeSpace() int64
  18. AvailableSpaceFor(option *VolumeGrowOption) int64
  19. ReserveOneVolume(r int64, option *VolumeGrowOption) (*DataNode, error)
  20. UpAdjustMaxVolumeCountDelta(maxVolumeCountDelta int64)
  21. UpAdjustMaxSsdVolumeCountDelta(maxSsdVolumeCountDelta int64)
  22. UpAdjustVolumeCountDelta(volumeCountDelta int64)
  23. UpAdjustSsdVolumeCountDelta(ssdVolumeCountDelta int64)
  24. UpAdjustRemoteVolumeCountDelta(remoteVolumeCountDelta int64)
  25. UpAdjustEcShardCountDelta(ecShardCountDelta int64)
  26. UpAdjustActiveVolumeCountDelta(activeVolumeCountDelta int64)
  27. UpAdjustMaxVolumeId(vid needle.VolumeId)
  28. GetVolumeCount() int64
  29. GetSsdVolumeCount() int64
  30. GetEcShardCount() int64
  31. GetActiveVolumeCount() int64
  32. GetRemoteVolumeCount() int64
  33. GetMaxVolumeCount() int64
  34. GetMaxSsdVolumeCount() int64
  35. GetMaxVolumeId() needle.VolumeId
  36. SetParent(Node)
  37. LinkChildNode(node Node)
  38. UnlinkChildNode(nodeId NodeId)
  39. CollectDeadNodeAndFullVolumes(freshThreshHold int64, volumeSizeLimit uint64)
  40. IsDataNode() bool
  41. IsRack() bool
  42. IsDataCenter() bool
  43. Children() []Node
  44. Parent() Node
  45. GetValue() interface{} //get reference to the topology,dc,rack,datanode
  46. }
  47. type NodeImpl struct {
  48. volumeCount int64
  49. remoteVolumeCount int64
  50. ssdVolumeCount int64
  51. activeVolumeCount int64
  52. ecShardCount int64
  53. maxVolumeCount int64
  54. maxSsdVolumeCount int64
  55. id NodeId
  56. parent Node
  57. sync.RWMutex // lock children
  58. children map[NodeId]Node
  59. maxVolumeId needle.VolumeId
  60. //for rack, data center, topology
  61. nodeType string
  62. value interface{}
  63. }
  64. // the first node must satisfy filterFirstNodeFn(), the rest nodes must have one free slot
  65. func (n *NodeImpl) PickNodesByWeight(numberOfNodes int, option *VolumeGrowOption, filterFirstNodeFn func(dn Node) error) (firstNode Node, restNodes []Node, err error) {
  66. var totalWeights int64
  67. var errs []string
  68. n.RLock()
  69. candidates := make([]Node, 0, len(n.children))
  70. candidatesWeights := make([]int64, 0, len(n.children))
  71. //pick nodes which has enough free volumes as candidates, and use free volumes number as node weight.
  72. for _, node := range n.children {
  73. if node.AvailableSpaceFor(option) <= 0 {
  74. continue
  75. }
  76. totalWeights += node.AvailableSpaceFor(option)
  77. candidates = append(candidates, node)
  78. candidatesWeights = append(candidatesWeights, node.AvailableSpaceFor(option))
  79. }
  80. n.RUnlock()
  81. if len(candidates) < numberOfNodes {
  82. glog.V(0).Infoln(n.Id(), "failed to pick", numberOfNodes, "from ", len(candidates), "node candidates")
  83. return nil, nil, errors.New("No enough data node found!")
  84. }
  85. //pick nodes randomly by weights, the node picked earlier has higher final weights
  86. sortedCandidates := make([]Node, 0, len(candidates))
  87. for i := 0; i < len(candidates); i++ {
  88. weightsInterval := rand.Int63n(totalWeights)
  89. lastWeights := int64(0)
  90. for k, weights := range candidatesWeights {
  91. if (weightsInterval >= lastWeights) && (weightsInterval < lastWeights+weights) {
  92. sortedCandidates = append(sortedCandidates, candidates[k])
  93. candidatesWeights[k] = 0
  94. totalWeights -= weights
  95. break
  96. }
  97. lastWeights += weights
  98. }
  99. }
  100. restNodes = make([]Node, 0, numberOfNodes-1)
  101. ret := false
  102. n.RLock()
  103. for k, node := range sortedCandidates {
  104. if err := filterFirstNodeFn(node); err == nil {
  105. firstNode = node
  106. if k >= numberOfNodes-1 {
  107. restNodes = sortedCandidates[:numberOfNodes-1]
  108. } else {
  109. restNodes = append(restNodes, sortedCandidates[:k]...)
  110. restNodes = append(restNodes, sortedCandidates[k+1:numberOfNodes]...)
  111. }
  112. ret = true
  113. break
  114. } else {
  115. errs = append(errs, string(node.Id())+":"+err.Error())
  116. }
  117. }
  118. n.RUnlock()
  119. if !ret {
  120. return nil, nil, errors.New("No matching data node found! \n" + strings.Join(errs, "\n"))
  121. }
  122. return
  123. }
  124. func (n *NodeImpl) IsDataNode() bool {
  125. return n.nodeType == "DataNode"
  126. }
  127. func (n *NodeImpl) IsRack() bool {
  128. return n.nodeType == "Rack"
  129. }
  130. func (n *NodeImpl) IsDataCenter() bool {
  131. return n.nodeType == "DataCenter"
  132. }
  133. func (n *NodeImpl) String() string {
  134. if n.parent != nil {
  135. return n.parent.String() + ":" + string(n.id)
  136. }
  137. return string(n.id)
  138. }
  139. func (n *NodeImpl) Id() NodeId {
  140. return n.id
  141. }
  142. func (n *NodeImpl) AvailableSpaceFor(option *VolumeGrowOption) int64 {
  143. freeVolumeSlotCount := n.maxVolumeCount + n.remoteVolumeCount - n.volumeCount
  144. if option.DiskType == storage.SsdType {
  145. freeVolumeSlotCount = n.maxSsdVolumeCount - n.ssdVolumeCount
  146. }
  147. if n.ecShardCount > 0 {
  148. freeVolumeSlotCount = freeVolumeSlotCount - n.ecShardCount/erasure_coding.DataShardsCount - 1
  149. }
  150. return freeVolumeSlotCount
  151. }
  152. func (n *NodeImpl) FreeSpace() int64 {
  153. freeVolumeSlotCount := n.maxVolumeCount + n.maxSsdVolumeCount + n.remoteVolumeCount - n.volumeCount - n.ssdVolumeCount
  154. if n.ecShardCount > 0 {
  155. freeVolumeSlotCount = freeVolumeSlotCount - n.ecShardCount/erasure_coding.DataShardsCount - 1
  156. }
  157. return freeVolumeSlotCount
  158. }
  159. func (n *NodeImpl) SetParent(node Node) {
  160. n.parent = node
  161. }
  162. func (n *NodeImpl) Children() (ret []Node) {
  163. n.RLock()
  164. defer n.RUnlock()
  165. for _, c := range n.children {
  166. ret = append(ret, c)
  167. }
  168. return ret
  169. }
  170. func (n *NodeImpl) Parent() Node {
  171. return n.parent
  172. }
  173. func (n *NodeImpl) GetValue() interface{} {
  174. return n.value
  175. }
  176. func (n *NodeImpl) ReserveOneVolume(r int64, option *VolumeGrowOption) (assignedNode *DataNode, err error) {
  177. n.RLock()
  178. defer n.RUnlock()
  179. for _, node := range n.children {
  180. freeSpace := node.AvailableSpaceFor(option)
  181. // fmt.Println("r =", r, ", node =", node, ", freeSpace =", freeSpace)
  182. if freeSpace <= 0 {
  183. continue
  184. }
  185. if r >= freeSpace {
  186. r -= freeSpace
  187. } else {
  188. if node.IsDataNode() && node.AvailableSpaceFor(option) > 0 {
  189. // fmt.Println("vid =", vid, " assigned to node =", node, ", freeSpace =", node.FreeSpace())
  190. return node.(*DataNode), nil
  191. }
  192. assignedNode, err = node.ReserveOneVolume(r, option)
  193. if err == nil {
  194. return
  195. }
  196. }
  197. }
  198. return nil, errors.New("No free volume slot found!")
  199. }
  200. func (n *NodeImpl) UpAdjustMaxVolumeCountDelta(maxVolumeCountDelta int64) { //can be negative
  201. if maxVolumeCountDelta == 0 {
  202. return
  203. }
  204. atomic.AddInt64(&n.maxVolumeCount, maxVolumeCountDelta)
  205. if n.parent != nil {
  206. n.parent.UpAdjustMaxVolumeCountDelta(maxVolumeCountDelta)
  207. }
  208. }
  209. func (n *NodeImpl) UpAdjustMaxSsdVolumeCountDelta(maxSsdVolumeCountDelta int64) { //can be negative
  210. if maxSsdVolumeCountDelta == 0 {
  211. return
  212. }
  213. atomic.AddInt64(&n.maxSsdVolumeCount, maxSsdVolumeCountDelta)
  214. if n.parent != nil {
  215. n.parent.UpAdjustMaxSsdVolumeCountDelta(maxSsdVolumeCountDelta)
  216. }
  217. }
  218. func (n *NodeImpl) UpAdjustVolumeCountDelta(volumeCountDelta int64) { //can be negative
  219. if volumeCountDelta == 0 {
  220. return
  221. }
  222. atomic.AddInt64(&n.volumeCount, volumeCountDelta)
  223. if n.parent != nil {
  224. n.parent.UpAdjustVolumeCountDelta(volumeCountDelta)
  225. }
  226. }
  227. func (n *NodeImpl) UpAdjustRemoteVolumeCountDelta(remoteVolumeCountDelta int64) { //can be negative
  228. if remoteVolumeCountDelta == 0 {
  229. return
  230. }
  231. atomic.AddInt64(&n.remoteVolumeCount, remoteVolumeCountDelta)
  232. if n.parent != nil {
  233. n.parent.UpAdjustRemoteVolumeCountDelta(remoteVolumeCountDelta)
  234. }
  235. }
  236. func (n *NodeImpl) UpAdjustSsdVolumeCountDelta(ssdVolumeCountDelta int64) { //can be negative
  237. if ssdVolumeCountDelta == 0 {
  238. return
  239. }
  240. atomic.AddInt64(&n.ssdVolumeCount, ssdVolumeCountDelta)
  241. if n.parent != nil {
  242. n.parent.UpAdjustSsdVolumeCountDelta(ssdVolumeCountDelta)
  243. }
  244. }
  245. func (n *NodeImpl) UpAdjustEcShardCountDelta(ecShardCountDelta int64) { //can be negative
  246. if ecShardCountDelta == 0 {
  247. return
  248. }
  249. atomic.AddInt64(&n.ecShardCount, ecShardCountDelta)
  250. if n.parent != nil {
  251. n.parent.UpAdjustEcShardCountDelta(ecShardCountDelta)
  252. }
  253. }
  254. func (n *NodeImpl) UpAdjustActiveVolumeCountDelta(activeVolumeCountDelta int64) { //can be negative
  255. if activeVolumeCountDelta == 0 {
  256. return
  257. }
  258. atomic.AddInt64(&n.activeVolumeCount, activeVolumeCountDelta)
  259. if n.parent != nil {
  260. n.parent.UpAdjustActiveVolumeCountDelta(activeVolumeCountDelta)
  261. }
  262. }
  263. func (n *NodeImpl) UpAdjustMaxVolumeId(vid needle.VolumeId) { //can be negative
  264. if n.maxVolumeId < vid {
  265. n.maxVolumeId = vid
  266. if n.parent != nil {
  267. n.parent.UpAdjustMaxVolumeId(vid)
  268. }
  269. }
  270. }
  271. func (n *NodeImpl) GetMaxVolumeId() needle.VolumeId {
  272. return n.maxVolumeId
  273. }
  274. func (n *NodeImpl) GetVolumeCount() int64 {
  275. return n.volumeCount
  276. }
  277. func (n *NodeImpl) GetSsdVolumeCount() int64 {
  278. return n.ssdVolumeCount
  279. }
  280. func (n *NodeImpl) GetEcShardCount() int64 {
  281. return n.ecShardCount
  282. }
  283. func (n *NodeImpl) GetRemoteVolumeCount() int64 {
  284. return n.remoteVolumeCount
  285. }
  286. func (n *NodeImpl) GetActiveVolumeCount() int64 {
  287. return n.activeVolumeCount
  288. }
  289. func (n *NodeImpl) GetMaxVolumeCount() int64 {
  290. return n.maxVolumeCount
  291. }
  292. func (n *NodeImpl) GetMaxSsdVolumeCount() int64 {
  293. return n.maxSsdVolumeCount
  294. }
  295. func (n *NodeImpl) LinkChildNode(node Node) {
  296. n.Lock()
  297. defer n.Unlock()
  298. if n.children[node.Id()] == nil {
  299. n.children[node.Id()] = node
  300. n.UpAdjustMaxVolumeCountDelta(node.GetMaxVolumeCount())
  301. n.UpAdjustMaxSsdVolumeCountDelta(node.GetMaxSsdVolumeCount())
  302. n.UpAdjustMaxVolumeId(node.GetMaxVolumeId())
  303. n.UpAdjustVolumeCountDelta(node.GetVolumeCount())
  304. n.UpAdjustSsdVolumeCountDelta(node.GetSsdVolumeCount())
  305. n.UpAdjustRemoteVolumeCountDelta(node.GetRemoteVolumeCount())
  306. n.UpAdjustEcShardCountDelta(node.GetEcShardCount())
  307. n.UpAdjustActiveVolumeCountDelta(node.GetActiveVolumeCount())
  308. node.SetParent(n)
  309. glog.V(0).Infoln(n, "adds child", node.Id())
  310. }
  311. }
  312. func (n *NodeImpl) UnlinkChildNode(nodeId NodeId) {
  313. n.Lock()
  314. defer n.Unlock()
  315. node := n.children[nodeId]
  316. if node != nil {
  317. node.SetParent(nil)
  318. delete(n.children, node.Id())
  319. n.UpAdjustVolumeCountDelta(-node.GetVolumeCount())
  320. n.UpAdjustSsdVolumeCountDelta(-node.GetSsdVolumeCount())
  321. n.UpAdjustRemoteVolumeCountDelta(-node.GetRemoteVolumeCount())
  322. n.UpAdjustEcShardCountDelta(-node.GetEcShardCount())
  323. n.UpAdjustActiveVolumeCountDelta(-node.GetActiveVolumeCount())
  324. n.UpAdjustMaxVolumeCountDelta(-node.GetMaxVolumeCount())
  325. n.UpAdjustMaxSsdVolumeCountDelta(-node.GetMaxSsdVolumeCount())
  326. glog.V(0).Infoln(n, "removes", node.Id())
  327. }
  328. }
  329. func (n *NodeImpl) CollectDeadNodeAndFullVolumes(freshThreshHold int64, volumeSizeLimit uint64) {
  330. if n.IsRack() {
  331. for _, c := range n.Children() {
  332. dn := c.(*DataNode) //can not cast n to DataNode
  333. for _, v := range dn.GetVolumes() {
  334. if uint64(v.Size) >= volumeSizeLimit {
  335. //fmt.Println("volume",v.Id,"size",v.Size,">",volumeSizeLimit)
  336. n.GetTopology().chanFullVolumes <- v
  337. }
  338. }
  339. }
  340. } else {
  341. for _, c := range n.Children() {
  342. c.CollectDeadNodeAndFullVolumes(freshThreshHold, volumeSizeLimit)
  343. }
  344. }
  345. }
  346. func (n *NodeImpl) GetTopology() *Topology {
  347. var p Node
  348. p = n
  349. for p.Parent() != nil {
  350. p = p.Parent()
  351. }
  352. return p.GetValue().(*Topology)
  353. }