You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

199 lines
6.0 KiB

  1. package shell
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/chrislusf/seaweedfs/weed/operation"
  6. "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
  7. "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
  8. "github.com/chrislusf/seaweedfs/weed/storage"
  9. "io"
  10. "math/rand"
  11. "sort"
  12. )
  13. func init() {
  14. commands = append(commands, &commandVolumeFixReplication{})
  15. }
  16. type commandVolumeFixReplication struct {
  17. }
  18. func (c *commandVolumeFixReplication) Name() string {
  19. return "volume.fix.replication"
  20. }
  21. func (c *commandVolumeFixReplication) Help() string {
  22. return `add replicas to volumes that are missing replicas
  23. This command file all under-replicated volumes, and find volume servers with free slots.
  24. If the free slots satisfy the replication requirement, the volume content is copied over and mounted.
  25. volume.fix.replication -n # do not take action
  26. volume.fix.replication # actually copying the volume files and mount the volume
  27. `
  28. }
  29. func (c *commandVolumeFixReplication) Do(args []string, commandEnv *commandEnv, writer io.Writer) (err error) {
  30. takeAction := true
  31. if len(args) > 0 && args[0] == "-n" {
  32. takeAction = false
  33. }
  34. var resp *master_pb.VolumeListResponse
  35. ctx := context.Background()
  36. err = commandEnv.masterClient.WithClient(ctx, func(client master_pb.SeaweedClient) error {
  37. resp, err = client.VolumeList(ctx, &master_pb.VolumeListRequest{})
  38. return err
  39. })
  40. if err != nil {
  41. return err
  42. }
  43. // find all volumes that needs replication
  44. // collect all data nodes
  45. replicatedVolumeLocations := make(map[uint32][]location)
  46. replicatedVolumeInfo := make(map[uint32]*master_pb.VolumeInformationMessage)
  47. var allLocations []location
  48. for _, dc := range resp.TopologyInfo.DataCenterInfos {
  49. for _, rack := range dc.RackInfos {
  50. for _, dn := range rack.DataNodeInfos {
  51. loc := newLocation(dc.Id, rack.Id, dn)
  52. for _, v := range dn.VolumeInfos {
  53. if v.ReplicaPlacement > 0 {
  54. replicatedVolumeLocations[v.Id] = append(replicatedVolumeLocations[v.Id], loc)
  55. replicatedVolumeInfo[v.Id] = v
  56. }
  57. }
  58. allLocations = append(allLocations, loc)
  59. }
  60. }
  61. }
  62. // find all under replicated volumes
  63. underReplicatedVolumeLocations := make(map[uint32][]location)
  64. for vid, locations := range replicatedVolumeLocations {
  65. volumeInfo := replicatedVolumeInfo[vid]
  66. replicaPlacement, _ := storage.NewReplicaPlacementFromByte(byte(volumeInfo.ReplicaPlacement))
  67. if replicaPlacement.GetCopyCount() > len(locations) {
  68. underReplicatedVolumeLocations[vid] = locations
  69. }
  70. }
  71. if len(underReplicatedVolumeLocations) == 0 {
  72. return fmt.Errorf("no under replicated volumes")
  73. }
  74. if len(allLocations) == 0 {
  75. return fmt.Errorf("no data nodes at all")
  76. }
  77. // find the most under populated data nodes
  78. keepDataNodesSorted(allLocations)
  79. for vid, locations := range underReplicatedVolumeLocations {
  80. volumeInfo := replicatedVolumeInfo[vid]
  81. replicaPlacement, _ := storage.NewReplicaPlacementFromByte(byte(volumeInfo.ReplicaPlacement))
  82. foundNewLocation := false
  83. for _, dst := range allLocations {
  84. // check whether data nodes satisfy the constraints
  85. if dst.dataNode.FreeVolumeCount > 0 && satisfyReplicaPlacement(replicaPlacement, locations, dst) {
  86. // ask the volume server to replicate the volume
  87. sourceNodes := underReplicatedVolumeLocations[vid]
  88. sourceNode := sourceNodes[rand.Intn(len(sourceNodes))]
  89. foundNewLocation = true
  90. fmt.Fprintf(writer, "replicating volume %d %s from %s to dataNode %s ...\n", volumeInfo.Id, replicaPlacement, sourceNode.dataNode.Id, dst.dataNode.Id)
  91. if !takeAction {
  92. break
  93. }
  94. err := operation.WithVolumeServerClient(dst.dataNode.Id, commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
  95. _, replicateErr := volumeServerClient.ReplicateVolume(ctx, &volume_server_pb.ReplicateVolumeRequest{
  96. VolumeId: volumeInfo.Id,
  97. Collection: volumeInfo.Collection,
  98. SourceDataNode: sourceNode.dataNode.Id,
  99. })
  100. return replicateErr
  101. })
  102. if err != nil {
  103. return err
  104. }
  105. // adjust free volume count
  106. dst.dataNode.FreeVolumeCount--
  107. keepDataNodesSorted(allLocations)
  108. break
  109. }
  110. }
  111. if !foundNewLocation {
  112. fmt.Fprintf(writer, "failed to place volume %d replica as %s, existing:%+v\n", volumeInfo.Id, replicaPlacement, locations)
  113. }
  114. }
  115. return nil
  116. }
  117. func keepDataNodesSorted(dataNodes []location) {
  118. sort.Slice(dataNodes, func(i, j int) bool {
  119. return dataNodes[i].dataNode.FreeVolumeCount > dataNodes[j].dataNode.FreeVolumeCount
  120. })
  121. }
  122. func satisfyReplicaPlacement(replicaPlacement *storage.ReplicaPlacement, existingLocations []location, possibleLocation location) bool {
  123. existingDataCenters := make(map[string]bool)
  124. existingRacks := make(map[string]bool)
  125. existingDataNodes := make(map[string]bool)
  126. for _, loc := range existingLocations {
  127. existingDataCenters[loc.DataCenter()] = true
  128. existingRacks[loc.Rack()] = true
  129. existingDataNodes[loc.String()] = true
  130. }
  131. if replicaPlacement.DiffDataCenterCount >= len(existingDataCenters) {
  132. // check dc, good if different from any existing data centers
  133. _, found := existingDataCenters[possibleLocation.DataCenter()]
  134. return !found
  135. } else if replicaPlacement.DiffRackCount >= len(existingRacks) {
  136. // check rack, good if different from any existing racks
  137. _, found := existingRacks[possibleLocation.Rack()]
  138. return !found
  139. } else if replicaPlacement.SameRackCount >= len(existingDataNodes) {
  140. // check data node, good if different from any existing data nodes
  141. _, found := existingDataNodes[possibleLocation.String()]
  142. return !found
  143. }
  144. return false
  145. }
  146. type location struct {
  147. dc string
  148. rack string
  149. dataNode *master_pb.DataNodeInfo
  150. }
  151. func newLocation(dc, rack string, dataNode *master_pb.DataNodeInfo) location {
  152. return location{
  153. dc: dc,
  154. rack: rack,
  155. dataNode: dataNode,
  156. }
  157. }
  158. func (l location) String() string {
  159. return fmt.Sprintf("%s %s %s", l.dc, l.rack, l.dataNode.Id)
  160. }
  161. func (l location) Rack() string {
  162. return fmt.Sprintf("%s %s", l.dc, l.rack)
  163. }
  164. func (l location) DataCenter() string {
  165. return l.dc
  166. }