You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

200 lines
6.2 KiB

6 years ago
6 years ago
6 years ago
6 years ago
  1. package shell
  2. import (
  3. "context"
  4. "fmt"
  5. "io"
  6. "math/rand"
  7. "sort"
  8. "github.com/chrislusf/seaweedfs/weed/operation"
  9. "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
  10. "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
  11. "github.com/chrislusf/seaweedfs/weed/storage/super_block"
  12. )
  13. func init() {
  14. Commands = append(Commands, &commandVolumeFixReplication{})
  15. }
  16. type commandVolumeFixReplication struct {
  17. }
  18. func (c *commandVolumeFixReplication) Name() string {
  19. return "volume.fix.replication"
  20. }
  21. func (c *commandVolumeFixReplication) Help() string {
  22. return `add replicas to volumes that are missing replicas
  23. This command file all under-replicated volumes, and find volume servers with free slots.
  24. If the free slots satisfy the replication requirement, the volume content is copied over and mounted.
  25. volume.fix.replication -n # do not take action
  26. volume.fix.replication # actually copying the volume files and mount the volume
  27. Note:
  28. * each time this will only add back one replica for one volume id. If there are multiple replicas
  29. are missing, e.g. multiple volume servers are new, you may need to run this multiple times.
  30. * do not run this too quick within seconds, since the new volume replica may take a few seconds
  31. to register itself to the master.
  32. `
  33. }
  34. func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
  35. takeAction := true
  36. if len(args) > 0 && args[0] == "-n" {
  37. takeAction = false
  38. }
  39. var resp *master_pb.VolumeListResponse
  40. err = commandEnv.MasterClient.WithClient(func(client master_pb.SeaweedClient) error {
  41. resp, err = client.VolumeList(context.Background(), &master_pb.VolumeListRequest{})
  42. return err
  43. })
  44. if err != nil {
  45. return err
  46. }
  47. // find all volumes that needs replication
  48. // collect all data nodes
  49. replicatedVolumeLocations := make(map[uint32][]location)
  50. replicatedVolumeInfo := make(map[uint32]*master_pb.VolumeInformationMessage)
  51. var allLocations []location
  52. eachDataNode(resp.TopologyInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) {
  53. loc := newLocation(dc, string(rack), dn)
  54. for _, v := range dn.VolumeInfos {
  55. if v.ReplicaPlacement > 0 {
  56. replicatedVolumeLocations[v.Id] = append(replicatedVolumeLocations[v.Id], loc)
  57. replicatedVolumeInfo[v.Id] = v
  58. }
  59. }
  60. allLocations = append(allLocations, loc)
  61. })
  62. // find all under replicated volumes
  63. underReplicatedVolumeLocations := make(map[uint32][]location)
  64. for vid, locations := range replicatedVolumeLocations {
  65. volumeInfo := replicatedVolumeInfo[vid]
  66. replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(volumeInfo.ReplicaPlacement))
  67. if replicaPlacement.GetCopyCount() > len(locations) {
  68. underReplicatedVolumeLocations[vid] = locations
  69. }
  70. }
  71. if len(underReplicatedVolumeLocations) == 0 {
  72. return fmt.Errorf("no under replicated volumes")
  73. }
  74. if len(allLocations) == 0 {
  75. return fmt.Errorf("no data nodes at all")
  76. }
  77. // find the most under populated data nodes
  78. keepDataNodesSorted(allLocations)
  79. for vid, locations := range underReplicatedVolumeLocations {
  80. volumeInfo := replicatedVolumeInfo[vid]
  81. replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(volumeInfo.ReplicaPlacement))
  82. foundNewLocation := false
  83. for _, dst := range allLocations {
  84. // check whether data nodes satisfy the constraints
  85. if dst.dataNode.FreeVolumeCount > 0 && satisfyReplicaPlacement(replicaPlacement, locations, dst) {
  86. // ask the volume server to replicate the volume
  87. sourceNodes := underReplicatedVolumeLocations[vid]
  88. sourceNode := sourceNodes[rand.Intn(len(sourceNodes))]
  89. foundNewLocation = true
  90. fmt.Fprintf(writer, "replicating volume %d %s from %s to dataNode %s ...\n", volumeInfo.Id, replicaPlacement, sourceNode.dataNode.Id, dst.dataNode.Id)
  91. if !takeAction {
  92. break
  93. }
  94. err := operation.WithVolumeServerClient(dst.dataNode.Id, commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
  95. _, replicateErr := volumeServerClient.VolumeCopy(context.Background(), &volume_server_pb.VolumeCopyRequest{
  96. VolumeId: volumeInfo.Id,
  97. SourceDataNode: sourceNode.dataNode.Id,
  98. })
  99. return replicateErr
  100. })
  101. if err != nil {
  102. return err
  103. }
  104. // adjust free volume count
  105. dst.dataNode.FreeVolumeCount--
  106. keepDataNodesSorted(allLocations)
  107. break
  108. }
  109. }
  110. if !foundNewLocation {
  111. fmt.Fprintf(writer, "failed to place volume %d replica as %s, existing:%+v\n", volumeInfo.Id, replicaPlacement, locations)
  112. }
  113. }
  114. return nil
  115. }
  116. func keepDataNodesSorted(dataNodes []location) {
  117. sort.Slice(dataNodes, func(i, j int) bool {
  118. return dataNodes[i].dataNode.FreeVolumeCount > dataNodes[j].dataNode.FreeVolumeCount
  119. })
  120. }
  121. func satisfyReplicaPlacement(replicaPlacement *super_block.ReplicaPlacement, existingLocations []location, possibleLocation location) bool {
  122. existingDataCenters := make(map[string]bool)
  123. existingRacks := make(map[string]bool)
  124. existingDataNodes := make(map[string]bool)
  125. for _, loc := range existingLocations {
  126. existingDataCenters[loc.DataCenter()] = true
  127. existingRacks[loc.Rack()] = true
  128. existingDataNodes[loc.String()] = true
  129. }
  130. if replicaPlacement.DiffDataCenterCount >= len(existingDataCenters) {
  131. // check dc, good if different from any existing data centers
  132. _, found := existingDataCenters[possibleLocation.DataCenter()]
  133. return !found
  134. } else if replicaPlacement.DiffRackCount >= len(existingRacks) {
  135. // check rack, good if different from any existing racks
  136. _, found := existingRacks[possibleLocation.Rack()]
  137. return !found
  138. } else if replicaPlacement.SameRackCount >= len(existingDataNodes) {
  139. // check data node, good if different from any existing data nodes
  140. _, found := existingDataNodes[possibleLocation.String()]
  141. return !found
  142. }
  143. return false
  144. }
  145. type location struct {
  146. dc string
  147. rack string
  148. dataNode *master_pb.DataNodeInfo
  149. }
  150. func newLocation(dc, rack string, dataNode *master_pb.DataNodeInfo) location {
  151. return location{
  152. dc: dc,
  153. rack: rack,
  154. dataNode: dataNode,
  155. }
  156. }
  157. func (l location) String() string {
  158. return fmt.Sprintf("%s %s %s", l.dc, l.rack, l.dataNode.Id)
  159. }
  160. func (l location) Rack() string {
  161. return fmt.Sprintf("%s %s", l.dc, l.rack)
  162. }
  163. func (l location) DataCenter() string {
  164. return l.dc
  165. }