You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

309 lines
8.6 KiB

6 years ago
6 years ago
6 years ago
6 years ago
5 years ago
  1. package shell
  2. import (
  3. "context"
  4. "fmt"
  5. "io"
  6. "math/rand"
  7. "sort"
  8. "github.com/chrislusf/seaweedfs/weed/operation"
  9. "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
  10. "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
  11. "github.com/chrislusf/seaweedfs/weed/storage/super_block"
  12. )
  13. func init() {
  14. Commands = append(Commands, &commandVolumeFixReplication{})
  15. }
  16. type commandVolumeFixReplication struct {
  17. }
  18. func (c *commandVolumeFixReplication) Name() string {
  19. return "volume.fix.replication"
  20. }
  21. func (c *commandVolumeFixReplication) Help() string {
  22. return `add replicas to volumes that are missing replicas
  23. This command finds all under-replicated volumes, and finds volume servers with free slots.
  24. If the free slots satisfy the replication requirement, the volume content is copied over and mounted.
  25. volume.fix.replication -n # do not take action
  26. volume.fix.replication # actually copying the volume files and mount the volume
  27. Note:
  28. * each time this will only add back one replica for one volume id. If there are multiple replicas
  29. are missing, e.g. multiple volume servers are new, you may need to run this multiple times.
  30. * do not run this too quick within seconds, since the new volume replica may take a few seconds
  31. to register itself to the master.
  32. `
  33. }
  34. func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
  35. if err = commandEnv.confirmIsLocked(); err != nil {
  36. return
  37. }
  38. takeAction := true
  39. if len(args) > 0 && args[0] == "-n" {
  40. takeAction = false
  41. }
  42. var resp *master_pb.VolumeListResponse
  43. err = commandEnv.MasterClient.WithClient(func(client master_pb.SeaweedClient) error {
  44. resp, err = client.VolumeList(context.Background(), &master_pb.VolumeListRequest{})
  45. return err
  46. })
  47. if err != nil {
  48. return err
  49. }
  50. // find all volumes that needs replication
  51. // collect all data nodes
  52. replicatedVolumeLocations := make(map[uint32][]location)
  53. replicatedVolumeInfo := make(map[uint32]*master_pb.VolumeInformationMessage)
  54. var allLocations []location
  55. eachDataNode(resp.TopologyInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) {
  56. loc := newLocation(dc, string(rack), dn)
  57. for _, v := range dn.VolumeInfos {
  58. if v.ReplicaPlacement > 0 {
  59. replicatedVolumeLocations[v.Id] = append(replicatedVolumeLocations[v.Id], loc)
  60. replicatedVolumeInfo[v.Id] = v
  61. }
  62. }
  63. allLocations = append(allLocations, loc)
  64. })
  65. // find all under replicated volumes
  66. underReplicatedVolumeLocations := make(map[uint32][]location)
  67. for vid, locations := range replicatedVolumeLocations {
  68. volumeInfo := replicatedVolumeInfo[vid]
  69. replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(volumeInfo.ReplicaPlacement))
  70. if replicaPlacement.GetCopyCount() > len(locations) {
  71. underReplicatedVolumeLocations[vid] = locations
  72. } else if replicaPlacement.GetCopyCount() < len(locations) {
  73. fmt.Fprintf(writer, "volume %d replication %s, but over repliacated:%+v\n", volumeInfo.Id, replicaPlacement, locations)
  74. }
  75. }
  76. if len(underReplicatedVolumeLocations) == 0 {
  77. return fmt.Errorf("no under replicated volumes")
  78. }
  79. if len(allLocations) == 0 {
  80. return fmt.Errorf("no data nodes at all")
  81. }
  82. // find the most under populated data nodes
  83. keepDataNodesSorted(allLocations)
  84. for vid, locations := range underReplicatedVolumeLocations {
  85. volumeInfo := replicatedVolumeInfo[vid]
  86. replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(volumeInfo.ReplicaPlacement))
  87. foundNewLocation := false
  88. for _, dst := range allLocations {
  89. // check whether data nodes satisfy the constraints
  90. if dst.dataNode.FreeVolumeCount > 0 && satisfyReplicaPlacement(replicaPlacement, locations, dst) {
  91. // ask the volume server to replicate the volume
  92. sourceNodes := underReplicatedVolumeLocations[vid]
  93. sourceNode := sourceNodes[rand.Intn(len(sourceNodes))]
  94. foundNewLocation = true
  95. fmt.Fprintf(writer, "replicating volume %d %s from %s to dataNode %s ...\n", volumeInfo.Id, replicaPlacement, sourceNode.dataNode.Id, dst.dataNode.Id)
  96. if !takeAction {
  97. break
  98. }
  99. err := operation.WithVolumeServerClient(dst.dataNode.Id, commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
  100. _, replicateErr := volumeServerClient.VolumeCopy(context.Background(), &volume_server_pb.VolumeCopyRequest{
  101. VolumeId: volumeInfo.Id,
  102. SourceDataNode: sourceNode.dataNode.Id,
  103. })
  104. if replicateErr != nil {
  105. return fmt.Errorf("copying from %s => %s : %v", sourceNode.dataNode.Id, dst.dataNode.Id, replicateErr)
  106. }
  107. return nil
  108. })
  109. if err != nil {
  110. return err
  111. }
  112. // adjust free volume count
  113. dst.dataNode.FreeVolumeCount--
  114. keepDataNodesSorted(allLocations)
  115. break
  116. }
  117. }
  118. if !foundNewLocation {
  119. fmt.Fprintf(writer, "failed to place volume %d replica as %s, existing:%+v\n", volumeInfo.Id, replicaPlacement, locations)
  120. }
  121. }
  122. return nil
  123. }
  124. func keepDataNodesSorted(dataNodes []location) {
  125. sort.Slice(dataNodes, func(i, j int) bool {
  126. return dataNodes[i].dataNode.FreeVolumeCount > dataNodes[j].dataNode.FreeVolumeCount
  127. })
  128. }
  129. /*
  130. if on an existing data node {
  131. return false
  132. }
  133. if different from existing dcs {
  134. if lack on different dcs {
  135. return true
  136. }else{
  137. return false
  138. }
  139. }
  140. if not on primary dc {
  141. return false
  142. }
  143. if different from existing racks {
  144. if lack on different racks {
  145. return true
  146. }else{
  147. return false
  148. }
  149. }
  150. if not on primary rack {
  151. return false
  152. }
  153. if lacks on same rack {
  154. return true
  155. } else {
  156. return false
  157. }
  158. */
  159. func satisfyReplicaPlacement(replicaPlacement *super_block.ReplicaPlacement, existingLocations []location, possibleLocation location) bool {
  160. existingDataNodes := make(map[string]int)
  161. for _, loc := range existingLocations {
  162. existingDataNodes[loc.String()] += 1
  163. }
  164. sameDataNodeCount := existingDataNodes[possibleLocation.String()]
  165. // avoid duplicated volume on the same data node
  166. if sameDataNodeCount > 0 {
  167. return false
  168. }
  169. existingDataCenters := make(map[string]int)
  170. for _, loc := range existingLocations {
  171. existingDataCenters[loc.DataCenter()] += 1
  172. }
  173. primaryDataCenters, _ := findTopKeys(existingDataCenters)
  174. // ensure data center count is within limit
  175. if _, found := existingDataCenters[possibleLocation.DataCenter()]; !found {
  176. // different from existing dcs
  177. if len(existingDataCenters) < replicaPlacement.DiffDataCenterCount+1 {
  178. // lack on different dcs
  179. return true
  180. } else {
  181. // adding this would go over the different dcs limit
  182. return false
  183. }
  184. }
  185. // now this is same as one of the existing data center
  186. if !isAmong(possibleLocation.DataCenter(), primaryDataCenters) {
  187. // not on one of the primary dcs
  188. return false
  189. }
  190. // now this is one of the primary dcs
  191. existingRacks := make(map[string]int)
  192. for _, loc := range existingLocations {
  193. if loc.DataCenter() != possibleLocation.DataCenter() {
  194. continue
  195. }
  196. existingRacks[loc.Rack()] += 1
  197. }
  198. primaryRacks, _ := findTopKeys(existingRacks)
  199. sameRackCount := existingRacks[possibleLocation.Rack()]
  200. // ensure rack count is within limit
  201. if _, found := existingRacks[possibleLocation.Rack()]; !found {
  202. // different from existing racks
  203. if len(existingRacks) < replicaPlacement.DiffRackCount+1 {
  204. // lack on different racks
  205. return true
  206. } else {
  207. // adding this would go over the different racks limit
  208. return false
  209. }
  210. }
  211. // now this is same as one of the existing racks
  212. if !isAmong(possibleLocation.Rack(), primaryRacks) {
  213. // not on the primary rack
  214. return false
  215. }
  216. // now this is on the primary rack
  217. // different from existing data nodes
  218. if sameRackCount < replicaPlacement.SameRackCount+1 {
  219. // lack on same rack
  220. return true
  221. } else {
  222. // adding this would go over the same data node limit
  223. return false
  224. }
  225. }
  226. func findTopKeys(m map[string]int) (topKeys []string, max int) {
  227. for k, c := range m {
  228. if max < c {
  229. topKeys = topKeys[:0]
  230. topKeys = append(topKeys, k)
  231. max = c
  232. } else if max == c {
  233. topKeys = append(topKeys, k)
  234. }
  235. }
  236. return
  237. }
  238. func isAmong(key string, keys []string) bool {
  239. for _, k := range keys {
  240. if k == key {
  241. return true
  242. }
  243. }
  244. return false
  245. }
  246. type location struct {
  247. dc string
  248. rack string
  249. dataNode *master_pb.DataNodeInfo
  250. }
  251. func newLocation(dc, rack string, dataNode *master_pb.DataNodeInfo) location {
  252. return location{
  253. dc: dc,
  254. rack: rack,
  255. dataNode: dataNode,
  256. }
  257. }
  258. func (l location) String() string {
  259. return fmt.Sprintf("%s %s %s", l.dc, l.rack, l.dataNode.Id)
  260. }
  261. func (l location) Rack() string {
  262. return fmt.Sprintf("%s %s", l.dc, l.rack)
  263. }
  264. func (l location) DataCenter() string {
  265. return l.dc
  266. }