You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

263 lines
7.0 KiB

  1. package shell
  2. import (
  3. "context"
  4. "fmt"
  5. "io"
  6. "io/ioutil"
  7. "math"
  8. "os"
  9. "path/filepath"
  10. "github.com/chrislusf/seaweedfs/weed/operation"
  11. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  12. "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
  13. "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
  14. "github.com/chrislusf/seaweedfs/weed/storage/needle_map"
  15. "github.com/chrislusf/seaweedfs/weed/storage/types"
  16. "github.com/chrislusf/seaweedfs/weed/util"
  17. )
  18. func init() {
  19. Commands = append(Commands, &commandVolumeFsck{})
  20. }
  21. type commandVolumeFsck struct {
  22. env *CommandEnv
  23. }
  24. func (c *commandVolumeFsck) Name() string {
  25. return "volume.fsck"
  26. }
  27. func (c *commandVolumeFsck) Help() string {
  28. return `check all volumes to find entries not used by the filer
  29. Important assumption!!!
  30. the system is all used by one filer.
  31. This command works this way:
  32. 1. collect all file ids from all volumes, as set A
  33. 2. collect all file ids from the filer, as set B
  34. 3. find out the set A subtract B
  35. `
  36. }
  37. func (c *commandVolumeFsck) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
  38. c.env = commandEnv
  39. // collect all volume id locations
  40. volumeIdToServer, err := c.collectVolumeIds()
  41. if err != nil {
  42. return fmt.Errorf("failed to collect all volume locations: %v", err)
  43. }
  44. // create a temp folder
  45. tempFolder, err := ioutil.TempDir("", "sw_fsck")
  46. if err != nil {
  47. return fmt.Errorf("failed to create temp folder: %v", err)
  48. }
  49. fmt.Fprintf(writer, "working directory: %s\n", tempFolder)
  50. // collect each volume file ids
  51. for volumeId, vinfo := range volumeIdToServer {
  52. err = c.collectOneVolumeFileIds(tempFolder, volumeId, vinfo)
  53. if err != nil {
  54. return fmt.Errorf("failed to collect file ids from volume %d on %s: %v", volumeId, vinfo.server, err)
  55. }
  56. }
  57. // collect all filer file ids
  58. if err = c.collectFilerFileIds(tempFolder, volumeIdToServer); err != nil {
  59. return fmt.Errorf("failed to collect file ids from filer: %v", err)
  60. }
  61. // volume file ids substract filer file ids
  62. var totalOrphanChunkCount, totalOrphanDataSize uint64
  63. for volumeId, server := range volumeIdToServer {
  64. orphanChunkCount, orphanDataSize, checkErr := c.oneVolumeFileIdsSubtractFilerFileIds(tempFolder, volumeId)
  65. if checkErr != nil {
  66. return fmt.Errorf("failed to collect file ids from volume %d on %s: %v", volumeId, server, checkErr)
  67. }
  68. totalOrphanChunkCount += orphanChunkCount
  69. totalOrphanDataSize += orphanDataSize
  70. }
  71. if totalOrphanChunkCount > 0 {
  72. fmt.Fprintf(writer, "total %d orphan chunks, %d bytes\n", totalOrphanChunkCount, totalOrphanDataSize)
  73. } else {
  74. fmt.Fprintf(writer, "no orphan data\n")
  75. }
  76. return nil
  77. }
  78. func (c *commandVolumeFsck) collectOneVolumeFileIds(tempFolder string, volumeId uint32, vinfo VInfo) error {
  79. return operation.WithVolumeServerClient(vinfo.server, c.env.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
  80. copyFileClient, err := volumeServerClient.CopyFile(context.Background(), &volume_server_pb.CopyFileRequest{
  81. VolumeId: volumeId,
  82. Ext: ".idx",
  83. CompactionRevision: math.MaxUint32,
  84. StopOffset: math.MaxInt64,
  85. Collection: vinfo.collection,
  86. IsEcVolume: vinfo.isEcVolume,
  87. IgnoreSourceFileNotFound: false,
  88. })
  89. if err != nil {
  90. return fmt.Errorf("failed to start copying volume %d.idx: %v", volumeId, err)
  91. }
  92. err = writeToFile(copyFileClient, getVolumeFileIdFile(tempFolder, volumeId))
  93. if err != nil {
  94. return fmt.Errorf("failed to copy %s.idx from %s: %v", volumeId, vinfo.server, err)
  95. }
  96. return nil
  97. })
  98. }
  99. func (c *commandVolumeFsck) collectFilerFileIds(tempFolder string, volumeIdToServer map[uint32]VInfo) error {
  100. files := make(map[uint32]*os.File)
  101. for vid := range volumeIdToServer {
  102. dst, openErr := os.OpenFile(getFilerFileIdFile(tempFolder, vid), os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644)
  103. if openErr != nil {
  104. return fmt.Errorf("failed to create file %s: %v", getFilerFileIdFile(tempFolder, vid), openErr)
  105. }
  106. files[vid] = dst
  107. }
  108. defer func() {
  109. for _, f := range files {
  110. f.Close()
  111. }
  112. }()
  113. type Item struct {
  114. vid uint32
  115. fileKey uint64
  116. }
  117. return doTraverseBfsAndSaving(c.env, nil, "/", false, func(outputChan chan interface{}) {
  118. buffer := make([]byte, 8)
  119. for item := range outputChan {
  120. i := item.(*Item)
  121. util.Uint64toBytes(buffer, i.fileKey)
  122. files[i.vid].Write(buffer)
  123. }
  124. }, func(entry *filer_pb.FullEntry, outputChan chan interface{}) (err error) {
  125. for _, chunk := range entry.Entry.Chunks {
  126. outputChan <- &Item{
  127. vid: chunk.Fid.VolumeId,
  128. fileKey: chunk.Fid.FileKey,
  129. }
  130. }
  131. return nil
  132. })
  133. }
  134. func (c *commandVolumeFsck) oneVolumeFileIdsSubtractFilerFileIds(tempFolder string, volumeId uint32) (orphanChunkCount, orphanDataSize uint64, err error) {
  135. db := needle_map.NewMemDb()
  136. defer db.Close()
  137. if err = db.LoadFromIdx(getVolumeFileIdFile(tempFolder, volumeId)); err != nil {
  138. return
  139. }
  140. filerFileIdsData, err := ioutil.ReadFile(getFilerFileIdFile(tempFolder, volumeId))
  141. if err != nil {
  142. return
  143. }
  144. dataLen := len(filerFileIdsData)
  145. if dataLen%8 != 0 {
  146. return 0, 0, fmt.Errorf("filer data is corrupted")
  147. }
  148. for i := 0; i < len(filerFileIdsData); i += 8 {
  149. fileKey := util.BytesToUint64(filerFileIdsData[i : i+8])
  150. db.Delete(types.NeedleId(fileKey))
  151. }
  152. db.AscendingVisit(func(n needle_map.NeedleValue) error {
  153. fmt.Printf("%d,%x\n", volumeId, n.Key)
  154. orphanChunkCount++
  155. orphanDataSize += uint64(n.Size)
  156. return nil
  157. })
  158. return
  159. }
  160. type VInfo struct {
  161. server string
  162. collection string
  163. isEcVolume bool
  164. }
  165. func (c *commandVolumeFsck) collectVolumeIds() (volumeIdToServer map[uint32]VInfo, err error) {
  166. volumeIdToServer = make(map[uint32]VInfo)
  167. var resp *master_pb.VolumeListResponse
  168. err = c.env.MasterClient.WithClient(func(client master_pb.SeaweedClient) error {
  169. resp, err = client.VolumeList(context.Background(), &master_pb.VolumeListRequest{})
  170. return err
  171. })
  172. if err != nil {
  173. return
  174. }
  175. eachDataNode(resp.TopologyInfo, func(dc string, rack RackId, t *master_pb.DataNodeInfo) {
  176. for _, vi := range t.VolumeInfos {
  177. volumeIdToServer[vi.Id] = VInfo{
  178. server: t.Id,
  179. collection: vi.Collection,
  180. isEcVolume: false,
  181. }
  182. }
  183. for _, ecShardInfo := range t.EcShardInfos {
  184. volumeIdToServer[ecShardInfo.Id] = VInfo{
  185. server: t.Id,
  186. collection: ecShardInfo.Collection,
  187. isEcVolume: true,
  188. }
  189. }
  190. })
  191. return
  192. }
  193. func getVolumeFileIdFile(tempFolder string, vid uint32) string {
  194. return filepath.Join(tempFolder, fmt.Sprintf("%d.idx", vid))
  195. }
  196. func getFilerFileIdFile(tempFolder string, vid uint32) string {
  197. return filepath.Join(tempFolder, fmt.Sprintf("%d.fid", vid))
  198. }
  199. func writeToFile(client volume_server_pb.VolumeServer_CopyFileClient, fileName string) error {
  200. flags := os.O_WRONLY | os.O_CREATE | os.O_TRUNC
  201. dst, err := os.OpenFile(fileName, flags, 0644)
  202. if err != nil {
  203. return nil
  204. }
  205. defer dst.Close()
  206. for {
  207. resp, receiveErr := client.Recv()
  208. if receiveErr == io.EOF {
  209. break
  210. }
  211. if receiveErr != nil {
  212. return fmt.Errorf("receiving %s: %v", fileName, receiveErr)
  213. }
  214. dst.Write(resp.FileContent)
  215. }
  216. return nil
  217. }