You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

362 lines
10 KiB

  1. package shell
  2. import (
  3. "context"
  4. "errors"
  5. "flag"
  6. "fmt"
  7. "io"
  8. "net/http"
  9. "sort"
  10. "strings"
  11. "github.com/seaweedfs/seaweedfs/weed/storage/needle"
  12. "github.com/seaweedfs/seaweedfs/weed/wdclient"
  13. "golang.org/x/exp/maps"
  14. "golang.org/x/exp/slices"
  15. "github.com/seaweedfs/seaweedfs/weed/operation"
  16. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  17. "github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
  18. "github.com/seaweedfs/seaweedfs/weed/util"
  19. util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
  20. )
  21. func init() {
  22. Commands = append(Commands, &commandFsMergeVolumes{})
  23. }
  24. type commandFsMergeVolumes struct {
  25. volumes map[needle.VolumeId]*master_pb.VolumeInformationMessage
  26. volumeSizeLimit uint64
  27. }
  28. func (c *commandFsMergeVolumes) Name() string {
  29. return "fs.mergeVolumes"
  30. }
  31. func (c *commandFsMergeVolumes) Help() string {
  32. return `re-locate chunks into target volumes and try to clear lighter volumes.
  33. This would help clear half-full volumes and let vacuum system to delete them later.
  34. fs.mergeVolumes [-toVolumeId=y] [-fromVolumeId=x] [-collection="*"] [-dir=/] [-apply]
  35. `
  36. }
  37. func (c *commandFsMergeVolumes) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
  38. fsMergeVolumesCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
  39. dirArg := fsMergeVolumesCommand.String("dir", "/", "base directory to find and update files")
  40. fromVolumeArg := fsMergeVolumesCommand.Uint("fromVolumeId", 0, "move chunks with this volume id")
  41. toVolumeArg := fsMergeVolumesCommand.Uint("toVolumeId", 0, "change chunks to this volume id")
  42. collectionArg := fsMergeVolumesCommand.String("collection", "*", "Name of collection to merge")
  43. apply := fsMergeVolumesCommand.Bool("apply", false, "applying the metadata changes")
  44. if err = fsMergeVolumesCommand.Parse(args); err != nil {
  45. return err
  46. }
  47. dir := *dirArg
  48. if dir != "/" {
  49. dir = strings.TrimRight(dir, "/")
  50. }
  51. fromVolumeId := needle.VolumeId(*fromVolumeArg)
  52. toVolumeId := needle.VolumeId(*toVolumeArg)
  53. c.reloadVolumesInfo(commandEnv.MasterClient)
  54. if fromVolumeId != 0 && toVolumeId != 0 {
  55. if fromVolumeId == toVolumeId {
  56. return fmt.Errorf("no volume id changes, %d == %d", fromVolumeId, toVolumeId)
  57. }
  58. compatible, err := c.volumesAreCompatible(fromVolumeId, toVolumeId)
  59. if err != nil {
  60. return fmt.Errorf("cannot determine volumes are compatible: %d and %d", fromVolumeId, toVolumeId)
  61. }
  62. if !compatible {
  63. return fmt.Errorf("volume %d is not compatible with volume %d", fromVolumeId, toVolumeId)
  64. }
  65. fromSize := c.getVolumeSizeById(fromVolumeId)
  66. toSize := c.getVolumeSizeById(toVolumeId)
  67. if fromSize+toSize > c.volumeSizeLimit {
  68. return fmt.Errorf(
  69. "volume %d (%d MB) cannot merge into volume %d (%d MB_ due to volume size limit (%d MB)",
  70. fromVolumeId, fromSize/1024/1024,
  71. toVolumeId, toSize/1024/1024,
  72. c.volumeSizeLimit/1024/102,
  73. )
  74. }
  75. }
  76. plan, err := c.createMergePlan(*collectionArg, toVolumeId, fromVolumeId)
  77. if err != nil {
  78. return err
  79. }
  80. c.printPlan(plan)
  81. if len(plan) == 0 {
  82. return nil
  83. }
  84. defer util_http.GetGlobalHttpClient().CloseIdleConnections()
  85. return commandEnv.WithFilerClient(false, func(filerClient filer_pb.SeaweedFilerClient) error {
  86. return filer_pb.TraverseBfs(commandEnv, util.FullPath(dir), func(parentPath util.FullPath, entry *filer_pb.Entry) {
  87. if entry.IsDirectory {
  88. return
  89. }
  90. for _, chunk := range entry.Chunks {
  91. if chunk.IsChunkManifest {
  92. fmt.Printf("Change volume id for large file is not implemented yet: %s/%s\n", parentPath, entry.Name)
  93. continue
  94. }
  95. chunkVolumeId := needle.VolumeId(chunk.Fid.VolumeId)
  96. toVolumeId, found := plan[chunkVolumeId]
  97. if !found {
  98. continue
  99. }
  100. path := parentPath.Child(entry.Name)
  101. fmt.Printf("move %s(%s)\n", path, chunk.GetFileIdString())
  102. if !*apply {
  103. continue
  104. }
  105. if err = moveChunk(chunk, toVolumeId, commandEnv.MasterClient); err != nil {
  106. fmt.Printf("failed to move %s/%s: %v\n", path, chunk.GetFileIdString(), err)
  107. continue
  108. }
  109. if err = filer_pb.UpdateEntry(filerClient, &filer_pb.UpdateEntryRequest{
  110. Directory: string(parentPath),
  111. Entry: entry,
  112. }); err != nil {
  113. fmt.Printf("failed to update %s: %v\n", path, err)
  114. }
  115. }
  116. })
  117. })
  118. }
  119. func (c *commandFsMergeVolumes) getVolumeInfoById(vid needle.VolumeId) (*master_pb.VolumeInformationMessage, error) {
  120. info := c.volumes[vid]
  121. var err error
  122. if info == nil {
  123. err = errors.New("cannot find volume")
  124. }
  125. return info, err
  126. }
  127. func (c *commandFsMergeVolumes) volumesAreCompatible(src needle.VolumeId, dest needle.VolumeId) (bool, error) {
  128. srcInfo, err := c.getVolumeInfoById(src)
  129. if err != nil {
  130. return false, err
  131. }
  132. destInfo, err := c.getVolumeInfoById(dest)
  133. if err != nil {
  134. return false, err
  135. }
  136. return (srcInfo.Collection == destInfo.Collection &&
  137. srcInfo.Ttl == destInfo.Ttl &&
  138. srcInfo.ReplicaPlacement == destInfo.ReplicaPlacement), nil
  139. }
  140. func (c *commandFsMergeVolumes) reloadVolumesInfo(masterClient *wdclient.MasterClient) error {
  141. c.volumes = make(map[needle.VolumeId]*master_pb.VolumeInformationMessage)
  142. return masterClient.WithClient(false, func(client master_pb.SeaweedClient) error {
  143. volumes, err := client.VolumeList(context.Background(), &master_pb.VolumeListRequest{})
  144. if err != nil {
  145. return err
  146. }
  147. c.volumeSizeLimit = volumes.GetVolumeSizeLimitMb() * 1024 * 1024
  148. for _, dc := range volumes.TopologyInfo.DataCenterInfos {
  149. for _, rack := range dc.RackInfos {
  150. for _, node := range rack.DataNodeInfos {
  151. for _, disk := range node.DiskInfos {
  152. for _, volume := range disk.VolumeInfos {
  153. vid := needle.VolumeId(volume.Id)
  154. if found := c.volumes[vid]; found == nil {
  155. c.volumes[vid] = volume
  156. }
  157. }
  158. }
  159. }
  160. }
  161. }
  162. return nil
  163. })
  164. }
  165. func (c *commandFsMergeVolumes) createMergePlan(collection string, toVolumeId needle.VolumeId, fromVolumeId needle.VolumeId) (map[needle.VolumeId]needle.VolumeId, error) {
  166. plan := make(map[needle.VolumeId]needle.VolumeId)
  167. volumes := maps.Keys(c.volumes)
  168. sort.Slice(volumes, func(a, b int) bool {
  169. return c.volumes[volumes[b]].Size < c.volumes[volumes[a]].Size
  170. })
  171. l := len(volumes)
  172. for i := 0; i < l; i++ {
  173. volume := c.volumes[volumes[i]]
  174. if volume.GetReadOnly() || c.getVolumeSize(volume) == 0 || (collection != "*" && collection != volume.GetCollection()) {
  175. volumes = slices.Delete(volumes, i, i+1)
  176. i--
  177. l--
  178. }
  179. }
  180. for i := l - 1; i >= 0; i-- {
  181. src := volumes[i]
  182. if fromVolumeId != 0 && src != fromVolumeId {
  183. continue
  184. }
  185. for j := 0; j < i; j++ {
  186. condidate := volumes[j]
  187. if toVolumeId != 0 && condidate != toVolumeId {
  188. continue
  189. }
  190. if _, moving := plan[condidate]; moving {
  191. continue
  192. }
  193. compatible, err := c.volumesAreCompatible(src, condidate)
  194. if err != nil {
  195. return nil, err
  196. }
  197. if !compatible {
  198. continue
  199. }
  200. if c.getVolumeSizeBasedOnPlan(plan, condidate)+c.getVolumeSizeById(src) > c.volumeSizeLimit {
  201. continue
  202. }
  203. plan[src] = condidate
  204. break
  205. }
  206. }
  207. return plan, nil
  208. }
  209. func (c *commandFsMergeVolumes) getVolumeSizeBasedOnPlan(plan map[needle.VolumeId]needle.VolumeId, vid needle.VolumeId) uint64 {
  210. size := c.getVolumeSizeById(vid)
  211. for src, dest := range plan {
  212. if dest == vid {
  213. size += c.getVolumeSizeById(src)
  214. }
  215. }
  216. return size
  217. }
  218. func (c *commandFsMergeVolumes) getVolumeSize(volume *master_pb.VolumeInformationMessage) uint64 {
  219. return volume.Size - volume.DeletedByteCount
  220. }
  221. func (c *commandFsMergeVolumes) getVolumeSizeById(vid needle.VolumeId) uint64 {
  222. return c.getVolumeSize(c.volumes[vid])
  223. }
  224. func (c *commandFsMergeVolumes) printPlan(plan map[needle.VolumeId]needle.VolumeId) {
  225. fmt.Printf("max volume size: %d MB\n", c.volumeSizeLimit/1024/1024)
  226. reversePlan := make(map[needle.VolumeId][]needle.VolumeId)
  227. for src, dest := range plan {
  228. reversePlan[dest] = append(reversePlan[dest], src)
  229. }
  230. for dest, srcs := range reversePlan {
  231. currentSize := c.getVolumeSizeById(dest)
  232. for _, src := range srcs {
  233. srcSize := c.getVolumeSizeById(src)
  234. newSize := currentSize + srcSize
  235. fmt.Printf(
  236. "volume %d (%d MB) merge into volume %d (%d MB => %d MB)\n",
  237. src, srcSize/1024/1024,
  238. dest, currentSize/1024/1024, newSize/1024/1024,
  239. )
  240. currentSize = newSize
  241. }
  242. fmt.Println()
  243. }
  244. }
  245. func moveChunk(chunk *filer_pb.FileChunk, toVolumeId needle.VolumeId, masterClient *wdclient.MasterClient) error {
  246. fromFid := needle.NewFileId(needle.VolumeId(chunk.Fid.VolumeId), chunk.Fid.FileKey, chunk.Fid.Cookie)
  247. toFid := needle.NewFileId(toVolumeId, chunk.Fid.FileKey, chunk.Fid.Cookie)
  248. downloadURLs, err := masterClient.LookupVolumeServerUrl(fromFid.VolumeId.String())
  249. if err != nil {
  250. return err
  251. }
  252. downloadURL := fmt.Sprintf("http://%s/%s?readDeleted=true", downloadURLs[0], fromFid.String())
  253. uploadURLs, err := masterClient.LookupVolumeServerUrl(toVolumeId.String())
  254. if err != nil {
  255. return err
  256. }
  257. uploadURL := fmt.Sprintf("http://%s/%s", uploadURLs[0], toFid.String())
  258. resp, reader, err := readUrl(downloadURL)
  259. if err != nil {
  260. return err
  261. }
  262. defer util_http.CloseResponse(resp)
  263. defer reader.Close()
  264. var filename string
  265. contentDisposition := resp.Header.Get("Content-Disposition")
  266. if len(contentDisposition) > 0 {
  267. idx := strings.Index(contentDisposition, "filename=")
  268. if idx != -1 {
  269. filename = contentDisposition[idx+len("filename="):]
  270. filename = strings.Trim(filename, "\"")
  271. }
  272. }
  273. contentType := resp.Header.Get("Content-Type")
  274. isCompressed := resp.Header.Get("Content-Encoding") == "gzip"
  275. md5 := resp.Header.Get("Content-MD5")
  276. uploader, err := operation.NewUploader()
  277. if err != nil {
  278. return err
  279. }
  280. _, err, _ = uploader.Upload(reader, &operation.UploadOption{
  281. UploadUrl: uploadURL,
  282. Filename: filename,
  283. IsInputCompressed: isCompressed,
  284. Cipher: false,
  285. MimeType: contentType,
  286. PairMap: nil,
  287. Md5: md5,
  288. })
  289. if err != nil {
  290. return err
  291. }
  292. chunk.Fid.VolumeId = uint32(toVolumeId)
  293. chunk.FileId = ""
  294. return nil
  295. }
  296. func readUrl(fileUrl string) (*http.Response, io.ReadCloser, error) {
  297. req, err := http.NewRequest(http.MethodGet, fileUrl, nil)
  298. if err != nil {
  299. return nil, nil, err
  300. }
  301. req.Header.Add("Accept-Encoding", "gzip")
  302. r, err := util_http.GetGlobalHttpClient().Do(req)
  303. if err != nil {
  304. return nil, nil, err
  305. }
  306. if r.StatusCode >= 400 {
  307. util_http.CloseResponse(r)
  308. return nil, nil, fmt.Errorf("%s: %s", fileUrl, r.Status)
  309. }
  310. return r, r.Body, nil
  311. }