You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

200 lines
6.2 KiB

6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
  1. package shell
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/joeslay/seaweedfs/weed/operation"
  6. "github.com/joeslay/seaweedfs/weed/pb/master_pb"
  7. "github.com/joeslay/seaweedfs/weed/pb/volume_server_pb"
  8. "github.com/joeslay/seaweedfs/weed/storage"
  9. "io"
  10. "math/rand"
  11. "sort"
  12. )
  13. func init() {
  14. Commands = append(Commands, &commandVolumeFixReplication{})
  15. }
  16. type commandVolumeFixReplication struct {
  17. }
  18. func (c *commandVolumeFixReplication) Name() string {
  19. return "volume.fix.replication"
  20. }
  21. func (c *commandVolumeFixReplication) Help() string {
  22. return `add replicas to volumes that are missing replicas
  23. This command file all under-replicated volumes, and find volume servers with free slots.
  24. If the free slots satisfy the replication requirement, the volume content is copied over and mounted.
  25. volume.fix.replication -n # do not take action
  26. volume.fix.replication # actually copying the volume files and mount the volume
  27. Note:
  28. * each time this will only add back one replica for one volume id. If there are multiple replicas
  29. are missing, e.g. multiple volume servers are new, you may need to run this multiple times.
  30. * do not run this too quick within seconds, since the new volume replica may take a few seconds
  31. to register itself to the master.
  32. `
  33. }
  34. func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
  35. takeAction := true
  36. if len(args) > 0 && args[0] == "-n" {
  37. takeAction = false
  38. }
  39. var resp *master_pb.VolumeListResponse
  40. ctx := context.Background()
  41. err = commandEnv.MasterClient.WithClient(ctx, func(client master_pb.SeaweedClient) error {
  42. resp, err = client.VolumeList(ctx, &master_pb.VolumeListRequest{})
  43. return err
  44. })
  45. if err != nil {
  46. return err
  47. }
  48. // find all volumes that needs replication
  49. // collect all data nodes
  50. replicatedVolumeLocations := make(map[uint32][]location)
  51. replicatedVolumeInfo := make(map[uint32]*master_pb.VolumeInformationMessage)
  52. var allLocations []location
  53. eachDataNode(resp.TopologyInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) {
  54. loc := newLocation(dc, string(rack), dn)
  55. for _, v := range dn.VolumeInfos {
  56. if v.ReplicaPlacement > 0 {
  57. replicatedVolumeLocations[v.Id] = append(replicatedVolumeLocations[v.Id], loc)
  58. replicatedVolumeInfo[v.Id] = v
  59. }
  60. }
  61. allLocations = append(allLocations, loc)
  62. })
  63. // find all under replicated volumes
  64. underReplicatedVolumeLocations := make(map[uint32][]location)
  65. for vid, locations := range replicatedVolumeLocations {
  66. volumeInfo := replicatedVolumeInfo[vid]
  67. replicaPlacement, _ := storage.NewReplicaPlacementFromByte(byte(volumeInfo.ReplicaPlacement))
  68. if replicaPlacement.GetCopyCount() > len(locations) {
  69. underReplicatedVolumeLocations[vid] = locations
  70. }
  71. }
  72. if len(underReplicatedVolumeLocations) == 0 {
  73. return fmt.Errorf("no under replicated volumes")
  74. }
  75. if len(allLocations) == 0 {
  76. return fmt.Errorf("no data nodes at all")
  77. }
  78. // find the most under populated data nodes
  79. keepDataNodesSorted(allLocations)
  80. for vid, locations := range underReplicatedVolumeLocations {
  81. volumeInfo := replicatedVolumeInfo[vid]
  82. replicaPlacement, _ := storage.NewReplicaPlacementFromByte(byte(volumeInfo.ReplicaPlacement))
  83. foundNewLocation := false
  84. for _, dst := range allLocations {
  85. // check whether data nodes satisfy the constraints
  86. if dst.dataNode.FreeVolumeCount > 0 && satisfyReplicaPlacement(replicaPlacement, locations, dst) {
  87. // ask the volume server to replicate the volume
  88. sourceNodes := underReplicatedVolumeLocations[vid]
  89. sourceNode := sourceNodes[rand.Intn(len(sourceNodes))]
  90. foundNewLocation = true
  91. fmt.Fprintf(writer, "replicating volume %d %s from %s to dataNode %s ...\n", volumeInfo.Id, replicaPlacement, sourceNode.dataNode.Id, dst.dataNode.Id)
  92. if !takeAction {
  93. break
  94. }
  95. err := operation.WithVolumeServerClient(dst.dataNode.Id, commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
  96. _, replicateErr := volumeServerClient.VolumeCopy(ctx, &volume_server_pb.VolumeCopyRequest{
  97. VolumeId: volumeInfo.Id,
  98. SourceDataNode: sourceNode.dataNode.Id,
  99. })
  100. return replicateErr
  101. })
  102. if err != nil {
  103. return err
  104. }
  105. // adjust free volume count
  106. dst.dataNode.FreeVolumeCount--
  107. keepDataNodesSorted(allLocations)
  108. break
  109. }
  110. }
  111. if !foundNewLocation {
  112. fmt.Fprintf(writer, "failed to place volume %d replica as %s, existing:%+v\n", volumeInfo.Id, replicaPlacement, locations)
  113. }
  114. }
  115. return nil
  116. }
  117. func keepDataNodesSorted(dataNodes []location) {
  118. sort.Slice(dataNodes, func(i, j int) bool {
  119. return dataNodes[i].dataNode.FreeVolumeCount > dataNodes[j].dataNode.FreeVolumeCount
  120. })
  121. }
  122. func satisfyReplicaPlacement(replicaPlacement *storage.ReplicaPlacement, existingLocations []location, possibleLocation location) bool {
  123. existingDataCenters := make(map[string]bool)
  124. existingRacks := make(map[string]bool)
  125. existingDataNodes := make(map[string]bool)
  126. for _, loc := range existingLocations {
  127. existingDataCenters[loc.DataCenter()] = true
  128. existingRacks[loc.Rack()] = true
  129. existingDataNodes[loc.String()] = true
  130. }
  131. if replicaPlacement.DiffDataCenterCount >= len(existingDataCenters) {
  132. // check dc, good if different from any existing data centers
  133. _, found := existingDataCenters[possibleLocation.DataCenter()]
  134. return !found
  135. } else if replicaPlacement.DiffRackCount >= len(existingRacks) {
  136. // check rack, good if different from any existing racks
  137. _, found := existingRacks[possibleLocation.Rack()]
  138. return !found
  139. } else if replicaPlacement.SameRackCount >= len(existingDataNodes) {
  140. // check data node, good if different from any existing data nodes
  141. _, found := existingDataNodes[possibleLocation.String()]
  142. return !found
  143. }
  144. return false
  145. }
  146. type location struct {
  147. dc string
  148. rack string
  149. dataNode *master_pb.DataNodeInfo
  150. }
  151. func newLocation(dc, rack string, dataNode *master_pb.DataNodeInfo) location {
  152. return location{
  153. dc: dc,
  154. rack: rack,
  155. dataNode: dataNode,
  156. }
  157. }
  158. func (l location) String() string {
  159. return fmt.Sprintf("%s %s %s", l.dc, l.rack, l.dataNode.Id)
  160. }
  161. func (l location) Rack() string {
  162. return fmt.Sprintf("%s %s", l.dc, l.rack)
  163. }
  164. func (l location) DataCenter() string {
  165. return l.dc
  166. }