You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

380 lines
11 KiB

3 months ago
3 months ago
  1. package shell
  2. import (
  3. "context"
  4. "errors"
  5. "flag"
  6. "fmt"
  7. "io"
  8. "net/http"
  9. "sort"
  10. "strings"
  11. "github.com/seaweedfs/seaweedfs/weed/storage/needle"
  12. "github.com/seaweedfs/seaweedfs/weed/wdclient"
  13. "golang.org/x/exp/maps"
  14. "golang.org/x/exp/slices"
  15. "github.com/seaweedfs/seaweedfs/weed/operation"
  16. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  17. "github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
  18. "github.com/seaweedfs/seaweedfs/weed/util"
  19. util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
  20. )
  21. func init() {
  22. Commands = append(Commands, &commandFsMergeVolumes{})
  23. }
  24. type commandFsMergeVolumes struct {
  25. volumes map[needle.VolumeId]*master_pb.VolumeInformationMessage
  26. volumeSizeLimit uint64
  27. }
  28. func (c *commandFsMergeVolumes) Name() string {
  29. return "fs.mergeVolumes"
  30. }
  31. func (c *commandFsMergeVolumes) Help() string {
  32. return `re-locate chunks into target volumes and try to clear lighter volumes.
  33. This would help clear half-full volumes and let vacuum system to delete them later.
  34. fs.mergeVolumes [-toVolumeId=y] [-fromVolumeId=x] [-collection="*"] [-dir=/] [-apply]
  35. `
  36. }
  37. func (c *commandFsMergeVolumes) HasTag(CommandTag) bool {
  38. return false
  39. }
  40. func (c *commandFsMergeVolumes) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
  41. fsMergeVolumesCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
  42. dirArg := fsMergeVolumesCommand.String("dir", "/", "base directory to find and update files")
  43. fromVolumeArg := fsMergeVolumesCommand.Uint("fromVolumeId", 0, "move chunks with this volume id")
  44. toVolumeArg := fsMergeVolumesCommand.Uint("toVolumeId", 0, "change chunks to this volume id")
  45. collectionArg := fsMergeVolumesCommand.String("collection", "*", "Name of collection to merge")
  46. apply := fsMergeVolumesCommand.Bool("apply", false, "applying the metadata changes")
  47. if err = fsMergeVolumesCommand.Parse(args); err != nil {
  48. return err
  49. }
  50. dir := *dirArg
  51. if dir != "/" {
  52. dir = strings.TrimRight(dir, "/")
  53. }
  54. fromVolumeId := needle.VolumeId(*fromVolumeArg)
  55. toVolumeId := needle.VolumeId(*toVolumeArg)
  56. c.reloadVolumesInfo(commandEnv.MasterClient)
  57. if fromVolumeId != 0 && toVolumeId != 0 {
  58. if fromVolumeId == toVolumeId {
  59. return fmt.Errorf("no volume id changes, %d == %d", fromVolumeId, toVolumeId)
  60. }
  61. compatible, err := c.volumesAreCompatible(fromVolumeId, toVolumeId)
  62. if err != nil {
  63. return fmt.Errorf("cannot determine volumes are compatible: %d and %d", fromVolumeId, toVolumeId)
  64. }
  65. if !compatible {
  66. return fmt.Errorf("volume %d is not compatible with volume %d", fromVolumeId, toVolumeId)
  67. }
  68. fromSize := c.getVolumeSizeById(fromVolumeId)
  69. toSize := c.getVolumeSizeById(toVolumeId)
  70. if fromSize+toSize > c.volumeSizeLimit {
  71. return fmt.Errorf(
  72. "volume %d (%d MB) cannot merge into volume %d (%d MB_ due to volume size limit (%d MB)",
  73. fromVolumeId, fromSize/1024/1024,
  74. toVolumeId, toSize/1024/1024,
  75. c.volumeSizeLimit/1024/102,
  76. )
  77. }
  78. }
  79. plan, err := c.createMergePlan(*collectionArg, toVolumeId, fromVolumeId)
  80. if err != nil {
  81. return err
  82. }
  83. c.printPlan(plan)
  84. if len(plan) == 0 {
  85. return nil
  86. }
  87. defer util_http.GetGlobalHttpClient().CloseIdleConnections()
  88. return commandEnv.WithFilerClient(false, func(filerClient filer_pb.SeaweedFilerClient) error {
  89. return filer_pb.TraverseBfs(commandEnv, util.FullPath(dir), func(parentPath util.FullPath, entry *filer_pb.Entry) {
  90. if entry.IsDirectory {
  91. return
  92. }
  93. for _, chunk := range entry.Chunks {
  94. if chunk.IsChunkManifest {
  95. fmt.Printf("Change volume id for large file is not implemented yet: %s/%s\n", parentPath, entry.Name)
  96. continue
  97. }
  98. chunkVolumeId := needle.VolumeId(chunk.Fid.VolumeId)
  99. toVolumeId, found := plan[chunkVolumeId]
  100. if !found {
  101. continue
  102. }
  103. path := parentPath.Child(entry.Name)
  104. fmt.Printf("move %s(%s)\n", path, chunk.GetFileIdString())
  105. if !*apply {
  106. continue
  107. }
  108. if err = moveChunk(chunk, toVolumeId, commandEnv.MasterClient); err != nil {
  109. fmt.Printf("failed to move %s/%s: %v\n", path, chunk.GetFileIdString(), err)
  110. continue
  111. }
  112. if err = filer_pb.UpdateEntry(filerClient, &filer_pb.UpdateEntryRequest{
  113. Directory: string(parentPath),
  114. Entry: entry,
  115. }); err != nil {
  116. fmt.Printf("failed to update %s: %v\n", path, err)
  117. }
  118. }
  119. })
  120. })
  121. }
  122. func (c *commandFsMergeVolumes) getVolumeInfoById(vid needle.VolumeId) (*master_pb.VolumeInformationMessage, error) {
  123. info := c.volumes[vid]
  124. var err error
  125. if info == nil {
  126. err = errors.New("cannot find volume")
  127. }
  128. return info, err
  129. }
  130. func (c *commandFsMergeVolumes) volumesAreCompatible(src needle.VolumeId, dest needle.VolumeId) (bool, error) {
  131. srcInfo, err := c.getVolumeInfoById(src)
  132. if err != nil {
  133. return false, err
  134. }
  135. destInfo, err := c.getVolumeInfoById(dest)
  136. if err != nil {
  137. return false, err
  138. }
  139. return (srcInfo.Collection == destInfo.Collection &&
  140. srcInfo.Ttl == destInfo.Ttl &&
  141. srcInfo.ReplicaPlacement == destInfo.ReplicaPlacement), nil
  142. }
  143. func (c *commandFsMergeVolumes) reloadVolumesInfo(masterClient *wdclient.MasterClient) error {
  144. c.volumes = make(map[needle.VolumeId]*master_pb.VolumeInformationMessage)
  145. return masterClient.WithClient(false, func(client master_pb.SeaweedClient) error {
  146. volumes, err := client.VolumeList(context.Background(), &master_pb.VolumeListRequest{})
  147. if err != nil {
  148. return err
  149. }
  150. c.volumeSizeLimit = volumes.GetVolumeSizeLimitMb() * 1024 * 1024
  151. for _, dc := range volumes.TopologyInfo.DataCenterInfos {
  152. for _, rack := range dc.RackInfos {
  153. for _, node := range rack.DataNodeInfos {
  154. for _, disk := range node.DiskInfos {
  155. for _, volume := range disk.VolumeInfos {
  156. vid := needle.VolumeId(volume.Id)
  157. if found := c.volumes[vid]; found == nil {
  158. c.volumes[vid] = volume
  159. }
  160. }
  161. }
  162. }
  163. }
  164. }
  165. return nil
  166. })
  167. }
  168. func (c *commandFsMergeVolumes) createMergePlan(collection string, toVolumeId needle.VolumeId, fromVolumeId needle.VolumeId) (map[needle.VolumeId]needle.VolumeId, error) {
  169. plan := make(map[needle.VolumeId]needle.VolumeId)
  170. volumes := maps.Keys(c.volumes)
  171. sort.Slice(volumes, func(a, b int) bool {
  172. return c.volumes[volumes[b]].Size < c.volumes[volumes[a]].Size
  173. })
  174. l := len(volumes)
  175. for i := 0; i < l; i++ {
  176. volume := c.volumes[volumes[i]]
  177. if volume.GetReadOnly() || c.getVolumeSize(volume) == 0 || (collection != "*" && collection != volume.GetCollection()) {
  178. if fromVolumeId != 0 && volumes[i] == fromVolumeId || toVolumeId != 0 && volumes[i] == toVolumeId {
  179. if volume.GetReadOnly() {
  180. return nil, fmt.Errorf("volume %d is readonly", volumes[i])
  181. }
  182. if c.getVolumeSize(volume) == 0 {
  183. return nil, fmt.Errorf("volume %d is empty", volumes[i])
  184. }
  185. }
  186. volumes = slices.Delete(volumes, i, i+1)
  187. i--
  188. l--
  189. }
  190. }
  191. for i := l - 1; i >= 0; i-- {
  192. src := volumes[i]
  193. if fromVolumeId != 0 && src != fromVolumeId {
  194. continue
  195. }
  196. for j := 0; j < i; j++ {
  197. condidate := volumes[j]
  198. if toVolumeId != 0 && condidate != toVolumeId {
  199. continue
  200. }
  201. if _, moving := plan[condidate]; moving {
  202. continue
  203. }
  204. compatible, err := c.volumesAreCompatible(src, condidate)
  205. if err != nil {
  206. return nil, err
  207. }
  208. if !compatible {
  209. fmt.Printf("volume %d is not compatible with volume %d\n", src, condidate)
  210. continue
  211. }
  212. if c.getVolumeSizeBasedOnPlan(plan, condidate)+c.getVolumeSizeById(src) > c.volumeSizeLimit {
  213. fmt.Printf("volume %d (%d MB) merge into volume %d (%d MB) exceeds volume size limit (%d MB)\n",
  214. src, c.getVolumeSizeById(src)/1024/1024,
  215. condidate, c.getVolumeSizeById(condidate)/1024/1024,
  216. c.volumeSizeLimit/1024/1024)
  217. continue
  218. }
  219. plan[src] = condidate
  220. break
  221. }
  222. }
  223. return plan, nil
  224. }
  225. func (c *commandFsMergeVolumes) getVolumeSizeBasedOnPlan(plan map[needle.VolumeId]needle.VolumeId, vid needle.VolumeId) uint64 {
  226. size := c.getVolumeSizeById(vid)
  227. for src, dest := range plan {
  228. if dest == vid {
  229. size += c.getVolumeSizeById(src)
  230. }
  231. }
  232. return size
  233. }
  234. func (c *commandFsMergeVolumes) getVolumeSize(volume *master_pb.VolumeInformationMessage) uint64 {
  235. return volume.Size - volume.DeletedByteCount
  236. }
  237. func (c *commandFsMergeVolumes) getVolumeSizeById(vid needle.VolumeId) uint64 {
  238. return c.getVolumeSize(c.volumes[vid])
  239. }
  240. func (c *commandFsMergeVolumes) printPlan(plan map[needle.VolumeId]needle.VolumeId) {
  241. fmt.Printf("max volume size: %d MB\n", c.volumeSizeLimit/1024/1024)
  242. reversePlan := make(map[needle.VolumeId][]needle.VolumeId)
  243. for src, dest := range plan {
  244. reversePlan[dest] = append(reversePlan[dest], src)
  245. }
  246. for dest, srcs := range reversePlan {
  247. currentSize := c.getVolumeSizeById(dest)
  248. for _, src := range srcs {
  249. srcSize := c.getVolumeSizeById(src)
  250. newSize := currentSize + srcSize
  251. fmt.Printf(
  252. "volume %d (%d MB) merge into volume %d (%d MB => %d MB)\n",
  253. src, srcSize/1024/1024,
  254. dest, currentSize/1024/1024, newSize/1024/1024,
  255. )
  256. currentSize = newSize
  257. }
  258. fmt.Println()
  259. }
  260. }
  261. func moveChunk(chunk *filer_pb.FileChunk, toVolumeId needle.VolumeId, masterClient *wdclient.MasterClient) error {
  262. fromFid := needle.NewFileId(needle.VolumeId(chunk.Fid.VolumeId), chunk.Fid.FileKey, chunk.Fid.Cookie)
  263. toFid := needle.NewFileId(toVolumeId, chunk.Fid.FileKey, chunk.Fid.Cookie)
  264. downloadURLs, err := masterClient.LookupVolumeServerUrl(fromFid.VolumeId.String())
  265. if err != nil {
  266. return err
  267. }
  268. downloadURL := fmt.Sprintf("http://%s/%s?readDeleted=true", downloadURLs[0], fromFid.String())
  269. uploadURLs, err := masterClient.LookupVolumeServerUrl(toVolumeId.String())
  270. if err != nil {
  271. return err
  272. }
  273. uploadURL := fmt.Sprintf("http://%s/%s", uploadURLs[0], toFid.String())
  274. resp, reader, err := readUrl(downloadURL)
  275. if err != nil {
  276. return err
  277. }
  278. defer util_http.CloseResponse(resp)
  279. defer reader.Close()
  280. var filename string
  281. contentDisposition := resp.Header.Get("Content-Disposition")
  282. if len(contentDisposition) > 0 {
  283. idx := strings.Index(contentDisposition, "filename=")
  284. if idx != -1 {
  285. filename = contentDisposition[idx+len("filename="):]
  286. filename = strings.Trim(filename, "\"")
  287. }
  288. }
  289. contentType := resp.Header.Get("Content-Type")
  290. isCompressed := resp.Header.Get("Content-Encoding") == "gzip"
  291. md5 := resp.Header.Get("Content-MD5")
  292. uploader, err := operation.NewUploader()
  293. if err != nil {
  294. return err
  295. }
  296. _, err, _ = uploader.Upload(reader, &operation.UploadOption{
  297. UploadUrl: uploadURL,
  298. Filename: filename,
  299. IsInputCompressed: isCompressed,
  300. Cipher: false,
  301. MimeType: contentType,
  302. PairMap: nil,
  303. Md5: md5,
  304. })
  305. if err != nil {
  306. return err
  307. }
  308. chunk.Fid.VolumeId = uint32(toVolumeId)
  309. chunk.FileId = ""
  310. return nil
  311. }
  312. func readUrl(fileUrl string) (*http.Response, io.ReadCloser, error) {
  313. req, err := http.NewRequest(http.MethodGet, fileUrl, nil)
  314. if err != nil {
  315. return nil, nil, err
  316. }
  317. req.Header.Add("Accept-Encoding", "gzip")
  318. r, err := util_http.GetGlobalHttpClient().Do(req)
  319. if err != nil {
  320. return nil, nil, err
  321. }
  322. if r.StatusCode >= 400 {
  323. util_http.CloseResponse(r)
  324. return nil, nil, fmt.Errorf("%s: %s", fileUrl, r.Status)
  325. }
  326. return r, r.Body, nil
  327. }