You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

196 lines
6.0 KiB

13 years ago
6 years ago
6 years ago
13 years ago
4 years ago
7 years ago
6 years ago
7 years ago
6 years ago
6 years ago
6 years ago
4 years ago
6 years ago
6 years ago
7 years ago
7 years ago
  1. package topology
  2. import (
  3. "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
  4. "github.com/chrislusf/seaweedfs/weed/sequence"
  5. "github.com/chrislusf/seaweedfs/weed/storage"
  6. "github.com/chrislusf/seaweedfs/weed/storage/needle"
  7. "github.com/chrislusf/seaweedfs/weed/storage/super_block"
  8. "github.com/chrislusf/seaweedfs/weed/storage/types"
  9. "testing"
  10. )
  11. func TestRemoveDataCenter(t *testing.T) {
  12. topo := setup(topologyLayout)
  13. topo.UnlinkChildNode(NodeId("dc2"))
  14. if topo.GetActiveVolumeCount() != 15 {
  15. t.Fail()
  16. }
  17. topo.UnlinkChildNode(NodeId("dc3"))
  18. if topo.GetActiveVolumeCount() != 12 {
  19. t.Fail()
  20. }
  21. }
  22. func TestHandlingVolumeServerHeartbeat(t *testing.T) {
  23. topo := NewTopology("weedfs", sequence.NewMemorySequencer(), 32*1024, 5, false)
  24. dc := topo.GetOrCreateDataCenter("dc1")
  25. rack := dc.GetOrCreateRack("rack1")
  26. dn := rack.GetOrCreateDataNode("127.0.0.1", 34534, "127.0.0.1", 25, 12)
  27. {
  28. volumeCount := 7
  29. var volumeMessages []*master_pb.VolumeInformationMessage
  30. for k := 1; k <= volumeCount; k++ {
  31. volumeMessage := &master_pb.VolumeInformationMessage{
  32. Id: uint32(k),
  33. Size: uint64(25432),
  34. Collection: "",
  35. FileCount: uint64(2343),
  36. DeleteCount: uint64(345),
  37. DeletedByteCount: 34524,
  38. ReadOnly: false,
  39. ReplicaPlacement: uint32(0),
  40. Version: uint32(needle.CurrentVersion),
  41. Ttl: 0,
  42. }
  43. volumeMessages = append(volumeMessages, volumeMessage)
  44. }
  45. for k := 1; k <= volumeCount; k++ {
  46. volumeMessage := &master_pb.VolumeInformationMessage{
  47. Id: uint32(volumeCount + k),
  48. Size: uint64(25432),
  49. Collection: "",
  50. FileCount: uint64(2343),
  51. DeleteCount: uint64(345),
  52. DeletedByteCount: 34524,
  53. ReadOnly: false,
  54. ReplicaPlacement: uint32(0),
  55. Version: uint32(needle.CurrentVersion),
  56. Ttl: 0,
  57. DiskType: "ssd",
  58. }
  59. volumeMessages = append(volumeMessages, volumeMessage)
  60. }
  61. topo.SyncDataNodeRegistration(volumeMessages, dn)
  62. assert(t, "activeVolumeCount1", int(topo.activeVolumeCount), volumeCount*2)
  63. assert(t, "volumeCount", int(topo.volumeCount), volumeCount)
  64. assert(t, "ssdVolumeCount", int(topo.ssdVolumeCount), volumeCount)
  65. }
  66. {
  67. volumeCount := 7 - 1
  68. var volumeMessages []*master_pb.VolumeInformationMessage
  69. for k := 1; k <= volumeCount; k++ {
  70. volumeMessage := &master_pb.VolumeInformationMessage{
  71. Id: uint32(k),
  72. Size: uint64(254320),
  73. Collection: "",
  74. FileCount: uint64(2343),
  75. DeleteCount: uint64(345),
  76. DeletedByteCount: 345240,
  77. ReadOnly: false,
  78. ReplicaPlacement: uint32(0),
  79. Version: uint32(needle.CurrentVersion),
  80. Ttl: 0,
  81. }
  82. volumeMessages = append(volumeMessages, volumeMessage)
  83. }
  84. topo.SyncDataNodeRegistration(volumeMessages, dn)
  85. //rp, _ := storage.NewReplicaPlacementFromString("000")
  86. //layout := topo.GetVolumeLayout("", rp, needle.EMPTY_TTL)
  87. //assert(t, "writables", len(layout.writables), volumeCount)
  88. assert(t, "activeVolumeCount1", int(topo.activeVolumeCount), volumeCount)
  89. assert(t, "volumeCount", int(topo.volumeCount), volumeCount)
  90. }
  91. {
  92. volumeCount := 6
  93. newVolumeShortMessage := &master_pb.VolumeShortInformationMessage{
  94. Id: uint32(3),
  95. Collection: "",
  96. ReplicaPlacement: uint32(0),
  97. Version: uint32(needle.CurrentVersion),
  98. Ttl: 0,
  99. }
  100. topo.IncrementalSyncDataNodeRegistration(
  101. []*master_pb.VolumeShortInformationMessage{newVolumeShortMessage},
  102. nil,
  103. dn)
  104. rp, _ := super_block.NewReplicaPlacementFromString("000")
  105. layout := topo.GetVolumeLayout("", rp, needle.EMPTY_TTL, types.HardDriveType)
  106. assert(t, "writables after repeated add", len(layout.writables), volumeCount)
  107. assert(t, "activeVolumeCount1", int(topo.activeVolumeCount), volumeCount)
  108. assert(t, "volumeCount", int(topo.volumeCount), volumeCount)
  109. topo.IncrementalSyncDataNodeRegistration(
  110. nil,
  111. []*master_pb.VolumeShortInformationMessage{newVolumeShortMessage},
  112. dn)
  113. assert(t, "writables after deletion", len(layout.writables), volumeCount-1)
  114. assert(t, "activeVolumeCount1", int(topo.activeVolumeCount), volumeCount-1)
  115. assert(t, "volumeCount", int(topo.volumeCount), volumeCount-1)
  116. topo.IncrementalSyncDataNodeRegistration(
  117. []*master_pb.VolumeShortInformationMessage{newVolumeShortMessage},
  118. nil,
  119. dn)
  120. for vid := range layout.vid2location {
  121. println("after add volume id", vid)
  122. }
  123. for _, vid := range layout.writables {
  124. println("after add writable volume id", vid)
  125. }
  126. assert(t, "writables after add back", len(layout.writables), volumeCount)
  127. }
  128. topo.UnRegisterDataNode(dn)
  129. assert(t, "activeVolumeCount2", int(topo.activeVolumeCount), 0)
  130. }
  131. func assert(t *testing.T, message string, actual, expected int) {
  132. if actual != expected {
  133. t.Fatalf("unexpected %s: %d, expected: %d", message, actual, expected)
  134. }
  135. }
  136. func TestAddRemoveVolume(t *testing.T) {
  137. topo := NewTopology("weedfs", sequence.NewMemorySequencer(), 32*1024, 5, false)
  138. dc := topo.GetOrCreateDataCenter("dc1")
  139. rack := dc.GetOrCreateRack("rack1")
  140. dn := rack.GetOrCreateDataNode("127.0.0.1", 34534, "127.0.0.1", 25, 12)
  141. v := storage.VolumeInfo{
  142. Id: needle.VolumeId(1),
  143. Size: 100,
  144. Collection: "xcollection",
  145. DiskType: "ssd",
  146. FileCount: 123,
  147. DeleteCount: 23,
  148. DeletedByteCount: 45,
  149. ReadOnly: false,
  150. Version: needle.CurrentVersion,
  151. ReplicaPlacement: &super_block.ReplicaPlacement{},
  152. Ttl: needle.EMPTY_TTL,
  153. }
  154. dn.UpdateVolumes([]storage.VolumeInfo{v})
  155. topo.RegisterVolumeLayout(v, dn)
  156. topo.RegisterVolumeLayout(v, dn)
  157. if _, hasCollection := topo.FindCollection(v.Collection); !hasCollection {
  158. t.Errorf("collection %v should exist", v.Collection)
  159. }
  160. topo.UnRegisterVolumeLayout(v, dn)
  161. if _, hasCollection := topo.FindCollection(v.Collection); hasCollection {
  162. t.Errorf("collection %v should not exist", v.Collection)
  163. }
  164. }