You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

270 lines
7.7 KiB

  1. package shell
  2. import (
  3. "context"
  4. "errors"
  5. "flag"
  6. "fmt"
  7. "io"
  8. "net/http"
  9. "strings"
  10. "github.com/seaweedfs/seaweedfs/weed/storage/needle"
  11. "github.com/seaweedfs/seaweedfs/weed/wdclient"
  12. "github.com/seaweedfs/seaweedfs/weed/operation"
  13. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  14. "github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
  15. "github.com/seaweedfs/seaweedfs/weed/util"
  16. )
  17. var (
  18. client *http.Client
  19. )
  20. func init() {
  21. client = &http.Client{}
  22. Commands = append(Commands, &commandFsMergeVolumes{})
  23. }
  24. type commandFsMergeVolumes struct {
  25. volumes map[needle.VolumeId]*master_pb.VolumeInformationMessage
  26. }
  27. func (c *commandFsMergeVolumes) Name() string {
  28. return "fs.mergeVolumes"
  29. }
  30. func (c *commandFsMergeVolumes) Help() string {
  31. return `re-locate chunks into target volumes and try to clear lighter volumes.
  32. This would help clear half-full volumes and let vacuum system to delete them later.
  33. fs.mergeVolumes -toVolumeId=y [-fromVolumeId=x] [-apply] /dir/
  34. `
  35. }
  36. func (c *commandFsMergeVolumes) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
  37. dir, err := commandEnv.parseUrl(findInputDirectory(args))
  38. if err != nil {
  39. return err
  40. }
  41. dir = strings.TrimRight(dir, "/")
  42. fsMergeVolumesCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
  43. fromVolumeArg := fsMergeVolumesCommand.Uint("fromVolumeId", 0, "move chunks with this volume id")
  44. toVolumeArg := fsMergeVolumesCommand.Uint("toVolumeId", 0, "change chunks to this volume id")
  45. apply := fsMergeVolumesCommand.Bool("apply", false, "applying the metadata changes")
  46. if err = fsMergeVolumesCommand.Parse(args); err != nil {
  47. return err
  48. }
  49. fromVolumeId := needle.VolumeId(*fromVolumeArg)
  50. toVolumeId := needle.VolumeId(*toVolumeArg)
  51. if toVolumeId == 0 {
  52. return fmt.Errorf("volume id can not be zero")
  53. }
  54. c.reloadVolumesInfo(commandEnv.MasterClient)
  55. toVolumeInfo, err := c.getVolumeInfoById(toVolumeId)
  56. if err != nil {
  57. return err
  58. }
  59. if toVolumeInfo.ReadOnly {
  60. return fmt.Errorf("volume is readonly: %d", toVolumeId)
  61. }
  62. if fromVolumeId != 0 {
  63. if fromVolumeId == toVolumeId {
  64. return fmt.Errorf("no volume id changes, %d == %d", fromVolumeId, toVolumeId)
  65. }
  66. compatible, err := c.volumesAreCompatible(fromVolumeId, toVolumeId)
  67. if err != nil {
  68. return fmt.Errorf("cannot determine volumes are compatible: %d and %d", fromVolumeId, toVolumeId)
  69. }
  70. if !compatible {
  71. return fmt.Errorf("volume %d is not compatible with volume %d", fromVolumeId, toVolumeId)
  72. }
  73. }
  74. defer client.CloseIdleConnections()
  75. compatibility := make(map[string]bool)
  76. return commandEnv.WithFilerClient(false, func(filerClient filer_pb.SeaweedFilerClient) error {
  77. return filer_pb.TraverseBfs(commandEnv, util.FullPath(dir), func(parentPath util.FullPath, entry *filer_pb.Entry) {
  78. if !entry.IsDirectory {
  79. for _, chunk := range entry.Chunks {
  80. if chunk.IsChunkManifest {
  81. fmt.Printf("Change volume id for large file is not implemented yet: %s/%s\n", parentPath, entry.Name)
  82. continue
  83. }
  84. chunkVolumeId := needle.VolumeId(chunk.Fid.VolumeId)
  85. if chunkVolumeId == toVolumeId || (fromVolumeId != 0 && fromVolumeId != chunkVolumeId) {
  86. continue
  87. }
  88. cacheKey := fmt.Sprintf("%d-%d", chunkVolumeId, toVolumeId)
  89. compatible, cached := compatibility[cacheKey]
  90. if !cached {
  91. compatible, err = c.volumesAreCompatible(chunkVolumeId, toVolumeId)
  92. if err != nil {
  93. _ = fmt.Errorf("cannot determine volumes are compatible: %d and %d", chunkVolumeId, toVolumeId)
  94. return
  95. }
  96. compatibility[cacheKey] = compatible
  97. }
  98. if !compatible {
  99. if fromVolumeId != 0 {
  100. _ = fmt.Errorf("volumes are incompatible: %d and %d", fromVolumeId, toVolumeId)
  101. return
  102. }
  103. continue
  104. }
  105. path := parentPath.Child(entry.Name)
  106. fmt.Printf("move %s(%s)\n", path, chunk.GetFileIdString())
  107. if !*apply {
  108. continue
  109. }
  110. if err = moveChunk(chunk, toVolumeId, commandEnv.MasterClient); err != nil {
  111. fmt.Printf("failed to move %s/%s: %v\n", path, chunk.GetFileIdString(), err)
  112. continue
  113. }
  114. if err = filer_pb.UpdateEntry(filerClient, &filer_pb.UpdateEntryRequest{
  115. Directory: string(parentPath),
  116. Entry: entry,
  117. }); err != nil {
  118. fmt.Printf("failed to update %s: %v\n", path, err)
  119. }
  120. }
  121. }
  122. })
  123. })
  124. }
  125. func (c *commandFsMergeVolumes) getVolumeInfoById(vid needle.VolumeId) (*master_pb.VolumeInformationMessage, error) {
  126. info := c.volumes[vid]
  127. var err error
  128. if info == nil {
  129. err = errors.New("cannot find volume")
  130. }
  131. return info, err
  132. }
  133. func (c *commandFsMergeVolumes) volumesAreCompatible(src needle.VolumeId, dest needle.VolumeId) (bool, error) {
  134. srcInfo, err := c.getVolumeInfoById(src)
  135. if err != nil {
  136. return false, err
  137. }
  138. destInfo, err := c.getVolumeInfoById(dest)
  139. if err != nil {
  140. return false, err
  141. }
  142. return (srcInfo.Collection == destInfo.Collection &&
  143. srcInfo.Ttl == destInfo.Ttl &&
  144. srcInfo.ReplicaPlacement == destInfo.ReplicaPlacement), nil
  145. }
  146. func (c *commandFsMergeVolumes) reloadVolumesInfo(masterClient *wdclient.MasterClient) error {
  147. c.volumes = make(map[needle.VolumeId]*master_pb.VolumeInformationMessage)
  148. return masterClient.WithClient(false, func(client master_pb.SeaweedClient) error {
  149. volumes, err := client.VolumeList(context.Background(), &master_pb.VolumeListRequest{})
  150. if err != nil {
  151. return err
  152. }
  153. for _, dc := range volumes.TopologyInfo.DataCenterInfos {
  154. for _, rack := range dc.RackInfos {
  155. for _, node := range rack.DataNodeInfos {
  156. for _, disk := range node.DiskInfos {
  157. for _, volume := range disk.VolumeInfos {
  158. vid := needle.VolumeId(volume.Id)
  159. if found := c.volumes[vid]; found == nil {
  160. c.volumes[vid] = volume
  161. }
  162. }
  163. }
  164. }
  165. }
  166. }
  167. return nil
  168. })
  169. }
  170. func moveChunk(chunk *filer_pb.FileChunk, toVolumeId needle.VolumeId, masterClient *wdclient.MasterClient) error {
  171. fromFid := needle.NewFileId(needle.VolumeId(chunk.Fid.VolumeId), chunk.Fid.FileKey, chunk.Fid.Cookie)
  172. toFid := needle.NewFileId(toVolumeId, chunk.Fid.FileKey, chunk.Fid.Cookie)
  173. downloadURLs, err := masterClient.LookupVolumeServerUrl(fromFid.VolumeId.String())
  174. if err != nil {
  175. return err
  176. }
  177. downloadURL := fmt.Sprintf("http://%s/%s", downloadURLs[0], fromFid.String())
  178. uploadURLs, err := masterClient.LookupVolumeServerUrl(toVolumeId.String())
  179. if err != nil {
  180. return err
  181. }
  182. uploadURL := fmt.Sprintf("http://%s/%s", uploadURLs[0], toFid.String())
  183. resp, reader, err := readUrl(downloadURL)
  184. if err != nil {
  185. return err
  186. }
  187. defer util.CloseResponse(resp)
  188. defer reader.Close()
  189. var filename string
  190. contentDisposition := resp.Header.Get("Content-Disposition")
  191. if len(contentDisposition) > 0 {
  192. idx := strings.Index(contentDisposition, "filename=")
  193. if idx != -1 {
  194. filename = contentDisposition[idx+len("filename="):]
  195. filename = strings.Trim(filename, "\"")
  196. }
  197. }
  198. contentType := resp.Header.Get("Content-Type")
  199. isCompressed := resp.Header.Get("Content-Encoding") == "gzip"
  200. md5 := resp.Header.Get("Content-MD5")
  201. _, err, _ = operation.Upload(reader, &operation.UploadOption{
  202. UploadUrl: uploadURL,
  203. Filename: filename,
  204. IsInputCompressed: isCompressed,
  205. Cipher: false,
  206. MimeType: contentType,
  207. PairMap: nil,
  208. Md5: md5,
  209. })
  210. if err != nil {
  211. return err
  212. }
  213. chunk.Fid.VolumeId = uint32(toVolumeId)
  214. chunk.FileId = ""
  215. return nil
  216. }
  217. func readUrl(fileUrl string) (*http.Response, io.ReadCloser, error) {
  218. req, err := http.NewRequest("GET", fileUrl, nil)
  219. if err != nil {
  220. return nil, nil, err
  221. }
  222. req.Header.Add("Accept-Encoding", "gzip")
  223. r, err := client.Do(req)
  224. if err != nil {
  225. return nil, nil, err
  226. }
  227. if r.StatusCode >= 400 {
  228. util.CloseResponse(r)
  229. return nil, nil, fmt.Errorf("%s: %s", fileUrl, r.Status)
  230. }
  231. return r, r.Body, nil
  232. }