@ -13,6 +13,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
"github.com/seaweedfs/seaweedfs/weed/storage"
"github.com/seaweedfs/seaweedfs/weed/storage/idx"
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
"github.com/seaweedfs/seaweedfs/weed/storage/needle_map"
@ -26,6 +27,7 @@ import (
"os"
"path"
"path/filepath"
"strconv"
"strings"
"sync"
"time"
@ -35,9 +37,21 @@ func init() {
Commands = append ( Commands , & commandVolumeFsck { } )
}
const (
readbufferSize = 16
)
type commandVolumeFsck struct {
env * CommandEnv
forcePurging * bool
env * CommandEnv
writer io . Writer
bucketsPath string
collection * string
volumeIds map [ uint32 ] bool
tempFolder string
verbose * bool
forcePurging * bool
findMissingChunksInFiler * bool
verifyNeedle * bool
}
func ( c * commandVolumeFsck ) Name ( ) string {
@ -67,15 +81,16 @@ func (c *commandVolumeFsck) Help() string {
func ( c * commandVolumeFsck ) Do ( args [ ] string , commandEnv * CommandEnv , writer io . Writer ) ( err error ) {
fsckCommand := flag . NewFlagSet ( c . Name ( ) , flag . ContinueOnError )
verbose : = fsckCommand . Bool ( "v" , false , "verbose mode" )
findMissingChunksInFiler : = fsckCommand . Bool ( "findMissingChunksInFiler" , false , "see \"help volume.fsck\"" )
findMissingChunksInFilerPath : = fsckCommand . String ( "findMissingChunksInFilerPath " , "/ " , "used together with findMissingChunksInFiler " )
findMissingChunksInVolumeId := fsckCommand . Int ( "findMissingChunksInVolumeId" , 0 , "used together with findMissingChunksInFiler ")
c . verbose = fsckCommand . Bool ( "v" , false , "verbose mode" )
c . findMissingChunksInFiler = fsckCommand . Bool ( "findMissingChunksInFiler" , false , "see \"help volume.fsck\"" )
c . collection = fsckCommand . String ( "collection " , "" , "the collection name " )
volumeIds := fsckCommand . String ( "volumeId" , "" , "comma separated the volume id ")
applyPurging := fsckCommand . Bool ( "reallyDeleteFromVolume" , false , "<expert only!> after detection, delete missing data from volumes / delete missing file entries from filer. Currently this only works with default filerGroup." )
c . forcePurging = fsckCommand . Bool ( "forcePurging" , false , "delete missing data from volumes in one replica used together with applyPurging" )
purgeAbsent := fsckCommand . Bool ( "reallyDeleteFilerEntries" , false , "<expert only!> delete missing file entries from filer if the corresponding volume is missing for any reason, please ensure all still existing/expected volumes are connected! used together with findMissingChunksInFiler" )
tempPath := fsckCommand . String ( "tempPath" , path . Join ( os . TempDir ( ) ) , "path for temporary idx files" )
cutoffTimeAgo := fsckCommand . Duration ( "cutoffTimeAgo" , 5 * time . Minute , "only include entries on volume servers before this cutoff time to check orphan chunks" )
c . verifyNeedle = fsckCommand . Bool ( "verifyNeedles" , false , "check needles status from volume server" )
if err = fsckCommand . Parse ( args ) ; err != nil {
return nil
@ -84,78 +99,87 @@ func (c *commandVolumeFsck) Do(args []string, commandEnv *CommandEnv, writer io.
if err = commandEnv . confirmIsLocked ( args ) ; err != nil {
return
}
c . volumeIds = make ( map [ uint32 ] bool )
if * volumeIds != "" {
for _ , volumeIdStr := range strings . Split ( * volumeIds , "," ) {
if volumeIdInt , err := strconv . ParseUint ( volumeIdStr , 10 , 32 ) ; err == nil {
c . volumeIds [ uint32 ( volumeIdInt ) ] = true
} else {
return fmt . Errorf ( "parse volumeId string %s to int: %v" , volumeIdStr , err )
}
}
}
c . env = commandEnv
c . writer = writer
c . bucketsPath , err = readFilerBucketsPath ( commandEnv )
if err != nil {
return fmt . Errorf ( "read filer buckets path: %v" , err )
}
// create a temp folder
tempFolder , err := os . MkdirTemp ( * tempPath , "sw_fsck" )
c . tempFolder , err = os . MkdirTemp ( * tempPath , "sw_fsck" )
if err != nil {
return fmt . Errorf ( "failed to create temp folder: %v" , err )
}
if * verbose {
fmt . Fprintf ( writer , "working directory: %s\n" , tempFolder )
if * c . verbose {
fmt . Fprintf ( c . writer , "working directory: %s\n" , c . tempFolder )
}
defer os . RemoveAll ( tempFolder )
defer os . RemoveAll ( c . tempFolder )
// collect all volume id locations
dataNodeVolumeIdToVInfo , err := c . collectVolumeIds ( commandEnv , * verbose , writer )
dataNodeVolumeIdToVInfo , err := c . collectVolumeIds ( )
if err != nil {
return fmt . Errorf ( "failed to collect all volume locations: %v" , err )
}
isBucketsPath := false
var fillerBucketsPath string
if * findMissingChunksInFiler && * findMissingChunksInFilerPath != "/" {
fillerBucketsPath , err = readFilerBucketsPath ( commandEnv )
if err != nil {
return fmt . Errorf ( "read filer buckets path: %v" , err )
}
if strings . HasPrefix ( * findMissingChunksInFilerPath , fillerBucketsPath ) {
isBucketsPath = true
}
}
if err != nil {
return fmt . Errorf ( "read filer buckets path: %v" , err )
}
collectMtime := time . Now ( ) . Unix ( )
collectCutoffFromAtNs := time . Now ( ) . UnixNano ( )
// collect each volume file ids
for dataNodeId , volumeIdToVInfo := range dataNodeVolumeIdToVInfo {
for volumeId , vinfo := range volumeIdToVInfo {
if * findMissingChunksInVolumeId > 0 && uint32 ( * findMissingChunksInVolumeId ) != volumeId {
delete ( volumeIdToVInfo , volumeId )
continue
if len ( c . volumeIds ) > 0 {
if _ , ok := c . volumeIds [ volumeId ] ; ! ok {
delete ( volumeIdToVInfo , volumeId )
continue
}
}
if isBucketsPath && ! strings . HasPrefix ( * findMissingChunksInFilerPath , fillerBucketsPath + "/" + vinfo . collection ) {
if * c . collection != "" && vinfo . collection != * c . collection {
delete ( volumeIdToVInfo , volumeId )
continue
}
cutoffFrom := time . Now ( ) . Add ( - * cutoffTimeAgo ) . UnixNano ( )
err = c . collectOneVolumeFileIds ( tempFolder , dataNodeId , volumeId , vinfo , * verbose , writer , uint64 ( cutoffFrom ) )
err = c . collectOneVolumeFileIds ( dataNodeId , volumeId , vinfo , uint64 ( cutoffFrom ) )
if err != nil {
return fmt . Errorf ( "failed to collect file ids from volume %d on %s: %v" , volumeId , vinfo . server , err )
}
}
if * c . verbose {
fmt . Fprintf ( c . writer , "dn %+v filtred %d volumes and locations.\n" , dataNodeId , len ( dataNodeVolumeIdToVInfo [ dataNodeId ] ) )
}
}
if * findMissingChunksInFiler {
if * c . findMissingChunksInFiler {
// collect all filer file ids and paths
if err = c . collectFilerFileIdAndPaths ( dataNodeVolumeIdToVInfo , tempFolder , writer , * findMissingChunksInFilerPath , * verbose , * purgeAbsent , collectMtime ) ; err != nil {
if err = c . collectFilerFileIdAndPaths ( dataNodeVolumeIdToVInfo , * purgeAbsent , collectCutoffFromAtNs ) ; err != nil {
return fmt . Errorf ( "collectFilerFileIdAndPaths: %v" , err )
}
for dataNodeId , volumeIdToVInfo := range dataNodeVolumeIdToVInfo {
// for each volume, check filer file ids
if err = c . findFilerChunksMissingInVolumeServers ( volumeIdToVInfo , tempFolder , dataNodeId , writer , * verbose , * applyPurging ) ; err != nil {
if err = c . findFilerChunksMissingInVolumeServers ( volumeIdToVInfo , dataNodeId , * applyPurging ) ; err != nil {
return fmt . Errorf ( "findFilerChunksMissingInVolumeServers: %v" , err )
}
}
} else {
// collect all filer file ids
if err = c . collectFilerFileIds ( dataNodeVolumeIdToVInfo , tempFolder , writer , * verbose ) ; err != nil {
if err = c . collectFilerFileIdAndPath s ( dataNodeVolumeIdToVInfo , false , 0 ) ; err != nil {
return fmt . Errorf ( "failed to collect file ids from filer: %v" , err )
}
// volume file ids subtract filer file ids
if err = c . findExtraChunksInVolumeServers ( dataNodeVolumeIdToVInfo , tempFolder , writer , * verbose , * applyPurging ) ; err != nil {
if err = c . findExtraChunksInVolumeServers ( dataNodeVolumeIdToVInfo , * applyPurging ) ; err != nil {
return fmt . Errorf ( "findExtraChunksInVolumeServers: %v" , err )
}
}
@ -163,10 +187,9 @@ func (c *commandVolumeFsck) Do(args []string, commandEnv *CommandEnv, writer io.
return nil
}
func ( c * commandVolumeFsck ) collectFilerFileIdAndPaths ( dataNodeVolumeIdToVInfo map [ string ] map [ uint32 ] VInfo , tempFolder string , writer io . Writer , filerPath string , verbose bool , purgeAbsent bool , collectMtime int64 ) error {
if verbose {
fmt . Fprintf ( writer , "checking each file from filer ...\n" )
func ( c * commandVolumeFsck ) collectFilerFileIdAndPaths ( dataNodeVolumeIdToVInfo map [ string ] map [ uint32 ] VInfo , purgeAbsent bool , cutoffFromAtNs int64 ) error {
if * c . verbose {
fmt . Fprintf ( c . writer , "checking each file from filer path %s...\n" , c . getCollectFilerFilePath ( ) )
}
files := make ( map [ uint32 ] * os . File )
@ -175,9 +198,9 @@ func (c *commandVolumeFsck) collectFilerFileIdAndPaths(dataNodeVolumeIdToVInfo m
if _ , ok := files [ vid ] ; ok {
continue
}
dst , openErr := os . OpenFile ( getFilerFileIdFile ( tempFolder , vid ) , os . O_WRONLY | os . O_CREATE | os . O_TRUNC , 0644 )
dst , openErr := os . OpenFile ( getFilerFileIdFile ( c . tempFolder , vid ) , os . O_WRONLY | os . O_CREATE | os . O_TRUNC , 0644 )
if openErr != nil {
return fmt . Errorf ( "failed to create file %s: %v" , getFilerFileIdFile ( tempFolder , vid ) , openErr )
return fmt . Errorf ( "failed to create file %s: %v" , getFilerFileIdFile ( c . tempFolder , vid ) , openErr )
}
files [ vid ] = dst
}
@ -188,60 +211,54 @@ func (c *commandVolumeFsck) collectFilerFileIdAndPaths(dataNodeVolumeIdToVInfo m
}
} ( )
type Item struct {
vid uint32
fileKey uint64
cookie uint32
path util . FullPath
}
return doTraverseBfsAndSaving ( c . env , nil , filerPath , false , func ( entry * filer_pb . FullEntry , outputChan chan interface { } ) ( err error ) {
if verbose && entry . Entry . IsDirectory {
fmt . Fprintf ( writer , "checking directory %s\n" , util . NewFullPath ( entry . Dir , entry . Entry . Name ) )
}
dataChunks , manifestChunks , resolveErr := filer . ResolveChunkManifest ( filer . LookupFn ( c . env ) , entry . Entry . Chunks , 0 , math . MaxInt64 )
if resolveErr != nil {
return nil
}
dataChunks = append ( dataChunks , manifestChunks ... )
for _ , chunk := range dataChunks {
if chunk . Mtime > collectMtime {
continue
return doTraverseBfsAndSaving ( c . env , nil , c . getCollectFilerFilePath ( ) , false ,
func ( entry * filer_pb . FullEntry , outputChan chan interface { } ) ( err error ) {
if * c . verbose && entry . Entry . IsDirectory {
fmt . Fprintf ( c . writer , "checking directory %s\n" , util . NewFullPath ( entry . Dir , entry . Entry . Name ) )
}
outputChan <- & Item {
vid : chunk . Fid . VolumeId ,
fileKey : chunk . Fid . FileKey ,
cookie : chunk . Fid . Cookie ,
path : util . NewFullPath ( entry . Dir , entry . Entry . Name ) ,
dataChunks , manifestChunks , resolveErr := filer . ResolveChunkManifest ( filer . LookupFn ( c . env ) , entry . Entry . Chunks , 0 , math . MaxInt64 )
if resolveErr != nil {
return fmt . Errorf ( "failed to ResolveChunkManifest: %+v" , resolveErr )
}
}
return nil
} , func ( outputChan chan interface { } ) {
buffer := make ( [ ] byte , 16 )
for item := range outputChan {
i := item . ( * Item )
if f , ok := files [ i . vid ] ; ok {
util . Uint64toBytes ( buffer , i . fileKey )
util . Uint32toBytes ( buffer [ 8 : ] , i . cookie )
util . Uint32toBytes ( buffer [ 12 : ] , uint32 ( len ( i . path ) ) )
f . Write ( buffer )
f . Write ( [ ] byte ( i . path ) )
// fmt.Fprintf(writer, "%d,%x%08x %d %s\n", i.vid, i.fileKey, i.cookie, len(i.path), i.path)
} else {
fmt . Fprintf ( writer , "%d,%x%08x %s volume not found\n" , i . vid , i . fileKey , i . cookie , i . path )
if purgeAbsent {
fmt . Printf ( "deleting path %s after volume not found" , i . path )
c . httpDelete ( i . path , verbose )
dataChunks = append ( dataChunks , manifestChunks ... )
for _ , chunk := range dataChunks {
if cutoffFromAtNs != 0 && chunk . ModifiedTsNs > cutoffFromAtNs {
continue
}
outputChan <- & Item {
vid : chunk . Fid . VolumeId ,
fileKey : chunk . Fid . FileKey ,
cookie : chunk . Fid . Cookie ,
path : util . NewFullPath ( entry . Dir , entry . Entry . Name ) ,
}
}
}
} )
return nil
} ,
func ( outputChan chan interface { } ) {
buffer := make ( [ ] byte , readbufferSize )
for item := range outputChan {
i := item . ( * Item )
if f , ok := files [ i . vid ] ; ok {
util . Uint64toBytes ( buffer , i . fileKey )
util . Uint32toBytes ( buffer [ 8 : ] , i . cookie )
util . Uint32toBytes ( buffer [ 12 : ] , uint32 ( len ( i . path ) ) )
f . Write ( buffer )
f . Write ( [ ] byte ( i . path ) )
} else if * c . findMissingChunksInFiler && len ( c . volumeIds ) == 0 {
fmt . Fprintf ( c . writer , "%d,%x%08x %s volume not found\n" , i . vid , i . fileKey , i . cookie , i . path )
if purgeAbsent {
fmt . Printf ( "deleting path %s after volume not found" , i . path )
c . httpDelete ( i . path )
}
}
}
} )
}
func ( c * commandVolumeFsck ) findFilerChunksMissingInVolumeServers ( volumeIdToVInfo map [ uint32 ] VInfo , tempFolder string , dataNodeId string , writer io . Writer , verbose bool , applyPurging bool ) error {
func ( c * commandVolumeFsck ) findFilerChunksMissingInVolumeServers ( volumeIdToVInfo map [ uint32 ] VInfo , dataNodeId string , applyPurging bool ) error {
for volumeId , vinfo := range volumeIdToVInfo {
checkErr := c . oneVolumeFileIdsCheckOneVolume ( tempFolder , dataNodeId , volumeId , writer , verbose , applyPurging )
checkErr := c . oneVolumeFileIdsCheckOneVolume ( dataNodeId , volumeId , applyPurging )
if checkErr != nil {
return fmt . Errorf ( "failed to collect file ids from volume %d on %s: %v" , volumeId , vinfo . server , checkErr )
}
@ -249,7 +266,7 @@ func (c *commandVolumeFsck) findFilerChunksMissingInVolumeServers(volumeIdToVInf
return nil
}
func ( c * commandVolumeFsck ) findExtraChunksInVolumeServers ( dataNodeVolumeIdToVInfo map [ string ] map [ uint32 ] VInfo , tempFolder string , writer io . Writer , verbose bool , applyPurging bool ) error {
func ( c * commandVolumeFsck ) findExtraChunksInVolumeServers ( dataNodeVolumeIdToVInfo map [ string ] map [ uint32 ] VInfo , applyPurging bool ) error {
var totalInUseCount , totalOrphanChunkCount , totalOrphanDataSize uint64
volumeIdOrphanFileIds := make ( map [ uint32 ] map [ string ] bool )
@ -259,7 +276,7 @@ func (c *commandVolumeFsck) findExtraChunksInVolumeServers(dataNodeVolumeIdToVIn
serverReplicas := make ( map [ uint32 ] [ ] pb . ServerAddress )
for dataNodeId , volumeIdToVInfo := range dataNodeVolumeIdToVInfo {
for volumeId , vinfo := range volumeIdToVInfo {
inUseCount , orphanFileIds , orphanDataSize , checkErr := c . oneVolumeFileIdsSubtractFilerFileIds ( tempFolder , dataNodeId , volumeId , writer , verbose )
inUseCount , orphanFileIds , orphanDataSize , checkErr := c . oneVolumeFileIdsSubtractFilerFileIds ( dataNodeId , volumeId , & vinfo )
if checkErr != nil {
return fmt . Errorf ( "failed to collect file ids from volume %d on %s: %v" , volumeId , vinfo . server , checkErr )
}
@ -282,9 +299,9 @@ func (c *commandVolumeFsck) findExtraChunksInVolumeServers(dataNodeVolumeIdToVIn
totalOrphanChunkCount += uint64 ( len ( orphanFileIds ) )
totalOrphanDataSize += orphanDataSize
if verbose {
if * c . verbose {
for _ , fid := range orphanFileIds {
fmt . Fprintf ( writer , "%s\n" , fid )
fmt . Fprintf ( c . writer , "%s:%s \n" , vinfo . collection , fid )
}
}
isEcVolumeReplicas [ volumeId ] = vinfo . isEcVolume
@ -307,12 +324,12 @@ func (c *commandVolumeFsck) findExtraChunksInVolumeServers(dataNodeVolumeIdToVIn
if ! ( len ( orphanFileIds ) > 0 ) {
continue
}
if verbose {
fmt . Fprintf ( writer , "purging process for volume %d.\n" , volumeId )
if * c . verbose {
fmt . Fprintf ( c . writer , "purging process for volume %d.\n" , volumeId )
}
if isEcVolumeReplicas [ volumeId ] {
fmt . Fprintf ( writer , "skip purging for Erasure Coded volume %d.\n" , volumeId )
fmt . Fprintf ( c . writer , "skip purging for Erasure Coded volume %d.\n" , volumeId )
continue
}
for _ , server := range serverReplicas [ volumeId ] {
@ -323,17 +340,17 @@ func (c *commandVolumeFsck) findExtraChunksInVolumeServers(dataNodeVolumeIdToVIn
if err != nil {
return fmt . Errorf ( "mark volume %d read/write: %v" , volumeId , err )
}
fmt . Fprintf ( writer , "temporarily marked %d on server %v writable for forced purge\n" , volumeId , server )
fmt . Fprintf ( c . writer , "temporarily marked %d on server %v writable for forced purge\n" , volumeId , server )
defer markVolumeWritable ( c . env . option . GrpcDialOption , needleVID , server , false )
fmt . Fprintf ( writer , "marked %d on server %v writable for forced purge\n" , volumeId , server )
fmt . Fprintf ( c . writer , "marked %d on server %v writable for forced purge\n" , volumeId , server )
}
if verbose {
fmt . Fprintf ( writer , "purging files from volume %d\n" , volumeId )
if * c . verbose {
fmt . Fprintf ( c . writer , "purging files from volume %d\n" , volumeId )
}
if err := c . purgeFileIdsForOneVolume ( volumeId , orphanFileIds , writer ) ; err != nil {
if err := c . purgeFileIdsForOneVolume ( volumeId , orphanFileIds ) ; err != nil {
return fmt . Errorf ( "purging volume %d: %v" , volumeId , err )
}
}
@ -342,203 +359,160 @@ func (c *commandVolumeFsck) findExtraChunksInVolumeServers(dataNodeVolumeIdToVIn
if ! applyPurging {
pct := float64 ( totalOrphanChunkCount * 100 ) / ( float64 ( totalOrphanChunkCount + totalInUseCount ) )
fmt . Fprintf ( writer , "\nTotal\t\tentries:%d\torphan:%d\t%.2f%%\t%dB\n" ,
fmt . Fprintf ( c . writer , "\nTotal\t\tentries:%d\torphan:%d\t%.2f%%\t%dB\n" ,
totalOrphanChunkCount + totalInUseCount , totalOrphanChunkCount , pct , totalOrphanDataSize )
fmt . Fprintf ( writer , "This could be normal if multiple filers or no filers are used.\n" )
fmt . Fprintf ( c . writer , "This could be normal if multiple filers or no filers are used.\n" )
}
if totalOrphanChunkCount == 0 {
fmt . Fprintf ( writer , "no orphan data\n" )
//return nil
fmt . Fprintf ( c . writer , "no orphan data\n" )
}
return nil
}
func ( c * commandVolumeFsck ) collectOneVolumeFileIds ( tempFolder string , dataNodeId string , volumeId uint32 , vinfo VInfo , verbose bool , writer io . Writer , cutoffFrom uint64 ) error {
func ( c * commandVolumeFsck ) collectOneVolumeFileIds ( dataNodeId string , volumeId uint32 , vinfo VInfo , cutoffFrom uint64 ) error {
if verbose {
fmt . Fprintf ( writer , "collecting volume %d file ids from %s ...\n" , volumeId , vinfo . server )
if * c . verbose {
fmt . Fprintf ( c . writer , "collecting volume %d file ids from %s ...\n" , volumeId , vinfo . server )
}
return operation . WithVolumeServerClient ( false , vinfo . server , c . env . option . GrpcDialOption , func ( volumeServerClient volume_server_pb . VolumeServerClient ) error {
ext := ".idx"
if vinfo . isEcVolume {
ext = ".ecx"
}
copyFileClient , err := volumeServerClient . CopyFile ( context . Background ( ) , & volume_server_pb . CopyFileRequest {
VolumeId : volumeId ,
Ext : ext ,
CompactionRevision : math . MaxUint32 ,
StopOffset : math . MaxInt64 ,
Collection : vinfo . collection ,
IsEcVolume : vinfo . isEcVolume ,
IgnoreSourceFileNotFound : false ,
} )
if err != nil {
return fmt . Errorf ( "failed to start copying volume %d%s: %v" , volumeId , ext , err )
}
var buf bytes . Buffer
for {
resp , err := copyFileClient . Recv ( )
if errors . Is ( err , io . EOF ) {
break
return operation . WithVolumeServerClient ( false , vinfo . server , c . env . option . GrpcDialOption ,
func ( volumeServerClient volume_server_pb . VolumeServerClient ) error {
ext := ".idx"
if vinfo . isEcVolume {
ext = ".ecx"
}
copyFileClient , err := volumeServerClient . CopyFile ( context . Background ( ) , & volume_server_pb . CopyFileRequest {
VolumeId : volumeId ,
Ext : ext ,
CompactionRevision : math . MaxUint32 ,
StopOffset : math . MaxInt64 ,
Collection : vinfo . collection ,
IsEcVolume : vinfo . isEcVolume ,
IgnoreSourceFileNotFound : false ,
} )
if err != nil {
return err
return fmt . Errorf ( "failed to start copying volume %d%s: %v" , volumeId , ext , err )
}
buf . Write ( resp . FileContent )
}
fileredBuf := filterDeletedNeedleFromIdx ( buf . Bytes ( ) )
if vinfo . isReadOnly == false {
index , err := idx . FirstInvalidIndex ( fileredBuf . Bytes ( ) , func ( key types . NeedleId , offset types . Offset , size types . Size ) ( bool , error ) {
resp , err := volumeServerClient . ReadNeedleMeta ( context . Background ( ) , & volume_server_pb . ReadNeedleMetaRequest {
VolumeId : volumeId ,
NeedleId : uint64 ( key ) ,
Offset : offset . ToActualOffset ( ) ,
Size : int32 ( size ) ,
} )
var buf bytes . Buffer
for {
resp , err := copyFileClient . Recv ( )
if errors . Is ( err , io . EOF ) {
break
}
if err != nil {
return false , fmt . Errorf ( "to read needle meta with id %d from volume %d with error %v" , key , volumeId , err )
return err
}
return resp . LastModified <= cutoffFrom , nil
} )
buf . Write ( resp . FileContent )
}
if vinfo . isReadOnly == false {
index , err := idx . FirstInvalidIndex ( buf . Bytes ( ) ,
func ( key types . NeedleId , offset types . Offset , size types . Size ) ( bool , error ) {
resp , err := volumeServerClient . ReadNeedleMeta ( context . Background ( ) , & volume_server_pb . ReadNeedleMetaRequest {
VolumeId : volumeId ,
NeedleId : uint64 ( key ) ,
Offset : offset . ToActualOffset ( ) ,
Size : int32 ( size ) ,
} )
if err != nil {
return false , fmt . Errorf ( "to read needle meta with id %d from volume %d with error %v" , key , volumeId , err )
}
return resp . AppendAtNs <= cutoffFrom , nil
} )
if err != nil {
fmt . Fprintf ( c . writer , "Failed to search for last valid index on volume %d with error %v" , volumeId , err )
} else {
buf . Truncate ( index * types . NeedleMapEntrySize )
}
}
idxFilename := getVolumeFileIdFile ( c . tempFolder , dataNodeId , volumeId )
err = writeToFile ( buf . Bytes ( ) , idxFilename )
if err != nil {
fmt . Fprintf ( writer , "Failed to search for last valid index on volume %d with error %v" , volumeId , err )
} else {
fileredBuf . Truncate ( index * types . NeedleMapEntrySize )
return fmt . Errorf ( "failed to copy %d%s from %s: %v" , volumeId , ext , vinfo . server , err )
}
}
idxFilename := getVolumeFileIdFile ( tempFolder , dataNodeId , volumeId )
err = writeToFile ( fileredBuf . Bytes ( ) , idxFilename )
if err != nil {
return fmt . Errorf ( "failed to copy %d%s from %s: %v" , volumeId , ext , vinfo . server , err )
}
return nil
} )
return nil
} )
}
func ( c * commandVolumeFsck ) collectFilerFileIds ( dataNodeVolumeIdToVInfo map [ string ] map [ uint32 ] VInfo , tempFolder string , writer io . Writer , verbose bool ) error {
if verbose {
fmt . Fprintf ( writer , "collecting file ids from filer ...\n" )
}
files := make ( map [ uint32 ] * os . File )
for _ , volumeIdToServer := range dataNodeVolumeIdToVInfo {
for vid := range volumeIdToServer {
dst , openErr := os . OpenFile ( getFilerFileIdFile ( tempFolder , vid ) , os . O_WRONLY | os . O_CREATE | os . O_TRUNC , 0644 )
if openErr != nil {
return fmt . Errorf ( "failed to create file %s: %v" , getFilerFileIdFile ( tempFolder , vid ) , openErr )
}
files [ vid ] = dst
}
}
defer func ( ) {
for _ , f := range files {
f . Close ( )
}
} ( )
type Item struct {
vid uint32
fileKey uint64
}
return doTraverseBfsAndSaving ( c . env , nil , "/" , false , func ( entry * filer_pb . FullEntry , outputChan chan interface { } ) ( err error ) {
dataChunks , manifestChunks , resolveErr := filer . ResolveChunkManifest ( filer . LookupFn ( c . env ) , entry . Entry . Chunks , 0 , math . MaxInt64 )
if resolveErr != nil {
if verbose {
fmt . Fprintf ( writer , "resolving manifest chunks in %s: %v\n" , util . NewFullPath ( entry . Dir , entry . Entry . Name ) , resolveErr )
}
return nil
}
dataChunks = append ( dataChunks , manifestChunks ... )
for _ , chunk := range dataChunks {
outputChan <- & Item {
vid : chunk . Fid . VolumeId ,
fileKey : chunk . Fid . FileKey ,
}
}
return nil
} , func ( outputChan chan interface { } ) {
buffer := make ( [ ] byte , 8 )
for item := range outputChan {
i := item . ( * Item )
util . Uint64toBytes ( buffer , i . fileKey )
files [ i . vid ] . Write ( buffer )
}
} )
type Item struct {
vid uint32
fileKey uint64
cookie uint32
path util . FullPath
}
func ( c * commandVolumeFsck ) oneVolumeFileIdsCheckOneVolume ( tempFolder string , dataNodeId string , volumeId uint32 , writer io . Writer , verbose bool , applyPurging bool ) ( err error ) {
if verbose {
fmt . Fprintf ( writer , "find missing file chunks in dataNodeId %s volume %d ...\n" , dataNodeId , volumeId )
}
db := needle_map . NewMemDb ( )
defer db . Close ( )
if err = db . LoadFromIdx ( getVolumeFileIdFile ( tempFolder , dataNodeId , volumeId ) ) ; err != nil {
return
}
file := getFilerFileIdFile ( tempFolder , volumeId )
fp , err := os . Open ( file )
func ( c * commandVolumeFsck ) readFilerFileIdFile ( volumeId uint32 , fn func ( needleId types . NeedleId , itemPath util . FullPath ) ) error {
fp , err := os . Open ( getFilerFileIdFile ( c . tempFolder , volumeId ) )
if err != nil {
return
return err
}
defer fp . Close ( )
type Item struct {
fileKey uint64
cookie uint32
path util . FullPath
}
br := bufio . NewReader ( fp )
buffer := make ( [ ] byte , 16 )
item := & Item { }
buffer := make ( [ ] byte , readbufferSize )
var readSize int
var readErr error
item := & Item { vid : volumeId }
for {
readSize , err = io . ReadFull ( br , buffer )
if err != nil || readSize != 16 {
readSize , readErr = io . ReadFull ( br , buffer )
if errors . Is ( readErr , io . EOF ) {
break
}
if readErr != nil {
return readErr
}
if readSize != readbufferSize {
return fmt . Errorf ( "readSize mismatch" )
}
item . fileKey = util . BytesToUint64 ( buffer [ : 8 ] )
item . cookie = util . BytesToUint32 ( buffer [ 8 : 12 ] )
pathSize := util . BytesToUint32 ( buffer [ 12 : 16 ] )
pathBytes := make ( [ ] byte , int ( pathSize ) )
n , err := io . ReadFull ( br , pathBytes )
if err != nil {
fmt . Fprintf ( writer , "%d,%x%08x in unexpected error: %v\n" , volumeId , item . fileKey , item . cookie , err )
fmt . Fprintf ( c . writer , "%d,%x%08x in unexpected error: %v\n" , volumeId , item . fileKey , item . cookie , err )
}
if n != int ( pathSize ) {
fmt . Fprintf ( writer , "%d,%x%08x %d unexpected file name size %d\n" , volumeId , item . fileKey , item . cookie , pathSize , n )
fmt . Fprintf ( c . writer , "%d,%x%08x %d unexpected file name size %d\n" , volumeId , item . fileKey , item . cookie , pathSize , n )
}
item . path = util . FullPath ( string ( pathBytes ) )
item . path = util . FullPath ( pathBytes )
needleId := types . NeedleId ( item . fileKey )
if _ , found := db . Get ( needleId ) ; ! found {
fmt . Fprintf ( writer , "%s\n" , item . path )
fn ( needleId , item . path )
}
return nil
}
func ( c * commandVolumeFsck ) oneVolumeFileIdsCheckOneVolume ( dataNodeId string , volumeId uint32 , applyPurging bool ) ( err error ) {
if * c . verbose {
fmt . Fprintf ( c . writer , "find missing file chunks in dataNodeId %s volume %d ...\n" , dataNodeId , volumeId )
}
db := needle_map . NewMemDb ( )
defer db . Close ( )
if err = db . LoadFromIdx ( getVolumeFileIdFile ( c . tempFolder , dataNodeId , volumeId ) ) ; err != nil {
return
}
if err = c . readFilerFileIdFile ( volumeId , func ( needleId types . NeedleId , itemPath util . FullPath ) {
if _ , found := db . Get ( needleId ) ; ! found {
fmt . Fprintf ( c . writer , "%s\n" , itemPath )
if applyPurging {
// defining the URL this way automatically escapes complex path names
c . httpDelete ( item . path , verbose )
c . httpDelete ( itemPath )
}
}
} ) ; err != nil {
return
}
return nil
}
func ( c * commandVolumeFsck ) httpDelete ( path util . FullPath , verbose bool ) {
func ( c * commandVolumeFsck ) httpDelete ( path util . FullPath ) {
req , err := http . NewRequest ( http . MethodDelete , "" , nil )
req . URL = & url . URL {
@ -546,69 +520,84 @@ func (c *commandVolumeFsck) httpDelete(path util.FullPath, verbose bool) {
Host : c . env . option . FilerAddress . ToHttpAddress ( ) ,
Path : string ( path ) ,
}
if verbose {
fmt . Printf ( "full HTTP delete request to be sent: %v\n" , req )
if * c . verbose {
fmt . Fprintf ( c . writer , "full HTTP delete request to be sent: %v\n" , req )
}
if err != nil {
fmt . Errorf ( "HTTP delete request error: %v\n" , err )
fmt . Fprintf ( c . writer , "HTTP delete request error: %v\n" , err )
}
client := & http . Client { }
resp , err := client . Do ( req )
if err != nil {
fmt . Errorf ( "DELETE fetch error: %v\n" , err )
fmt . Fprintf ( c . writer , "DELETE fetch error: %v\n" , err )
}
defer resp . Body . Close ( )
_ , err = ioutil . ReadAll ( resp . Body )
if err != nil {
fmt . Errorf ( "DELETE response error: %v\n" , err )
fmt . Fprintf ( c . writer , "DELETE response error: %v\n" , err )
}
if verbose {
fmt . Println ( "delete response Status : " , resp . Status )
fmt . Println ( "delete response Headers : " , resp . Header )
if * c . verbose {
fmt . Fprintln ( c . writer , "delete response Status : " , resp . Status )
fmt . Fprintln ( c . writer , "delete response Headers : " , resp . Header )
}
}
func ( c * commandVolumeFsck ) oneVolumeFileIdsSubtractFilerFileIds ( tempFolder string , dataNodeId string , volumeId uint32 , writer io . Writer , verbose bool ) ( inUseCount uint64 , orphanFileIds [ ] string , orphanDataSize uint64 , err error ) {
func ( c * commandVolumeFsck ) oneVolumeFileIdsSubtractFilerFileIds ( dataNodeId string , volumeId uint32 , vinfo * VInfo ) ( inUseCount uint64 , orphanFileIds [ ] string , orphanDataSize uint64 , err error ) {
db := needle_map . NewMemDb ( )
defer db . Close ( )
volumeFileI dD b := needle_map . NewMemDb ( )
defer volumeFileI dD b. Close ( )
if err = db . LoadFromIdx ( getVolumeFileIdFile ( tempFolder , dataNodeId , volumeId ) ) ; err != nil {
if err = volumeFileIdDb . LoadFromIdx ( getVolumeFileIdFile ( c . tempFolder , dataNodeId , volumeId ) ) ; err != nil {
err = fmt . Errorf ( "failed to LoadFromIdx %+v" , err )
return
}
filerFileIdsData , err := os . ReadFile ( getFilerFileIdFile ( tempFolder , volumeId ) )
if err != nil {
return
}
dataLen := len ( filerFileIdsData )
if dataLen % 8 != 0 {
return 0 , nil , 0 , fmt . Errorf ( "filer data is corrupted" )
}
for i := 0 ; i < len ( filerFileIdsData ) ; i += 8 {
fileKey := util . BytesToUint64 ( filerFileIdsData [ i : i + 8 ] )
db . Delete ( types . NeedleId ( fileKey ) )
if err = c . readFilerFileIdFile ( volumeId , func ( filerNeedleId types . NeedleId , itemPath util . FullPath ) {
inUseCount ++
if * c . verifyNeedle {
if needleValue , ok := volumeFileIdDb . Get ( filerNeedleId ) ; ok && ! needleValue . Size . IsDeleted ( ) {
if _ , err := readNeedleStatus ( c . env . option . GrpcDialOption , vinfo . server , volumeId , * needleValue ) ; err != nil {
// files may be deleted during copying filesIds
if ! strings . Contains ( err . Error ( ) , storage . ErrorDeleted . Error ( ) ) {
fmt . Fprintf ( c . writer , "failed to read %d:%s needle status of file %s: %+v\n" ,
volumeId , filerNeedleId . String ( ) , itemPath , err )
if * c . forcePurging {
return
}
}
}
}
}
if err = volumeFileIdDb . Delete ( filerNeedleId ) ; err != nil && * c . verbose {
fmt . Fprintf ( c . writer , "failed to nm.delete %s(%+v): %+v" , itemPath , filerNeedleId , err )
}
} ) ; err != nil {
err = fmt . Errorf ( "failed to readFilerFileIdFile %+v" , err )
return
}
var orphanFileCount uint64
db . AscendingVisit ( func ( n needle_map . NeedleValue ) error {
// fmt.Printf("%d,%x\n", volumeId, n.Key)
orphanFileIds = append ( orphanFileIds , fmt . Sprintf ( "%d,%s00000000" , volumeId , n . Key . String ( ) ) )
if err = volumeFileIdDb . AscendingVisit ( func ( n needle_map . NeedleValue ) error {
if n . Size . IsDeleted ( ) {
return nil
}
orphanFileIds = append ( orphanFileIds , n . Key . FileId ( volumeId ) )
orphanFileCount ++
orphanDataSize += uint64 ( n . Size )
return nil
} )
} ) ; err != nil {
err = fmt . Errorf ( "failed to AscendingVisit %+v" , err )
return
}
if orphanFileCount > 0 {
pct := float64 ( orphanFileCount * 100 ) / ( float64 ( orphanFileCount + inUseCount ) )
fmt . Fprintf ( writer , "dataNode:%s\tvolume:%d\tentries:%d\torphan:%d\t%.2f%%\t%dB\n" ,
fmt . Fprintf ( c . writer , "dataNode:%s\tvolume:%d\tentries:%d\torphan:%d\t%.2f%%\t%dB\n" ,
dataNodeId , volumeId , orphanFileCount + inUseCount , orphanFileCount , pct , orphanDataSize )
}
@ -623,15 +612,15 @@ type VInfo struct {
isReadOnly bool
}
func ( c * commandVolumeFsck ) collectVolumeIds ( commandEnv * CommandEnv , verbose bool , writer io . Writer ) ( volumeIdToServer map [ string ] map [ uint32 ] VInfo , err error ) {
func ( c * commandVolumeFsck ) collectVolumeIds ( ) ( volumeIdToServer map [ string ] map [ uint32 ] VInfo , err error ) {
if verbose {
fmt . Fprintf ( writer , "collecting volume id and locations from master ...\n" )
if * c . verbose {
fmt . Fprintf ( c . writer , "collecting volume id and locations from master ...\n" )
}
volumeIdToServer = make ( map [ string ] map [ uint32 ] VInfo )
// collect topology information
topologyInfo , _ , err := collectTopologyInfo ( commandE nv , 0 )
topologyInfo , _ , err := collectTopologyInfo ( c . e nv, 0 )
if err != nil {
return
}
@ -656,17 +645,16 @@ func (c *commandVolumeFsck) collectVolumeIds(commandEnv *CommandEnv, verbose boo
isReadOnly : true ,
}
}
if * c . verbose {
fmt . Fprintf ( c . writer , "dn %+v collected %d volumes and locations.\n" , dataNodeId , len ( volumeIdToServer [ dataNodeId ] ) )
}
}
} )
if verbose {
fmt . Fprintf ( writer , "collected %d volumes and locations.\n" , len ( volumeIdToServer ) )
}
return
}
func ( c * commandVolumeFsck ) purgeFileIdsForOneVolume ( volumeId uint32 , fileIds [ ] string , writer io . Writer ) ( err error ) {
fmt . Fprintf ( writer , "purging orphan data for volume %d...\n" , volumeId )
func ( c * commandVolumeFsck ) purgeFileIdsForOneVolume ( volumeId uint32 , fileIds [ ] string ) ( err error ) {
fmt . Fprintf ( c . writer , "purging orphan data for volume %d...\n" , volumeId )
locations , found := c . env . MasterClient . GetLocations ( volumeId )
if ! found {
return fmt . Errorf ( "failed to find volume %d locations" , volumeId )
@ -693,7 +681,7 @@ func (c *commandVolumeFsck) purgeFileIdsForOneVolume(volumeId uint32, fileIds []
for results := range resultChan {
for _ , result := range results {
if result . Error != "" {
fmt . Fprintf ( writer , "purge error: %s\n" , result . Error )
fmt . Fprintf ( c . writer , "purge error: %s\n" , result . Error )
}
}
}
@ -701,6 +689,13 @@ func (c *commandVolumeFsck) purgeFileIdsForOneVolume(volumeId uint32, fileIds []
return
}
func ( c * commandVolumeFsck ) getCollectFilerFilePath ( ) string {
if * c . collection != "" {
return fmt . Sprintf ( "%s/%s" , c . bucketsPath , * c . collection )
}
return "/"
}
func getVolumeFileIdFile ( tempFolder string , dataNodeid string , vid uint32 ) string {
return filepath . Join ( tempFolder , fmt . Sprintf ( "%s_%d.idx" , dataNodeid , vid ) )
}
@ -720,14 +715,3 @@ func writeToFile(bytes []byte, fileName string) error {
dst . Write ( bytes )
return nil
}
func filterDeletedNeedleFromIdx ( arr [ ] byte ) bytes . Buffer {
var filteredBuf bytes . Buffer
for i := 0 ; i < len ( arr ) ; i += types . NeedleMapEntrySize {
size := types . BytesToSize ( arr [ i + types . NeedleIdSize + types . OffsetSize : i + types . NeedleIdSize + types . OffsetSize + types . SizeSize ] )
if size > 0 {
filteredBuf . Write ( arr [ i : i + types . NeedleIdSize + types . OffsetSize + types . SizeSize ] )
}
}
return filteredBuf
}