@ -5,16 +5,6 @@ import (
"context"
"context"
"flag"
"flag"
"fmt"
"fmt"
"io"
"io/ioutil"
"math"
"net/http"
"net/url"
"os"
"path/filepath"
"strings"
"sync"
"github.com/chrislusf/seaweedfs/weed/filer"
"github.com/chrislusf/seaweedfs/weed/filer"
"github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb"
@ -25,6 +15,17 @@ import (
"github.com/chrislusf/seaweedfs/weed/storage/needle_map"
"github.com/chrislusf/seaweedfs/weed/storage/needle_map"
"github.com/chrislusf/seaweedfs/weed/storage/types"
"github.com/chrislusf/seaweedfs/weed/storage/types"
"github.com/chrislusf/seaweedfs/weed/util"
"github.com/chrislusf/seaweedfs/weed/util"
"io"
"io/ioutil"
"math"
"net/http"
"net/url"
"os"
"path"
"path/filepath"
"strings"
"sync"
"time"
)
)
func init ( ) {
func init ( ) {
@ -65,8 +66,11 @@ func (c *commandVolumeFsck) Do(args []string, commandEnv *CommandEnv, writer io.
verbose := fsckCommand . Bool ( "v" , false , "verbose mode" )
verbose := fsckCommand . Bool ( "v" , false , "verbose mode" )
findMissingChunksInFiler := fsckCommand . Bool ( "findMissingChunksInFiler" , false , "see \"help volume.fsck\"" )
findMissingChunksInFiler := fsckCommand . Bool ( "findMissingChunksInFiler" , false , "see \"help volume.fsck\"" )
findMissingChunksInFilerPath := fsckCommand . String ( "findMissingChunksInFilerPath" , "/" , "used together with findMissingChunksInFiler" )
findMissingChunksInFilerPath := fsckCommand . String ( "findMissingChunksInFilerPath" , "/" , "used together with findMissingChunksInFiler" )
findMissingChunksInVolumeId := fsckCommand . Int ( "findMissingChunksInVolumeId" , 0 , "used together with findMissingChunksInFiler" )
applyPurging := fsckCommand . Bool ( "reallyDeleteFromVolume" , false , "<expert only!> after detection, delete missing data from volumes / delete missing file entries from filer" )
applyPurging := fsckCommand . Bool ( "reallyDeleteFromVolume" , false , "<expert only!> after detection, delete missing data from volumes / delete missing file entries from filer" )
purgeAbsent := fsckCommand . Bool ( "reallyDeleteFilerEntries" , false , "<expert only!> delete missing file entries from filer if the corresponding volume is missing for any reason, please ensure all still existing/expected volumes are connected! used together with findMissingChunksInFiler" )
purgeAbsent := fsckCommand . Bool ( "reallyDeleteFilerEntries" , false , "<expert only!> delete missing file entries from filer if the corresponding volume is missing for any reason, please ensure all still existing/expected volumes are connected! used together with findMissingChunksInFiler" )
tempPath := fsckCommand . String ( "tempPath" , path . Join ( os . TempDir ( ) ) , "path for temporary idx files" )
if err = fsckCommand . Parse ( args ) ; err != nil {
if err = fsckCommand . Parse ( args ) ; err != nil {
return nil
return nil
}
}
@ -78,7 +82,7 @@ func (c *commandVolumeFsck) Do(args []string, commandEnv *CommandEnv, writer io.
c . env = commandEnv
c . env = commandEnv
// create a temp folder
// create a temp folder
tempFolder , err := os . MkdirTemp ( "" , "sw_fsck" )
tempFolder , err := os . MkdirTemp ( * tempPath , "sw_fsck" )
if err != nil {
if err != nil {
return fmt . Errorf ( "failed to create temp folder: %v" , err )
return fmt . Errorf ( "failed to create temp folder: %v" , err )
}
}
@ -88,14 +92,14 @@ func (c *commandVolumeFsck) Do(args []string, commandEnv *CommandEnv, writer io.
defer os . RemoveAll ( tempFolder )
defer os . RemoveAll ( tempFolder )
// collect all volume id locations
// collect all volume id locations
v olumeIdToVInfo, err := c . collectVolumeIds ( commandEnv , * verbose , writer )
dataNodeV olumeIdToVInfo, err := c . collectVolumeIds ( commandEnv , * verbose , writer )
if err != nil {
if err != nil {
return fmt . Errorf ( "failed to collect all volume locations: %v" , err )
return fmt . Errorf ( "failed to collect all volume locations: %v" , err )
}
}
isBucketsPath := false
isBucketsPath := false
var fillerBucketsPath string
var fillerBucketsPath string
if * findMissingChunksInFiler && * findMissingChunksInFilerPath != "" {
if * findMissingChunksInFiler && * findMissingChunksInFilerPath != "/ " {
fillerBucketsPath , err = readFilerBucketsPath ( commandEnv )
fillerBucketsPath , err = readFilerBucketsPath ( commandEnv )
if err != nil {
if err != nil {
return fmt . Errorf ( "read filer buckets path: %v" , err )
return fmt . Errorf ( "read filer buckets path: %v" , err )
@ -108,34 +112,43 @@ func (c *commandVolumeFsck) Do(args []string, commandEnv *CommandEnv, writer io.
return fmt . Errorf ( "read filer buckets path: %v" , err )
return fmt . Errorf ( "read filer buckets path: %v" , err )
}
}
collectMtime := time . Now ( ) . Unix ( )
// collect each volume file ids
// collect each volume file ids
for dataNodeId , volumeIdToVInfo := range dataNodeVolumeIdToVInfo {
for volumeId , vinfo := range volumeIdToVInfo {
for volumeId , vinfo := range volumeIdToVInfo {
if * findMissingChunksInVolumeId > 0 && uint32 ( * findMissingChunksInVolumeId ) != volumeId {
delete ( volumeIdToVInfo , volumeId )
continue
}
if isBucketsPath && ! strings . HasPrefix ( * findMissingChunksInFilerPath , fillerBucketsPath + "/" + vinfo . collection ) {
if isBucketsPath && ! strings . HasPrefix ( * findMissingChunksInFilerPath , fillerBucketsPath + "/" + vinfo . collection ) {
delete ( volumeIdToVInfo , volumeId )
delete ( volumeIdToVInfo , volumeId )
continue
continue
}
}
err = c . collectOneVolumeFileIds ( tempFolder , volumeId , vinfo , * verbose , writer )
err = c . collectOneVolumeFileIds ( tempFolder , dataNodeId , volumeId , vinfo , * verbose , writer )
if err != nil {
if err != nil {
return fmt . Errorf ( "failed to collect file ids from volume %d on %s: %v" , volumeId , vinfo . server , err )
return fmt . Errorf ( "failed to collect file ids from volume %d on %s: %v" , volumeId , vinfo . server , err )
}
}
}
}
}
if * findMissingChunksInFiler {
if * findMissingChunksInFiler {
// collect all filer file ids and paths
// collect all filer file ids and paths
if err = c . collectFilerFileIdAndPaths ( v olumeIdToVInfo, tempFolder , writer , * findMissingChunksInFilerPath , * verbose , * purgeAbsent ) ; err != nil {
if err = c . collectFilerFileIdAndPaths ( dataNodeV olumeIdToVInfo, tempFolder , writer , * findMissingChunksInFilerPath , * verbose , * purgeAbsent , collectMtime ) ; err != nil {
return fmt . Errorf ( "collectFilerFileIdAndPaths: %v" , err )
return fmt . Errorf ( "collectFilerFileIdAndPaths: %v" , err )
}
}
for dataNodeId , volumeIdToVInfo := range dataNodeVolumeIdToVInfo {
// for each volume, check filer file ids
// for each volume, check filer file ids
if err = c . findFilerChunksMissingInVolumeServers ( volumeIdToVInfo , tempFolder , writer , * verbose , * applyPurging ) ; err != nil {
if err = c . findFilerChunksMissingInVolumeServers ( volumeIdToVInfo , tempFolder , dataNodeId , writer , * verbose , * applyPurging ) ; err != nil {
return fmt . Errorf ( "findFilerChunksMissingInVolumeServers: %v" , err )
return fmt . Errorf ( "findFilerChunksMissingInVolumeServers: %v" , err )
}
}
}
} else {
} else {
// collect all filer file ids
// collect all filer file ids
if err = c . collectFilerFileIds ( v olumeIdToVInfo, tempFolder , writer , * verbose ) ; err != nil {
if err = c . collectFilerFileIds ( dataNodeV olumeIdToVInfo, tempFolder , writer , * verbose ) ; err != nil {
return fmt . Errorf ( "failed to collect file ids from filer: %v" , err )
return fmt . Errorf ( "failed to collect file ids from filer: %v" , err )
}
}
// volume file ids subtract filer file ids
// volume file ids subtract filer file ids
if err = c . findExtraChunksInVolumeServers ( v olumeIdToVInfo, tempFolder , writer , * verbose , * applyPurging ) ; err != nil {
if err = c . findExtraChunksInVolumeServers ( dataNodeV olumeIdToVInfo, tempFolder , writer , * verbose , * applyPurging ) ; err != nil {
return fmt . Errorf ( "findExtraChunksInVolumeServers: %v" , err )
return fmt . Errorf ( "findExtraChunksInVolumeServers: %v" , err )
}
}
}
}
@ -143,20 +156,25 @@ func (c *commandVolumeFsck) Do(args []string, commandEnv *CommandEnv, writer io.
return nil
return nil
}
}
func ( c * commandVolumeFsck ) collectFilerFileIdAndPaths ( volumeIdToServer map [ uint32 ] VInfo , tempFolder string , writer io . Writer , filerPath string , verbose bool , purgeAbsent bool ) error {
func ( c * commandVolumeFsck ) collectFilerFileIdAndPaths ( dataNodeVolumeIdToVInfo map [ string ] map [ uint32 ] VInfo , tempFolder string , writer io . Writer , filerPath string , verbose bool , purgeAbsent bool , collectMtime int64 ) error {
if verbose {
if verbose {
fmt . Fprintf ( writer , "checking each file from filer ...\n" )
fmt . Fprintf ( writer , "checking each file from filer ...\n" )
}
}
files := make ( map [ uint32 ] * os . File )
files := make ( map [ uint32 ] * os . File )
for _ , volumeIdToServer := range dataNodeVolumeIdToVInfo {
for vid := range volumeIdToServer {
for vid := range volumeIdToServer {
if _ , ok := files [ vid ] ; ok {
continue
}
dst , openErr := os . OpenFile ( getFilerFileIdFile ( tempFolder , vid ) , os . O_WRONLY | os . O_CREATE | os . O_TRUNC , 0644 )
dst , openErr := os . OpenFile ( getFilerFileIdFile ( tempFolder , vid ) , os . O_WRONLY | os . O_CREATE | os . O_TRUNC , 0644 )
if openErr != nil {
if openErr != nil {
return fmt . Errorf ( "failed to create file %s: %v" , getFilerFileIdFile ( tempFolder , vid ) , openErr )
return fmt . Errorf ( "failed to create file %s: %v" , getFilerFileIdFile ( tempFolder , vid ) , openErr )
}
}
files [ vid ] = dst
files [ vid ] = dst
}
}
}
defer func ( ) {
defer func ( ) {
for _ , f := range files {
for _ , f := range files {
f . Close ( )
f . Close ( )
@ -179,6 +197,9 @@ func (c *commandVolumeFsck) collectFilerFileIdAndPaths(volumeIdToServer map[uint
}
}
dataChunks = append ( dataChunks , manifestChunks ... )
dataChunks = append ( dataChunks , manifestChunks ... )
for _ , chunk := range dataChunks {
for _ , chunk := range dataChunks {
if chunk . Mtime > collectMtime {
continue
}
outputChan <- & Item {
outputChan <- & Item {
vid : chunk . Fid . VolumeId ,
vid : chunk . Fid . VolumeId ,
fileKey : chunk . Fid . FileKey ,
fileKey : chunk . Fid . FileKey ,
@ -210,10 +231,10 @@ func (c *commandVolumeFsck) collectFilerFileIdAndPaths(volumeIdToServer map[uint
}
}
func ( c * commandVolumeFsck ) findFilerChunksMissingInVolumeServers ( volumeIdToVInfo map [ uint32 ] VInfo , tempFolder string , writer io . Writer , verbose bool , applyPurging bool ) error {
func ( c * commandVolumeFsck ) findFilerChunksMissingInVolumeServers ( volumeIdToVInfo map [ uint32 ] VInfo , tempFolder string , dataNodeId string , writer io . Writer , verbose bool , applyPurging bool ) error {
for volumeId , vinfo := range volumeIdToVInfo {
for volumeId , vinfo := range volumeIdToVInfo {
checkErr := c . oneVolumeFileIdsCheckOneVolume ( tempFolder , volumeId , writer , verbose , applyPurging )
checkErr := c . oneVolumeFileIdsCheckOneVolume ( tempFolder , dataNodeId , volumeId , writer , verbose , applyPurging )
if checkErr != nil {
if checkErr != nil {
return fmt . Errorf ( "failed to collect file ids from volume %d on %s: %v" , volumeId , vinfo . server , checkErr )
return fmt . Errorf ( "failed to collect file ids from volume %d on %s: %v" , volumeId , vinfo . server , checkErr )
}
}
@ -221,15 +242,35 @@ func (c *commandVolumeFsck) findFilerChunksMissingInVolumeServers(volumeIdToVInf
return nil
return nil
}
}
func ( c * commandVolumeFsck ) findExtraChunksInVolumeServers ( volumeIdToVInfo map [ uint32 ] VInfo , tempFolder string , writer io . Writer , verbose bool , applyPurging bool ) error {
func ( c * commandVolumeFsck ) findExtraChunksInVolumeServers ( dataNodeVolumeIdToVInfo map [ string ] map [ uint32 ] VInfo , tempFolder string , writer io . Writer , verbose bool , applyPurging bool ) error {
var totalInUseCount , totalOrphanChunkCount , totalOrphanDataSize uint64
var totalInUseCount , totalOrphanChunkCount , totalOrphanDataSize uint64
volumeIdOrphanFileIds := make ( map [ uint32 ] map [ string ] bool )
isSeveralReplicas := make ( map [ uint32 ] bool )
isEcVolumeReplicas := make ( map [ uint32 ] bool )
isReadOnlyReplicas := make ( map [ uint32 ] bool )
serverReplicas := make ( map [ uint32 ] [ ] pb . ServerAddress )
for dataNodeId , volumeIdToVInfo := range dataNodeVolumeIdToVInfo {
for volumeId , vinfo := range volumeIdToVInfo {
for volumeId , vinfo := range volumeIdToVInfo {
inUseCount , orphanFileIds , orphanDataSize , checkErr := c . oneVolumeFileIdsSubtractFilerFileIds ( tempFolder , volumeId , writer , verbose )
inUseCount , orphanFileIds , orphanDataSize , checkErr := c . oneVolumeFileIdsSubtractFilerFileIds ( tempFolder , dataNodeId , volumeId , writer , verbose )
if checkErr != nil {
if checkErr != nil {
return fmt . Errorf ( "failed to collect file ids from volume %d on %s: %v" , volumeId , vinfo . server , checkErr )
return fmt . Errorf ( "failed to collect file ids from volume %d on %s: %v" , volumeId , vinfo . server , checkErr )
}
}
isSeveralReplicas [ volumeId ] = false
if _ , found := volumeIdOrphanFileIds [ volumeId ] ; ! found {
volumeIdOrphanFileIds [ volumeId ] = make ( map [ string ] bool )
} else {
isSeveralReplicas [ volumeId ] = true
}
for _ , fid := range orphanFileIds {
if isSeveralReplicas [ volumeId ] {
if _ , found := volumeIdOrphanFileIds [ volumeId ] [ fid ] ; ! found {
continue
}
}
volumeIdOrphanFileIds [ volumeId ] [ fid ] = isSeveralReplicas [ volumeId ]
}
totalInUseCount += inUseCount
totalInUseCount += inUseCount
totalOrphanChunkCount += uint64 ( len ( orphanFileIds ) )
totalOrphanChunkCount += uint64 ( len ( orphanFileIds ) )
totalOrphanDataSize += orphanDataSize
totalOrphanDataSize += orphanDataSize
@ -239,31 +280,48 @@ func (c *commandVolumeFsck) findExtraChunksInVolumeServers(volumeIdToVInfo map[u
fmt . Fprintf ( writer , "%s\n" , fid )
fmt . Fprintf ( writer , "%s\n" , fid )
}
}
}
}
isEcVolumeReplicas [ volumeId ] = vinfo . isEcVolume
if isReadOnly , found := isReadOnlyReplicas [ volumeId ] ; ! ( found && isReadOnly ) {
isReadOnlyReplicas [ volumeId ] = vinfo . isReadOnly
}
serverReplicas [ volumeId ] = append ( serverReplicas [ volumeId ] , vinfo . server )
}
if applyPurging && len ( orphanFileIds ) > 0 {
for volumeId , orphanReplicaFileIds := range volumeIdOrphanFileIds {
if ! ( applyPurging && len ( orphanReplicaFileIds ) > 0 ) {
continue
}
orphanFileIds := [ ] string { }
for fid , foundInAllReplicas := range orphanReplicaFileIds {
if ! isSeveralReplicas [ volumeId ] || ( isSeveralReplicas [ volumeId ] && foundInAllReplicas ) {
orphanFileIds = append ( orphanFileIds , fid )
}
}
if ! ( len ( orphanFileIds ) > 0 ) {
continue
}
if verbose {
if verbose {
fmt . Fprintf ( writer , "purging process for volume %d" , volumeId )
fmt . Fprintf ( writer , "purging process for volume %d" , volumeId )
}
}
if vinfo . isEcVolume {
if isEcVolumeReplicas [ volumeId ] {
fmt . Fprintf ( writer , "skip purging for Erasure Coded volume %d.\n" , volumeId )
fmt . Fprintf ( writer , "skip purging for Erasure Coded volume %d.\n" , volumeId )
continue
continue
}
}
for _ , server := range serverReplicas [ volumeId ] {
needleVID := needle . VolumeId ( volumeId )
needleVID := needle . VolumeId ( volumeId )
if vinfo . isReadOnly {
err := markVolumeWritable ( c . env . option . GrpcDialOption , needleVID , vinfo . server , true )
if isReadOnlyReplicas [ volumeId ] {
err := markVolumeWritable ( c . env . option . GrpcDialOption , needleVID , server , true )
if err != nil {
if err != nil {
return fmt . Errorf ( "mark volume %d read/write: %v" , volumeId , err )
return fmt . Errorf ( "mark volume %d read/write: %v" , volumeId , err )
}
}
fmt . Fprintf ( writer , "temporarily marked %d on server %v writable for forced purge\n" , volumeId , vinfo . server )
defer markVolumeWritable ( c . env . option . GrpcDialOption , needleVID , vinfo . server , false )
}
fmt . Fprintf ( writer , "marked %d on server %v writable for forced purge\n" , volumeId , vinfo . server )
fmt . Fprintf ( writer , "temporarily marked %d on server %v writable for forced purge\n" , volumeId , server )
defer markVolumeWritable ( c . env . option . GrpcDialOption , needleVID , server , false )
fmt . Fprintf ( writer , "marked %d on server %v writable for forced purge\n" , volumeId , server )
}
if verbose {
if verbose {
fmt . Fprintf ( writer , "purging files from volume %d\n" , volumeId )
fmt . Fprintf ( writer , "purging files from volume %d\n" , volumeId )
}
}
@ -273,6 +331,7 @@ func (c *commandVolumeFsck) findExtraChunksInVolumeServers(volumeIdToVInfo map[u
}
}
}
}
}
}
}
if ! applyPurging {
if ! applyPurging {
pct := float64 ( totalOrphanChunkCount * 100 ) / ( float64 ( totalOrphanChunkCount + totalInUseCount ) )
pct := float64 ( totalOrphanChunkCount * 100 ) / ( float64 ( totalOrphanChunkCount + totalInUseCount ) )
@ -290,7 +349,7 @@ func (c *commandVolumeFsck) findExtraChunksInVolumeServers(volumeIdToVInfo map[u
return nil
return nil
}
}
func ( c * commandVolumeFsck ) collectOneVolumeFileIds ( tempFolder string , volumeId uint32 , vinfo VInfo , verbose bool , writer io . Writer ) error {
func ( c * commandVolumeFsck ) collectOneVolumeFileIds ( tempFolder string , dataNodeId string , volumeId uint32 , vinfo VInfo , verbose bool , writer io . Writer ) error {
if verbose {
if verbose {
fmt . Fprintf ( writer , "collecting volume %d file ids from %s ...\n" , volumeId , vinfo . server )
fmt . Fprintf ( writer , "collecting volume %d file ids from %s ...\n" , volumeId , vinfo . server )
@ -316,7 +375,7 @@ func (c *commandVolumeFsck) collectOneVolumeFileIds(tempFolder string, volumeId
return fmt . Errorf ( "failed to start copying volume %d%s: %v" , volumeId , ext , err )
return fmt . Errorf ( "failed to start copying volume %d%s: %v" , volumeId , ext , err )
}
}
err = writeToFile ( copyFileClient , getVolumeFileIdFile ( tempFolder , volumeId ) )
err = writeToFile ( copyFileClient , getVolumeFileIdFile ( tempFolder , dataNodeId , volumeId ) )
if err != nil {
if err != nil {
return fmt . Errorf ( "failed to copy %d%s from %s: %v" , volumeId , ext , vinfo . server , err )
return fmt . Errorf ( "failed to copy %d%s from %s: %v" , volumeId , ext , vinfo . server , err )
}
}
@ -327,13 +386,14 @@ func (c *commandVolumeFsck) collectOneVolumeFileIds(tempFolder string, volumeId
}
}
func ( c * commandVolumeFsck ) collectFilerFileIds ( volumeIdToServer map [ uint32 ] VInfo , tempFolder string , writer io . Writer , verbose bool ) error {
func ( c * commandVolumeFsck ) collectFilerFileIds ( dataNodeVolumeIdToVInfo map [ string ] map [ uint32 ] VInfo , tempFolder string , writer io . Writer , verbose bool ) error {
if verbose {
if verbose {
fmt . Fprintf ( writer , "collecting file ids from filer ...\n" )
fmt . Fprintf ( writer , "collecting file ids from filer ...\n" )
}
}
files := make ( map [ uint32 ] * os . File )
files := make ( map [ uint32 ] * os . File )
for _ , volumeIdToServer := range dataNodeVolumeIdToVInfo {
for vid := range volumeIdToServer {
for vid := range volumeIdToServer {
dst , openErr := os . OpenFile ( getFilerFileIdFile ( tempFolder , vid ) , os . O_WRONLY | os . O_CREATE | os . O_TRUNC , 0644 )
dst , openErr := os . OpenFile ( getFilerFileIdFile ( tempFolder , vid ) , os . O_WRONLY | os . O_CREATE | os . O_TRUNC , 0644 )
if openErr != nil {
if openErr != nil {
@ -341,6 +401,7 @@ func (c *commandVolumeFsck) collectFilerFileIds(volumeIdToServer map[uint32]VInf
}
}
files [ vid ] = dst
files [ vid ] = dst
}
}
}
defer func ( ) {
defer func ( ) {
for _ , f := range files {
for _ , f := range files {
f . Close ( )
f . Close ( )
@ -377,16 +438,16 @@ func (c *commandVolumeFsck) collectFilerFileIds(volumeIdToServer map[uint32]VInf
} )
} )
}
}
func ( c * commandVolumeFsck ) oneVolumeFileIdsCheckOneVolume ( tempFolder string , volumeId uint32 , writer io . Writer , verbose bool , applyPurging bool ) ( err error ) {
func ( c * commandVolumeFsck ) oneVolumeFileIdsCheckOneVolume ( tempFolder string , dataNodeId string , volumeId uint32 , writer io . Writer , verbose bool , applyPurging bool ) ( err error ) {
if verbose {
if verbose {
fmt . Fprintf ( writer , "find missing file chunks in volume %d ...\n" , volumeId )
fmt . Fprintf ( writer , "find missing file chunks in dataNodeId %s volume %d ...\n" , dataNodeId , volumeId )
}
}
db := needle_map . NewMemDb ( )
db := needle_map . NewMemDb ( )
defer db . Close ( )
defer db . Close ( )
if err = db . LoadFromIdx ( getVolumeFileIdFile ( tempFolder , volumeId ) ) ; err != nil {
if err = db . LoadFromIdx ( getVolumeFileIdFile ( tempFolder , dataNodeId , volumeId ) ) ; err != nil {
return
return
}
}
@ -473,12 +534,12 @@ func (c *commandVolumeFsck) httpDelete(path util.FullPath, verbose bool) {
}
}
}
}
func ( c * commandVolumeFsck ) oneVolumeFileIdsSubtractFilerFileIds ( tempFolder string , volumeId uint32 , writer io . Writer , verbose bool ) ( inUseCount uint64 , orphanFileIds [ ] string , orphanDataSize uint64 , err error ) {
func ( c * commandVolumeFsck ) oneVolumeFileIdsSubtractFilerFileIds ( tempFolder string , dataNodeId string , volumeId uint32 , writer io . Writer , verbose bool ) ( inUseCount uint64 , orphanFileIds [ ] string , orphanDataSize uint64 , err error ) {
db := needle_map . NewMemDb ( )
db := needle_map . NewMemDb ( )
defer db . Close ( )
defer db . Close ( )
if err = db . LoadFromIdx ( getVolumeFileIdFile ( tempFolder , volumeId ) ) ; err != nil {
if err = db . LoadFromIdx ( getVolumeFileIdFile ( tempFolder , dataNodeId , volumeId ) ) ; err != nil {
return
return
}
}
@ -509,8 +570,8 @@ func (c *commandVolumeFsck) oneVolumeFileIdsSubtractFilerFileIds(tempFolder stri
if orphanFileCount > 0 {
if orphanFileCount > 0 {
pct := float64 ( orphanFileCount * 100 ) / ( float64 ( orphanFileCount + inUseCount ) )
pct := float64 ( orphanFileCount * 100 ) / ( float64 ( orphanFileCount + inUseCount ) )
fmt . Fprintf ( writer , "volume:%d\tentries:%d\torphan:%d\t%.2f%%\t%dB\n" ,
volumeId , orphanFileCount + inUseCount , orphanFileCount , pct , orphanDataSize )
fmt . Fprintf ( writer , "dataNode:%s\t volume:%d\tentries:%d\torphan:%d\t%.2f%%\t%dB\n" ,
dataNodeId , volumeId , orphanFileCount + inUseCount , orphanFileCount , pct , orphanDataSize )
}
}
return
return
@ -524,13 +585,13 @@ type VInfo struct {
isReadOnly bool
isReadOnly bool
}
}
func ( c * commandVolumeFsck ) collectVolumeIds ( commandEnv * CommandEnv , verbose bool , writer io . Writer ) ( volumeIdToServer map [ uint32 ] VInfo , err error ) {
func ( c * commandVolumeFsck ) collectVolumeIds ( commandEnv * CommandEnv , verbose bool , writer io . Writer ) ( volumeIdToServer map [ string ] map [ uint32 ] VInfo , err error ) {
if verbose {
if verbose {
fmt . Fprintf ( writer , "collecting volume id and locations from master ...\n" )
fmt . Fprintf ( writer , "collecting volume id and locations from master ...\n" )
}
}
volumeIdToServer = make ( map [ uint32 ] VInfo )
volumeIdToServer = make ( map [ string ] map [ uint32 ] VInfo )
// collect topology information
// collect topology information
topologyInfo , _ , err := collectTopologyInfo ( commandEnv , 0 )
topologyInfo , _ , err := collectTopologyInfo ( commandEnv , 0 )
if err != nil {
if err != nil {
@ -539,8 +600,10 @@ func (c *commandVolumeFsck) collectVolumeIds(commandEnv *CommandEnv, verbose boo
eachDataNode ( topologyInfo , func ( dc string , rack RackId , t * master_pb . DataNodeInfo ) {
eachDataNode ( topologyInfo , func ( dc string , rack RackId , t * master_pb . DataNodeInfo ) {
for _ , diskInfo := range t . DiskInfos {
for _ , diskInfo := range t . DiskInfos {
dataNodeId := t . GetId ( )
volumeIdToServer [ dataNodeId ] = make ( map [ uint32 ] VInfo )
for _ , vi := range diskInfo . VolumeInfos {
for _ , vi := range diskInfo . VolumeInfos {
volumeIdToServer [ vi . Id ] = VInfo {
volumeIdToServer [ dataNodeId ] [ vi . Id ] = VInfo {
server : pb . NewServerAddressFromDataNode ( t ) ,
server : pb . NewServerAddressFromDataNode ( t ) ,
collection : vi . Collection ,
collection : vi . Collection ,
isEcVolume : false ,
isEcVolume : false ,
@ -548,7 +611,7 @@ func (c *commandVolumeFsck) collectVolumeIds(commandEnv *CommandEnv, verbose boo
}
}
}
}
for _ , ecShardInfo := range diskInfo . EcShardInfos {
for _ , ecShardInfo := range diskInfo . EcShardInfos {
volumeIdToServer [ ecShardInfo . Id ] = VInfo {
volumeIdToServer [ dataNodeId ] [ ecShardInfo . Id ] = VInfo {
server : pb . NewServerAddressFromDataNode ( t ) ,
server : pb . NewServerAddressFromDataNode ( t ) ,
collection : ecShardInfo . Collection ,
collection : ecShardInfo . Collection ,
isEcVolume : true ,
isEcVolume : true ,
@ -600,8 +663,8 @@ func (c *commandVolumeFsck) purgeFileIdsForOneVolume(volumeId uint32, fileIds []
return
return
}
}
func getVolumeFileIdFile ( tempFolder string , vid uint32 ) string {
return filepath . Join ( tempFolder , fmt . Sprintf ( "%d.idx" , vid ) )
func getVolumeFileIdFile ( tempFolder string , dataNodeid string , vid uint32 ) string {
return filepath . Join ( tempFolder , fmt . Sprintf ( "%s_% d.idx" , dataNodeid , vid ) )
}
}
func getFilerFileIdFile ( tempFolder string , vid uint32 ) string {
func getFilerFileIdFile ( tempFolder string , vid uint32 ) string {