You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

693 lines
22 KiB

4 years ago
3 years ago
3 years ago
3 years ago
5 years ago
3 years ago
5 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
4 years ago
3 years ago
3 years ago
3 years ago
4 years ago
3 years ago
4 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
5 years ago
3 years ago
3 years ago
5 years ago
3 years ago
3 years ago
3 years ago
3 years ago
4 years ago
4 years ago
3 years ago
4 years ago
4 years ago
3 years ago
3 years ago
3 years ago
3 years ago
5 years ago
3 years ago
4 years ago
4 years ago
4 years ago
3 years ago
3 years ago
3 years ago
5 years ago
3 years ago
  1. package shell
  2. import (
  3. "bufio"
  4. "context"
  5. "flag"
  6. "fmt"
  7. "github.com/chrislusf/seaweedfs/weed/filer"
  8. "github.com/chrislusf/seaweedfs/weed/operation"
  9. "github.com/chrislusf/seaweedfs/weed/pb"
  10. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  11. "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
  12. "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
  13. "github.com/chrislusf/seaweedfs/weed/storage/needle"
  14. "github.com/chrislusf/seaweedfs/weed/storage/needle_map"
  15. "github.com/chrislusf/seaweedfs/weed/storage/types"
  16. "github.com/chrislusf/seaweedfs/weed/util"
  17. "io"
  18. "io/ioutil"
  19. "math"
  20. "net/http"
  21. "net/url"
  22. "os"
  23. "path"
  24. "path/filepath"
  25. "strings"
  26. "sync"
  27. "time"
  28. )
  29. func init() {
  30. Commands = append(Commands, &commandVolumeFsck{})
  31. }
  32. type commandVolumeFsck struct {
  33. env *CommandEnv
  34. }
  35. func (c *commandVolumeFsck) Name() string {
  36. return "volume.fsck"
  37. }
  38. func (c *commandVolumeFsck) Help() string {
  39. return `check all volumes to find entries not used by the filer
  40. Important assumption!!!
  41. the system is all used by one filer.
  42. This command works this way:
  43. 1. collect all file ids from all volumes, as set A
  44. 2. collect all file ids from the filer, as set B
  45. 3. find out the set A subtract B
  46. If -findMissingChunksInFiler is enabled, this works
  47. in a reverse way:
  48. 1. collect all file ids from all volumes, as set A
  49. 2. collect all file ids from the filer, as set B
  50. 3. find out the set B subtract A
  51. `
  52. }
  53. func (c *commandVolumeFsck) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
  54. fsckCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
  55. verbose := fsckCommand.Bool("v", false, "verbose mode")
  56. findMissingChunksInFiler := fsckCommand.Bool("findMissingChunksInFiler", false, "see \"help volume.fsck\"")
  57. findMissingChunksInFilerPath := fsckCommand.String("findMissingChunksInFilerPath", "/", "used together with findMissingChunksInFiler")
  58. findMissingChunksInVolumeId := fsckCommand.Int("findMissingChunksInVolumeId", 0, "used together with findMissingChunksInFiler")
  59. applyPurging := fsckCommand.Bool("reallyDeleteFromVolume", false, "<expert only!> after detection, delete missing data from volumes / delete missing file entries from filer")
  60. purgeAbsent := fsckCommand.Bool("reallyDeleteFilerEntries", false, "<expert only!> delete missing file entries from filer if the corresponding volume is missing for any reason, please ensure all still existing/expected volumes are connected! used together with findMissingChunksInFiler")
  61. tempPath := fsckCommand.String("tempPath", path.Join(os.TempDir()), "path for temporary idx files")
  62. if err = fsckCommand.Parse(args); err != nil {
  63. return nil
  64. }
  65. if err = commandEnv.confirmIsLocked(args); err != nil {
  66. return
  67. }
  68. c.env = commandEnv
  69. // create a temp folder
  70. tempFolder, err := os.MkdirTemp(*tempPath, "sw_fsck")
  71. if err != nil {
  72. return fmt.Errorf("failed to create temp folder: %v", err)
  73. }
  74. if *verbose {
  75. fmt.Fprintf(writer, "working directory: %s\n", tempFolder)
  76. }
  77. defer os.RemoveAll(tempFolder)
  78. // collect all volume id locations
  79. dataNodeVolumeIdToVInfo, err := c.collectVolumeIds(commandEnv, *verbose, writer)
  80. if err != nil {
  81. return fmt.Errorf("failed to collect all volume locations: %v", err)
  82. }
  83. isBucketsPath := false
  84. var fillerBucketsPath string
  85. if *findMissingChunksInFiler && *findMissingChunksInFilerPath != "/" {
  86. fillerBucketsPath, err = readFilerBucketsPath(commandEnv)
  87. if err != nil {
  88. return fmt.Errorf("read filer buckets path: %v", err)
  89. }
  90. if strings.HasPrefix(*findMissingChunksInFilerPath, fillerBucketsPath) {
  91. isBucketsPath = true
  92. }
  93. }
  94. if err != nil {
  95. return fmt.Errorf("read filer buckets path: %v", err)
  96. }
  97. collectMtime := time.Now().Unix()
  98. // collect each volume file ids
  99. for dataNodeId, volumeIdToVInfo := range dataNodeVolumeIdToVInfo {
  100. for volumeId, vinfo := range volumeIdToVInfo {
  101. if *findMissingChunksInVolumeId > 0 && uint32(*findMissingChunksInVolumeId) != volumeId {
  102. delete(volumeIdToVInfo, volumeId)
  103. continue
  104. }
  105. if isBucketsPath && !strings.HasPrefix(*findMissingChunksInFilerPath, fillerBucketsPath+"/"+vinfo.collection) {
  106. delete(volumeIdToVInfo, volumeId)
  107. continue
  108. }
  109. err = c.collectOneVolumeFileIds(tempFolder, dataNodeId, volumeId, vinfo, *verbose, writer)
  110. if err != nil {
  111. return fmt.Errorf("failed to collect file ids from volume %d on %s: %v", volumeId, vinfo.server, err)
  112. }
  113. }
  114. }
  115. if *findMissingChunksInFiler {
  116. // collect all filer file ids and paths
  117. if err = c.collectFilerFileIdAndPaths(dataNodeVolumeIdToVInfo, tempFolder, writer, *findMissingChunksInFilerPath, *verbose, *purgeAbsent, collectMtime); err != nil {
  118. return fmt.Errorf("collectFilerFileIdAndPaths: %v", err)
  119. }
  120. for dataNodeId, volumeIdToVInfo := range dataNodeVolumeIdToVInfo {
  121. // for each volume, check filer file ids
  122. if err = c.findFilerChunksMissingInVolumeServers(volumeIdToVInfo, tempFolder, dataNodeId, writer, *verbose, *applyPurging); err != nil {
  123. return fmt.Errorf("findFilerChunksMissingInVolumeServers: %v", err)
  124. }
  125. }
  126. } else {
  127. // collect all filer file ids
  128. if err = c.collectFilerFileIds(dataNodeVolumeIdToVInfo, tempFolder, writer, *verbose); err != nil {
  129. return fmt.Errorf("failed to collect file ids from filer: %v", err)
  130. }
  131. // volume file ids subtract filer file ids
  132. if err = c.findExtraChunksInVolumeServers(dataNodeVolumeIdToVInfo, tempFolder, writer, *verbose, *applyPurging); err != nil {
  133. return fmt.Errorf("findExtraChunksInVolumeServers: %v", err)
  134. }
  135. }
  136. return nil
  137. }
  138. func (c *commandVolumeFsck) collectFilerFileIdAndPaths(dataNodeVolumeIdToVInfo map[string]map[uint32]VInfo, tempFolder string, writer io.Writer, filerPath string, verbose bool, purgeAbsent bool, collectMtime int64) error {
  139. if verbose {
  140. fmt.Fprintf(writer, "checking each file from filer ...\n")
  141. }
  142. files := make(map[uint32]*os.File)
  143. for _, volumeIdToServer := range dataNodeVolumeIdToVInfo {
  144. for vid := range volumeIdToServer {
  145. if _, ok := files[vid]; ok {
  146. continue
  147. }
  148. dst, openErr := os.OpenFile(getFilerFileIdFile(tempFolder, vid), os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644)
  149. if openErr != nil {
  150. return fmt.Errorf("failed to create file %s: %v", getFilerFileIdFile(tempFolder, vid), openErr)
  151. }
  152. files[vid] = dst
  153. }
  154. }
  155. defer func() {
  156. for _, f := range files {
  157. f.Close()
  158. }
  159. }()
  160. type Item struct {
  161. vid uint32
  162. fileKey uint64
  163. cookie uint32
  164. path util.FullPath
  165. }
  166. return doTraverseBfsAndSaving(c.env, nil, filerPath, false, func(entry *filer_pb.FullEntry, outputChan chan interface{}) (err error) {
  167. if verbose && entry.Entry.IsDirectory {
  168. fmt.Fprintf(writer, "checking directory %s\n", util.NewFullPath(entry.Dir, entry.Entry.Name))
  169. }
  170. dataChunks, manifestChunks, resolveErr := filer.ResolveChunkManifest(filer.LookupFn(c.env), entry.Entry.Chunks, 0, math.MaxInt64)
  171. if resolveErr != nil {
  172. return nil
  173. }
  174. dataChunks = append(dataChunks, manifestChunks...)
  175. for _, chunk := range dataChunks {
  176. if chunk.Mtime > collectMtime {
  177. continue
  178. }
  179. outputChan <- &Item{
  180. vid: chunk.Fid.VolumeId,
  181. fileKey: chunk.Fid.FileKey,
  182. cookie: chunk.Fid.Cookie,
  183. path: util.NewFullPath(entry.Dir, entry.Entry.Name),
  184. }
  185. }
  186. return nil
  187. }, func(outputChan chan interface{}) {
  188. buffer := make([]byte, 16)
  189. for item := range outputChan {
  190. i := item.(*Item)
  191. if f, ok := files[i.vid]; ok {
  192. util.Uint64toBytes(buffer, i.fileKey)
  193. util.Uint32toBytes(buffer[8:], i.cookie)
  194. util.Uint32toBytes(buffer[12:], uint32(len(i.path)))
  195. f.Write(buffer)
  196. f.Write([]byte(i.path))
  197. // fmt.Fprintf(writer, "%d,%x%08x %d %s\n", i.vid, i.fileKey, i.cookie, len(i.path), i.path)
  198. } else {
  199. fmt.Fprintf(writer, "%d,%x%08x %s volume not found\n", i.vid, i.fileKey, i.cookie, i.path)
  200. if purgeAbsent {
  201. fmt.Printf("deleting path %s after volume not found", i.path)
  202. c.httpDelete(i.path, verbose)
  203. }
  204. }
  205. }
  206. })
  207. }
  208. func (c *commandVolumeFsck) findFilerChunksMissingInVolumeServers(volumeIdToVInfo map[uint32]VInfo, tempFolder string, dataNodeId string, writer io.Writer, verbose bool, applyPurging bool) error {
  209. for volumeId, vinfo := range volumeIdToVInfo {
  210. checkErr := c.oneVolumeFileIdsCheckOneVolume(tempFolder, dataNodeId, volumeId, writer, verbose, applyPurging)
  211. if checkErr != nil {
  212. return fmt.Errorf("failed to collect file ids from volume %d on %s: %v", volumeId, vinfo.server, checkErr)
  213. }
  214. }
  215. return nil
  216. }
  217. func (c *commandVolumeFsck) findExtraChunksInVolumeServers(dataNodeVolumeIdToVInfo map[string]map[uint32]VInfo, tempFolder string, writer io.Writer, verbose bool, applyPurging bool) error {
  218. var totalInUseCount, totalOrphanChunkCount, totalOrphanDataSize uint64
  219. volumeIdOrphanFileIds := make(map[uint32]map[string]bool)
  220. isSeveralReplicas := make(map[uint32]bool)
  221. isEcVolumeReplicas := make(map[uint32]bool)
  222. isReadOnlyReplicas := make(map[uint32]bool)
  223. serverReplicas := make(map[uint32][]pb.ServerAddress)
  224. for dataNodeId, volumeIdToVInfo := range dataNodeVolumeIdToVInfo {
  225. for volumeId, vinfo := range volumeIdToVInfo {
  226. inUseCount, orphanFileIds, orphanDataSize, checkErr := c.oneVolumeFileIdsSubtractFilerFileIds(tempFolder, dataNodeId, volumeId, writer, verbose)
  227. if checkErr != nil {
  228. return fmt.Errorf("failed to collect file ids from volume %d on %s: %v", volumeId, vinfo.server, checkErr)
  229. }
  230. isSeveralReplicas[volumeId] = false
  231. if _, found := volumeIdOrphanFileIds[volumeId]; !found {
  232. volumeIdOrphanFileIds[volumeId] = make(map[string]bool)
  233. } else {
  234. isSeveralReplicas[volumeId] = true
  235. }
  236. for _, fid := range orphanFileIds {
  237. if isSeveralReplicas[volumeId] {
  238. if _, found := volumeIdOrphanFileIds[volumeId][fid]; !found {
  239. continue
  240. }
  241. }
  242. volumeIdOrphanFileIds[volumeId][fid] = isSeveralReplicas[volumeId]
  243. }
  244. totalInUseCount += inUseCount
  245. totalOrphanChunkCount += uint64(len(orphanFileIds))
  246. totalOrphanDataSize += orphanDataSize
  247. if verbose {
  248. for _, fid := range orphanFileIds {
  249. fmt.Fprintf(writer, "%s\n", fid)
  250. }
  251. }
  252. isEcVolumeReplicas[volumeId] = vinfo.isEcVolume
  253. if isReadOnly, found := isReadOnlyReplicas[volumeId]; !(found && isReadOnly) {
  254. isReadOnlyReplicas[volumeId] = vinfo.isReadOnly
  255. }
  256. serverReplicas[volumeId] = append(serverReplicas[volumeId], vinfo.server)
  257. }
  258. for volumeId, orphanReplicaFileIds := range volumeIdOrphanFileIds {
  259. if !(applyPurging && len(orphanReplicaFileIds) > 0) {
  260. continue
  261. }
  262. orphanFileIds := []string{}
  263. for fid, foundInAllReplicas := range orphanReplicaFileIds {
  264. if !isSeveralReplicas[volumeId] || (isSeveralReplicas[volumeId] && foundInAllReplicas) {
  265. orphanFileIds = append(orphanFileIds, fid)
  266. }
  267. }
  268. if !(len(orphanFileIds) > 0) {
  269. continue
  270. }
  271. if verbose {
  272. fmt.Fprintf(writer, "purging process for volume %d", volumeId)
  273. }
  274. if isEcVolumeReplicas[volumeId] {
  275. fmt.Fprintf(writer, "skip purging for Erasure Coded volume %d.\n", volumeId)
  276. continue
  277. }
  278. for _, server := range serverReplicas[volumeId] {
  279. needleVID := needle.VolumeId(volumeId)
  280. if isReadOnlyReplicas[volumeId] {
  281. err := markVolumeWritable(c.env.option.GrpcDialOption, needleVID, server, true)
  282. if err != nil {
  283. return fmt.Errorf("mark volume %d read/write: %v", volumeId, err)
  284. }
  285. fmt.Fprintf(writer, "temporarily marked %d on server %v writable for forced purge\n", volumeId, server)
  286. defer markVolumeWritable(c.env.option.GrpcDialOption, needleVID, server, false)
  287. fmt.Fprintf(writer, "marked %d on server %v writable for forced purge\n", volumeId, server)
  288. }
  289. if verbose {
  290. fmt.Fprintf(writer, "purging files from volume %d\n", volumeId)
  291. }
  292. if err := c.purgeFileIdsForOneVolume(volumeId, orphanFileIds, writer); err != nil {
  293. return fmt.Errorf("purging volume %d: %v", volumeId, err)
  294. }
  295. }
  296. }
  297. }
  298. if !applyPurging {
  299. pct := float64(totalOrphanChunkCount*100) / (float64(totalOrphanChunkCount + totalInUseCount))
  300. fmt.Fprintf(writer, "\nTotal\t\tentries:%d\torphan:%d\t%.2f%%\t%dB\n",
  301. totalOrphanChunkCount+totalInUseCount, totalOrphanChunkCount, pct, totalOrphanDataSize)
  302. fmt.Fprintf(writer, "This could be normal if multiple filers or no filers are used.\n")
  303. }
  304. if totalOrphanChunkCount == 0 {
  305. fmt.Fprintf(writer, "no orphan data\n")
  306. //return nil
  307. }
  308. return nil
  309. }
  310. func (c *commandVolumeFsck) collectOneVolumeFileIds(tempFolder string, dataNodeId string, volumeId uint32, vinfo VInfo, verbose bool, writer io.Writer) error {
  311. if verbose {
  312. fmt.Fprintf(writer, "collecting volume %d file ids from %s ...\n", volumeId, vinfo.server)
  313. }
  314. return operation.WithVolumeServerClient(false, vinfo.server, c.env.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
  315. ext := ".idx"
  316. if vinfo.isEcVolume {
  317. ext = ".ecx"
  318. }
  319. copyFileClient, err := volumeServerClient.CopyFile(context.Background(), &volume_server_pb.CopyFileRequest{
  320. VolumeId: volumeId,
  321. Ext: ext,
  322. CompactionRevision: math.MaxUint32,
  323. StopOffset: math.MaxInt64,
  324. Collection: vinfo.collection,
  325. IsEcVolume: vinfo.isEcVolume,
  326. IgnoreSourceFileNotFound: false,
  327. })
  328. if err != nil {
  329. return fmt.Errorf("failed to start copying volume %d%s: %v", volumeId, ext, err)
  330. }
  331. err = writeToFile(copyFileClient, getVolumeFileIdFile(tempFolder, dataNodeId, volumeId))
  332. if err != nil {
  333. return fmt.Errorf("failed to copy %d%s from %s: %v", volumeId, ext, vinfo.server, err)
  334. }
  335. return nil
  336. })
  337. }
  338. func (c *commandVolumeFsck) collectFilerFileIds(dataNodeVolumeIdToVInfo map[string]map[uint32]VInfo, tempFolder string, writer io.Writer, verbose bool) error {
  339. if verbose {
  340. fmt.Fprintf(writer, "collecting file ids from filer ...\n")
  341. }
  342. files := make(map[uint32]*os.File)
  343. for _, volumeIdToServer := range dataNodeVolumeIdToVInfo {
  344. for vid := range volumeIdToServer {
  345. dst, openErr := os.OpenFile(getFilerFileIdFile(tempFolder, vid), os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644)
  346. if openErr != nil {
  347. return fmt.Errorf("failed to create file %s: %v", getFilerFileIdFile(tempFolder, vid), openErr)
  348. }
  349. files[vid] = dst
  350. }
  351. }
  352. defer func() {
  353. for _, f := range files {
  354. f.Close()
  355. }
  356. }()
  357. type Item struct {
  358. vid uint32
  359. fileKey uint64
  360. }
  361. return doTraverseBfsAndSaving(c.env, nil, "/", false, func(entry *filer_pb.FullEntry, outputChan chan interface{}) (err error) {
  362. dataChunks, manifestChunks, resolveErr := filer.ResolveChunkManifest(filer.LookupFn(c.env), entry.Entry.Chunks, 0, math.MaxInt64)
  363. if resolveErr != nil {
  364. if verbose {
  365. fmt.Fprintf(writer, "resolving manifest chunks in %s: %v\n", util.NewFullPath(entry.Dir, entry.Entry.Name), resolveErr)
  366. }
  367. return nil
  368. }
  369. dataChunks = append(dataChunks, manifestChunks...)
  370. for _, chunk := range dataChunks {
  371. outputChan <- &Item{
  372. vid: chunk.Fid.VolumeId,
  373. fileKey: chunk.Fid.FileKey,
  374. }
  375. }
  376. return nil
  377. }, func(outputChan chan interface{}) {
  378. buffer := make([]byte, 8)
  379. for item := range outputChan {
  380. i := item.(*Item)
  381. util.Uint64toBytes(buffer, i.fileKey)
  382. files[i.vid].Write(buffer)
  383. }
  384. })
  385. }
  386. func (c *commandVolumeFsck) oneVolumeFileIdsCheckOneVolume(tempFolder string, dataNodeId string, volumeId uint32, writer io.Writer, verbose bool, applyPurging bool) (err error) {
  387. if verbose {
  388. fmt.Fprintf(writer, "find missing file chunks in dataNodeId %s volume %d ...\n", dataNodeId, volumeId)
  389. }
  390. db := needle_map.NewMemDb()
  391. defer db.Close()
  392. if err = db.LoadFromIdx(getVolumeFileIdFile(tempFolder, dataNodeId, volumeId)); err != nil {
  393. return
  394. }
  395. file := getFilerFileIdFile(tempFolder, volumeId)
  396. fp, err := os.Open(file)
  397. if err != nil {
  398. return
  399. }
  400. defer fp.Close()
  401. type Item struct {
  402. fileKey uint64
  403. cookie uint32
  404. path util.FullPath
  405. }
  406. br := bufio.NewReader(fp)
  407. buffer := make([]byte, 16)
  408. item := &Item{}
  409. var readSize int
  410. for {
  411. readSize, err = io.ReadFull(br, buffer)
  412. if err != nil || readSize != 16 {
  413. break
  414. }
  415. item.fileKey = util.BytesToUint64(buffer[:8])
  416. item.cookie = util.BytesToUint32(buffer[8:12])
  417. pathSize := util.BytesToUint32(buffer[12:16])
  418. pathBytes := make([]byte, int(pathSize))
  419. n, err := io.ReadFull(br, pathBytes)
  420. if err != nil {
  421. fmt.Fprintf(writer, "%d,%x%08x in unexpected error: %v\n", volumeId, item.fileKey, item.cookie, err)
  422. }
  423. if n != int(pathSize) {
  424. fmt.Fprintf(writer, "%d,%x%08x %d unexpected file name size %d\n", volumeId, item.fileKey, item.cookie, pathSize, n)
  425. }
  426. item.path = util.FullPath(string(pathBytes))
  427. needleId := types.NeedleId(item.fileKey)
  428. if _, found := db.Get(needleId); !found {
  429. fmt.Fprintf(writer, "%s\n", item.path)
  430. if applyPurging {
  431. // defining the URL this way automatically escapes complex path names
  432. c.httpDelete(item.path, verbose)
  433. }
  434. }
  435. }
  436. return nil
  437. }
  438. func (c *commandVolumeFsck) httpDelete(path util.FullPath, verbose bool) {
  439. req, err := http.NewRequest(http.MethodDelete, "", nil)
  440. req.URL = &url.URL{
  441. Scheme: "http",
  442. Host: c.env.option.FilerAddress.ToHttpAddress(),
  443. Path: string(path),
  444. }
  445. if verbose {
  446. fmt.Printf("full HTTP delete request to be sent: %v\n", req)
  447. }
  448. if err != nil {
  449. fmt.Errorf("HTTP delete request error: %v\n", err)
  450. }
  451. client := &http.Client{}
  452. resp, err := client.Do(req)
  453. if err != nil {
  454. fmt.Errorf("DELETE fetch error: %v\n", err)
  455. }
  456. defer resp.Body.Close()
  457. _, err = ioutil.ReadAll(resp.Body)
  458. if err != nil {
  459. fmt.Errorf("DELETE response error: %v\n", err)
  460. }
  461. if verbose {
  462. fmt.Println("delete response Status : ", resp.Status)
  463. fmt.Println("delete response Headers : ", resp.Header)
  464. }
  465. }
  466. func (c *commandVolumeFsck) oneVolumeFileIdsSubtractFilerFileIds(tempFolder string, dataNodeId string, volumeId uint32, writer io.Writer, verbose bool) (inUseCount uint64, orphanFileIds []string, orphanDataSize uint64, err error) {
  467. db := needle_map.NewMemDb()
  468. defer db.Close()
  469. if err = db.LoadFromIdx(getVolumeFileIdFile(tempFolder, dataNodeId, volumeId)); err != nil {
  470. return
  471. }
  472. filerFileIdsData, err := os.ReadFile(getFilerFileIdFile(tempFolder, volumeId))
  473. if err != nil {
  474. return
  475. }
  476. dataLen := len(filerFileIdsData)
  477. if dataLen%8 != 0 {
  478. return 0, nil, 0, fmt.Errorf("filer data is corrupted")
  479. }
  480. for i := 0; i < len(filerFileIdsData); i += 8 {
  481. fileKey := util.BytesToUint64(filerFileIdsData[i : i+8])
  482. db.Delete(types.NeedleId(fileKey))
  483. inUseCount++
  484. }
  485. var orphanFileCount uint64
  486. db.AscendingVisit(func(n needle_map.NeedleValue) error {
  487. // fmt.Printf("%d,%x\n", volumeId, n.Key)
  488. orphanFileIds = append(orphanFileIds, fmt.Sprintf("%d,%s00000000", volumeId, n.Key.String()))
  489. orphanFileCount++
  490. orphanDataSize += uint64(n.Size)
  491. return nil
  492. })
  493. if orphanFileCount > 0 {
  494. pct := float64(orphanFileCount*100) / (float64(orphanFileCount + inUseCount))
  495. fmt.Fprintf(writer, "dataNode:%s\tvolume:%d\tentries:%d\torphan:%d\t%.2f%%\t%dB\n",
  496. dataNodeId, volumeId, orphanFileCount+inUseCount, orphanFileCount, pct, orphanDataSize)
  497. }
  498. return
  499. }
  500. type VInfo struct {
  501. server pb.ServerAddress
  502. collection string
  503. isEcVolume bool
  504. isReadOnly bool
  505. }
  506. func (c *commandVolumeFsck) collectVolumeIds(commandEnv *CommandEnv, verbose bool, writer io.Writer) (volumeIdToServer map[string]map[uint32]VInfo, err error) {
  507. if verbose {
  508. fmt.Fprintf(writer, "collecting volume id and locations from master ...\n")
  509. }
  510. volumeIdToServer = make(map[string]map[uint32]VInfo)
  511. // collect topology information
  512. topologyInfo, _, err := collectTopologyInfo(commandEnv, 0)
  513. if err != nil {
  514. return
  515. }
  516. eachDataNode(topologyInfo, func(dc string, rack RackId, t *master_pb.DataNodeInfo) {
  517. for _, diskInfo := range t.DiskInfos {
  518. dataNodeId := t.GetId()
  519. volumeIdToServer[dataNodeId] = make(map[uint32]VInfo)
  520. for _, vi := range diskInfo.VolumeInfos {
  521. volumeIdToServer[dataNodeId][vi.Id] = VInfo{
  522. server: pb.NewServerAddressFromDataNode(t),
  523. collection: vi.Collection,
  524. isEcVolume: false,
  525. isReadOnly: vi.ReadOnly,
  526. }
  527. }
  528. for _, ecShardInfo := range diskInfo.EcShardInfos {
  529. volumeIdToServer[dataNodeId][ecShardInfo.Id] = VInfo{
  530. server: pb.NewServerAddressFromDataNode(t),
  531. collection: ecShardInfo.Collection,
  532. isEcVolume: true,
  533. isReadOnly: true,
  534. }
  535. }
  536. }
  537. })
  538. if verbose {
  539. fmt.Fprintf(writer, "collected %d volumes and locations.\n", len(volumeIdToServer))
  540. }
  541. return
  542. }
  543. func (c *commandVolumeFsck) purgeFileIdsForOneVolume(volumeId uint32, fileIds []string, writer io.Writer) (err error) {
  544. fmt.Fprintf(writer, "purging orphan data for volume %d...\n", volumeId)
  545. locations, found := c.env.MasterClient.GetLocations(volumeId)
  546. if !found {
  547. return fmt.Errorf("failed to find volume %d locations", volumeId)
  548. }
  549. resultChan := make(chan []*volume_server_pb.DeleteResult, len(locations))
  550. var wg sync.WaitGroup
  551. for _, location := range locations {
  552. wg.Add(1)
  553. go func(server pb.ServerAddress, fidList []string) {
  554. defer wg.Done()
  555. if deleteResults, deleteErr := operation.DeleteFilesAtOneVolumeServer(server, c.env.option.GrpcDialOption, fidList, false); deleteErr != nil {
  556. err = deleteErr
  557. } else if deleteResults != nil {
  558. resultChan <- deleteResults
  559. }
  560. }(location.ServerAddress(), fileIds)
  561. }
  562. wg.Wait()
  563. close(resultChan)
  564. for results := range resultChan {
  565. for _, result := range results {
  566. if result.Error != "" {
  567. fmt.Fprintf(writer, "purge error: %s\n", result.Error)
  568. }
  569. }
  570. }
  571. return
  572. }
  573. func getVolumeFileIdFile(tempFolder string, dataNodeid string, vid uint32) string {
  574. return filepath.Join(tempFolder, fmt.Sprintf("%s_%d.idx", dataNodeid, vid))
  575. }
  576. func getFilerFileIdFile(tempFolder string, vid uint32) string {
  577. return filepath.Join(tempFolder, fmt.Sprintf("%d.fid", vid))
  578. }
  579. func writeToFile(client volume_server_pb.VolumeServer_CopyFileClient, fileName string) error {
  580. flags := os.O_WRONLY | os.O_CREATE | os.O_TRUNC
  581. dst, err := os.OpenFile(fileName, flags, 0644)
  582. if err != nil {
  583. return nil
  584. }
  585. defer dst.Close()
  586. for {
  587. resp, receiveErr := client.Recv()
  588. if receiveErr == io.EOF {
  589. break
  590. }
  591. if receiveErr != nil {
  592. return fmt.Errorf("receiving %s: %v", fileName, receiveErr)
  593. }
  594. dst.Write(resp.FileContent)
  595. }
  596. return nil
  597. }