You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

748 lines
24 KiB

3 years ago
3 years ago
5 years ago
5 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
more solid weed mount (#4089) * compare chunks by timestamp * fix slab clearing error * fix test compilation * move oldest chunk to sealed, instead of by fullness * lock on fh.entryViewCache * remove verbose logs * revert slat clearing * less logs * less logs * track write and read by timestamp * remove useless logic * add entry lock on file handle release * use mem chunk only, swap file chunk has problems * comment out code that maybe used later * add debug mode to compare data read and write * more efficient readResolvedChunks with linked list * small optimization * fix test compilation * minor fix on writer * add SeparateGarbageChunks * group chunks into sections * turn off debug mode * fix tests * fix tests * tmp enable swap file chunk * Revert "tmp enable swap file chunk" This reverts commit 985137ec472924e4815f258189f6ca9f2168a0a7. * simple refactoring * simple refactoring * do not re-use swap file chunk. Sealed chunks should not be re-used. * comment out debugging facilities * either mem chunk or swap file chunk is fine now * remove orderedMutex as *semaphore.Weighted not found impactful * optimize size calculation for changing large files * optimize performance to avoid going through the long list of chunks * still problems with swap file chunk * rename * tiny optimization * swap file chunk save only successfully read data * fix * enable both mem and swap file chunk * resolve chunks with range * rename * fix chunk interval list * also change file handle chunk group when adding chunks * pick in-active chunk with time-decayed counter * fix compilation * avoid nil with empty fh.entry * refactoring * rename * rename * refactor visible intervals to *list.List * refactor chunkViews to *list.List * add IntervalList for generic interval list * change visible interval to use IntervalList in generics * cahnge chunkViews to *IntervalList[*ChunkView] * use NewFileChunkSection to create * rename variables * refactor * fix renaming leftover * renaming * renaming * add insert interval * interval list adds lock * incrementally add chunks to readers Fixes: 1. set start and stop offset for the value object 2. clone the value object 3. use pointer instead of copy-by-value when passing to interval.Value 4. use insert interval since adding chunk could be out of order * fix tests compilation * fix tests compilation
2 years ago
4 years ago
4 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
5 years ago
5 years ago
4 years ago
4 years ago
4 years ago
3 years ago
5 years ago
5 years ago
3 years ago
4 years ago
4 years ago
4 years ago
3 years ago
3 years ago
5 months ago
3 years ago
  1. package shell
  2. import (
  3. "bufio"
  4. "bytes"
  5. "context"
  6. "errors"
  7. "flag"
  8. "fmt"
  9. "io"
  10. "math"
  11. "net/http"
  12. "net/url"
  13. "os"
  14. "path"
  15. "path/filepath"
  16. "strconv"
  17. "strings"
  18. "sync"
  19. "time"
  20. "github.com/seaweedfs/seaweedfs/weed/filer"
  21. "github.com/seaweedfs/seaweedfs/weed/operation"
  22. "github.com/seaweedfs/seaweedfs/weed/pb"
  23. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  24. "github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
  25. "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
  26. "github.com/seaweedfs/seaweedfs/weed/storage"
  27. "github.com/seaweedfs/seaweedfs/weed/storage/needle"
  28. "github.com/seaweedfs/seaweedfs/weed/storage/needle_map"
  29. "github.com/seaweedfs/seaweedfs/weed/storage/types"
  30. "github.com/seaweedfs/seaweedfs/weed/util"
  31. util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
  32. "golang.org/x/sync/errgroup"
  33. )
  34. func init() {
  35. Commands = append(Commands, &commandVolumeFsck{})
  36. }
  37. const (
  38. readbufferSize = 16
  39. )
  40. type commandVolumeFsck struct {
  41. env *CommandEnv
  42. writer io.Writer
  43. bucketsPath string
  44. collection *string
  45. volumeIds map[uint32]bool
  46. tempFolder string
  47. verbose *bool
  48. forcePurging *bool
  49. findMissingChunksInFiler *bool
  50. verifyNeedle *bool
  51. }
  52. func (c *commandVolumeFsck) Name() string {
  53. return "volume.fsck"
  54. }
  55. func (c *commandVolumeFsck) Help() string {
  56. return `check all volumes to find entries not used by the filer. It is optional and resource intensive.
  57. Important assumption!!!
  58. the system is all used by one filer.
  59. This command works this way:
  60. 1. collect all file ids from all volumes, as set A
  61. 2. collect all file ids from the filer, as set B
  62. 3. find out the set A subtract B
  63. If -findMissingChunksInFiler is enabled, this works
  64. in a reverse way:
  65. 1. collect all file ids from all volumes, as set A
  66. 2. collect all file ids from the filer, as set B
  67. 3. find out the set B subtract A
  68. `
  69. }
  70. func (c *commandVolumeFsck) HasTag(tag CommandTag) bool {
  71. return tag == ResourceHeavy
  72. }
  73. func (c *commandVolumeFsck) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
  74. fsckCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
  75. c.verbose = fsckCommand.Bool("v", false, "verbose mode")
  76. c.findMissingChunksInFiler = fsckCommand.Bool("findMissingChunksInFiler", false, "see \"help volume.fsck\"")
  77. c.collection = fsckCommand.String("collection", "", "the collection name")
  78. volumeIds := fsckCommand.String("volumeId", "", "comma separated the volume id")
  79. applyPurging := fsckCommand.Bool("reallyDeleteFromVolume", false, "<expert only!> after detection, delete missing data from volumes / delete missing file entries from filer. Currently this only works with default filerGroup.")
  80. c.forcePurging = fsckCommand.Bool("forcePurging", false, "delete missing data from volumes in one replica used together with applyPurging")
  81. purgeAbsent := fsckCommand.Bool("reallyDeleteFilerEntries", false, "<expert only!> delete missing file entries from filer if the corresponding volume is missing for any reason, please ensure all still existing/expected volumes are connected! used together with findMissingChunksInFiler")
  82. tempPath := fsckCommand.String("tempPath", path.Join(os.TempDir()), "path for temporary idx files")
  83. cutoffTimeAgo := fsckCommand.Duration("cutoffTimeAgo", 5*time.Minute, "only include entries on volume servers before this cutoff time to check orphan chunks")
  84. modifyTimeAgo := fsckCommand.Duration("modifyTimeAgo", 0, "only include entries after this modify time to check orphan chunks")
  85. c.verifyNeedle = fsckCommand.Bool("verifyNeedles", false, "check needles status from volume server")
  86. if err = fsckCommand.Parse(args); err != nil {
  87. return nil
  88. }
  89. if err = commandEnv.confirmIsLocked(args); err != nil {
  90. return
  91. }
  92. c.volumeIds = make(map[uint32]bool)
  93. if *volumeIds != "" {
  94. for _, volumeIdStr := range strings.Split(*volumeIds, ",") {
  95. if volumeIdInt, err := strconv.ParseUint(volumeIdStr, 10, 32); err == nil {
  96. c.volumeIds[uint32(volumeIdInt)] = true
  97. } else {
  98. return fmt.Errorf("parse volumeId string %s to int: %v", volumeIdStr, err)
  99. }
  100. }
  101. }
  102. c.env = commandEnv
  103. c.writer = writer
  104. c.bucketsPath, err = readFilerBucketsPath(commandEnv)
  105. if err != nil {
  106. return fmt.Errorf("read filer buckets path: %v", err)
  107. }
  108. // create a temp folder
  109. c.tempFolder, err = os.MkdirTemp(*tempPath, "sw_fsck")
  110. if err != nil {
  111. return fmt.Errorf("failed to create temp folder: %v", err)
  112. }
  113. if *c.verbose {
  114. fmt.Fprintf(c.writer, "working directory: %s\n", c.tempFolder)
  115. }
  116. defer os.RemoveAll(c.tempFolder)
  117. // collect all volume id locations
  118. dataNodeVolumeIdToVInfo, err := c.collectVolumeIds()
  119. if err != nil {
  120. return fmt.Errorf("failed to collect all volume locations: %v", err)
  121. }
  122. if err != nil {
  123. return fmt.Errorf("read filer buckets path: %v", err)
  124. }
  125. var collectCutoffFromAtNs int64 = 0
  126. if cutoffTimeAgo.Seconds() != 0 {
  127. collectCutoffFromAtNs = time.Now().Add(-*cutoffTimeAgo).UnixNano()
  128. }
  129. var collectModifyFromAtNs int64 = 0
  130. if modifyTimeAgo.Seconds() != 0 {
  131. collectModifyFromAtNs = time.Now().Add(-*modifyTimeAgo).UnixNano()
  132. }
  133. // collect each volume file ids
  134. eg, gCtx := errgroup.WithContext(context.Background())
  135. _ = gCtx
  136. for _dataNodeId, _volumeIdToVInfo := range dataNodeVolumeIdToVInfo {
  137. dataNodeId, volumeIdToVInfo := _dataNodeId, _volumeIdToVInfo
  138. eg.Go(func() error {
  139. for volumeId, vinfo := range volumeIdToVInfo {
  140. if len(c.volumeIds) > 0 {
  141. if _, ok := c.volumeIds[volumeId]; !ok {
  142. delete(volumeIdToVInfo, volumeId)
  143. continue
  144. }
  145. }
  146. if *c.collection != "" && vinfo.collection != *c.collection {
  147. delete(volumeIdToVInfo, volumeId)
  148. continue
  149. }
  150. err = c.collectOneVolumeFileIds(dataNodeId, volumeId, vinfo)
  151. if err != nil {
  152. return fmt.Errorf("failed to collect file ids from volume %d on %s: %v", volumeId, vinfo.server, err)
  153. }
  154. }
  155. if *c.verbose {
  156. fmt.Fprintf(c.writer, "dn %+v filtred %d volumes and locations.\n", dataNodeId, len(dataNodeVolumeIdToVInfo[dataNodeId]))
  157. }
  158. return nil
  159. })
  160. }
  161. err = eg.Wait()
  162. if err != nil {
  163. fmt.Fprintf(c.writer, "got error: %v", err)
  164. return err
  165. }
  166. if *c.findMissingChunksInFiler {
  167. // collect all filer file ids and paths
  168. if err = c.collectFilerFileIdAndPaths(dataNodeVolumeIdToVInfo, *purgeAbsent, collectModifyFromAtNs, collectCutoffFromAtNs); err != nil {
  169. return fmt.Errorf("collectFilerFileIdAndPaths: %v", err)
  170. }
  171. for dataNodeId, volumeIdToVInfo := range dataNodeVolumeIdToVInfo {
  172. // for each volume, check filer file ids
  173. if err = c.findFilerChunksMissingInVolumeServers(volumeIdToVInfo, dataNodeId, *applyPurging); err != nil {
  174. return fmt.Errorf("findFilerChunksMissingInVolumeServers: %v", err)
  175. }
  176. }
  177. } else {
  178. // collect all filer file ids
  179. if err = c.collectFilerFileIdAndPaths(dataNodeVolumeIdToVInfo, false, 0, 0); err != nil {
  180. return fmt.Errorf("failed to collect file ids from filer: %v", err)
  181. }
  182. // volume file ids subtract filer file ids
  183. if err = c.findExtraChunksInVolumeServers(dataNodeVolumeIdToVInfo, *applyPurging, uint64(collectModifyFromAtNs), uint64(collectCutoffFromAtNs)); err != nil {
  184. return fmt.Errorf("findExtraChunksInVolumeServers: %v", err)
  185. }
  186. }
  187. return nil
  188. }
  189. func (c *commandVolumeFsck) collectFilerFileIdAndPaths(dataNodeVolumeIdToVInfo map[string]map[uint32]VInfo, purgeAbsent bool, collectModifyFromAtNs int64, cutoffFromAtNs int64) error {
  190. if *c.verbose {
  191. fmt.Fprintf(c.writer, "checking each file from filer path %s...\n", c.getCollectFilerFilePath())
  192. }
  193. files := make(map[uint32]*os.File)
  194. for _, volumeIdToServer := range dataNodeVolumeIdToVInfo {
  195. for vid := range volumeIdToServer {
  196. if _, ok := files[vid]; ok {
  197. continue
  198. }
  199. dst, openErr := os.OpenFile(getFilerFileIdFile(c.tempFolder, vid), os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644)
  200. if openErr != nil {
  201. return fmt.Errorf("failed to create file %s: %v", getFilerFileIdFile(c.tempFolder, vid), openErr)
  202. }
  203. files[vid] = dst
  204. }
  205. }
  206. defer func() {
  207. for _, f := range files {
  208. f.Close()
  209. }
  210. }()
  211. return doTraverseBfsAndSaving(c.env, c.writer, c.getCollectFilerFilePath(), false,
  212. func(entry *filer_pb.FullEntry, outputChan chan interface{}) (err error) {
  213. if *c.verbose && entry.Entry.IsDirectory {
  214. fmt.Fprintf(c.writer, "checking directory %s\n", util.NewFullPath(entry.Dir, entry.Entry.Name))
  215. }
  216. dataChunks, manifestChunks, resolveErr := filer.ResolveChunkManifest(filer.LookupFn(c.env), entry.Entry.GetChunks(), 0, math.MaxInt64)
  217. if resolveErr != nil {
  218. return fmt.Errorf("failed to ResolveChunkManifest: %+v", resolveErr)
  219. }
  220. dataChunks = append(dataChunks, manifestChunks...)
  221. for _, chunk := range dataChunks {
  222. if cutoffFromAtNs != 0 && chunk.ModifiedTsNs > cutoffFromAtNs {
  223. continue
  224. }
  225. if collectModifyFromAtNs != 0 && chunk.ModifiedTsNs < collectModifyFromAtNs {
  226. continue
  227. }
  228. outputChan <- &Item{
  229. vid: chunk.Fid.VolumeId,
  230. fileKey: chunk.Fid.FileKey,
  231. cookie: chunk.Fid.Cookie,
  232. path: util.NewFullPath(entry.Dir, entry.Entry.Name),
  233. }
  234. }
  235. return nil
  236. },
  237. func(outputChan chan interface{}) {
  238. buffer := make([]byte, readbufferSize)
  239. for item := range outputChan {
  240. i := item.(*Item)
  241. if f, ok := files[i.vid]; ok {
  242. util.Uint64toBytes(buffer, i.fileKey)
  243. util.Uint32toBytes(buffer[8:], i.cookie)
  244. util.Uint32toBytes(buffer[12:], uint32(len(i.path)))
  245. f.Write(buffer)
  246. f.Write([]byte(i.path))
  247. } else if *c.findMissingChunksInFiler && len(c.volumeIds) == 0 {
  248. fmt.Fprintf(c.writer, "%d,%x%08x %s volume not found\n", i.vid, i.fileKey, i.cookie, i.path)
  249. if purgeAbsent {
  250. fmt.Printf("deleting path %s after volume not found", i.path)
  251. c.httpDelete(i.path)
  252. }
  253. }
  254. }
  255. })
  256. }
  257. func (c *commandVolumeFsck) findFilerChunksMissingInVolumeServers(volumeIdToVInfo map[uint32]VInfo, dataNodeId string, applyPurging bool) error {
  258. for volumeId, vinfo := range volumeIdToVInfo {
  259. checkErr := c.oneVolumeFileIdsCheckOneVolume(dataNodeId, volumeId, applyPurging)
  260. if checkErr != nil {
  261. return fmt.Errorf("failed to collect file ids from volume %d on %s: %v", volumeId, vinfo.server, checkErr)
  262. }
  263. }
  264. return nil
  265. }
  266. func (c *commandVolumeFsck) findExtraChunksInVolumeServers(dataNodeVolumeIdToVInfo map[string]map[uint32]VInfo, applyPurging bool, modifyFrom, cutoffFrom uint64) error {
  267. var totalInUseCount, totalOrphanChunkCount, totalOrphanDataSize uint64
  268. volumeIdOrphanFileIds := make(map[uint32]map[string]bool)
  269. isSeveralReplicas := make(map[uint32]bool)
  270. isEcVolumeReplicas := make(map[uint32]bool)
  271. isReadOnlyReplicas := make(map[uint32]bool)
  272. serverReplicas := make(map[uint32][]pb.ServerAddress)
  273. for dataNodeId, volumeIdToVInfo := range dataNodeVolumeIdToVInfo {
  274. for volumeId, vinfo := range volumeIdToVInfo {
  275. inUseCount, orphanFileIds, orphanDataSize, checkErr := c.oneVolumeFileIdsSubtractFilerFileIds(dataNodeId, volumeId, &vinfo, modifyFrom, cutoffFrom)
  276. if checkErr != nil {
  277. return fmt.Errorf("failed to collect file ids from volume %d on %s: %v", volumeId, vinfo.server, checkErr)
  278. }
  279. isSeveralReplicas[volumeId] = false
  280. if _, found := volumeIdOrphanFileIds[volumeId]; !found {
  281. volumeIdOrphanFileIds[volumeId] = make(map[string]bool)
  282. } else {
  283. isSeveralReplicas[volumeId] = true
  284. }
  285. for _, fid := range orphanFileIds {
  286. if isSeveralReplicas[volumeId] {
  287. if _, found := volumeIdOrphanFileIds[volumeId][fid]; !found {
  288. continue
  289. }
  290. }
  291. volumeIdOrphanFileIds[volumeId][fid] = isSeveralReplicas[volumeId]
  292. }
  293. totalInUseCount += inUseCount
  294. totalOrphanChunkCount += uint64(len(orphanFileIds))
  295. totalOrphanDataSize += orphanDataSize
  296. if *c.verbose {
  297. for _, fid := range orphanFileIds {
  298. fmt.Fprintf(c.writer, "%s:%s\n", vinfo.collection, fid)
  299. }
  300. }
  301. isEcVolumeReplicas[volumeId] = vinfo.isEcVolume
  302. if isReadOnly, found := isReadOnlyReplicas[volumeId]; !(found && isReadOnly) {
  303. isReadOnlyReplicas[volumeId] = vinfo.isReadOnly
  304. }
  305. serverReplicas[volumeId] = append(serverReplicas[volumeId], vinfo.server)
  306. }
  307. for volumeId, orphanReplicaFileIds := range volumeIdOrphanFileIds {
  308. if !(applyPurging && len(orphanReplicaFileIds) > 0) {
  309. continue
  310. }
  311. orphanFileIds := []string{}
  312. for fid, foundInAllReplicas := range orphanReplicaFileIds {
  313. if !isSeveralReplicas[volumeId] || *c.forcePurging || (isSeveralReplicas[volumeId] && foundInAllReplicas) {
  314. orphanFileIds = append(orphanFileIds, fid)
  315. }
  316. }
  317. if !(len(orphanFileIds) > 0) {
  318. continue
  319. }
  320. if *c.verbose {
  321. fmt.Fprintf(c.writer, "purging process for volume %d.\n", volumeId)
  322. }
  323. if isEcVolumeReplicas[volumeId] {
  324. fmt.Fprintf(c.writer, "skip purging for Erasure Coded volume %d.\n", volumeId)
  325. continue
  326. }
  327. for _, server := range serverReplicas[volumeId] {
  328. needleVID := needle.VolumeId(volumeId)
  329. if isReadOnlyReplicas[volumeId] {
  330. err := markVolumeWritable(c.env.option.GrpcDialOption, needleVID, server, true, false)
  331. if err != nil {
  332. return fmt.Errorf("mark volume %d read/write: %v", volumeId, err)
  333. }
  334. fmt.Fprintf(c.writer, "temporarily marked %d on server %v writable for forced purge\n", volumeId, server)
  335. defer markVolumeWritable(c.env.option.GrpcDialOption, needleVID, server, false, false)
  336. fmt.Fprintf(c.writer, "marked %d on server %v writable for forced purge\n", volumeId, server)
  337. }
  338. if *c.verbose {
  339. fmt.Fprintf(c.writer, "purging files from volume %d\n", volumeId)
  340. }
  341. if err := c.purgeFileIdsForOneVolume(volumeId, orphanFileIds); err != nil {
  342. return fmt.Errorf("purging volume %d: %v", volumeId, err)
  343. }
  344. }
  345. }
  346. }
  347. if !applyPurging {
  348. pct := float64(totalOrphanChunkCount*100) / (float64(totalOrphanChunkCount + totalInUseCount))
  349. fmt.Fprintf(c.writer, "\nTotal\t\tentries:%d\torphan:%d\t%.2f%%\t%dB\n",
  350. totalOrphanChunkCount+totalInUseCount, totalOrphanChunkCount, pct, totalOrphanDataSize)
  351. fmt.Fprintf(c.writer, "This could be normal if multiple filers or no filers are used.\n")
  352. }
  353. if totalOrphanChunkCount == 0 {
  354. fmt.Fprintf(c.writer, "no orphan data\n")
  355. }
  356. return nil
  357. }
  358. func (c *commandVolumeFsck) collectOneVolumeFileIds(dataNodeId string, volumeId uint32, vinfo VInfo) error {
  359. if *c.verbose {
  360. fmt.Fprintf(c.writer, "collecting volume %d file ids from %s ...\n", volumeId, vinfo.server)
  361. }
  362. return operation.WithVolumeServerClient(false, vinfo.server, c.env.option.GrpcDialOption,
  363. func(volumeServerClient volume_server_pb.VolumeServerClient) error {
  364. ext := ".idx"
  365. if vinfo.isEcVolume {
  366. ext = ".ecx"
  367. }
  368. copyFileClient, err := volumeServerClient.CopyFile(context.Background(), &volume_server_pb.CopyFileRequest{
  369. VolumeId: volumeId,
  370. Ext: ext,
  371. CompactionRevision: math.MaxUint32,
  372. StopOffset: math.MaxInt64,
  373. Collection: vinfo.collection,
  374. IsEcVolume: vinfo.isEcVolume,
  375. IgnoreSourceFileNotFound: false,
  376. })
  377. if err != nil {
  378. return fmt.Errorf("failed to start copying volume %d%s: %v", volumeId, ext, err)
  379. }
  380. var buf bytes.Buffer
  381. for {
  382. resp, err := copyFileClient.Recv()
  383. if errors.Is(err, io.EOF) {
  384. break
  385. }
  386. if err != nil {
  387. return err
  388. }
  389. buf.Write(resp.FileContent)
  390. }
  391. idxFilename := getVolumeFileIdFile(c.tempFolder, dataNodeId, volumeId)
  392. err = writeToFile(buf.Bytes(), idxFilename)
  393. if err != nil {
  394. return fmt.Errorf("failed to copy %d%s from %s: %v", volumeId, ext, vinfo.server, err)
  395. }
  396. return nil
  397. })
  398. }
  399. type Item struct {
  400. vid uint32
  401. fileKey uint64
  402. cookie uint32
  403. path util.FullPath
  404. }
  405. func (c *commandVolumeFsck) readFilerFileIdFile(volumeId uint32, fn func(needleId types.NeedleId, itemPath util.FullPath)) error {
  406. fp, err := os.Open(getFilerFileIdFile(c.tempFolder, volumeId))
  407. if err != nil {
  408. return err
  409. }
  410. defer fp.Close()
  411. br := bufio.NewReader(fp)
  412. buffer := make([]byte, readbufferSize)
  413. var readSize int
  414. var readErr error
  415. item := &Item{vid: volumeId}
  416. for {
  417. readSize, readErr = io.ReadFull(br, buffer)
  418. if errors.Is(readErr, io.EOF) {
  419. break
  420. }
  421. if readErr != nil {
  422. return readErr
  423. }
  424. if readSize != readbufferSize {
  425. return fmt.Errorf("readSize mismatch")
  426. }
  427. item.fileKey = util.BytesToUint64(buffer[:8])
  428. item.cookie = util.BytesToUint32(buffer[8:12])
  429. pathSize := util.BytesToUint32(buffer[12:16])
  430. pathBytes := make([]byte, int(pathSize))
  431. n, err := io.ReadFull(br, pathBytes)
  432. if err != nil {
  433. fmt.Fprintf(c.writer, "%d,%x%08x in unexpected error: %v\n", volumeId, item.fileKey, item.cookie, err)
  434. }
  435. if n != int(pathSize) {
  436. fmt.Fprintf(c.writer, "%d,%x%08x %d unexpected file name size %d\n", volumeId, item.fileKey, item.cookie, pathSize, n)
  437. }
  438. item.path = util.FullPath(pathBytes)
  439. needleId := types.NeedleId(item.fileKey)
  440. fn(needleId, item.path)
  441. }
  442. return nil
  443. }
  444. func (c *commandVolumeFsck) oneVolumeFileIdsCheckOneVolume(dataNodeId string, volumeId uint32, applyPurging bool) (err error) {
  445. if *c.verbose {
  446. fmt.Fprintf(c.writer, "find missing file chunks in dataNodeId %s volume %d ...\n", dataNodeId, volumeId)
  447. }
  448. db := needle_map.NewMemDb()
  449. defer db.Close()
  450. if err = db.LoadFromIdx(getVolumeFileIdFile(c.tempFolder, dataNodeId, volumeId)); err != nil {
  451. return
  452. }
  453. if err = c.readFilerFileIdFile(volumeId, func(needleId types.NeedleId, itemPath util.FullPath) {
  454. if _, found := db.Get(needleId); !found {
  455. fmt.Fprintf(c.writer, "%s\n", itemPath)
  456. if applyPurging {
  457. c.httpDelete(itemPath)
  458. }
  459. }
  460. }); err != nil {
  461. return
  462. }
  463. return nil
  464. }
  465. func (c *commandVolumeFsck) httpDelete(path util.FullPath) {
  466. req, err := http.NewRequest(http.MethodDelete, "", nil)
  467. req.URL = &url.URL{
  468. Scheme: "http",
  469. Host: c.env.option.FilerAddress.ToHttpAddress(),
  470. Path: string(path),
  471. }
  472. if *c.verbose {
  473. fmt.Fprintf(c.writer, "full HTTP delete request to be sent: %v\n", req)
  474. }
  475. if err != nil {
  476. fmt.Fprintf(c.writer, "HTTP delete request error: %v\n", err)
  477. }
  478. resp, err := util_http.GetGlobalHttpClient().Do(req)
  479. if err != nil {
  480. fmt.Fprintf(c.writer, "DELETE fetch error: %v\n", err)
  481. }
  482. defer resp.Body.Close()
  483. _, err = io.ReadAll(resp.Body)
  484. if err != nil {
  485. fmt.Fprintf(c.writer, "DELETE response error: %v\n", err)
  486. }
  487. if *c.verbose {
  488. fmt.Fprintln(c.writer, "delete response Status : ", resp.Status)
  489. fmt.Fprintln(c.writer, "delete response Headers : ", resp.Header)
  490. }
  491. }
  492. func (c *commandVolumeFsck) oneVolumeFileIdsSubtractFilerFileIds(dataNodeId string, volumeId uint32, vinfo *VInfo, modifyFrom, cutoffFrom uint64) (inUseCount uint64, orphanFileIds []string, orphanDataSize uint64, err error) {
  493. volumeFileIdDb := needle_map.NewMemDb()
  494. defer volumeFileIdDb.Close()
  495. if err = volumeFileIdDb.LoadFromIdx(getVolumeFileIdFile(c.tempFolder, dataNodeId, volumeId)); err != nil {
  496. err = fmt.Errorf("failed to LoadFromIdx %+v", err)
  497. return
  498. }
  499. if err = c.readFilerFileIdFile(volumeId, func(filerNeedleId types.NeedleId, itemPath util.FullPath) {
  500. inUseCount++
  501. if *c.verifyNeedle {
  502. if needleValue, ok := volumeFileIdDb.Get(filerNeedleId); ok && !needleValue.Size.IsDeleted() {
  503. if _, err := readNeedleStatus(c.env.option.GrpcDialOption, vinfo.server, volumeId, *needleValue); err != nil {
  504. // files may be deleted during copying filesIds
  505. if !strings.Contains(err.Error(), storage.ErrorDeleted.Error()) {
  506. fmt.Fprintf(c.writer, "failed to read %d:%s needle status of file %s: %+v\n",
  507. volumeId, filerNeedleId.String(), itemPath, err)
  508. if *c.forcePurging {
  509. return
  510. }
  511. }
  512. }
  513. }
  514. }
  515. if err = volumeFileIdDb.Delete(filerNeedleId); err != nil && *c.verbose {
  516. fmt.Fprintf(c.writer, "failed to nm.delete %s(%+v): %+v", itemPath, filerNeedleId, err)
  517. }
  518. }); err != nil {
  519. err = fmt.Errorf("failed to readFilerFileIdFile %+v", err)
  520. return
  521. }
  522. var orphanFileCount uint64
  523. if err = volumeFileIdDb.AscendingVisit(func(n needle_map.NeedleValue) error {
  524. if n.Size.IsDeleted() {
  525. return nil
  526. }
  527. if cutoffFrom > 0 || modifyFrom > 0 {
  528. return operation.WithVolumeServerClient(false, vinfo.server, c.env.option.GrpcDialOption,
  529. func(volumeServerClient volume_server_pb.VolumeServerClient) error {
  530. resp, err := volumeServerClient.ReadNeedleMeta(context.Background(), &volume_server_pb.ReadNeedleMetaRequest{
  531. VolumeId: volumeId,
  532. NeedleId: types.NeedleIdToUint64(n.Key),
  533. Offset: n.Offset.ToActualOffset(),
  534. Size: int32(n.Size),
  535. })
  536. if err != nil {
  537. return fmt.Errorf("read needle meta with id %d from volume %d: %v", n.Key, volumeId, err)
  538. }
  539. if (modifyFrom == 0 || modifyFrom <= resp.AppendAtNs) && (cutoffFrom == 0 || resp.AppendAtNs <= cutoffFrom) {
  540. orphanFileIds = append(orphanFileIds, n.Key.FileId(volumeId))
  541. orphanFileCount++
  542. orphanDataSize += uint64(n.Size)
  543. }
  544. return nil
  545. })
  546. } else {
  547. orphanFileIds = append(orphanFileIds, n.Key.FileId(volumeId))
  548. orphanFileCount++
  549. orphanDataSize += uint64(n.Size)
  550. }
  551. return nil
  552. }); err != nil {
  553. err = fmt.Errorf("failed to AscendingVisit %+v", err)
  554. return
  555. }
  556. if orphanFileCount > 0 {
  557. pct := float64(orphanFileCount*100) / (float64(orphanFileCount + inUseCount))
  558. fmt.Fprintf(c.writer, "dataNode:%s\tvolume:%d\tentries:%d\torphan:%d\t%.2f%%\t%dB\n",
  559. dataNodeId, volumeId, orphanFileCount+inUseCount, orphanFileCount, pct, orphanDataSize)
  560. }
  561. return
  562. }
  563. type VInfo struct {
  564. server pb.ServerAddress
  565. collection string
  566. isEcVolume bool
  567. isReadOnly bool
  568. }
  569. func (c *commandVolumeFsck) collectVolumeIds() (volumeIdToServer map[string]map[uint32]VInfo, err error) {
  570. if *c.verbose {
  571. fmt.Fprintf(c.writer, "collecting volume id and locations from master ...\n")
  572. }
  573. volumeIdToServer = make(map[string]map[uint32]VInfo)
  574. // collect topology information
  575. topologyInfo, _, err := collectTopologyInfo(c.env, 0)
  576. if err != nil {
  577. return
  578. }
  579. eachDataNode(topologyInfo, func(dc string, rack RackId, t *master_pb.DataNodeInfo) {
  580. var volumeCount, ecShardCount int
  581. dataNodeId := t.GetId()
  582. for _, diskInfo := range t.DiskInfos {
  583. if _, ok := volumeIdToServer[dataNodeId]; !ok {
  584. volumeIdToServer[dataNodeId] = make(map[uint32]VInfo)
  585. }
  586. for _, vi := range diskInfo.VolumeInfos {
  587. volumeIdToServer[dataNodeId][vi.Id] = VInfo{
  588. server: pb.NewServerAddressFromDataNode(t),
  589. collection: vi.Collection,
  590. isEcVolume: false,
  591. isReadOnly: vi.ReadOnly,
  592. }
  593. volumeCount += 1
  594. }
  595. for _, ecShardInfo := range diskInfo.EcShardInfos {
  596. volumeIdToServer[dataNodeId][ecShardInfo.Id] = VInfo{
  597. server: pb.NewServerAddressFromDataNode(t),
  598. collection: ecShardInfo.Collection,
  599. isEcVolume: true,
  600. isReadOnly: true,
  601. }
  602. ecShardCount += 1
  603. }
  604. }
  605. if *c.verbose {
  606. fmt.Fprintf(c.writer, "dn %+v collected %d volumes and %d ec shards.\n", dataNodeId, volumeCount, ecShardCount)
  607. }
  608. })
  609. return
  610. }
  611. func (c *commandVolumeFsck) purgeFileIdsForOneVolume(volumeId uint32, fileIds []string) (err error) {
  612. fmt.Fprintf(c.writer, "purging orphan data for volume %d...\n", volumeId)
  613. locations, found := c.env.MasterClient.GetLocations(volumeId)
  614. if !found {
  615. return fmt.Errorf("failed to find volume %d locations", volumeId)
  616. }
  617. resultChan := make(chan []*volume_server_pb.DeleteResult, len(locations))
  618. var wg sync.WaitGroup
  619. for _, location := range locations {
  620. wg.Add(1)
  621. go func(server pb.ServerAddress, fidList []string) {
  622. defer wg.Done()
  623. if deleteResults, deleteErr := operation.DeleteFileIdsAtOneVolumeServer(server, c.env.option.GrpcDialOption, fidList, false); deleteErr != nil {
  624. err = deleteErr
  625. } else if deleteResults != nil {
  626. resultChan <- deleteResults
  627. }
  628. }(location.ServerAddress(), fileIds)
  629. }
  630. wg.Wait()
  631. close(resultChan)
  632. for results := range resultChan {
  633. for _, result := range results {
  634. if result.Error != "" {
  635. fmt.Fprintf(c.writer, "purge error: %s\n", result.Error)
  636. }
  637. }
  638. }
  639. return
  640. }
  641. func (c *commandVolumeFsck) getCollectFilerFilePath() string {
  642. if *c.collection != "" {
  643. return fmt.Sprintf("%s/%s", c.bucketsPath, *c.collection)
  644. }
  645. return "/"
  646. }
  647. func getVolumeFileIdFile(tempFolder string, dataNodeid string, vid uint32) string {
  648. return filepath.Join(tempFolder, fmt.Sprintf("%s_%d.idx", dataNodeid, vid))
  649. }
  650. func getFilerFileIdFile(tempFolder string, vid uint32) string {
  651. return filepath.Join(tempFolder, fmt.Sprintf("%d.fid", vid))
  652. }
  653. func writeToFile(bytes []byte, fileName string) error {
  654. flags := os.O_WRONLY | os.O_CREATE | os.O_TRUNC
  655. dst, err := os.OpenFile(fileName, flags, 0644)
  656. if err != nil {
  657. return nil
  658. }
  659. defer dst.Close()
  660. dst.Write(bytes)
  661. return nil
  662. }