You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

547 lines
16 KiB

4 years ago
5 years ago
4 years ago
5 years ago
5 years ago
4 years ago
3 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
5 years ago
5 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
5 years ago
4 years ago
4 years ago
4 years ago
5 years ago
  1. package shell
  2. import (
  3. "bufio"
  4. "context"
  5. "flag"
  6. "fmt"
  7. "github.com/chrislusf/seaweedfs/weed/pb"
  8. "github.com/chrislusf/seaweedfs/weed/storage/needle"
  9. "io"
  10. "io/ioutil"
  11. "math"
  12. "os"
  13. "path/filepath"
  14. "sync"
  15. "github.com/chrislusf/seaweedfs/weed/filer"
  16. "github.com/chrislusf/seaweedfs/weed/operation"
  17. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  18. "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
  19. "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
  20. "github.com/chrislusf/seaweedfs/weed/storage/needle_map"
  21. "github.com/chrislusf/seaweedfs/weed/storage/types"
  22. "github.com/chrislusf/seaweedfs/weed/util"
  23. )
  24. func init() {
  25. Commands = append(Commands, &commandVolumeFsck{})
  26. }
  27. type commandVolumeFsck struct {
  28. env *CommandEnv
  29. }
  30. func (c *commandVolumeFsck) Name() string {
  31. return "volume.fsck"
  32. }
  33. func (c *commandVolumeFsck) Help() string {
  34. return `check all volumes to find entries not used by the filer
  35. Important assumption!!!
  36. the system is all used by one filer.
  37. This command works this way:
  38. 1. collect all file ids from all volumes, as set A
  39. 2. collect all file ids from the filer, as set B
  40. 3. find out the set A subtract B
  41. If -findMissingChunksInFiler is enabled, this works
  42. in a reverse way:
  43. 1. collect all file ids from all volumes, as set A
  44. 2. collect all file ids from the filer, as set B
  45. 3. find out the set B subtract A
  46. `
  47. }
  48. func (c *commandVolumeFsck) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
  49. fsckCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
  50. verbose := fsckCommand.Bool("v", false, "verbose mode")
  51. findMissingChunksInFiler := fsckCommand.Bool("findMissingChunksInFiler", false, "see \"help volume.fsck\"")
  52. findMissingChunksInFilerPath := fsckCommand.String("findMissingChunksInFilerPath", "/", "used together with findMissingChunksInFiler")
  53. applyPurging := fsckCommand.Bool("reallyDeleteFromVolume", false, "<expert only> delete data not referenced by the filer")
  54. if err = fsckCommand.Parse(args); err != nil {
  55. return nil
  56. }
  57. if err = commandEnv.confirmIsLocked(); err != nil {
  58. return
  59. }
  60. c.env = commandEnv
  61. // create a temp folder
  62. tempFolder, err := ioutil.TempDir("", "sw_fsck")
  63. if err != nil {
  64. return fmt.Errorf("failed to create temp folder: %v", err)
  65. }
  66. if *verbose {
  67. fmt.Fprintf(writer, "working directory: %s\n", tempFolder)
  68. }
  69. defer os.RemoveAll(tempFolder)
  70. // collect all volume id locations
  71. volumeIdToVInfo, err := c.collectVolumeIds(commandEnv, *verbose, writer)
  72. if err != nil {
  73. return fmt.Errorf("failed to collect all volume locations: %v", err)
  74. }
  75. // collect each volume file ids
  76. for volumeId, vinfo := range volumeIdToVInfo {
  77. err = c.collectOneVolumeFileIds(tempFolder, volumeId, vinfo, *verbose, writer)
  78. if err != nil {
  79. return fmt.Errorf("failed to collect file ids from volume %d on %s: %v", volumeId, vinfo.server, err)
  80. }
  81. }
  82. if *findMissingChunksInFiler {
  83. // collect all filer file ids and paths
  84. if err = c.collectFilerFileIdAndPaths(volumeIdToVInfo, tempFolder, writer, *findMissingChunksInFilerPath, *verbose, applyPurging); err != nil {
  85. return fmt.Errorf("collectFilerFileIdAndPaths: %v", err)
  86. }
  87. // for each volume, check filer file ids
  88. if err = c.findFilerChunksMissingInVolumeServers(volumeIdToVInfo, tempFolder, writer, *verbose, applyPurging); err != nil {
  89. return fmt.Errorf("findFilerChunksMissingInVolumeServers: %v", err)
  90. }
  91. } else {
  92. // collect all filer file ids
  93. if err = c.collectFilerFileIds(tempFolder, volumeIdToVInfo, *verbose, writer); err != nil {
  94. return fmt.Errorf("failed to collect file ids from filer: %v", err)
  95. }
  96. // volume file ids substract filer file ids
  97. if err = c.findExtraChunksInVolumeServers(volumeIdToVInfo, tempFolder, writer, *verbose, applyPurging); err != nil {
  98. return fmt.Errorf("findExtraChunksInVolumeServers: %v", err)
  99. }
  100. }
  101. return nil
  102. }
  103. func (c *commandVolumeFsck) collectFilerFileIdAndPaths(volumeIdToServer map[uint32]VInfo, tempFolder string, writer io.Writer, filerPath string, verbose bool, applyPurging *bool) error {
  104. if verbose {
  105. fmt.Fprintf(writer, "checking each file from filer ...\n")
  106. }
  107. files := make(map[uint32]*os.File)
  108. for vid := range volumeIdToServer {
  109. dst, openErr := os.OpenFile(getFilerFileIdFile(tempFolder, vid), os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644)
  110. if openErr != nil {
  111. return fmt.Errorf("failed to create file %s: %v", getFilerFileIdFile(tempFolder, vid), openErr)
  112. }
  113. files[vid] = dst
  114. }
  115. defer func() {
  116. for _, f := range files {
  117. f.Close()
  118. }
  119. }()
  120. type Item struct {
  121. vid uint32
  122. fileKey uint64
  123. cookie uint32
  124. path util.FullPath
  125. }
  126. return doTraverseBfsAndSaving(c.env, nil, filerPath, false, func(outputChan chan interface{}) {
  127. buffer := make([]byte, 16)
  128. for item := range outputChan {
  129. i := item.(*Item)
  130. if f, ok := files[i.vid]; ok {
  131. util.Uint64toBytes(buffer, i.fileKey)
  132. util.Uint32toBytes(buffer[8:], i.cookie)
  133. util.Uint32toBytes(buffer[12:], uint32(len(i.path)))
  134. f.Write(buffer)
  135. f.Write([]byte(i.path))
  136. // fmt.Fprintf(writer, "%d,%x%08x %d %s\n", i.vid, i.fileKey, i.cookie, len(i.path), i.path)
  137. } else {
  138. fmt.Fprintf(writer, "%d,%x%08x %s volume not found\n", i.vid, i.fileKey, i.cookie, i.path)
  139. }
  140. }
  141. }, func(entry *filer_pb.FullEntry, outputChan chan interface{}) (err error) {
  142. if verbose && entry.Entry.IsDirectory {
  143. fmt.Fprintf(writer, "checking directory %s\n", util.NewFullPath(entry.Dir, entry.Entry.Name))
  144. }
  145. dChunks, mChunks, resolveErr := filer.ResolveChunkManifest(filer.LookupFn(c.env), entry.Entry.Chunks, 0, math.MaxInt64)
  146. if resolveErr != nil {
  147. return nil
  148. }
  149. dChunks = append(dChunks, mChunks...)
  150. for _, chunk := range dChunks {
  151. outputChan <- &Item{
  152. vid: chunk.Fid.VolumeId,
  153. fileKey: chunk.Fid.FileKey,
  154. cookie: chunk.Fid.Cookie,
  155. path: util.NewFullPath(entry.Dir, entry.Entry.Name),
  156. }
  157. }
  158. return nil
  159. })
  160. return nil
  161. }
  162. func (c *commandVolumeFsck) findFilerChunksMissingInVolumeServers(volumeIdToVInfo map[uint32]VInfo, tempFolder string, writer io.Writer, verbose bool, applyPurging *bool) error {
  163. for volumeId, vinfo := range volumeIdToVInfo {
  164. checkErr := c.oneVolumeFileIdsCheckOneVolume(tempFolder, volumeId, writer, verbose)
  165. if checkErr != nil {
  166. return fmt.Errorf("failed to collect file ids from volume %d on %s: %v", volumeId, vinfo.server, checkErr)
  167. }
  168. }
  169. return nil
  170. }
  171. func (c *commandVolumeFsck) findExtraChunksInVolumeServers(volumeIdToVInfo map[uint32]VInfo, tempFolder string, writer io.Writer, verbose bool, applyPurging *bool) error {
  172. var totalInUseCount, totalOrphanChunkCount, totalOrphanDataSize uint64
  173. for volumeId, vinfo := range volumeIdToVInfo {
  174. inUseCount, orphanFileIds, orphanDataSize, checkErr := c.oneVolumeFileIdsSubtractFilerFileIds(tempFolder, volumeId, writer, verbose)
  175. if checkErr != nil {
  176. return fmt.Errorf("failed to collect file ids from volume %d on %s: %v", volumeId, vinfo.server, checkErr)
  177. }
  178. totalInUseCount += inUseCount
  179. totalOrphanChunkCount += uint64(len(orphanFileIds))
  180. totalOrphanDataSize += orphanDataSize
  181. if verbose {
  182. for _, fid := range orphanFileIds {
  183. fmt.Fprintf(writer, "%s\n", fid)
  184. }
  185. }
  186. if *applyPurging && len(orphanFileIds) > 0 {
  187. if vinfo.isEcVolume {
  188. fmt.Fprintf(writer, "Skip purging for Erasure Coded volumes.\n")
  189. }
  190. if inUseCount == 0 {
  191. if err := deleteVolume(c.env.option.GrpcDialOption, needle.VolumeId(volumeId), vinfo.server); err != nil {
  192. return fmt.Errorf("delete volume %d: %v\n", volumeId, err)
  193. }
  194. } else {
  195. if err := c.purgeFileIdsForOneVolume(volumeId, orphanFileIds, writer); err != nil {
  196. return fmt.Errorf("purge for volume %d: %v\n", volumeId, err)
  197. }
  198. }
  199. }
  200. }
  201. if totalOrphanChunkCount == 0 {
  202. fmt.Fprintf(writer, "no orphan data\n")
  203. return nil
  204. }
  205. if !*applyPurging {
  206. pct := float64(totalOrphanChunkCount*100) / (float64(totalOrphanChunkCount + totalInUseCount))
  207. fmt.Fprintf(writer, "\nTotal\t\tentries:%d\torphan:%d\t%.2f%%\t%dB\n",
  208. totalOrphanChunkCount+totalInUseCount, totalOrphanChunkCount, pct, totalOrphanDataSize)
  209. fmt.Fprintf(writer, "This could be normal if multiple filers or no filers are used.\n")
  210. }
  211. return nil
  212. }
  213. func (c *commandVolumeFsck) collectOneVolumeFileIds(tempFolder string, volumeId uint32, vinfo VInfo, verbose bool, writer io.Writer) error {
  214. if verbose {
  215. fmt.Fprintf(writer, "collecting volume %d file ids from %s ...\n", volumeId, vinfo.server)
  216. }
  217. return operation.WithVolumeServerClient(vinfo.server, c.env.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
  218. ext := ".idx"
  219. if vinfo.isEcVolume {
  220. ext = ".ecx"
  221. }
  222. copyFileClient, err := volumeServerClient.CopyFile(context.Background(), &volume_server_pb.CopyFileRequest{
  223. VolumeId: volumeId,
  224. Ext: ext,
  225. CompactionRevision: math.MaxUint32,
  226. StopOffset: math.MaxInt64,
  227. Collection: vinfo.collection,
  228. IsEcVolume: vinfo.isEcVolume,
  229. IgnoreSourceFileNotFound: false,
  230. })
  231. if err != nil {
  232. return fmt.Errorf("failed to start copying volume %d%s: %v", volumeId, ext, err)
  233. }
  234. err = writeToFile(copyFileClient, getVolumeFileIdFile(tempFolder, volumeId))
  235. if err != nil {
  236. return fmt.Errorf("failed to copy %d%s from %s: %v", volumeId, ext, vinfo.server, err)
  237. }
  238. return nil
  239. })
  240. }
  241. func (c *commandVolumeFsck) collectFilerFileIds(tempFolder string, volumeIdToServer map[uint32]VInfo, verbose bool, writer io.Writer) error {
  242. if verbose {
  243. fmt.Fprintf(writer, "collecting file ids from filer ...\n")
  244. }
  245. files := make(map[uint32]*os.File)
  246. for vid := range volumeIdToServer {
  247. dst, openErr := os.OpenFile(getFilerFileIdFile(tempFolder, vid), os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644)
  248. if openErr != nil {
  249. return fmt.Errorf("failed to create file %s: %v", getFilerFileIdFile(tempFolder, vid), openErr)
  250. }
  251. files[vid] = dst
  252. }
  253. defer func() {
  254. for _, f := range files {
  255. f.Close()
  256. }
  257. }()
  258. type Item struct {
  259. vid uint32
  260. fileKey uint64
  261. }
  262. return doTraverseBfsAndSaving(c.env, nil, "/", false, func(outputChan chan interface{}) {
  263. buffer := make([]byte, 8)
  264. for item := range outputChan {
  265. i := item.(*Item)
  266. util.Uint64toBytes(buffer, i.fileKey)
  267. files[i.vid].Write(buffer)
  268. }
  269. }, func(entry *filer_pb.FullEntry, outputChan chan interface{}) (err error) {
  270. dChunks, mChunks, resolveErr := filer.ResolveChunkManifest(filer.LookupFn(c.env), entry.Entry.Chunks, 0, math.MaxInt64)
  271. if resolveErr != nil {
  272. if verbose {
  273. fmt.Fprintf(writer, "resolving manifest chunks in %s: %v\n", util.NewFullPath(entry.Dir, entry.Entry.Name), resolveErr)
  274. }
  275. return nil
  276. }
  277. dChunks = append(dChunks, mChunks...)
  278. for _, chunk := range dChunks {
  279. outputChan <- &Item{
  280. vid: chunk.Fid.VolumeId,
  281. fileKey: chunk.Fid.FileKey,
  282. }
  283. }
  284. return nil
  285. })
  286. }
  287. func (c *commandVolumeFsck) oneVolumeFileIdsCheckOneVolume(tempFolder string, volumeId uint32, writer io.Writer, verbose bool) (err error) {
  288. if verbose {
  289. fmt.Fprintf(writer, "find missing file chuns in volume %d ...\n", volumeId)
  290. }
  291. db := needle_map.NewMemDb()
  292. defer db.Close()
  293. if err = db.LoadFromIdx(getVolumeFileIdFile(tempFolder, volumeId)); err != nil {
  294. return
  295. }
  296. file := getFilerFileIdFile(tempFolder, volumeId)
  297. fp, err := os.Open(file)
  298. if err != nil {
  299. return
  300. }
  301. defer fp.Close()
  302. type Item struct {
  303. fileKey uint64
  304. cookie uint32
  305. path util.FullPath
  306. }
  307. br := bufio.NewReader(fp)
  308. buffer := make([]byte, 16)
  309. item := &Item{}
  310. var readSize int
  311. for {
  312. readSize, err = io.ReadFull(br, buffer)
  313. if err != nil || readSize != 16 {
  314. if err == io.EOF {
  315. return nil
  316. } else {
  317. break
  318. }
  319. }
  320. item.fileKey = util.BytesToUint64(buffer[:8])
  321. item.cookie = util.BytesToUint32(buffer[8:12])
  322. pathSize := util.BytesToUint32(buffer[12:16])
  323. pathBytes := make([]byte, int(pathSize))
  324. n, err := io.ReadFull(br, pathBytes)
  325. if err != nil {
  326. fmt.Fprintf(writer, "%d,%x%08x in unexpected error: %v\n", volumeId, item.fileKey, item.cookie, err)
  327. }
  328. if n != int(pathSize) {
  329. fmt.Fprintf(writer, "%d,%x%08x %d unexpected file name size %d\n", volumeId, item.fileKey, item.cookie, pathSize, n)
  330. }
  331. item.path = util.FullPath(string(pathBytes))
  332. if _, found := db.Get(types.NeedleId(item.fileKey)); !found {
  333. fmt.Fprintf(writer, "%d,%x%08x in %s %d not found\n", volumeId, item.fileKey, item.cookie, item.path, pathSize)
  334. }
  335. }
  336. return
  337. }
  338. func (c *commandVolumeFsck) oneVolumeFileIdsSubtractFilerFileIds(tempFolder string, volumeId uint32, writer io.Writer, verbose bool) (inUseCount uint64, orphanFileIds []string, orphanDataSize uint64, err error) {
  339. db := needle_map.NewMemDb()
  340. defer db.Close()
  341. if err = db.LoadFromIdx(getVolumeFileIdFile(tempFolder, volumeId)); err != nil {
  342. return
  343. }
  344. filerFileIdsData, err := ioutil.ReadFile(getFilerFileIdFile(tempFolder, volumeId))
  345. if err != nil {
  346. return
  347. }
  348. dataLen := len(filerFileIdsData)
  349. if dataLen%8 != 0 {
  350. return 0, nil, 0, fmt.Errorf("filer data is corrupted")
  351. }
  352. for i := 0; i < len(filerFileIdsData); i += 8 {
  353. fileKey := util.BytesToUint64(filerFileIdsData[i : i+8])
  354. db.Delete(types.NeedleId(fileKey))
  355. inUseCount++
  356. }
  357. var orphanFileCount uint64
  358. db.AscendingVisit(func(n needle_map.NeedleValue) error {
  359. // fmt.Printf("%d,%x\n", volumeId, n.Key)
  360. orphanFileIds = append(orphanFileIds, fmt.Sprintf("%d,%s00000000", volumeId, n.Key.String()))
  361. orphanFileCount++
  362. orphanDataSize += uint64(n.Size)
  363. return nil
  364. })
  365. if orphanFileCount > 0 {
  366. pct := float64(orphanFileCount*100) / (float64(orphanFileCount + inUseCount))
  367. fmt.Fprintf(writer, "volume:%d\tentries:%d\torphan:%d\t%.2f%%\t%dB\n",
  368. volumeId, orphanFileCount+inUseCount, orphanFileCount, pct, orphanDataSize)
  369. }
  370. return
  371. }
  372. type VInfo struct {
  373. server pb.ServerAddress
  374. collection string
  375. isEcVolume bool
  376. }
  377. func (c *commandVolumeFsck) collectVolumeIds(commandEnv *CommandEnv, verbose bool, writer io.Writer) (volumeIdToServer map[uint32]VInfo, err error) {
  378. if verbose {
  379. fmt.Fprintf(writer, "collecting volume id and locations from master ...\n")
  380. }
  381. volumeIdToServer = make(map[uint32]VInfo)
  382. // collect topology information
  383. topologyInfo, _, err := collectTopologyInfo(commandEnv)
  384. if err != nil {
  385. return
  386. }
  387. eachDataNode(topologyInfo, func(dc string, rack RackId, t *master_pb.DataNodeInfo) {
  388. for _, diskInfo := range t.DiskInfos {
  389. for _, vi := range diskInfo.VolumeInfos {
  390. volumeIdToServer[vi.Id] = VInfo{
  391. server: pb.NewServerAddressFromDataNode(t),
  392. collection: vi.Collection,
  393. isEcVolume: false,
  394. }
  395. }
  396. for _, ecShardInfo := range diskInfo.EcShardInfos {
  397. volumeIdToServer[ecShardInfo.Id] = VInfo{
  398. server: pb.NewServerAddressFromDataNode(t),
  399. collection: ecShardInfo.Collection,
  400. isEcVolume: true,
  401. }
  402. }
  403. }
  404. })
  405. if verbose {
  406. fmt.Fprintf(writer, "collected %d volumes and locations.\n", len(volumeIdToServer))
  407. }
  408. return
  409. }
  410. func (c *commandVolumeFsck) purgeFileIdsForOneVolume(volumeId uint32, fileIds []string, writer io.Writer) (err error) {
  411. fmt.Fprintf(writer, "purging orphan data for volume %d...\n", volumeId)
  412. locations, found := c.env.MasterClient.GetLocations(volumeId)
  413. if !found {
  414. return fmt.Errorf("failed to find volume %d locations", volumeId)
  415. }
  416. resultChan := make(chan []*volume_server_pb.DeleteResult, len(locations))
  417. var wg sync.WaitGroup
  418. for _, location := range locations {
  419. wg.Add(1)
  420. go func(server pb.ServerAddress, fidList []string) {
  421. defer wg.Done()
  422. if deleteResults, deleteErr := operation.DeleteFilesAtOneVolumeServer(server, c.env.option.GrpcDialOption, fidList, false); deleteErr != nil {
  423. err = deleteErr
  424. } else if deleteResults != nil {
  425. resultChan <- deleteResults
  426. }
  427. }(location.ServerAddress(), fileIds)
  428. }
  429. wg.Wait()
  430. close(resultChan)
  431. for results := range resultChan {
  432. for _, result := range results {
  433. if result.Error != "" {
  434. fmt.Fprintf(writer, "purge error: %s\n", result.Error)
  435. }
  436. }
  437. }
  438. return
  439. }
  440. func getVolumeFileIdFile(tempFolder string, vid uint32) string {
  441. return filepath.Join(tempFolder, fmt.Sprintf("%d.idx", vid))
  442. }
  443. func getFilerFileIdFile(tempFolder string, vid uint32) string {
  444. return filepath.Join(tempFolder, fmt.Sprintf("%d.fid", vid))
  445. }
  446. func writeToFile(client volume_server_pb.VolumeServer_CopyFileClient, fileName string) error {
  447. flags := os.O_WRONLY | os.O_CREATE | os.O_TRUNC
  448. dst, err := os.OpenFile(fileName, flags, 0644)
  449. if err != nil {
  450. return nil
  451. }
  452. defer dst.Close()
  453. for {
  454. resp, receiveErr := client.Recv()
  455. if receiveErr == io.EOF {
  456. break
  457. }
  458. if receiveErr != nil {
  459. return fmt.Errorf("receiving %s: %v", fileName, receiveErr)
  460. }
  461. dst.Write(resp.FileContent)
  462. }
  463. return nil
  464. }