You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

155 lines
5.0 KiB

4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
  1. package shell
  2. import (
  3. "flag"
  4. "fmt"
  5. "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
  6. "github.com/chrislusf/seaweedfs/weed/storage/types"
  7. "io"
  8. "time"
  9. "github.com/chrislusf/seaweedfs/weed/storage/needle"
  10. )
  11. func init() {
  12. Commands = append(Commands, &commandVolumeTierMove{})
  13. }
  14. type commandVolumeTierMove struct {
  15. }
  16. func (c *commandVolumeTierMove) Name() string {
  17. return "volume.tier.move"
  18. }
  19. func (c *commandVolumeTierMove) Help() string {
  20. return `change a volume from one disk type to another
  21. volume.tier.move -fromDiskType=hdd -toDiskType=ssd [-collection=""] [-fullPercent=95] [-quietFor=1h]
  22. Even if the volume is replicated, only one replica will be changed and the rest replicas will be dropped.
  23. So "volume.fix.replication" and "volume.balance" should be followed.
  24. `
  25. }
  26. func (c *commandVolumeTierMove) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
  27. if err = commandEnv.confirmIsLocked(); err != nil {
  28. return
  29. }
  30. tierCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
  31. collection := tierCommand.String("collection", "", "the collection name")
  32. fullPercentage := tierCommand.Float64("fullPercent", 95, "the volume reaches the percentage of max volume size")
  33. quietPeriod := tierCommand.Duration("quietFor", 24*time.Hour, "select volumes without no writes for this period")
  34. source := tierCommand.String("fromDiskType", "", "the source disk type")
  35. target := tierCommand.String("toDiskType", "", "the target disk type")
  36. applyChange := tierCommand.Bool("force", false, "actually apply the changes")
  37. if err = tierCommand.Parse(args); err != nil {
  38. return nil
  39. }
  40. fromDiskType := types.ToDiskType(*source)
  41. toDiskType := types.ToDiskType(*target)
  42. if fromDiskType == toDiskType {
  43. return fmt.Errorf("source tier %s is the same as target tier %s", fromDiskType, toDiskType)
  44. }
  45. // collect topology information
  46. topologyInfo, volumeSizeLimitMb, err := collectTopologyInfo(commandEnv)
  47. if err != nil {
  48. return err
  49. }
  50. // collect all volumes that should change
  51. volumeIds, err := collectVolumeIdsForTierChange(commandEnv, topologyInfo, volumeSizeLimitMb, fromDiskType, *collection, *fullPercentage, *quietPeriod)
  52. if err != nil {
  53. return err
  54. }
  55. fmt.Printf("tier move volumes: %v\n", volumeIds)
  56. _, allLocations := collectVolumeReplicaLocations(topologyInfo)
  57. for _, vid := range volumeIds {
  58. if err = doVolumeTierMove(commandEnv, writer, *collection, vid, toDiskType, allLocations, *applyChange); err != nil {
  59. fmt.Printf("tier move volume %d: %v\n", vid, err)
  60. }
  61. }
  62. return nil
  63. }
  64. func doVolumeTierMove(commandEnv *CommandEnv, writer io.Writer, collection string, vid needle.VolumeId, toDiskType types.DiskType, allLocations []location, applyChanges bool) (err error) {
  65. // find volume location
  66. locations, found := commandEnv.MasterClient.GetLocations(uint32(vid))
  67. if !found {
  68. return fmt.Errorf("volume %d not found", vid)
  69. }
  70. // find one server with the most empty volume slots with target disk type
  71. hasFoundTarget := false
  72. keepDataNodesSorted(allLocations, toDiskType)
  73. fn := capacityByFreeVolumeCount(toDiskType)
  74. for _, dst := range allLocations {
  75. if fn(dst.dataNode) > 0 {
  76. // ask the volume server to replicate the volume
  77. sourceVolumeServer := ""
  78. for _, loc := range locations {
  79. if loc.Url != dst.dataNode.Id {
  80. sourceVolumeServer = loc.Url
  81. }
  82. }
  83. if sourceVolumeServer == "" {
  84. continue
  85. }
  86. fmt.Fprintf(writer, "moving volume %d %s from %s to dataNode %s with disk type ...\n", vid, sourceVolumeServer, dst.dataNode.Id, toDiskType.String())
  87. hasFoundTarget = true
  88. if !applyChanges {
  89. break
  90. }
  91. // mark all replicas as read only
  92. err = markVolumeReadonly(commandEnv.option.GrpcDialOption, vid, locations)
  93. if err != nil {
  94. return fmt.Errorf("mark volume %d as readonly on %s: %v", vid, locations[0].Url, err)
  95. }
  96. return LiveMoveVolume(commandEnv.option.GrpcDialOption, vid, sourceVolumeServer, dst.dataNode.Id, 5*time.Second, toDiskType.String())
  97. }
  98. }
  99. if !hasFoundTarget {
  100. fmt.Fprintf(writer, "can not find disk type %s for volume %d\n", toDiskType.String(), vid)
  101. }
  102. return nil
  103. }
  104. func collectVolumeIdsForTierChange(commandEnv *CommandEnv, topologyInfo *master_pb.TopologyInfo, volumeSizeLimitMb uint64, sourceTier types.DiskType, selectedCollection string, fullPercentage float64, quietPeriod time.Duration) (vids []needle.VolumeId, err error) {
  105. quietSeconds := int64(quietPeriod / time.Second)
  106. nowUnixSeconds := time.Now().Unix()
  107. fmt.Printf("collect %s volumes quiet for: %d seconds\n", sourceTier, quietSeconds)
  108. vidMap := make(map[uint32]bool)
  109. eachDataNode(topologyInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) {
  110. for _, diskInfo := range dn.DiskInfos {
  111. for _, v := range diskInfo.VolumeInfos {
  112. if v.Collection == selectedCollection && v.ModifiedAtSecond+quietSeconds < nowUnixSeconds && types.ToDiskType(v.DiskType) == sourceTier {
  113. if float64(v.Size) > fullPercentage/100*float64(volumeSizeLimitMb)*1024*1024 {
  114. vidMap[v.Id] = true
  115. }
  116. }
  117. }
  118. }
  119. })
  120. for vid := range vidMap {
  121. vids = append(vids, needle.VolumeId(vid))
  122. }
  123. return
  124. }