diff --git a/weed/shell/command_volume_fsck.go b/weed/shell/command_volume_fsck.go index d6c9e796f..400e96fe7 100644 --- a/weed/shell/command_volume_fsck.go +++ b/weed/shell/command_volume_fsck.go @@ -1,6 +1,7 @@ package shell import ( + "bufio" "context" "flag" "fmt" @@ -44,6 +45,12 @@ func (c *commandVolumeFsck) Help() string { 2. collect all file ids from the filer, as set B 3. find out the set A subtract B + If -findMissingChunksInFiler is enabled, this works + in a reverse way: + 1. collect all file ids from all volumes, as set A + 2. collect all file ids from the filer, as set B + 3. find out the set B subtract A + ` } @@ -55,6 +62,7 @@ func (c *commandVolumeFsck) Do(args []string, commandEnv *CommandEnv, writer io. fsckCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) verbose := fsckCommand.Bool("v", false, "verbose mode") + findMissingChunksInFiler := fsckCommand.Bool("findMissingChunksInFiler", false, "see help volume.fsck") applyPurging := fsckCommand.Bool("reallyDeleteFromVolume", false, " delete data not referenced by the filer") if err = fsckCommand.Parse(args); err != nil { return nil @@ -86,21 +94,105 @@ func (c *commandVolumeFsck) Do(args []string, commandEnv *CommandEnv, writer io. } } - // collect all filer file ids - if err = c.collectFilerFileIds(tempFolder, volumeIdToVInfo, *verbose, writer); err != nil { - return fmt.Errorf("failed to collect file ids from filer: %v", err) + if *findMissingChunksInFiler { + // collect all filer file ids and paths + if err = c.collectFilerFileIdAndPaths(volumeIdToVInfo, tempFolder, writer, *verbose, applyPurging); err != nil { + return fmt.Errorf("collectFilerFileIdAndPaths: %v", err) + } + // for each volume, check filer file ids + if err = c.findFilerChunksMissingInVolumeServers(volumeIdToVInfo, tempFolder, writer, *verbose, applyPurging); err != nil { + return fmt.Errorf("findExtraChunksInVolumeServers: %v", err) + } + } else { + // collect all filer file ids + if err = c.collectFilerFileIds(tempFolder, volumeIdToVInfo, *verbose, writer); err != nil { + return fmt.Errorf("failed to collect file ids from filer: %v", err) + } + // volume file ids substract filer file ids + if err = c.findExtraChunksInVolumeServers(volumeIdToVInfo, tempFolder, writer, *verbose, applyPurging); err != nil { + return fmt.Errorf("findExtraChunksInVolumeServers: %v", err) + } } - // volume file ids substract filer file ids - err = c.findExtraChunksInVolumeServers(volumeIdToVInfo, tempFolder, writer, verbose, applyPurging) + return nil +} + +func (c *commandVolumeFsck) collectFilerFileIdAndPaths(volumeIdToServer map[uint32]VInfo, tempFolder string, writer io.Writer, verbose bool, applyPurging *bool) error { + + if verbose { + fmt.Fprintf(writer, "checking each file from filer ...\n") + } + + files := make(map[uint32]*os.File) + for vid := range volumeIdToServer { + dst, openErr := os.OpenFile(getFilerFileIdFile(tempFolder, vid), os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644) + if openErr != nil { + return fmt.Errorf("failed to create file %s: %v", getFilerFileIdFile(tempFolder, vid), openErr) + } + files[vid] = dst + } + defer func() { + for _, f := range files { + f.Close() + } + }() - return err + type Item struct { + vid uint32 + fileKey uint64 + cookie uint32 + path util.FullPath + } + return doTraverseBfsAndSaving(c.env, nil, "/", false, func(outputChan chan interface{}) { + buffer := make([]byte, 8) + for item := range outputChan { + i := item.(*Item) + if f, ok := files[i.vid]; ok { + util.Uint64toBytes(buffer, i.fileKey) + f.Write(buffer) + util.Uint32toBytes(buffer, i.cookie) + util.Uint32toBytes(buffer[4:], uint32(len(i.path))) + f.Write(buffer) + f.Write([]byte(i.path)) + } else { + fmt.Fprintf(writer, "%d,%x%08x %s volume not found\n", i.vid, i.fileKey, i.cookie, i.path) + } + } + }, func(entry *filer_pb.FullEntry, outputChan chan interface{}) (err error) { + dChunks, mChunks, resolveErr := filer.ResolveChunkManifest(filer.LookupFn(c.env), entry.Entry.Chunks) + if resolveErr != nil { + return nil + } + dChunks = append(dChunks, mChunks...) + for _, chunk := range dChunks { + outputChan <- &Item{ + vid: chunk.Fid.VolumeId, + fileKey: chunk.Fid.FileKey, + cookie: chunk.Fid.Cookie, + path: util.NewFullPath(entry.Dir, entry.Entry.Name), + } + } + return nil + }) + + return nil } -func (c *commandVolumeFsck) findExtraChunksInVolumeServers(volumeIdToVInfo map[uint32]VInfo, tempFolder string, writer io.Writer, verbose *bool, applyPurging *bool) (error) { +func (c *commandVolumeFsck) findFilerChunksMissingInVolumeServers(volumeIdToVInfo map[uint32]VInfo, tempFolder string, writer io.Writer, verbose bool, applyPurging *bool) error { + + for volumeId, vinfo := range volumeIdToVInfo { + checkErr := c.oneVolumeFileIdsCheckOneVolume(tempFolder, volumeId, writer, verbose) + if checkErr != nil { + return fmt.Errorf("failed to collect file ids from volume %d on %s: %v", volumeId, vinfo.server, checkErr) + } + } + return nil +} + +func (c *commandVolumeFsck) findExtraChunksInVolumeServers(volumeIdToVInfo map[uint32]VInfo, tempFolder string, writer io.Writer, verbose bool, applyPurging *bool) error { var totalInUseCount, totalOrphanChunkCount, totalOrphanDataSize uint64 for volumeId, vinfo := range volumeIdToVInfo { - inUseCount, orphanFileIds, orphanDataSize, checkErr := c.oneVolumeFileIdsSubtractFilerFileIds(tempFolder, volumeId, writer, *verbose) + inUseCount, orphanFileIds, orphanDataSize, checkErr := c.oneVolumeFileIdsSubtractFilerFileIds(tempFolder, volumeId, writer, verbose) if checkErr != nil { return fmt.Errorf("failed to collect file ids from volume %d on %s: %v", volumeId, vinfo.server, checkErr) } @@ -108,7 +200,7 @@ func (c *commandVolumeFsck) findExtraChunksInVolumeServers(volumeIdToVInfo map[u totalOrphanChunkCount += uint64(len(orphanFileIds)) totalOrphanDataSize += orphanDataSize - if *verbose { + if verbose { for _, fid := range orphanFileIds { fmt.Fprintf(writer, "%sxxxxxxxx\n", fid) } @@ -223,6 +315,59 @@ func (c *commandVolumeFsck) collectFilerFileIds(tempFolder string, volumeIdToSer }) } +func (c *commandVolumeFsck) oneVolumeFileIdsCheckOneVolume(tempFolder string, volumeId uint32, writer io.Writer, verbose bool) (err error) { + + db := needle_map.NewMemDb() + defer db.Close() + + if err = db.LoadFromIdx(getVolumeFileIdFile(tempFolder, volumeId)); err != nil { + return + } + + file := getFilerFileIdFile(tempFolder, volumeId) + fp, err := os.Open(file) + if err != nil { + return + } + defer fp.Close() + + type Item struct { + fileKey uint64 + cookie uint32 + path util.FullPath + } + + br := bufio.NewReader(fp) + buffer := make([]byte, 16) + item := &Item{} + var readSize int + for { + readSize, err = br.Read(buffer) + if err != nil || readSize != 16 { + if err == io.EOF { + return nil + } else { + break + } + } + + item.fileKey = util.BytesToUint64(buffer[:8]) + item.cookie = util.BytesToUint32(buffer[8:12]) + pathSize := util.BytesToUint32(buffer[12:16]) + pathBytes := make([]byte, int(pathSize)) + _, err = br.Read(pathBytes) + item.path = util.FullPath(string(pathBytes)) + + if _, found := db.Get(types.NeedleId(item.fileKey)); !found { + fmt.Fprintf(writer, "%d,%x%08x in %s not found\n", volumeId, item.fileKey, item.cookie, item.path) + } + + } + + return + +} + func (c *commandVolumeFsck) oneVolumeFileIdsSubtractFilerFileIds(tempFolder string, volumeId uint32, writer io.Writer, verbose bool) (inUseCount uint64, orphanFileIds []string, orphanDataSize uint64, err error) { db := needle_map.NewMemDb()