348 lines
7.2 KiB

6 years ago
6 years ago
6 years ago
4 years ago
5 years ago
5 years ago
5 years ago
5 years ago
  1. package topology
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "testing"
  6. "github.com/chrislusf/seaweedfs/weed/sequence"
  7. "github.com/chrislusf/seaweedfs/weed/storage"
  8. "github.com/chrislusf/seaweedfs/weed/storage/needle"
  9. "github.com/chrislusf/seaweedfs/weed/storage/super_block"
  10. )
  11. var topologyLayout = `
  12. {
  13. "dc1":{
  14. "rack1":{
  15. "server111":{
  16. "volumes":[
  17. {"id":1, "size":12312},
  18. {"id":2, "size":12312},
  19. {"id":3, "size":12312}
  20. ],
  21. "limit":3
  22. },
  23. "server112":{
  24. "volumes":[
  25. {"id":4, "size":12312},
  26. {"id":5, "size":12312},
  27. {"id":6, "size":12312}
  28. ],
  29. "limit":10
  30. }
  31. },
  32. "rack2":{
  33. "server121":{
  34. "volumes":[
  35. {"id":4, "size":12312},
  36. {"id":5, "size":12312},
  37. {"id":6, "size":12312}
  38. ],
  39. "limit":4
  40. },
  41. "server122":{
  42. "volumes":[],
  43. "limit":4
  44. },
  45. "server123":{
  46. "volumes":[
  47. {"id":2, "size":12312},
  48. {"id":3, "size":12312},
  49. {"id":4, "size":12312}
  50. ],
  51. "limit":5
  52. }
  53. }
  54. },
  55. "dc2":{
  56. },
  57. "dc3":{
  58. "rack2":{
  59. "server321":{
  60. "volumes":[
  61. {"id":1, "size":12312},
  62. {"id":3, "size":12312},
  63. {"id":5, "size":12312}
  64. ],
  65. "limit":4
  66. }
  67. }
  68. }
  69. }
  70. `
  71. func setup(topologyLayout string) *Topology {
  72. var data interface{}
  73. err := json.Unmarshal([]byte(topologyLayout), &data)
  74. if err != nil {
  75. fmt.Println("error:", err)
  76. }
  77. fmt.Println("data:", data)
  78. //need to connect all nodes first before server adding volumes
  79. topo := NewTopology("weedfs", sequence.NewMemorySequencer(), 32*1024, 5, false)
  80. mTopology := data.(map[string]interface{})
  81. for dcKey, dcValue := range mTopology {
  82. dc := NewDataCenter(dcKey)
  83. dcMap := dcValue.(map[string]interface{})
  84. topo.LinkChildNode(dc)
  85. for rackKey, rackValue := range dcMap {
  86. rack := NewRack(rackKey)
  87. rackMap := rackValue.(map[string]interface{})
  88. dc.LinkChildNode(rack)
  89. for serverKey, serverValue := range rackMap {
  90. server := NewDataNode(serverKey)
  91. serverMap := serverValue.(map[string]interface{})
  92. rack.LinkChildNode(server)
  93. for _, v := range serverMap["volumes"].([]interface{}) {
  94. m := v.(map[string]interface{})
  95. vi := storage.VolumeInfo{
  96. Id: needle.VolumeId(int64(m["id"].(float64))),
  97. Size: uint64(m["size"].(float64)),
  98. Version: needle.CurrentVersion}
  99. server.AddOrUpdateVolume(vi)
  100. }
  101. disk := server.getOrCreateDisk("")
  102. deltaDiskUsages := newDiskUsages()
  103. deltaDiskUsage := deltaDiskUsages.getOrCreateDisk("")
  104. deltaDiskUsage.maxVolumeCount = int64(serverMap["limit"].(float64))
  105. disk.UpAdjustDiskUsageDelta(deltaDiskUsages)
  106. }
  107. }
  108. }
  109. return topo
  110. }
  111. func TestFindEmptySlotsForOneVolume(t *testing.T) {
  112. topo := setup(topologyLayout)
  113. vg := NewDefaultVolumeGrowth()
  114. rp, _ := super_block.NewReplicaPlacementFromString("002")
  115. volumeGrowOption := &VolumeGrowOption{
  116. Collection: "",
  117. ReplicaPlacement: rp,
  118. DataCenter: "dc1",
  119. Rack: "",
  120. DataNode: "",
  121. }
  122. servers, err := vg.findEmptySlotsForOneVolume(topo, volumeGrowOption)
  123. if err != nil {
  124. fmt.Println("finding empty slots error :", err)
  125. t.Fail()
  126. }
  127. for _, server := range servers {
  128. fmt.Println("assigned node :", server.Id())
  129. }
  130. }
  131. var topologyLayout2 = `
  132. {
  133. "dc1":{
  134. "rack1":{
  135. "server111":{
  136. "volumes":[
  137. {"id":1, "size":12312},
  138. {"id":2, "size":12312},
  139. {"id":3, "size":12312}
  140. ],
  141. "limit":300
  142. },
  143. "server112":{
  144. "volumes":[
  145. {"id":4, "size":12312},
  146. {"id":5, "size":12312},
  147. {"id":6, "size":12312}
  148. ],
  149. "limit":300
  150. },
  151. "server113":{
  152. "volumes":[],
  153. "limit":300
  154. },
  155. "server114":{
  156. "volumes":[],
  157. "limit":300
  158. },
  159. "server115":{
  160. "volumes":[],
  161. "limit":300
  162. },
  163. "server116":{
  164. "volumes":[],
  165. "limit":300
  166. }
  167. },
  168. "rack2":{
  169. "server121":{
  170. "volumes":[
  171. {"id":4, "size":12312},
  172. {"id":5, "size":12312},
  173. {"id":6, "size":12312}
  174. ],
  175. "limit":300
  176. },
  177. "server122":{
  178. "volumes":[],
  179. "limit":300
  180. },
  181. "server123":{
  182. "volumes":[
  183. {"id":2, "size":12312},
  184. {"id":3, "size":12312},
  185. {"id":4, "size":12312}
  186. ],
  187. "limit":300
  188. },
  189. "server124":{
  190. "volumes":[],
  191. "limit":300
  192. },
  193. "server125":{
  194. "volumes":[],
  195. "limit":300
  196. },
  197. "server126":{
  198. "volumes":[],
  199. "limit":300
  200. }
  201. },
  202. "rack3":{
  203. "server131":{
  204. "volumes":[],
  205. "limit":300
  206. },
  207. "server132":{
  208. "volumes":[],
  209. "limit":300
  210. },
  211. "server133":{
  212. "volumes":[],
  213. "limit":300
  214. },
  215. "server134":{
  216. "volumes":[],
  217. "limit":300
  218. },
  219. "server135":{
  220. "volumes":[],
  221. "limit":300
  222. },
  223. "server136":{
  224. "volumes":[],
  225. "limit":300
  226. }
  227. }
  228. }
  229. }
  230. `
  231. func TestReplication011(t *testing.T) {
  232. topo := setup(topologyLayout2)
  233. vg := NewDefaultVolumeGrowth()
  234. rp, _ := super_block.NewReplicaPlacementFromString("011")
  235. volumeGrowOption := &VolumeGrowOption{
  236. Collection: "MAIL",
  237. ReplicaPlacement: rp,
  238. DataCenter: "dc1",
  239. Rack: "",
  240. DataNode: "",
  241. }
  242. servers, err := vg.findEmptySlotsForOneVolume(topo, volumeGrowOption)
  243. if err != nil {
  244. fmt.Println("finding empty slots error :", err)
  245. t.Fail()
  246. }
  247. for _, server := range servers {
  248. fmt.Println("assigned node :", server.Id())
  249. }
  250. }
  251. var topologyLayout3 = `
  252. {
  253. "dc1":{
  254. "rack1":{
  255. "server111":{
  256. "volumes":[],
  257. "limit":2000
  258. }
  259. }
  260. },
  261. "dc2":{
  262. "rack2":{
  263. "server222":{
  264. "volumes":[],
  265. "limit":2000
  266. }
  267. }
  268. },
  269. "dc3":{
  270. "rack3":{
  271. "server333":{
  272. "volumes":[],
  273. "limit":1000
  274. }
  275. }
  276. },
  277. "dc4":{
  278. "rack4":{
  279. "server444":{
  280. "volumes":[],
  281. "limit":1000
  282. }
  283. }
  284. },
  285. "dc5":{
  286. "rack5":{
  287. "server555":{
  288. "volumes":[],
  289. "limit":500
  290. }
  291. }
  292. },
  293. "dc6":{
  294. "rack6":{
  295. "server666":{
  296. "volumes":[],
  297. "limit":500
  298. }
  299. }
  300. }
  301. }
  302. `
  303. func TestFindEmptySlotsForOneVolumeScheduleByWeight(t *testing.T) {
  304. topo := setup(topologyLayout3)
  305. vg := NewDefaultVolumeGrowth()
  306. rp, _ := super_block.NewReplicaPlacementFromString("100")
  307. volumeGrowOption := &VolumeGrowOption{
  308. Collection: "Weight",
  309. ReplicaPlacement: rp,
  310. DataCenter: "",
  311. Rack: "",
  312. DataNode: "",
  313. }
  314. distribution := map[NodeId]int{}
  315. // assign 1000 volumes
  316. for i := 0; i < 1000; i++ {
  317. servers, err := vg.findEmptySlotsForOneVolume(topo, volumeGrowOption)
  318. if err != nil {
  319. fmt.Println("finding empty slots error :", err)
  320. t.Fail()
  321. }
  322. for _, server := range servers {
  323. // fmt.Println("assigned node :", server.Id())
  324. if _, ok := distribution[server.id]; !ok {
  325. distribution[server.id] = 0
  326. }
  327. distribution[server.id] += 1
  328. }
  329. }
  330. for k, v := range distribution {
  331. fmt.Printf("%s : %d\n", k, v)
  332. }
  333. }