You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

271 lines
7.4 KiB

5 years ago
  1. package shell
  2. import (
  3. "context"
  4. "fmt"
  5. "io"
  6. "io/ioutil"
  7. "math"
  8. "os"
  9. "path/filepath"
  10. "github.com/chrislusf/seaweedfs/weed/operation"
  11. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  12. "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
  13. "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
  14. "github.com/chrislusf/seaweedfs/weed/storage/needle_map"
  15. "github.com/chrislusf/seaweedfs/weed/storage/types"
  16. "github.com/chrislusf/seaweedfs/weed/util"
  17. )
  18. func init() {
  19. Commands = append(Commands, &commandVolumeFsck{})
  20. }
  21. type commandVolumeFsck struct {
  22. env *CommandEnv
  23. }
  24. func (c *commandVolumeFsck) Name() string {
  25. return "volume.fsck"
  26. }
  27. func (c *commandVolumeFsck) Help() string {
  28. return `check all volumes to find entries not used by the filer
  29. Important assumption!!!
  30. the system is all used by one filer.
  31. This command works this way:
  32. 1. collect all file ids from all volumes, as set A
  33. 2. collect all file ids from the filer, as set B
  34. 3. find out the set A subtract B
  35. `
  36. }
  37. func (c *commandVolumeFsck) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
  38. c.env = commandEnv
  39. // collect all volume id locations
  40. volumeIdToServer, err := c.collectVolumeIds()
  41. if err != nil {
  42. return fmt.Errorf("failed to collect all volume locations: %v", err)
  43. }
  44. // create a temp folder
  45. tempFolder, err := ioutil.TempDir("", "sw_fsck")
  46. if err != nil {
  47. return fmt.Errorf("failed to create temp folder: %v", err)
  48. }
  49. // fmt.Fprintf(writer, "working directory: %s\n", tempFolder)
  50. // collect each volume file ids
  51. for volumeId, vinfo := range volumeIdToServer {
  52. err = c.collectOneVolumeFileIds(tempFolder, volumeId, vinfo)
  53. if err != nil {
  54. return fmt.Errorf("failed to collect file ids from volume %d on %s: %v", volumeId, vinfo.server, err)
  55. }
  56. }
  57. // collect all filer file ids
  58. if err = c.collectFilerFileIds(tempFolder, volumeIdToServer); err != nil {
  59. return fmt.Errorf("failed to collect file ids from filer: %v", err)
  60. }
  61. // volume file ids substract filer file ids
  62. var totalOrphanChunkCount, totalOrphanDataSize uint64
  63. for volumeId, server := range volumeIdToServer {
  64. orphanChunkCount, orphanDataSize, checkErr := c.oneVolumeFileIdsSubtractFilerFileIds(tempFolder, volumeId, writer)
  65. if checkErr != nil {
  66. return fmt.Errorf("failed to collect file ids from volume %d on %s: %v", volumeId, server, checkErr)
  67. }
  68. totalOrphanChunkCount += orphanChunkCount
  69. totalOrphanDataSize += orphanDataSize
  70. }
  71. if totalOrphanChunkCount > 0 {
  72. fmt.Fprintf(writer, "\ntotal\t%d orphan entries\t%d bytes not used by filer http://%s:%d/\n",
  73. totalOrphanChunkCount, totalOrphanDataSize, c.env.option.FilerHost, c.env.option.FilerPort)
  74. fmt.Fprintf(writer, "This could be normal if multiple filers or no filers are used.\n")
  75. } else {
  76. fmt.Fprintf(writer, "no orphan data\n")
  77. }
  78. os.RemoveAll(tempFolder)
  79. return nil
  80. }
  81. func (c *commandVolumeFsck) collectOneVolumeFileIds(tempFolder string, volumeId uint32, vinfo VInfo) error {
  82. return operation.WithVolumeServerClient(vinfo.server, c.env.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
  83. copyFileClient, err := volumeServerClient.CopyFile(context.Background(), &volume_server_pb.CopyFileRequest{
  84. VolumeId: volumeId,
  85. Ext: ".idx",
  86. CompactionRevision: math.MaxUint32,
  87. StopOffset: math.MaxInt64,
  88. Collection: vinfo.collection,
  89. IsEcVolume: vinfo.isEcVolume,
  90. IgnoreSourceFileNotFound: false,
  91. })
  92. if err != nil {
  93. return fmt.Errorf("failed to start copying volume %d.idx: %v", volumeId, err)
  94. }
  95. err = writeToFile(copyFileClient, getVolumeFileIdFile(tempFolder, volumeId))
  96. if err != nil {
  97. return fmt.Errorf("failed to copy %s.idx from %s: %v", volumeId, vinfo.server, err)
  98. }
  99. return nil
  100. })
  101. }
  102. func (c *commandVolumeFsck) collectFilerFileIds(tempFolder string, volumeIdToServer map[uint32]VInfo) error {
  103. files := make(map[uint32]*os.File)
  104. for vid := range volumeIdToServer {
  105. dst, openErr := os.OpenFile(getFilerFileIdFile(tempFolder, vid), os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644)
  106. if openErr != nil {
  107. return fmt.Errorf("failed to create file %s: %v", getFilerFileIdFile(tempFolder, vid), openErr)
  108. }
  109. files[vid] = dst
  110. }
  111. defer func() {
  112. for _, f := range files {
  113. f.Close()
  114. }
  115. }()
  116. type Item struct {
  117. vid uint32
  118. fileKey uint64
  119. }
  120. return doTraverseBfsAndSaving(c.env, nil, "/", false, func(outputChan chan interface{}) {
  121. buffer := make([]byte, 8)
  122. for item := range outputChan {
  123. i := item.(*Item)
  124. util.Uint64toBytes(buffer, i.fileKey)
  125. files[i.vid].Write(buffer)
  126. }
  127. }, func(entry *filer_pb.FullEntry, outputChan chan interface{}) (err error) {
  128. for _, chunk := range entry.Entry.Chunks {
  129. outputChan <- &Item{
  130. vid: chunk.Fid.VolumeId,
  131. fileKey: chunk.Fid.FileKey,
  132. }
  133. }
  134. return nil
  135. })
  136. }
  137. func (c *commandVolumeFsck) oneVolumeFileIdsSubtractFilerFileIds(tempFolder string, volumeId uint32, writer io.Writer) (orphanChunkCount, orphanDataSize uint64, err error) {
  138. db := needle_map.NewMemDb()
  139. defer db.Close()
  140. if err = db.LoadFromIdx(getVolumeFileIdFile(tempFolder, volumeId)); err != nil {
  141. return
  142. }
  143. filerFileIdsData, err := ioutil.ReadFile(getFilerFileIdFile(tempFolder, volumeId))
  144. if err != nil {
  145. return
  146. }
  147. dataLen := len(filerFileIdsData)
  148. if dataLen%8 != 0 {
  149. return 0, 0, fmt.Errorf("filer data is corrupted")
  150. }
  151. for i := 0; i < len(filerFileIdsData); i += 8 {
  152. fileKey := util.BytesToUint64(filerFileIdsData[i : i+8])
  153. db.Delete(types.NeedleId(fileKey))
  154. }
  155. db.AscendingVisit(func(n needle_map.NeedleValue) error {
  156. // fmt.Printf("%d,%x\n", volumeId, n.Key)
  157. orphanChunkCount++
  158. orphanDataSize += uint64(n.Size)
  159. return nil
  160. })
  161. if orphanChunkCount > 0 {
  162. fmt.Fprintf(writer, "volume %d\t%d orphan entries\t%d bytes\n", volumeId, orphanChunkCount, orphanDataSize)
  163. }
  164. return
  165. }
  166. type VInfo struct {
  167. server string
  168. collection string
  169. isEcVolume bool
  170. }
  171. func (c *commandVolumeFsck) collectVolumeIds() (volumeIdToServer map[uint32]VInfo, err error) {
  172. volumeIdToServer = make(map[uint32]VInfo)
  173. var resp *master_pb.VolumeListResponse
  174. err = c.env.MasterClient.WithClient(func(client master_pb.SeaweedClient) error {
  175. resp, err = client.VolumeList(context.Background(), &master_pb.VolumeListRequest{})
  176. return err
  177. })
  178. if err != nil {
  179. return
  180. }
  181. eachDataNode(resp.TopologyInfo, func(dc string, rack RackId, t *master_pb.DataNodeInfo) {
  182. for _, vi := range t.VolumeInfos {
  183. volumeIdToServer[vi.Id] = VInfo{
  184. server: t.Id,
  185. collection: vi.Collection,
  186. isEcVolume: false,
  187. }
  188. }
  189. for _, ecShardInfo := range t.EcShardInfos {
  190. volumeIdToServer[ecShardInfo.Id] = VInfo{
  191. server: t.Id,
  192. collection: ecShardInfo.Collection,
  193. isEcVolume: true,
  194. }
  195. }
  196. })
  197. return
  198. }
  199. func getVolumeFileIdFile(tempFolder string, vid uint32) string {
  200. return filepath.Join(tempFolder, fmt.Sprintf("%d.idx", vid))
  201. }
  202. func getFilerFileIdFile(tempFolder string, vid uint32) string {
  203. return filepath.Join(tempFolder, fmt.Sprintf("%d.fid", vid))
  204. }
  205. func writeToFile(client volume_server_pb.VolumeServer_CopyFileClient, fileName string) error {
  206. flags := os.O_WRONLY | os.O_CREATE | os.O_TRUNC
  207. dst, err := os.OpenFile(fileName, flags, 0644)
  208. if err != nil {
  209. return nil
  210. }
  211. defer dst.Close()
  212. for {
  213. resp, receiveErr := client.Recv()
  214. if receiveErr == io.EOF {
  215. break
  216. }
  217. if receiveErr != nil {
  218. return fmt.Errorf("receiving %s: %v", fileName, receiveErr)
  219. }
  220. dst.Write(resp.FileContent)
  221. }
  222. return nil
  223. }