@ -11,6 +11,7 @@ import (
"net/http"
"net/url"
"os"
"path"
"path/filepath"
"strings"
"sync"
@ -65,8 +66,11 @@ func (c *commandVolumeFsck) Do(args []string, commandEnv *CommandEnv, writer io.
verbose := fsckCommand . Bool ( "v" , false , "verbose mode" )
findMissingChunksInFiler := fsckCommand . Bool ( "findMissingChunksInFiler" , false , "see \"help volume.fsck\"" )
findMissingChunksInFilerPath := fsckCommand . String ( "findMissingChunksInFilerPath" , "/" , "used together with findMissingChunksInFiler" )
findMissingChunksInVolumeId := fsckCommand . Int ( "findMissingChunksInVolumeId" , 0 , "used together with findMissingChunksInFiler" )
applyPurging := fsckCommand . Bool ( "reallyDeleteFromVolume" , false , "<expert only!> after detection, delete missing data from volumes / delete missing file entries from filer" )
purgeAbsent := fsckCommand . Bool ( "reallyDeleteFilerEntries" , false , "<expert only!> delete missing file entries from filer if the corresponding volume is missing for any reason, please ensure all still existing/expected volumes are connected! used together with findMissingChunksInFiler" )
tempPath := fsckCommand . String ( "tempPath" , path . Join ( os . TempDir ( ) ) , "path for temporary idx files" )
if err = fsckCommand . Parse ( args ) ; err != nil {
return nil
}
@ -78,7 +82,7 @@ func (c *commandVolumeFsck) Do(args []string, commandEnv *CommandEnv, writer io.
c . env = commandEnv
// create a temp folder
tempFolder , err := os . MkdirTemp ( "" , "sw_fsck" )
tempFolder , err := os . MkdirTemp ( * tempPath , "sw_fsck" )
if err != nil {
return fmt . Errorf ( "failed to create temp folder: %v" , err )
}
@ -88,14 +92,14 @@ func (c *commandVolumeFsck) Do(args []string, commandEnv *CommandEnv, writer io.
defer os . RemoveAll ( tempFolder )
// collect all volume id locations
v olumeIdToVInfo, err := c . collectVolumeIds ( commandEnv , * verbose , writer )
dataNodeV olumeIdToVInfo, err := c . collectVolumeIds ( commandEnv , * verbose , writer )
if err != nil {
return fmt . Errorf ( "failed to collect all volume locations: %v" , err )
}
isBucketsPath := false
var fillerBucketsPath string
if * findMissingChunksInFiler && * findMissingChunksInFilerPath != "" {
if * findMissingChunksInFiler && * findMissingChunksInFilerPath != "/ " {
fillerBucketsPath , err = readFilerBucketsPath ( commandEnv )
if err != nil {
return fmt . Errorf ( "read filer buckets path: %v" , err )
@ -109,33 +113,41 @@ func (c *commandVolumeFsck) Do(args []string, commandEnv *CommandEnv, writer io.
}
// collect each volume file ids
for volumeId , vinfo := range volumeIdToVInfo {
if isBucketsPath && ! strings . HasPrefix ( * findMissingChunksInFilerPath , fillerBucketsPath + "/" + vinfo . collection ) {
delete ( volumeIdToVInfo , volumeId )
continue
}
err = c . collectOneVolumeFileIds ( tempFolder , volumeId , vinfo , * verbose , writer )
if err != nil {
return fmt . Errorf ( "failed to collect file ids from volume %d on %s: %v" , volumeId , vinfo . server , err )
for dataNodeId , volumeIdToVInfo := range dataNodeVolumeIdToVInfo {
for volumeId , vinfo := range volumeIdToVInfo {
if * findMissingChunksInVolumeId > 0 && uint32 ( * findMissingChunksInVolumeId ) != volumeId {
delete ( volumeIdToVInfo , volumeId )
continue
}
if isBucketsPath && ! strings . HasPrefix ( * findMissingChunksInFilerPath , fillerBucketsPath + "/" + vinfo . collection ) {
delete ( volumeIdToVInfo , volumeId )
continue
}
err = c . collectOneVolumeFileIds ( tempFolder , dataNodeId , volumeId , vinfo , * verbose , writer )
if err != nil {
return fmt . Errorf ( "failed to collect file ids from volume %d on %s: %v" , volumeId , vinfo . server , err )
}
}
}
if * findMissingChunksInFiler {
// collect all filer file ids and paths
if err = c . collectFilerFileIdAndPaths ( volumeIdToVInfo , tempFolder , writer , * findMissingChunksInFilerPath , * verbose , * purgeAbsent ) ; err != nil {
if err = c . collectFilerFileIdAndPaths ( dataNodeV olumeIdToVInfo, tempFolder , writer , * findMissingChunksInFilerPath , * verbose , * purgeAbsent ) ; err != nil {
return fmt . Errorf ( "collectFilerFileIdAndPaths: %v" , err )
}
// for each volume, check filer file ids
if err = c . findFilerChunksMissingInVolumeServers ( volumeIdToVInfo , tempFolder , writer , * verbose , * applyPurging ) ; err != nil {
return fmt . Errorf ( "findFilerChunksMissingInVolumeServers: %v" , err )
for dataNodeId , volumeIdToVInfo := range dataNodeVolumeIdToVInfo {
// for each volume, check filer file ids
if err = c . findFilerChunksMissingInVolumeServers ( volumeIdToVInfo , tempFolder , dataNodeId , writer , * verbose , * applyPurging ) ; err != nil {
return fmt . Errorf ( "findFilerChunksMissingInVolumeServers: %v" , err )
}
}
} else {
// collect all filer file ids
if err = c . collectFilerFileIds ( v olumeIdToVInfo, tempFolder , writer , * verbose ) ; err != nil {
if err = c . collectFilerFileIds ( dataNodeV olumeIdToVInfo, tempFolder , writer , * verbose ) ; err != nil {
return fmt . Errorf ( "failed to collect file ids from filer: %v" , err )
}
// volume file ids subtract filer file ids
if err = c . findExtraChunksInVolumeServers ( v olumeIdToVInfo, tempFolder , writer , * verbose , * applyPurging ) ; err != nil {
if err = c . findExtraChunksInVolumeServers ( dataNodeV olumeIdToVInfo, tempFolder , writer , * verbose , * applyPurging ) ; err != nil {
return fmt . Errorf ( "findExtraChunksInVolumeServers: %v" , err )
}
}
@ -143,19 +155,23 @@ func (c *commandVolumeFsck) Do(args []string, commandEnv *CommandEnv, writer io.
return nil
}
func ( c * commandVolumeFsck ) collectFilerFileIdAndPaths ( volumeIdToServer map [ uint32 ] VInfo , tempFolder string , writer io . Writer , filerPath string , verbose bool , purgeAbsent bool ) error {
func ( c * commandVolumeFsck ) collectFilerFileIdAndPaths ( dataNodeVolumeIdToVInfo map [ string ] map [ uint32 ] VInfo , tempFolder string , writer io . Writer , filerPath string , verbose bool , purgeAbsent bool ) error {
if verbose {
fmt . Fprintf ( writer , "checking each file from filer ...\n" )
}
files := make ( map [ uint32 ] * os . File )
for vid := range volumeIdToServer {
dst , openErr := os . OpenFile ( getFilerFileIdFile ( tempFolder , vid ) , os . O_WRONLY | os . O_CREATE | os . O_TRUNC , 0644 )
if openErr != nil {
return fmt . Errorf ( "failed to create file %s: %v" , getFilerFileIdFile ( tempFolder , vid ) , openErr )
for _ , volumeIdToServer := range dataNodeVolumeIdToVInfo {
for vid := range volumeIdToServer {
dst , openErr := os . OpenFile ( getFilerFileIdFile ( tempFolder , vid ) , os . O_WRONLY | os . O_CREATE | os . O_TRUNC , 0644 )
if openErr != nil {
return fmt . Errorf ( "failed to create file %s: %v" , getFilerFileIdFile ( tempFolder , vid ) , openErr )
}
if _ , ok := volumeIdToServer [ vid ] ; ! ok {
files [ vid ] = dst
}
}
files [ vid ] = dst
}
defer func ( ) {
for _ , f := range files {
@ -210,10 +226,10 @@ func (c *commandVolumeFsck) collectFilerFileIdAndPaths(volumeIdToServer map[uint
}
func ( c * commandVolumeFsck ) findFilerChunksMissingInVolumeServers ( volumeIdToVInfo map [ uint32 ] VInfo , tempFolder string , writer io . Writer , verbose bool , applyPurging bool ) error {
func ( c * commandVolumeFsck ) findFilerChunksMissingInVolumeServers ( volumeIdToVInfo map [ uint32 ] VInfo , tempFolder string , dataNodeId string , writer io . Writer , verbose bool , applyPurging bool ) error {
for volumeId , vinfo := range volumeIdToVInfo {
checkErr := c . oneVolumeFileIdsCheckOneVolume ( tempFolder , volumeId , writer , verbose , applyPurging )
checkErr := c . oneVolumeFileIdsCheckOneVolume ( tempFolder , dataNodeId , volumeId , writer , verbose , applyPurging )
if checkErr != nil {
return fmt . Errorf ( "failed to collect file ids from volume %d on %s: %v" , volumeId , vinfo . server , checkErr )
}
@ -221,55 +237,57 @@ func (c *commandVolumeFsck) findFilerChunksMissingInVolumeServers(volumeIdToVInf
return nil
}
func ( c * commandVolumeFsck ) findExtraChunksInVolumeServers ( volumeIdToVInfo map [ uint32 ] VInfo , tempFolder string , writer io . Writer , verbose bool , applyPurging bool ) error {
func ( c * commandVolumeFsck ) findExtraChunksInVolumeServers ( dataNodeVolumeIdToVInfo map [ string ] map [ uint32 ] VInfo , tempFolder string , writer io . Writer , verbose bool , applyPurging bool ) error {
var totalInUseCount , totalOrphanChunkCount , totalOrphanDataSize uint64
for volumeId , vinfo := range volumeIdToVInfo {
inUseCount , orphanFileIds , orphanDataSize , checkErr := c . oneVolumeFileIdsSubtractFilerFileIds ( tempFolder , volumeId , writer , verbose )
if checkErr != nil {
return fmt . Errorf ( "failed to collect file ids from volume %d on %s: %v" , volumeId , vinfo . server , checkErr )
}
totalInUseCount += inUseCount
totalOrphanChunkCount += uint64 ( len ( orphanFileIds ) )
totalOrphanDataSize += orphanDataSize
if verbose {
for _ , fid := range orphanFileIds {
fmt . Fprintf ( writer , "%s\n" , fid )
for dataNodeId , volumeIdToVInfo := range dataNodeVolumeIdToVInfo {
for volumeId , vinfo := range volumeIdToVInfo {
inUseCount , orphanFileIds , orphanDataSize , checkErr := c . oneVolumeFileIdsSubtractFilerFileIds ( tempFolder , dataNodeId , volumeId , writer , verbose )
if checkErr != nil {
return fmt . Errorf ( "failed to collect file ids from volume %d on %s: %v" , volumeId , vinfo . server , checkErr )
}
}
totalInUseCount += inUseCount
totalOrphanChunkCount += uint64 ( len ( orphanFileIds ) )
totalOrphanDataSize += orphanDataSize
if applyPurging && len ( orphanFileIds ) > 0 {
if verbose {
fmt . Fprintf ( writer , "purging process for volume %d" , volumeId )
}
if vinfo . isEcVolume {
fmt . Fprintf ( writer , "skip purging for Erasure Coded volume %d.\n" , volumeId )
continue
for _ , fid := range orphanFileIds {
fmt . Fprintf ( writer , "%s\n" , fid )
}
}
needleVID := needle . VolumeId ( volumeId )
if applyPurging && len ( orphanFileIds ) > 0 {
if verbose {
fmt . Fprintf ( writer , "purging process for volume %d" , volumeId )
}
if vinfo . isReadOnly {
err := markVolumeWritable ( c . env . option . GrpcDialOption , needleVID , vinfo . server , true )
if err != nil {
return fmt . Errorf ( "mark volume %d read/write: %v" , volumeId , err )
if vinfo . isEcVolume {
fmt . Fprintf ( writer , "skip purging for Erasure Coded volume %d.\n" , volumeId )
continue
}
fmt . Fprintf ( writer , "temporarily marked %d on server %v writable for forced purge\n" , volumeId , vinfo . server )
defer markVolumeWritable ( c . env . option . GrpcDialOption , needleVID , vinfo . server , false )
}
needleVID := needle . VolumeId ( volumeId )
fmt . Fprintf ( writer , "marked %d on server %v writable for forced purge\n" , volumeId , vinfo . server )
if vinfo . isReadOnly {
err := markVolumeWritable ( c . env . option . GrpcDialOption , needleVID , vinfo . server , true )
if err != nil {
return fmt . Errorf ( "mark volume %d read/write: %v" , volumeId , err )
}
if verbose {
fmt . Fprintf ( writer , "purging files from volume %d\n" , volumeId )
}
fmt . Fprintf ( writer , "temporarily marked %d on server %v writable for forced purge\n" , volumeId , vinfo . server )
defer markVolumeWritable ( c . env . option . GrpcDialOption , needleVID , vinfo . server , false )
}
fmt . Fprintf ( writer , "marked %d on server %v writable for forced purge\n" , volumeId , vinfo . server )
if err := c . purgeFileIdsForOneVolume ( volumeId , orphanFileIds , writer ) ; err != nil {
return fmt . Errorf ( "purging volume %d: %v" , volumeId , err )
if verbose {
fmt . Fprintf ( writer , "purging files from volume %d\n" , volumeId )
}
if err := c . purgeFileIdsForOneVolume ( volumeId , orphanFileIds , writer ) ; err != nil {
return fmt . Errorf ( "purging volume %d: %v" , volumeId , err )
}
}
}
}
@ -290,7 +308,7 @@ func (c *commandVolumeFsck) findExtraChunksInVolumeServers(volumeIdToVInfo map[u
return nil
}
func ( c * commandVolumeFsck ) collectOneVolumeFileIds ( tempFolder string , volumeId uint32 , vinfo VInfo , verbose bool , writer io . Writer ) error {
func ( c * commandVolumeFsck ) collectOneVolumeFileIds ( tempFolder string , dataNodeId string , volumeId uint32 , vinfo VInfo , verbose bool , writer io . Writer ) error {
if verbose {
fmt . Fprintf ( writer , "collecting volume %d file ids from %s ...\n" , volumeId , vinfo . server )
@ -316,7 +334,7 @@ func (c *commandVolumeFsck) collectOneVolumeFileIds(tempFolder string, volumeId
return fmt . Errorf ( "failed to start copying volume %d%s: %v" , volumeId , ext , err )
}
err = writeToFile ( copyFileClient , getVolumeFileIdFile ( tempFolder , volumeId ) )
err = writeToFile ( copyFileClient , getVolumeFileIdFile ( tempFolder , dataNodeId , volumeId ) )
if err != nil {
return fmt . Errorf ( "failed to copy %d%s from %s: %v" , volumeId , ext , vinfo . server , err )
}
@ -327,19 +345,21 @@ func (c *commandVolumeFsck) collectOneVolumeFileIds(tempFolder string, volumeId
}
func ( c * commandVolumeFsck ) collectFilerFileIds ( volumeIdToServer map [ uint32 ] VInfo , tempFolder string , writer io . Writer , verbose bool ) error {
func ( c * commandVolumeFsck ) collectFilerFileIds ( dataNodeVolumeIdToVInfo map [ string ] map [ uint32 ] VInfo , tempFolder string , writer io . Writer , verbose bool ) error {
if verbose {
fmt . Fprintf ( writer , "collecting file ids from filer ...\n" )
}
files := make ( map [ uint32 ] * os . File )
for vid := range volumeIdToServer {
dst , openErr := os . OpenFile ( getFilerFileIdFile ( tempFolder , vid ) , os . O_WRONLY | os . O_CREATE | os . O_TRUNC , 0644 )
if openErr != nil {
return fmt . Errorf ( "failed to create file %s: %v" , getFilerFileIdFile ( tempFolder , vid ) , openErr )
for _ , volumeIdToServer := range dataNodeVolumeIdToVInfo {
for vid := range volumeIdToServer {
dst , openErr := os . OpenFile ( getFilerFileIdFile ( tempFolder , vid ) , os . O_WRONLY | os . O_CREATE | os . O_TRUNC , 0644 )
if openErr != nil {
return fmt . Errorf ( "failed to create file %s: %v" , getFilerFileIdFile ( tempFolder , vid ) , openErr )
}
files [ vid ] = dst
}
files [ vid ] = dst
}
defer func ( ) {
for _ , f := range files {
@ -377,7 +397,7 @@ func (c *commandVolumeFsck) collectFilerFileIds(volumeIdToServer map[uint32]VInf
} )
}
func ( c * commandVolumeFsck ) oneVolumeFileIdsCheckOneVolume ( tempFolder string , volumeId uint32 , writer io . Writer , verbose bool , applyPurging bool ) ( err error ) {
func ( c * commandVolumeFsck ) oneVolumeFileIdsCheckOneVolume ( tempFolder string , dataNodeId string , volumeId uint32 , writer io . Writer , verbose bool , applyPurging bool ) ( err error ) {
if verbose {
fmt . Fprintf ( writer , "find missing file chunks in volume %d ...\n" , volumeId )
@ -386,7 +406,7 @@ func (c *commandVolumeFsck) oneVolumeFileIdsCheckOneVolume(tempFolder string, vo
db := needle_map . NewMemDb ( )
defer db . Close ( )
if err = db . LoadFromIdx ( getVolumeFileIdFile ( tempFolder , volumeId ) ) ; err != nil {
if err = db . LoadFromIdx ( getVolumeFileIdFile ( tempFolder , dataNodeId , volumeId ) ) ; err != nil {
return
}
@ -473,12 +493,12 @@ func (c *commandVolumeFsck) httpDelete(path util.FullPath, verbose bool) {
}
}
func ( c * commandVolumeFsck ) oneVolumeFileIdsSubtractFilerFileIds ( tempFolder string , volumeId uint32 , writer io . Writer , verbose bool ) ( inUseCount uint64 , orphanFileIds [ ] string , orphanDataSize uint64 , err error ) {
func ( c * commandVolumeFsck ) oneVolumeFileIdsSubtractFilerFileIds ( tempFolder string , dataNodeId string , volumeId uint32 , writer io . Writer , verbose bool ) ( inUseCount uint64 , orphanFileIds [ ] string , orphanDataSize uint64 , err error ) {
db := needle_map . NewMemDb ( )
defer db . Close ( )
if err = db . LoadFromIdx ( getVolumeFileIdFile ( tempFolder , volumeId ) ) ; err != nil {
if err = db . LoadFromIdx ( getVolumeFileIdFile ( tempFolder , dataNodeId , volumeId ) ) ; err != nil {
return
}
@ -509,8 +529,8 @@ func (c *commandVolumeFsck) oneVolumeFileIdsSubtractFilerFileIds(tempFolder stri
if orphanFileCount > 0 {
pct := float64 ( orphanFileCount * 100 ) / ( float64 ( orphanFileCount + inUseCount ) )
fmt . Fprintf ( writer , "volume:%d\tentries:%d\torphan:%d\t%.2f%%\t%dB\n" ,
volumeId , orphanFileCount + inUseCount , orphanFileCount , pct , orphanDataSize )
fmt . Fprintf ( writer , "dataNode:%s\t volume:%d\tentries:%d\torphan:%d\t%.2f%%\t%dB\n" ,
dataNodeId , volumeId , orphanFileCount + inUseCount , orphanFileCount , pct , orphanDataSize )
}
return
@ -524,13 +544,13 @@ type VInfo struct {
isReadOnly bool
}
func ( c * commandVolumeFsck ) collectVolumeIds ( commandEnv * CommandEnv , verbose bool , writer io . Writer ) ( volumeIdToServer map [ uint32 ] VInfo , err error ) {
func ( c * commandVolumeFsck ) collectVolumeIds ( commandEnv * CommandEnv , verbose bool , writer io . Writer ) ( volumeIdToServer map [ string ] map [ uint32 ] VInfo , err error ) {
if verbose {
fmt . Fprintf ( writer , "collecting volume id and locations from master ...\n" )
}
volumeIdToServer = make ( map [ uint32 ] VInfo )
volumeIdToServer = make ( map [ string ] map [ uint32 ] VInfo )
// collect topology information
topologyInfo , _ , err := collectTopologyInfo ( commandEnv , 0 )
if err != nil {
@ -539,8 +559,10 @@ func (c *commandVolumeFsck) collectVolumeIds(commandEnv *CommandEnv, verbose boo
eachDataNode ( topologyInfo , func ( dc string , rack RackId , t * master_pb . DataNodeInfo ) {
for _ , diskInfo := range t . DiskInfos {
dataNodeId := t . GetId ( )
volumeIdToServer [ dataNodeId ] = make ( map [ uint32 ] VInfo )
for _ , vi := range diskInfo . VolumeInfos {
volumeIdToServer [ vi . Id ] = VInfo {
volumeIdToServer [ dataNodeId ] [ vi . Id ] = VInfo {
server : pb . NewServerAddressFromDataNode ( t ) ,
collection : vi . Collection ,
isEcVolume : false ,
@ -548,7 +570,7 @@ func (c *commandVolumeFsck) collectVolumeIds(commandEnv *CommandEnv, verbose boo
}
}
for _ , ecShardInfo := range diskInfo . EcShardInfos {
volumeIdToServer [ ecShardInfo . Id ] = VInfo {
volumeIdToServer [ dataNodeId ] [ ecShardInfo . Id ] = VInfo {
server : pb . NewServerAddressFromDataNode ( t ) ,
collection : ecShardInfo . Collection ,
isEcVolume : true ,
@ -600,8 +622,8 @@ func (c *commandVolumeFsck) purgeFileIdsForOneVolume(volumeId uint32, fileIds []
return
}
func getVolumeFileIdFile ( tempFolder string , vid uint32 ) string {
return filepath . Join ( tempFolder , fmt . Sprintf ( "%d.idx" , vid ) )
func getVolumeFileIdFile ( tempFolder string , dataNodeid string , vid uint32 ) string {
return filepath . Join ( tempFolder , fmt . Sprintf ( "%s_% d.idx" , dataNodeid , vid ) )
}
func getFilerFileIdFile ( tempFolder string , vid uint32 ) string {