You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

357 lines
10 KiB

5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
  1. package shell
  2. import (
  3. "context"
  4. "flag"
  5. "fmt"
  6. "io"
  7. "io/ioutil"
  8. "math"
  9. "os"
  10. "path/filepath"
  11. "sync"
  12. "github.com/chrislusf/seaweedfs/weed/operation"
  13. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  14. "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
  15. "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
  16. "github.com/chrislusf/seaweedfs/weed/storage/needle_map"
  17. "github.com/chrislusf/seaweedfs/weed/storage/types"
  18. "github.com/chrislusf/seaweedfs/weed/util"
  19. )
  20. func init() {
  21. Commands = append(Commands, &commandVolumeFsck{})
  22. }
  23. type commandVolumeFsck struct {
  24. env *CommandEnv
  25. }
  26. func (c *commandVolumeFsck) Name() string {
  27. return "volume.fsck"
  28. }
  29. func (c *commandVolumeFsck) Help() string {
  30. return `check all volumes to find entries not used by the filer
  31. Important assumption!!!
  32. the system is all used by one filer.
  33. This command works this way:
  34. 1. collect all file ids from all volumes, as set A
  35. 2. collect all file ids from the filer, as set B
  36. 3. find out the set A subtract B
  37. `
  38. }
  39. func (c *commandVolumeFsck) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
  40. fsckCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
  41. verbose := fsckCommand.Bool("v", false, "verbose mode")
  42. applyPurging := fsckCommand.Bool("reallyDeleteFromVolume", false, "<expert only> delete data not referenced by the filer")
  43. if err = fsckCommand.Parse(args); err != nil {
  44. return nil
  45. }
  46. c.env = commandEnv
  47. // create a temp folder
  48. tempFolder, err := ioutil.TempDir("", "sw_fsck")
  49. if err != nil {
  50. return fmt.Errorf("failed to create temp folder: %v", err)
  51. }
  52. if *verbose {
  53. fmt.Fprintf(writer, "working directory: %s\n", tempFolder)
  54. }
  55. defer os.RemoveAll(tempFolder)
  56. // collect all volume id locations
  57. volumeIdToVInfo, err := c.collectVolumeIds(*verbose, writer)
  58. if err != nil {
  59. return fmt.Errorf("failed to collect all volume locations: %v", err)
  60. }
  61. // collect each volume file ids
  62. for volumeId, vinfo := range volumeIdToVInfo {
  63. err = c.collectOneVolumeFileIds(tempFolder, volumeId, vinfo, *verbose, writer)
  64. if err != nil {
  65. return fmt.Errorf("failed to collect file ids from volume %d on %s: %v", volumeId, vinfo.server, err)
  66. }
  67. }
  68. // collect all filer file ids
  69. if err = c.collectFilerFileIds(tempFolder, volumeIdToVInfo, *verbose, writer); err != nil {
  70. return fmt.Errorf("failed to collect file ids from filer: %v", err)
  71. }
  72. // volume file ids substract filer file ids
  73. var totalInUseCount, totalOrphanChunkCount, totalOrphanDataSize uint64
  74. for volumeId, vinfo := range volumeIdToVInfo {
  75. inUseCount, orphanFileIds, orphanDataSize, checkErr := c.oneVolumeFileIdsSubtractFilerFileIds(tempFolder, volumeId, writer, *verbose)
  76. if checkErr != nil {
  77. return fmt.Errorf("failed to collect file ids from volume %d on %s: %v", volumeId, vinfo.server, checkErr)
  78. }
  79. totalInUseCount += inUseCount
  80. totalOrphanChunkCount += uint64(len(orphanFileIds))
  81. totalOrphanDataSize += orphanDataSize
  82. if *applyPurging && len(orphanFileIds) > 0 {
  83. if vinfo.isEcVolume {
  84. fmt.Fprintf(writer, "Skip purging for Erasure Coded volumes.\n")
  85. }
  86. if err = c.purgeFileIdsForOneVolume(volumeId, orphanFileIds, writer); err != nil {
  87. return fmt.Errorf("purge for volume %d: %v\n", volumeId, err)
  88. }
  89. }
  90. }
  91. if totalOrphanChunkCount == 0 {
  92. fmt.Fprintf(writer, "no orphan data\n")
  93. return nil
  94. }
  95. if !*applyPurging {
  96. pct := float64(totalOrphanChunkCount*100) / (float64(totalOrphanChunkCount + totalInUseCount))
  97. fmt.Fprintf(writer, "\nTotal\t\tentries:%d\torphan:%d\t%.2f%%\t%dB\n",
  98. totalOrphanChunkCount+totalInUseCount, totalOrphanChunkCount, pct, totalOrphanDataSize)
  99. fmt.Fprintf(writer, "This could be normal if multiple filers or no filers are used.\n")
  100. }
  101. return nil
  102. }
  103. func (c *commandVolumeFsck) collectOneVolumeFileIds(tempFolder string, volumeId uint32, vinfo VInfo, verbose bool, writer io.Writer) error {
  104. if verbose {
  105. fmt.Fprintf(writer, "collecting volume %d file ids from %s ...\n", volumeId, vinfo.server)
  106. }
  107. return operation.WithVolumeServerClient(vinfo.server, c.env.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
  108. ext := ".idx"
  109. if vinfo.isEcVolume {
  110. ext = ".ecx"
  111. }
  112. copyFileClient, err := volumeServerClient.CopyFile(context.Background(), &volume_server_pb.CopyFileRequest{
  113. VolumeId: volumeId,
  114. Ext: ext,
  115. CompactionRevision: math.MaxUint32,
  116. StopOffset: math.MaxInt64,
  117. Collection: vinfo.collection,
  118. IsEcVolume: vinfo.isEcVolume,
  119. IgnoreSourceFileNotFound: false,
  120. })
  121. if err != nil {
  122. return fmt.Errorf("failed to start copying volume %d.idx: %v", volumeId, err)
  123. }
  124. err = writeToFile(copyFileClient, getVolumeFileIdFile(tempFolder, volumeId))
  125. if err != nil {
  126. return fmt.Errorf("failed to copy %d.idx from %s: %v", volumeId, vinfo.server, err)
  127. }
  128. return nil
  129. })
  130. }
  131. func (c *commandVolumeFsck) collectFilerFileIds(tempFolder string, volumeIdToServer map[uint32]VInfo, verbose bool, writer io.Writer) error {
  132. if verbose {
  133. fmt.Fprintf(writer, "collecting file ids from filer ...\n")
  134. }
  135. files := make(map[uint32]*os.File)
  136. for vid := range volumeIdToServer {
  137. dst, openErr := os.OpenFile(getFilerFileIdFile(tempFolder, vid), os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644)
  138. if openErr != nil {
  139. return fmt.Errorf("failed to create file %s: %v", getFilerFileIdFile(tempFolder, vid), openErr)
  140. }
  141. files[vid] = dst
  142. }
  143. defer func() {
  144. for _, f := range files {
  145. f.Close()
  146. }
  147. }()
  148. type Item struct {
  149. vid uint32
  150. fileKey uint64
  151. }
  152. return doTraverseBfsAndSaving(c.env, nil, "/", false, func(outputChan chan interface{}) {
  153. buffer := make([]byte, 8)
  154. for item := range outputChan {
  155. i := item.(*Item)
  156. util.Uint64toBytes(buffer, i.fileKey)
  157. files[i.vid].Write(buffer)
  158. }
  159. }, func(entry *filer_pb.FullEntry, outputChan chan interface{}) (err error) {
  160. for _, chunk := range entry.Entry.Chunks {
  161. outputChan <- &Item{
  162. vid: chunk.Fid.VolumeId,
  163. fileKey: chunk.Fid.FileKey,
  164. }
  165. }
  166. return nil
  167. })
  168. }
  169. func (c *commandVolumeFsck) oneVolumeFileIdsSubtractFilerFileIds(tempFolder string, volumeId uint32, writer io.Writer, verbose bool) (inUseCount uint64, orphanFileIds []string, orphanDataSize uint64, err error) {
  170. db := needle_map.NewMemDb()
  171. defer db.Close()
  172. if err = db.LoadFromIdx(getVolumeFileIdFile(tempFolder, volumeId)); err != nil {
  173. return
  174. }
  175. filerFileIdsData, err := ioutil.ReadFile(getFilerFileIdFile(tempFolder, volumeId))
  176. if err != nil {
  177. return
  178. }
  179. dataLen := len(filerFileIdsData)
  180. if dataLen%8 != 0 {
  181. return 0, nil, 0, fmt.Errorf("filer data is corrupted")
  182. }
  183. for i := 0; i < len(filerFileIdsData); i += 8 {
  184. fileKey := util.BytesToUint64(filerFileIdsData[i : i+8])
  185. db.Delete(types.NeedleId(fileKey))
  186. inUseCount++
  187. }
  188. var orphanFileCount uint64
  189. db.AscendingVisit(func(n needle_map.NeedleValue) error {
  190. // fmt.Printf("%d,%x\n", volumeId, n.Key)
  191. orphanFileIds = append(orphanFileIds, fmt.Sprintf("%d,%s", volumeId, n.Key.String()))
  192. orphanFileCount++
  193. orphanDataSize += uint64(n.Size)
  194. return nil
  195. })
  196. if orphanFileCount > 0 {
  197. pct := float64(orphanFileCount*100) / (float64(orphanFileCount + inUseCount))
  198. fmt.Fprintf(writer, "volume:%d\tentries:%d\torphan:%d\t%.2f%%\t%dB\n",
  199. volumeId, orphanFileCount+inUseCount, orphanFileCount, pct, orphanDataSize)
  200. }
  201. return
  202. }
  203. type VInfo struct {
  204. server string
  205. collection string
  206. isEcVolume bool
  207. }
  208. func (c *commandVolumeFsck) collectVolumeIds(verbose bool, writer io.Writer) (volumeIdToServer map[uint32]VInfo, err error) {
  209. if verbose {
  210. fmt.Fprintf(writer, "collecting volume id and locations from master ...\n")
  211. }
  212. volumeIdToServer = make(map[uint32]VInfo)
  213. var resp *master_pb.VolumeListResponse
  214. err = c.env.MasterClient.WithClient(func(client master_pb.SeaweedClient) error {
  215. resp, err = client.VolumeList(context.Background(), &master_pb.VolumeListRequest{})
  216. return err
  217. })
  218. if err != nil {
  219. return
  220. }
  221. eachDataNode(resp.TopologyInfo, func(dc string, rack RackId, t *master_pb.DataNodeInfo) {
  222. for _, vi := range t.VolumeInfos {
  223. volumeIdToServer[vi.Id] = VInfo{
  224. server: t.Id,
  225. collection: vi.Collection,
  226. isEcVolume: false,
  227. }
  228. }
  229. for _, ecShardInfo := range t.EcShardInfos {
  230. volumeIdToServer[ecShardInfo.Id] = VInfo{
  231. server: t.Id,
  232. collection: ecShardInfo.Collection,
  233. isEcVolume: true,
  234. }
  235. }
  236. })
  237. if verbose {
  238. fmt.Fprintf(writer, "collected %d volumes and locations.\n", len(volumeIdToServer))
  239. }
  240. return
  241. }
  242. func (c *commandVolumeFsck) purgeFileIdsForOneVolume(volumeId uint32, fileIds []string, writer io.Writer) (err error) {
  243. fmt.Fprintf(writer, "purging orphan data for volume %d...\n", volumeId)
  244. locations, found := c.env.MasterClient.GetLocations(volumeId)
  245. if !found {
  246. return fmt.Errorf("failed to find volume %d locations", volumeId)
  247. }
  248. resultChan := make(chan []*volume_server_pb.DeleteResult, len(locations))
  249. var wg sync.WaitGroup
  250. for _, location := range locations {
  251. wg.Add(1)
  252. go func(server string, fidList []string) {
  253. defer wg.Done()
  254. if deleteResults, deleteErr := operation.DeleteFilesAtOneVolumeServer(server, c.env.option.GrpcDialOption, fidList, false); deleteErr != nil {
  255. err = deleteErr
  256. } else if deleteResults != nil {
  257. resultChan <- deleteResults
  258. }
  259. }(location.Url, fileIds)
  260. }
  261. wg.Wait()
  262. close(resultChan)
  263. for results := range resultChan {
  264. for _, result := range results {
  265. if result.Error != "" {
  266. fmt.Fprintf(writer, "purge error: %s\n", result.Error)
  267. }
  268. }
  269. }
  270. return
  271. }
  272. func getVolumeFileIdFile(tempFolder string, vid uint32) string {
  273. return filepath.Join(tempFolder, fmt.Sprintf("%d.idx", vid))
  274. }
  275. func getFilerFileIdFile(tempFolder string, vid uint32) string {
  276. return filepath.Join(tempFolder, fmt.Sprintf("%d.fid", vid))
  277. }
  278. func writeToFile(client volume_server_pb.VolumeServer_CopyFileClient, fileName string) error {
  279. flags := os.O_WRONLY | os.O_CREATE | os.O_TRUNC
  280. dst, err := os.OpenFile(fileName, flags, 0644)
  281. if err != nil {
  282. return nil
  283. }
  284. defer dst.Close()
  285. for {
  286. resp, receiveErr := client.Recv()
  287. if receiveErr == io.EOF {
  288. break
  289. }
  290. if receiveErr != nil {
  291. return fmt.Errorf("receiving %s: %v", fileName, receiveErr)
  292. }
  293. dst.Write(resp.FileContent)
  294. }
  295. return nil
  296. }