You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

194 lines
5.7 KiB

  1. package shell
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/chrislusf/seaweedfs/weed/operation"
  6. "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
  7. "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
  8. "github.com/chrislusf/seaweedfs/weed/storage"
  9. "io"
  10. "math/rand"
  11. "sort"
  12. )
  13. func init() {
  14. commands = append(commands, &commandVolumeFixReplication{})
  15. }
  16. type commandVolumeFixReplication struct {
  17. }
  18. func (c *commandVolumeFixReplication) Name() string {
  19. return "volume.fix.replication"
  20. }
  21. func (c *commandVolumeFixReplication) Help() string {
  22. return `add replicas to volumes that are missing replicas
  23. -n do not take action
  24. `
  25. }
  26. func (c *commandVolumeFixReplication) Do(args []string, commandEnv *commandEnv, writer io.Writer) (err error) {
  27. takeAction := true
  28. if len(args) > 0 && args[0] == "-n" {
  29. takeAction = false
  30. }
  31. var resp *master_pb.VolumeListResponse
  32. ctx := context.Background()
  33. err = commandEnv.masterClient.WithClient(ctx, func(client master_pb.SeaweedClient) error {
  34. resp, err = client.VolumeList(ctx, &master_pb.VolumeListRequest{})
  35. return err
  36. })
  37. if err != nil {
  38. return err
  39. }
  40. // find all volumes that needs replication
  41. // collect all data nodes
  42. replicatedVolumeLocations := make(map[uint32][]location)
  43. replicatedVolumeInfo := make(map[uint32]*master_pb.VolumeInformationMessage)
  44. var allLocations []location
  45. for _, dc := range resp.TopologyInfo.DataCenterInfos {
  46. for _, rack := range dc.RackInfos {
  47. for _, dn := range rack.DataNodeInfos {
  48. loc := newLocation(dc.Id, rack.Id, dn)
  49. for _, v := range dn.VolumeInfos {
  50. if v.ReplicaPlacement > 0 {
  51. replicatedVolumeLocations[v.Id] = append(replicatedVolumeLocations[v.Id], loc)
  52. replicatedVolumeInfo[v.Id] = v
  53. }
  54. }
  55. allLocations = append(allLocations, loc)
  56. }
  57. }
  58. }
  59. // find all under replicated volumes
  60. underReplicatedVolumeLocations := make(map[uint32][]location)
  61. for vid, locations := range replicatedVolumeLocations {
  62. volumeInfo := replicatedVolumeInfo[vid]
  63. replicaPlacement, _ := storage.NewReplicaPlacementFromByte(byte(volumeInfo.ReplicaPlacement))
  64. if replicaPlacement.GetCopyCount() > len(locations) {
  65. underReplicatedVolumeLocations[vid] = locations
  66. }
  67. }
  68. if len(underReplicatedVolumeLocations) == 0 {
  69. return fmt.Errorf("no under replicated volumes")
  70. }
  71. if len(allLocations) == 0 {
  72. return fmt.Errorf("no data nodes at all")
  73. }
  74. // find the most under populated data nodes
  75. keepDataNodesSorted(allLocations)
  76. for vid, locations := range underReplicatedVolumeLocations {
  77. volumeInfo := replicatedVolumeInfo[vid]
  78. replicaPlacement, _ := storage.NewReplicaPlacementFromByte(byte(volumeInfo.ReplicaPlacement))
  79. foundNewLocation := false
  80. for _, dst := range allLocations {
  81. // check whether data nodes satisfy the constraints
  82. if dst.dataNode.FreeVolumeCount > 0 && satisfyReplicaPlacement(replicaPlacement, locations, dst) {
  83. // ask the volume server to replicate the volume
  84. sourceNodes := underReplicatedVolumeLocations[vid]
  85. sourceNode := sourceNodes[rand.Intn(len(sourceNodes))]
  86. foundNewLocation = true
  87. fmt.Fprintf(writer, "replicating volume %d %s from %s to dataNode %s ...\n", volumeInfo.Id, replicaPlacement, sourceNode.dataNode.Id, dst.dataNode.Id)
  88. if !takeAction {
  89. break
  90. }
  91. err := operation.WithVolumeServerClient(dst.dataNode.Id, commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
  92. _, replicateErr := volumeServerClient.ReplicateVolume(ctx, &volume_server_pb.ReplicateVolumeRequest{
  93. VolumeId: volumeInfo.Id,
  94. Collection: volumeInfo.Collection,
  95. SourceDataNode: sourceNode.dataNode.Id,
  96. })
  97. return replicateErr
  98. })
  99. if err != nil {
  100. return err
  101. }
  102. // adjust free volume count
  103. dst.dataNode.FreeVolumeCount--
  104. keepDataNodesSorted(allLocations)
  105. break
  106. }
  107. }
  108. if !foundNewLocation {
  109. fmt.Fprintf(writer, "failed to place volume %d replica as %s, existing:%+v\n", volumeInfo.Id, replicaPlacement, locations)
  110. }
  111. }
  112. return nil
  113. }
  114. func keepDataNodesSorted(dataNodes []location) {
  115. sort.Slice(dataNodes, func(i, j int) bool {
  116. return dataNodes[i].dataNode.FreeVolumeCount > dataNodes[j].dataNode.FreeVolumeCount
  117. })
  118. }
  119. func satisfyReplicaPlacement(replicaPlacement *storage.ReplicaPlacement, existingLocations []location, possibleLocation location) bool {
  120. existingDataCenters := make(map[string]bool)
  121. existingRacks := make(map[string]bool)
  122. existingDataNodes := make(map[string]bool)
  123. for _, loc := range existingLocations {
  124. existingDataCenters[loc.DataCenter()] = true
  125. existingRacks[loc.Rack()] = true
  126. existingDataNodes[loc.String()] = true
  127. }
  128. if replicaPlacement.DiffDataCenterCount >= len(existingDataCenters) {
  129. // check dc, good if different from any existing data centers
  130. _, found := existingDataCenters[possibleLocation.DataCenter()]
  131. return !found
  132. } else if replicaPlacement.DiffRackCount >= len(existingRacks) {
  133. // check rack, good if different from any existing racks
  134. _, found := existingRacks[possibleLocation.Rack()]
  135. return !found
  136. } else if replicaPlacement.SameRackCount >= len(existingDataNodes) {
  137. // check data node, good if different from any existing data nodes
  138. _, found := existingDataNodes[possibleLocation.String()]
  139. return !found
  140. }
  141. return false
  142. }
  143. type location struct {
  144. dc string
  145. rack string
  146. dataNode *master_pb.DataNodeInfo
  147. }
  148. func newLocation(dc, rack string, dataNode *master_pb.DataNodeInfo) location {
  149. return location{
  150. dc: dc,
  151. rack: rack,
  152. dataNode: dataNode,
  153. }
  154. }
  155. func (l location) String() string {
  156. return fmt.Sprintf("%s %s %s", l.dc, l.rack, l.dataNode.Id)
  157. }
  158. func (l location) Rack() string {
  159. return fmt.Sprintf("%s %s", l.dc, l.rack)
  160. }
  161. func (l location) DataCenter() string {
  162. return l.dc
  163. }