You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

391 lines
11 KiB

4 months ago
3 months ago
3 months ago
3 months ago
3 months ago
3 months ago
3 months ago
3 months ago
3 months ago
3 months ago
3 months ago
3 months ago
3 months ago
3 months ago
3 months ago
3 months ago
4 months ago
  1. package shell
  2. import (
  3. "context"
  4. "errors"
  5. "flag"
  6. "fmt"
  7. "io"
  8. "net/http"
  9. "sort"
  10. "strings"
  11. "slices"
  12. "github.com/seaweedfs/seaweedfs/weed/security"
  13. "github.com/seaweedfs/seaweedfs/weed/storage/needle"
  14. "github.com/seaweedfs/seaweedfs/weed/wdclient"
  15. "golang.org/x/exp/maps"
  16. "github.com/seaweedfs/seaweedfs/weed/operation"
  17. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  18. "github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
  19. "github.com/seaweedfs/seaweedfs/weed/util"
  20. util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
  21. )
  22. func init() {
  23. Commands = append(Commands, &commandFsMergeVolumes{})
  24. }
  25. type commandFsMergeVolumes struct {
  26. volumes map[needle.VolumeId]*master_pb.VolumeInformationMessage
  27. volumeSizeLimit uint64
  28. }
  29. func (c *commandFsMergeVolumes) Name() string {
  30. return "fs.mergeVolumes"
  31. }
  32. func (c *commandFsMergeVolumes) Help() string {
  33. return `re-locate chunks into target volumes and try to clear lighter volumes.
  34. This would help clear half-full volumes and let vacuum system to delete them later.
  35. fs.mergeVolumes [-toVolumeId=y] [-fromVolumeId=x] [-collection="*"] [-dir=/] [-apply]
  36. `
  37. }
  38. func (c *commandFsMergeVolumes) HasTag(CommandTag) bool {
  39. return false
  40. }
  41. func (c *commandFsMergeVolumes) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
  42. fsMergeVolumesCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
  43. dirArg := fsMergeVolumesCommand.String("dir", "/", "base directory to find and update files")
  44. fromVolumeArg := fsMergeVolumesCommand.Uint("fromVolumeId", 0, "move chunks with this volume id")
  45. toVolumeArg := fsMergeVolumesCommand.Uint("toVolumeId", 0, "change chunks to this volume id")
  46. collectionArg := fsMergeVolumesCommand.String("collection", "*", "Name of collection to merge")
  47. apply := fsMergeVolumesCommand.Bool("apply", false, "applying the metadata changes")
  48. if err = fsMergeVolumesCommand.Parse(args); err != nil {
  49. return err
  50. }
  51. dir := *dirArg
  52. if dir != "/" {
  53. dir = strings.TrimRight(dir, "/")
  54. }
  55. fromVolumeId := needle.VolumeId(*fromVolumeArg)
  56. toVolumeId := needle.VolumeId(*toVolumeArg)
  57. c.reloadVolumesInfo(commandEnv.MasterClient)
  58. if fromVolumeId != 0 && toVolumeId != 0 {
  59. if fromVolumeId == toVolumeId {
  60. return fmt.Errorf("no volume id changes, %d == %d", fromVolumeId, toVolumeId)
  61. }
  62. compatible, err := c.volumesAreCompatible(fromVolumeId, toVolumeId)
  63. if err != nil {
  64. return fmt.Errorf("cannot determine volumes are compatible: %d and %d", fromVolumeId, toVolumeId)
  65. }
  66. if !compatible {
  67. return fmt.Errorf("volume %d is not compatible with volume %d", fromVolumeId, toVolumeId)
  68. }
  69. fromSize := c.getVolumeSizeById(fromVolumeId)
  70. toSize := c.getVolumeSizeById(toVolumeId)
  71. if fromSize+toSize > c.volumeSizeLimit {
  72. return fmt.Errorf(
  73. "volume %d (%d MB) cannot merge into volume %d (%d MB_ due to volume size limit (%d MB)",
  74. fromVolumeId, fromSize/1024/1024,
  75. toVolumeId, toSize/1024/1024,
  76. c.volumeSizeLimit/1024/102,
  77. )
  78. }
  79. }
  80. plan, err := c.createMergePlan(*collectionArg, toVolumeId, fromVolumeId)
  81. if err != nil {
  82. return err
  83. }
  84. c.printPlan(plan)
  85. if len(plan) == 0 {
  86. return nil
  87. }
  88. defer util_http.GetGlobalHttpClient().CloseIdleConnections()
  89. return commandEnv.WithFilerClient(false, func(filerClient filer_pb.SeaweedFilerClient) error {
  90. return filer_pb.TraverseBfs(commandEnv, util.FullPath(dir), func(parentPath util.FullPath, entry *filer_pb.Entry) {
  91. if entry.IsDirectory {
  92. return
  93. }
  94. for _, chunk := range entry.Chunks {
  95. chunkVolumeId := needle.VolumeId(chunk.Fid.VolumeId)
  96. toVolumeId, found := plan[chunkVolumeId]
  97. if !found {
  98. continue
  99. }
  100. if chunk.IsChunkManifest {
  101. fmt.Printf("Change volume id for large file is not implemented yet: %s/%s\n", parentPath, entry.Name)
  102. continue
  103. }
  104. path := parentPath.Child(entry.Name)
  105. fmt.Printf("move %s(%s)\n", path, chunk.GetFileIdString())
  106. if !*apply {
  107. continue
  108. }
  109. if err = moveChunk(chunk, toVolumeId, commandEnv.MasterClient); err != nil {
  110. fmt.Printf("failed to move %s/%s: %v\n", path, chunk.GetFileIdString(), err)
  111. continue
  112. }
  113. if err = filer_pb.UpdateEntry(filerClient, &filer_pb.UpdateEntryRequest{
  114. Directory: string(parentPath),
  115. Entry: entry,
  116. }); err != nil {
  117. fmt.Printf("failed to update %s: %v\n", path, err)
  118. }
  119. }
  120. })
  121. })
  122. }
  123. func (c *commandFsMergeVolumes) getVolumeInfoById(vid needle.VolumeId) (*master_pb.VolumeInformationMessage, error) {
  124. info := c.volumes[vid]
  125. var err error
  126. if info == nil {
  127. err = errors.New("cannot find volume")
  128. }
  129. return info, err
  130. }
  131. func (c *commandFsMergeVolumes) volumesAreCompatible(src needle.VolumeId, dest needle.VolumeId) (bool, error) {
  132. srcInfo, err := c.getVolumeInfoById(src)
  133. if err != nil {
  134. return false, err
  135. }
  136. destInfo, err := c.getVolumeInfoById(dest)
  137. if err != nil {
  138. return false, err
  139. }
  140. return (srcInfo.Collection == destInfo.Collection &&
  141. srcInfo.Ttl == destInfo.Ttl &&
  142. srcInfo.ReplicaPlacement == destInfo.ReplicaPlacement), nil
  143. }
  144. func (c *commandFsMergeVolumes) reloadVolumesInfo(masterClient *wdclient.MasterClient) error {
  145. c.volumes = make(map[needle.VolumeId]*master_pb.VolumeInformationMessage)
  146. return masterClient.WithClient(false, func(client master_pb.SeaweedClient) error {
  147. volumes, err := client.VolumeList(context.Background(), &master_pb.VolumeListRequest{})
  148. if err != nil {
  149. return err
  150. }
  151. c.volumeSizeLimit = volumes.GetVolumeSizeLimitMb() * 1024 * 1024
  152. for _, dc := range volumes.TopologyInfo.DataCenterInfos {
  153. for _, rack := range dc.RackInfos {
  154. for _, node := range rack.DataNodeInfos {
  155. for _, disk := range node.DiskInfos {
  156. for _, volume := range disk.VolumeInfos {
  157. vid := needle.VolumeId(volume.Id)
  158. if found := c.volumes[vid]; found == nil {
  159. c.volumes[vid] = volume
  160. }
  161. }
  162. }
  163. }
  164. }
  165. }
  166. return nil
  167. })
  168. }
  169. func (c *commandFsMergeVolumes) createMergePlan(collection string, toVolumeId needle.VolumeId, fromVolumeId needle.VolumeId) (map[needle.VolumeId]needle.VolumeId, error) {
  170. plan := make(map[needle.VolumeId]needle.VolumeId)
  171. volumeIds := maps.Keys(c.volumes)
  172. sort.Slice(volumeIds, func(a, b int) bool {
  173. return c.volumes[volumeIds[b]].Size < c.volumes[volumeIds[a]].Size
  174. })
  175. l := len(volumeIds)
  176. for i := 0; i < l; i++ {
  177. volume := c.volumes[volumeIds[i]]
  178. if volume.GetReadOnly() || c.getVolumeSize(volume) == 0 || (collection != "*" && collection != volume.GetCollection()) {
  179. if fromVolumeId != 0 && volumeIds[i] == fromVolumeId || toVolumeId != 0 && volumeIds[i] == toVolumeId {
  180. if volume.GetReadOnly() {
  181. return nil, fmt.Errorf("volume %d is readonly", volumeIds[i])
  182. }
  183. if c.getVolumeSize(volume) == 0 {
  184. return nil, fmt.Errorf("volume %d is empty", volumeIds[i])
  185. }
  186. }
  187. volumeIds = slices.Delete(volumeIds, i, i+1)
  188. i--
  189. l--
  190. }
  191. }
  192. for i := l - 1; i >= 0; i-- {
  193. src := volumeIds[i]
  194. if fromVolumeId != 0 && src != fromVolumeId {
  195. continue
  196. }
  197. for j := 0; j < i; j++ {
  198. candidate := volumeIds[j]
  199. if toVolumeId != 0 && candidate != toVolumeId {
  200. continue
  201. }
  202. if _, moving := plan[candidate]; moving {
  203. continue
  204. }
  205. compatible, err := c.volumesAreCompatible(src, candidate)
  206. if err != nil {
  207. return nil, err
  208. }
  209. if !compatible {
  210. fmt.Printf("volume %d is not compatible with volume %d\n", src, candidate)
  211. continue
  212. }
  213. if c.getVolumeSizeBasedOnPlan(plan, candidate)+c.getVolumeSizeById(src) > c.volumeSizeLimit {
  214. fmt.Printf("volume %d (%d MB) merge into volume %d (%d MB) exceeds volume size limit (%d MB)\n",
  215. src, c.getVolumeSizeById(src)/1024/1024,
  216. candidate, c.getVolumeSizeById(candidate)/1024/1024,
  217. c.volumeSizeLimit/1024/1024)
  218. continue
  219. }
  220. plan[src] = candidate
  221. break
  222. }
  223. }
  224. return plan, nil
  225. }
  226. func (c *commandFsMergeVolumes) getVolumeSizeBasedOnPlan(plan map[needle.VolumeId]needle.VolumeId, vid needle.VolumeId) uint64 {
  227. size := c.getVolumeSizeById(vid)
  228. for src, dest := range plan {
  229. if dest == vid {
  230. size += c.getVolumeSizeById(src)
  231. }
  232. }
  233. return size
  234. }
  235. func (c *commandFsMergeVolumes) getVolumeSize(volume *master_pb.VolumeInformationMessage) uint64 {
  236. return volume.Size - volume.DeletedByteCount
  237. }
  238. func (c *commandFsMergeVolumes) getVolumeSizeById(vid needle.VolumeId) uint64 {
  239. return c.getVolumeSize(c.volumes[vid])
  240. }
  241. func (c *commandFsMergeVolumes) printPlan(plan map[needle.VolumeId]needle.VolumeId) {
  242. fmt.Printf("max volume size: %d MB\n", c.volumeSizeLimit/1024/1024)
  243. reversePlan := make(map[needle.VolumeId][]needle.VolumeId)
  244. for src, dest := range plan {
  245. reversePlan[dest] = append(reversePlan[dest], src)
  246. }
  247. for dest, srcs := range reversePlan {
  248. currentSize := c.getVolumeSizeById(dest)
  249. for _, src := range srcs {
  250. srcSize := c.getVolumeSizeById(src)
  251. newSize := currentSize + srcSize
  252. fmt.Printf(
  253. "volume %d (%d MB) merge into volume %d (%d MB => %d MB)\n",
  254. src, srcSize/1024/1024,
  255. dest, currentSize/1024/1024, newSize/1024/1024,
  256. )
  257. currentSize = newSize
  258. }
  259. fmt.Println()
  260. }
  261. }
  262. func moveChunk(chunk *filer_pb.FileChunk, toVolumeId needle.VolumeId, masterClient *wdclient.MasterClient) error {
  263. fromFid := needle.NewFileId(needle.VolumeId(chunk.Fid.VolumeId), chunk.Fid.FileKey, chunk.Fid.Cookie)
  264. toFid := needle.NewFileId(toVolumeId, chunk.Fid.FileKey, chunk.Fid.Cookie)
  265. downloadURLs, err := masterClient.LookupVolumeServerUrl(fromFid.VolumeId.String())
  266. if err != nil {
  267. return err
  268. }
  269. downloadURL := fmt.Sprintf("http://%s/%s?readDeleted=true", downloadURLs[0], fromFid.String())
  270. uploadURLs, err := masterClient.LookupVolumeServerUrl(toVolumeId.String())
  271. if err != nil {
  272. return err
  273. }
  274. uploadURL := fmt.Sprintf("http://%s/%s", uploadURLs[0], toFid.String())
  275. resp, reader, err := readUrl(downloadURL)
  276. if err != nil {
  277. return err
  278. }
  279. defer util_http.CloseResponse(resp)
  280. defer reader.Close()
  281. var filename string
  282. contentDisposition := resp.Header.Get("Content-Disposition")
  283. if len(contentDisposition) > 0 {
  284. idx := strings.Index(contentDisposition, "filename=")
  285. if idx != -1 {
  286. filename = contentDisposition[idx+len("filename="):]
  287. filename = strings.Trim(filename, "\"")
  288. }
  289. }
  290. contentType := resp.Header.Get("Content-Type")
  291. isCompressed := resp.Header.Get("Content-Encoding") == "gzip"
  292. md5 := resp.Header.Get("Content-MD5")
  293. uploader, err := operation.NewUploader()
  294. if err != nil {
  295. return err
  296. }
  297. v := util.GetViper()
  298. signingKey := v.GetString("jwt.signing.key")
  299. var jwt security.EncodedJwt
  300. if signingKey != "" {
  301. expiresAfterSec := v.GetInt("jwt.signing.expires_after_seconds")
  302. jwt = security.GenJwtForVolumeServer(security.SigningKey(signingKey), expiresAfterSec, toFid.String())
  303. }
  304. _, err, _ = uploader.Upload(reader, &operation.UploadOption{
  305. UploadUrl: uploadURL,
  306. Filename: filename,
  307. IsInputCompressed: isCompressed,
  308. Cipher: false,
  309. MimeType: contentType,
  310. PairMap: nil,
  311. Md5: md5,
  312. Jwt: security.EncodedJwt(jwt),
  313. })
  314. if err != nil {
  315. return err
  316. }
  317. chunk.Fid.VolumeId = uint32(toVolumeId)
  318. chunk.FileId = ""
  319. return nil
  320. }
  321. func readUrl(fileUrl string) (*http.Response, io.ReadCloser, error) {
  322. req, err := http.NewRequest(http.MethodGet, fileUrl, nil)
  323. if err != nil {
  324. return nil, nil, err
  325. }
  326. req.Header.Add("Accept-Encoding", "gzip")
  327. r, err := util_http.GetGlobalHttpClient().Do(req)
  328. if err != nil {
  329. return nil, nil, err
  330. }
  331. if r.StatusCode >= 400 {
  332. util_http.CloseResponse(r)
  333. return nil, nil, fmt.Errorf("%s: %s", fileUrl, r.Status)
  334. }
  335. return r, r.Body, nil
  336. }