You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

361 lines
10 KiB

  1. package shell
  2. import (
  3. "context"
  4. "errors"
  5. "flag"
  6. "fmt"
  7. "io"
  8. "net/http"
  9. "sort"
  10. "strings"
  11. "github.com/seaweedfs/seaweedfs/weed/storage/needle"
  12. "github.com/seaweedfs/seaweedfs/weed/wdclient"
  13. "golang.org/x/exp/maps"
  14. "golang.org/x/exp/slices"
  15. "github.com/seaweedfs/seaweedfs/weed/operation"
  16. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  17. "github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
  18. "github.com/seaweedfs/seaweedfs/weed/util"
  19. )
  20. var (
  21. client *http.Client
  22. )
  23. func init() {
  24. client = &http.Client{}
  25. Commands = append(Commands, &commandFsMergeVolumes{})
  26. }
  27. type commandFsMergeVolumes struct {
  28. volumes map[needle.VolumeId]*master_pb.VolumeInformationMessage
  29. volumeSizeLimit uint64
  30. }
  31. func (c *commandFsMergeVolumes) Name() string {
  32. return "fs.mergeVolumes"
  33. }
  34. func (c *commandFsMergeVolumes) Help() string {
  35. return `re-locate chunks into target volumes and try to clear lighter volumes.
  36. This would help clear half-full volumes and let vacuum system to delete them later.
  37. fs.mergeVolumes [-toVolumeId=y] [-fromVolumeId=x] [-collection="*"] [-dir=/] [-apply]
  38. `
  39. }
  40. func (c *commandFsMergeVolumes) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
  41. fsMergeVolumesCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
  42. dirArg := fsMergeVolumesCommand.String("dir", "/", "base directory to find and update files")
  43. fromVolumeArg := fsMergeVolumesCommand.Uint("fromVolumeId", 0, "move chunks with this volume id")
  44. toVolumeArg := fsMergeVolumesCommand.Uint("toVolumeId", 0, "change chunks to this volume id")
  45. collectionArg := fsMergeVolumesCommand.String("collection", "*", "Name of collection to merge")
  46. apply := fsMergeVolumesCommand.Bool("apply", false, "applying the metadata changes")
  47. if err = fsMergeVolumesCommand.Parse(args); err != nil {
  48. return err
  49. }
  50. dir := *dirArg
  51. if dir != "/" {
  52. dir = strings.TrimRight(dir, "/")
  53. }
  54. fromVolumeId := needle.VolumeId(*fromVolumeArg)
  55. toVolumeId := needle.VolumeId(*toVolumeArg)
  56. c.reloadVolumesInfo(commandEnv.MasterClient)
  57. if fromVolumeId != 0 && toVolumeId != 0 {
  58. if fromVolumeId == toVolumeId {
  59. return fmt.Errorf("no volume id changes, %d == %d", fromVolumeId, toVolumeId)
  60. }
  61. compatible, err := c.volumesAreCompatible(fromVolumeId, toVolumeId)
  62. if err != nil {
  63. return fmt.Errorf("cannot determine volumes are compatible: %d and %d", fromVolumeId, toVolumeId)
  64. }
  65. if !compatible {
  66. return fmt.Errorf("volume %d is not compatible with volume %d", fromVolumeId, toVolumeId)
  67. }
  68. fromSize := c.getVolumeSizeById(fromVolumeId)
  69. toSize := c.getVolumeSizeById(toVolumeId)
  70. if fromSize+toSize > c.volumeSizeLimit {
  71. return fmt.Errorf(
  72. "volume %d (%d MB) cannot merge into volume %d (%d MB_ due to volume size limit (%d MB)",
  73. fromVolumeId, fromSize/1024/1024,
  74. toVolumeId, toSize/1024/1024,
  75. c.volumeSizeLimit/1024/102,
  76. )
  77. }
  78. }
  79. plan, err := c.createMergePlan(*collectionArg, toVolumeId, fromVolumeId)
  80. if err != nil {
  81. return err
  82. }
  83. c.printPlan(plan)
  84. if len(plan) == 0 {
  85. return nil
  86. }
  87. defer client.CloseIdleConnections()
  88. return commandEnv.WithFilerClient(false, func(filerClient filer_pb.SeaweedFilerClient) error {
  89. return filer_pb.TraverseBfs(commandEnv, util.FullPath(dir), func(parentPath util.FullPath, entry *filer_pb.Entry) {
  90. if entry.IsDirectory {
  91. return
  92. }
  93. for _, chunk := range entry.Chunks {
  94. if chunk.IsChunkManifest {
  95. fmt.Printf("Change volume id for large file is not implemented yet: %s/%s\n", parentPath, entry.Name)
  96. continue
  97. }
  98. chunkVolumeId := needle.VolumeId(chunk.Fid.VolumeId)
  99. toVolumeId, found := plan[chunkVolumeId]
  100. if !found {
  101. continue
  102. }
  103. path := parentPath.Child(entry.Name)
  104. fmt.Printf("move %s(%s)\n", path, chunk.GetFileIdString())
  105. if !*apply {
  106. continue
  107. }
  108. if err = moveChunk(chunk, toVolumeId, commandEnv.MasterClient); err != nil {
  109. fmt.Printf("failed to move %s/%s: %v\n", path, chunk.GetFileIdString(), err)
  110. continue
  111. }
  112. if err = filer_pb.UpdateEntry(filerClient, &filer_pb.UpdateEntryRequest{
  113. Directory: string(parentPath),
  114. Entry: entry,
  115. }); err != nil {
  116. fmt.Printf("failed to update %s: %v\n", path, err)
  117. }
  118. }
  119. })
  120. })
  121. }
  122. func (c *commandFsMergeVolumes) getVolumeInfoById(vid needle.VolumeId) (*master_pb.VolumeInformationMessage, error) {
  123. info := c.volumes[vid]
  124. var err error
  125. if info == nil {
  126. err = errors.New("cannot find volume")
  127. }
  128. return info, err
  129. }
  130. func (c *commandFsMergeVolumes) volumesAreCompatible(src needle.VolumeId, dest needle.VolumeId) (bool, error) {
  131. srcInfo, err := c.getVolumeInfoById(src)
  132. if err != nil {
  133. return false, err
  134. }
  135. destInfo, err := c.getVolumeInfoById(dest)
  136. if err != nil {
  137. return false, err
  138. }
  139. return (srcInfo.Collection == destInfo.Collection &&
  140. srcInfo.Ttl == destInfo.Ttl &&
  141. srcInfo.ReplicaPlacement == destInfo.ReplicaPlacement), nil
  142. }
  143. func (c *commandFsMergeVolumes) reloadVolumesInfo(masterClient *wdclient.MasterClient) error {
  144. c.volumes = make(map[needle.VolumeId]*master_pb.VolumeInformationMessage)
  145. return masterClient.WithClient(false, func(client master_pb.SeaweedClient) error {
  146. volumes, err := client.VolumeList(context.Background(), &master_pb.VolumeListRequest{})
  147. if err != nil {
  148. return err
  149. }
  150. c.volumeSizeLimit = volumes.GetVolumeSizeLimitMb() * 1024 * 1024
  151. for _, dc := range volumes.TopologyInfo.DataCenterInfos {
  152. for _, rack := range dc.RackInfos {
  153. for _, node := range rack.DataNodeInfos {
  154. for _, disk := range node.DiskInfos {
  155. for _, volume := range disk.VolumeInfos {
  156. vid := needle.VolumeId(volume.Id)
  157. if found := c.volumes[vid]; found == nil {
  158. c.volumes[vid] = volume
  159. }
  160. }
  161. }
  162. }
  163. }
  164. }
  165. return nil
  166. })
  167. }
  168. func (c *commandFsMergeVolumes) createMergePlan(collection string, toVolumeId needle.VolumeId, fromVolumeId needle.VolumeId) (map[needle.VolumeId]needle.VolumeId, error) {
  169. plan := make(map[needle.VolumeId]needle.VolumeId)
  170. volumes := maps.Keys(c.volumes)
  171. sort.Slice(volumes, func(a, b int) bool {
  172. return c.volumes[volumes[b]].Size < c.volumes[volumes[a]].Size
  173. })
  174. l := len(volumes)
  175. for i := 0; i < l; i++ {
  176. volume := c.volumes[volumes[i]]
  177. if volume.GetReadOnly() || c.getVolumeSize(volume) == 0 || (collection != "*" && collection != volume.GetCollection()) {
  178. volumes = slices.Delete(volumes, i, i+1)
  179. i--
  180. l--
  181. }
  182. }
  183. for i := l - 1; i >= 0; i-- {
  184. src := volumes[i]
  185. if fromVolumeId != 0 && src != fromVolumeId {
  186. continue
  187. }
  188. for j := 0; j < i; j++ {
  189. condidate := volumes[j]
  190. if toVolumeId != 0 && condidate != toVolumeId {
  191. continue
  192. }
  193. if _, moving := plan[condidate]; moving {
  194. continue
  195. }
  196. compatible, err := c.volumesAreCompatible(src, condidate)
  197. if err != nil {
  198. return nil, err
  199. }
  200. if !compatible {
  201. continue
  202. }
  203. if c.getVolumeSizeBasedOnPlan(plan, condidate)+c.getVolumeSizeById(src) > c.volumeSizeLimit {
  204. continue
  205. }
  206. plan[src] = condidate
  207. break
  208. }
  209. }
  210. return plan, nil
  211. }
  212. func (c *commandFsMergeVolumes) getVolumeSizeBasedOnPlan(plan map[needle.VolumeId]needle.VolumeId, vid needle.VolumeId) uint64 {
  213. size := c.getVolumeSizeById(vid)
  214. for src, dest := range plan {
  215. if dest == vid {
  216. size += c.getVolumeSizeById(src)
  217. }
  218. }
  219. return size
  220. }
  221. func (c *commandFsMergeVolumes) getVolumeSize(volume *master_pb.VolumeInformationMessage) uint64 {
  222. return volume.Size - volume.DeletedByteCount
  223. }
  224. func (c *commandFsMergeVolumes) getVolumeSizeById(vid needle.VolumeId) uint64 {
  225. return c.getVolumeSize(c.volumes[vid])
  226. }
  227. func (c *commandFsMergeVolumes) printPlan(plan map[needle.VolumeId]needle.VolumeId) {
  228. fmt.Printf("max volume size: %d MB\n", c.volumeSizeLimit/1024/1024)
  229. reversePlan := make(map[needle.VolumeId][]needle.VolumeId)
  230. for src, dest := range plan {
  231. reversePlan[dest] = append(reversePlan[dest], src)
  232. }
  233. for dest, srcs := range reversePlan {
  234. currentSize := c.getVolumeSizeById(dest)
  235. for _, src := range srcs {
  236. srcSize := c.getVolumeSizeById(src)
  237. newSize := currentSize + srcSize
  238. fmt.Printf(
  239. "volume %d (%d MB) merge into volume %d (%d MB => %d MB)\n",
  240. src, srcSize/1024/1024,
  241. dest, currentSize/1024/1024, newSize/1024/1024,
  242. )
  243. currentSize = newSize
  244. }
  245. fmt.Println()
  246. }
  247. }
  248. func moveChunk(chunk *filer_pb.FileChunk, toVolumeId needle.VolumeId, masterClient *wdclient.MasterClient) error {
  249. fromFid := needle.NewFileId(needle.VolumeId(chunk.Fid.VolumeId), chunk.Fid.FileKey, chunk.Fid.Cookie)
  250. toFid := needle.NewFileId(toVolumeId, chunk.Fid.FileKey, chunk.Fid.Cookie)
  251. downloadURLs, err := masterClient.LookupVolumeServerUrl(fromFid.VolumeId.String())
  252. if err != nil {
  253. return err
  254. }
  255. downloadURL := fmt.Sprintf("http://%s/%s?readDeleted=true", downloadURLs[0], fromFid.String())
  256. uploadURLs, err := masterClient.LookupVolumeServerUrl(toVolumeId.String())
  257. if err != nil {
  258. return err
  259. }
  260. uploadURL := fmt.Sprintf("http://%s/%s", uploadURLs[0], toFid.String())
  261. resp, reader, err := readUrl(downloadURL)
  262. if err != nil {
  263. return err
  264. }
  265. defer util.CloseResponse(resp)
  266. defer reader.Close()
  267. var filename string
  268. contentDisposition := resp.Header.Get("Content-Disposition")
  269. if len(contentDisposition) > 0 {
  270. idx := strings.Index(contentDisposition, "filename=")
  271. if idx != -1 {
  272. filename = contentDisposition[idx+len("filename="):]
  273. filename = strings.Trim(filename, "\"")
  274. }
  275. }
  276. contentType := resp.Header.Get("Content-Type")
  277. isCompressed := resp.Header.Get("Content-Encoding") == "gzip"
  278. md5 := resp.Header.Get("Content-MD5")
  279. _, err, _ = operation.Upload(reader, &operation.UploadOption{
  280. UploadUrl: uploadURL,
  281. Filename: filename,
  282. IsInputCompressed: isCompressed,
  283. Cipher: false,
  284. MimeType: contentType,
  285. PairMap: nil,
  286. Md5: md5,
  287. })
  288. if err != nil {
  289. return err
  290. }
  291. chunk.Fid.VolumeId = uint32(toVolumeId)
  292. chunk.FileId = ""
  293. return nil
  294. }
  295. func readUrl(fileUrl string) (*http.Response, io.ReadCloser, error) {
  296. req, err := http.NewRequest("GET", fileUrl, nil)
  297. if err != nil {
  298. return nil, nil, err
  299. }
  300. req.Header.Add("Accept-Encoding", "gzip")
  301. r, err := client.Do(req)
  302. if err != nil {
  303. return nil, nil, err
  304. }
  305. if r.StatusCode >= 400 {
  306. util.CloseResponse(r)
  307. return nil, nil, fmt.Errorf("%s: %s", fileUrl, r.Status)
  308. }
  309. return r, r.Body, nil
  310. }