You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

366 lines
10 KiB

4 months ago
4 months ago
  1. package shell
  2. import (
  3. "context"
  4. "errors"
  5. "flag"
  6. "fmt"
  7. "io"
  8. "net/http"
  9. "sort"
  10. "strings"
  11. "github.com/seaweedfs/seaweedfs/weed/storage/needle"
  12. "github.com/seaweedfs/seaweedfs/weed/wdclient"
  13. "golang.org/x/exp/maps"
  14. "golang.org/x/exp/slices"
  15. "github.com/seaweedfs/seaweedfs/weed/operation"
  16. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  17. "github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
  18. "github.com/seaweedfs/seaweedfs/weed/util"
  19. util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
  20. )
  21. func init() {
  22. Commands = append(Commands, &commandFsMergeVolumes{})
  23. }
  24. type commandFsMergeVolumes struct {
  25. volumes map[needle.VolumeId]*master_pb.VolumeInformationMessage
  26. volumeSizeLimit uint64
  27. }
  28. func (c *commandFsMergeVolumes) Name() string {
  29. return "fs.mergeVolumes"
  30. }
  31. func (c *commandFsMergeVolumes) Help() string {
  32. return `re-locate chunks into target volumes and try to clear lighter volumes.
  33. This would help clear half-full volumes and let vacuum system to delete them later.
  34. fs.mergeVolumes [-toVolumeId=y] [-fromVolumeId=x] [-collection="*"] [-dir=/] [-apply]
  35. `
  36. }
  37. func (c *commandFsMergeVolumes) HasTag(CommandTag) bool {
  38. return false
  39. }
  40. func (c *commandFsMergeVolumes) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
  41. fsMergeVolumesCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
  42. dirArg := fsMergeVolumesCommand.String("dir", "/", "base directory to find and update files")
  43. fromVolumeArg := fsMergeVolumesCommand.Uint("fromVolumeId", 0, "move chunks with this volume id")
  44. toVolumeArg := fsMergeVolumesCommand.Uint("toVolumeId", 0, "change chunks to this volume id")
  45. collectionArg := fsMergeVolumesCommand.String("collection", "*", "Name of collection to merge")
  46. apply := fsMergeVolumesCommand.Bool("apply", false, "applying the metadata changes")
  47. if err = fsMergeVolumesCommand.Parse(args); err != nil {
  48. return err
  49. }
  50. dir := *dirArg
  51. if dir != "/" {
  52. dir = strings.TrimRight(dir, "/")
  53. }
  54. fromVolumeId := needle.VolumeId(*fromVolumeArg)
  55. toVolumeId := needle.VolumeId(*toVolumeArg)
  56. c.reloadVolumesInfo(commandEnv.MasterClient)
  57. if fromVolumeId != 0 && toVolumeId != 0 {
  58. if fromVolumeId == toVolumeId {
  59. return fmt.Errorf("no volume id changes, %d == %d", fromVolumeId, toVolumeId)
  60. }
  61. compatible, err := c.volumesAreCompatible(fromVolumeId, toVolumeId)
  62. if err != nil {
  63. return fmt.Errorf("cannot determine volumes are compatible: %d and %d", fromVolumeId, toVolumeId)
  64. }
  65. if !compatible {
  66. return fmt.Errorf("volume %d is not compatible with volume %d", fromVolumeId, toVolumeId)
  67. }
  68. fromSize := c.getVolumeSizeById(fromVolumeId)
  69. toSize := c.getVolumeSizeById(toVolumeId)
  70. if fromSize+toSize > c.volumeSizeLimit {
  71. return fmt.Errorf(
  72. "volume %d (%d MB) cannot merge into volume %d (%d MB_ due to volume size limit (%d MB)",
  73. fromVolumeId, fromSize/1024/1024,
  74. toVolumeId, toSize/1024/1024,
  75. c.volumeSizeLimit/1024/102,
  76. )
  77. }
  78. }
  79. plan, err := c.createMergePlan(*collectionArg, toVolumeId, fromVolumeId)
  80. if err != nil {
  81. return err
  82. }
  83. c.printPlan(plan)
  84. if len(plan) == 0 {
  85. return nil
  86. }
  87. defer util_http.GetGlobalHttpClient().CloseIdleConnections()
  88. return commandEnv.WithFilerClient(false, func(filerClient filer_pb.SeaweedFilerClient) error {
  89. return filer_pb.TraverseBfs(commandEnv, util.FullPath(dir), func(parentPath util.FullPath, entry *filer_pb.Entry) {
  90. if entry.IsDirectory {
  91. return
  92. }
  93. for _, chunk := range entry.Chunks {
  94. if chunk.IsChunkManifest {
  95. fmt.Printf("Change volume id for large file is not implemented yet: %s/%s\n", parentPath, entry.Name)
  96. continue
  97. }
  98. chunkVolumeId := needle.VolumeId(chunk.Fid.VolumeId)
  99. toVolumeId, found := plan[chunkVolumeId]
  100. if !found {
  101. continue
  102. }
  103. path := parentPath.Child(entry.Name)
  104. fmt.Printf("move %s(%s)\n", path, chunk.GetFileIdString())
  105. if !*apply {
  106. continue
  107. }
  108. if err = moveChunk(chunk, toVolumeId, commandEnv.MasterClient); err != nil {
  109. fmt.Printf("failed to move %s/%s: %v\n", path, chunk.GetFileIdString(), err)
  110. continue
  111. }
  112. if err = filer_pb.UpdateEntry(filerClient, &filer_pb.UpdateEntryRequest{
  113. Directory: string(parentPath),
  114. Entry: entry,
  115. }); err != nil {
  116. fmt.Printf("failed to update %s: %v\n", path, err)
  117. }
  118. }
  119. })
  120. })
  121. }
  122. func (c *commandFsMergeVolumes) getVolumeInfoById(vid needle.VolumeId) (*master_pb.VolumeInformationMessage, error) {
  123. info := c.volumes[vid]
  124. var err error
  125. if info == nil {
  126. err = errors.New("cannot find volume")
  127. }
  128. return info, err
  129. }
  130. func (c *commandFsMergeVolumes) volumesAreCompatible(src needle.VolumeId, dest needle.VolumeId) (bool, error) {
  131. srcInfo, err := c.getVolumeInfoById(src)
  132. if err != nil {
  133. return false, err
  134. }
  135. destInfo, err := c.getVolumeInfoById(dest)
  136. if err != nil {
  137. return false, err
  138. }
  139. return (srcInfo.Collection == destInfo.Collection &&
  140. srcInfo.Ttl == destInfo.Ttl &&
  141. srcInfo.ReplicaPlacement == destInfo.ReplicaPlacement), nil
  142. }
  143. func (c *commandFsMergeVolumes) reloadVolumesInfo(masterClient *wdclient.MasterClient) error {
  144. c.volumes = make(map[needle.VolumeId]*master_pb.VolumeInformationMessage)
  145. return masterClient.WithClient(false, func(client master_pb.SeaweedClient) error {
  146. volumes, err := client.VolumeList(context.Background(), &master_pb.VolumeListRequest{})
  147. if err != nil {
  148. return err
  149. }
  150. c.volumeSizeLimit = volumes.GetVolumeSizeLimitMb() * 1024 * 1024
  151. for _, dc := range volumes.TopologyInfo.DataCenterInfos {
  152. for _, rack := range dc.RackInfos {
  153. for _, node := range rack.DataNodeInfos {
  154. for _, disk := range node.DiskInfos {
  155. for _, volume := range disk.VolumeInfos {
  156. vid := needle.VolumeId(volume.Id)
  157. if found := c.volumes[vid]; found == nil {
  158. c.volumes[vid] = volume
  159. }
  160. }
  161. }
  162. }
  163. }
  164. }
  165. return nil
  166. })
  167. }
  168. func (c *commandFsMergeVolumes) createMergePlan(collection string, toVolumeId needle.VolumeId, fromVolumeId needle.VolumeId) (map[needle.VolumeId]needle.VolumeId, error) {
  169. plan := make(map[needle.VolumeId]needle.VolumeId)
  170. volumes := maps.Keys(c.volumes)
  171. sort.Slice(volumes, func(a, b int) bool {
  172. return c.volumes[volumes[b]].Size < c.volumes[volumes[a]].Size
  173. })
  174. l := len(volumes)
  175. for i := 0; i < l; i++ {
  176. volume := c.volumes[volumes[i]]
  177. if volume.GetReadOnly() || c.getVolumeSize(volume) == 0 || (collection != "*" && collection != volume.GetCollection()) {
  178. volumes = slices.Delete(volumes, i, i+1)
  179. i--
  180. l--
  181. }
  182. }
  183. for i := l - 1; i >= 0; i-- {
  184. src := volumes[i]
  185. if fromVolumeId != 0 && src != fromVolumeId {
  186. continue
  187. }
  188. for j := 0; j < i; j++ {
  189. condidate := volumes[j]
  190. if toVolumeId != 0 && condidate != toVolumeId {
  191. continue
  192. }
  193. if _, moving := plan[condidate]; moving {
  194. continue
  195. }
  196. compatible, err := c.volumesAreCompatible(src, condidate)
  197. if err != nil {
  198. return nil, err
  199. }
  200. if !compatible {
  201. continue
  202. }
  203. if c.getVolumeSizeBasedOnPlan(plan, condidate)+c.getVolumeSizeById(src) > c.volumeSizeLimit {
  204. continue
  205. }
  206. plan[src] = condidate
  207. break
  208. }
  209. }
  210. return plan, nil
  211. }
  212. func (c *commandFsMergeVolumes) getVolumeSizeBasedOnPlan(plan map[needle.VolumeId]needle.VolumeId, vid needle.VolumeId) uint64 {
  213. size := c.getVolumeSizeById(vid)
  214. for src, dest := range plan {
  215. if dest == vid {
  216. size += c.getVolumeSizeById(src)
  217. }
  218. }
  219. return size
  220. }
  221. func (c *commandFsMergeVolumes) getVolumeSize(volume *master_pb.VolumeInformationMessage) uint64 {
  222. return volume.Size - volume.DeletedByteCount
  223. }
  224. func (c *commandFsMergeVolumes) getVolumeSizeById(vid needle.VolumeId) uint64 {
  225. return c.getVolumeSize(c.volumes[vid])
  226. }
  227. func (c *commandFsMergeVolumes) printPlan(plan map[needle.VolumeId]needle.VolumeId) {
  228. fmt.Printf("max volume size: %d MB\n", c.volumeSizeLimit/1024/1024)
  229. reversePlan := make(map[needle.VolumeId][]needle.VolumeId)
  230. for src, dest := range plan {
  231. reversePlan[dest] = append(reversePlan[dest], src)
  232. }
  233. for dest, srcs := range reversePlan {
  234. currentSize := c.getVolumeSizeById(dest)
  235. for _, src := range srcs {
  236. srcSize := c.getVolumeSizeById(src)
  237. newSize := currentSize + srcSize
  238. fmt.Printf(
  239. "volume %d (%d MB) merge into volume %d (%d MB => %d MB)\n",
  240. src, srcSize/1024/1024,
  241. dest, currentSize/1024/1024, newSize/1024/1024,
  242. )
  243. currentSize = newSize
  244. }
  245. fmt.Println()
  246. }
  247. }
  248. func moveChunk(chunk *filer_pb.FileChunk, toVolumeId needle.VolumeId, masterClient *wdclient.MasterClient) error {
  249. fromFid := needle.NewFileId(needle.VolumeId(chunk.Fid.VolumeId), chunk.Fid.FileKey, chunk.Fid.Cookie)
  250. toFid := needle.NewFileId(toVolumeId, chunk.Fid.FileKey, chunk.Fid.Cookie)
  251. downloadURLs, err := masterClient.LookupVolumeServerUrl(fromFid.VolumeId.String())
  252. if err != nil {
  253. return err
  254. }
  255. downloadURL := fmt.Sprintf("http://%s/%s?readDeleted=true", downloadURLs[0], fromFid.String())
  256. uploadURLs, err := masterClient.LookupVolumeServerUrl(toVolumeId.String())
  257. if err != nil {
  258. return err
  259. }
  260. uploadURL := fmt.Sprintf("http://%s/%s", uploadURLs[0], toFid.String())
  261. resp, reader, err := readUrl(downloadURL)
  262. if err != nil {
  263. return err
  264. }
  265. defer util_http.CloseResponse(resp)
  266. defer reader.Close()
  267. var filename string
  268. contentDisposition := resp.Header.Get("Content-Disposition")
  269. if len(contentDisposition) > 0 {
  270. idx := strings.Index(contentDisposition, "filename=")
  271. if idx != -1 {
  272. filename = contentDisposition[idx+len("filename="):]
  273. filename = strings.Trim(filename, "\"")
  274. }
  275. }
  276. contentType := resp.Header.Get("Content-Type")
  277. isCompressed := resp.Header.Get("Content-Encoding") == "gzip"
  278. md5 := resp.Header.Get("Content-MD5")
  279. uploader, err := operation.NewUploader()
  280. if err != nil {
  281. return err
  282. }
  283. _, err, _ = uploader.Upload(reader, &operation.UploadOption{
  284. UploadUrl: uploadURL,
  285. Filename: filename,
  286. IsInputCompressed: isCompressed,
  287. Cipher: false,
  288. MimeType: contentType,
  289. PairMap: nil,
  290. Md5: md5,
  291. })
  292. if err != nil {
  293. return err
  294. }
  295. chunk.Fid.VolumeId = uint32(toVolumeId)
  296. chunk.FileId = ""
  297. return nil
  298. }
  299. func readUrl(fileUrl string) (*http.Response, io.ReadCloser, error) {
  300. req, err := http.NewRequest(http.MethodGet, fileUrl, nil)
  301. if err != nil {
  302. return nil, nil, err
  303. }
  304. req.Header.Add("Accept-Encoding", "gzip")
  305. r, err := util_http.GetGlobalHttpClient().Do(req)
  306. if err != nil {
  307. return nil, nil, err
  308. }
  309. if r.StatusCode >= 400 {
  310. util_http.CloseResponse(r)
  311. return nil, nil, fmt.Errorf("%s: %s", fileUrl, r.Status)
  312. }
  313. return r, r.Body, nil
  314. }