@ -12,9 +12,11 @@ import (
"github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
"github.com/seaweedfs/seaweedfs/weed/storage"
"github.com/seaweedfs/seaweedfs/weed/util"
"golang.org/x/exp/slices"
"io"
"math"
"strings"
"sync"
"time"
)
@ -24,10 +26,14 @@ func init() {
type commandFsVerify struct {
env * CommandEnv
volumeServers [ ] pb . ServerAddress
volumeIds map [ uint32 ] [ ] pb . ServerAddress
verbose * bool
concurrency * int
modifyTimeAgoAtSec int64
writer io . Writer
waitChan map [ string ] chan struct { }
waitChanLock sync . RWMutex
}
func ( c * commandFsVerify ) Name ( ) string {
@ -45,10 +51,10 @@ func (c *commandFsVerify) Help() string {
func ( c * commandFsVerify ) Do ( args [ ] string , commandEnv * CommandEnv , writer io . Writer ) ( err error ) {
c . env = commandEnv
c . writer = writer
fsVerifyCommand := flag . NewFlagSet ( c . Name ( ) , flag . ContinueOnError )
c . verbose = fsVerifyCommand . Bool ( "v" , false , "print out each processed files" )
modifyTimeAgo := fsVerifyCommand . Duration ( "modifyTimeAgo" , 0 , "only include files after this modify time to verify" )
c . concurrency = fsVerifyCommand . Int ( "concurrency" , 0 , "number of parallel verification per volume server" )
if err = fsVerifyCommand . Parse ( args ) ; err != nil {
return err
@ -60,13 +66,23 @@ func (c *commandFsVerify) Do(args []string, commandEnv *CommandEnv, writer io.Wr
}
c . modifyTimeAgoAtSec = int64 ( modifyTimeAgo . Seconds ( ) )
c . volumeIds = make ( map [ uint32 ] [ ] pb . ServerAddress )
c . waitChan = make ( map [ string ] chan struct { } )
c . volumeServers = [ ] pb . ServerAddress { }
if err := c . collectVolumeIds ( ) ; err != nil {
return parseErr
}
fCount , eConut , terr := c . verifyTraverseBfs ( path )
if * c . concurrency > 0 {
for _ , volumeServer := range c . volumeServers {
volumeServerStr := string ( volumeServer )
c . waitChan [ volumeServerStr ] = make ( chan struct { } , * c . concurrency )
defer close ( c . waitChan [ volumeServerStr ] )
}
}
fCount , eConut , terr := c . verifyTraverseBfs ( path )
if terr == nil {
fmt . Fprintf ( writer , "verified %d files, error %d files \n" , fCount , eConut )
}
@ -76,7 +92,6 @@ func (c *commandFsVerify) Do(args []string, commandEnv *CommandEnv, writer io.Wr
}
func ( c * commandFsVerify ) collectVolumeIds ( ) error {
c . volumeIds = make ( map [ uint32 ] [ ] pb . ServerAddress )
topologyInfo , _ , err := collectTopologyInfo ( c . env , 0 )
if err != nil {
return err
@ -84,15 +99,19 @@ func (c *commandFsVerify) collectVolumeIds() error {
eachDataNode ( topologyInfo , func ( dc string , rack RackId , nodeInfo * master_pb . DataNodeInfo ) {
for _ , diskInfo := range nodeInfo . DiskInfos {
for _ , vi := range diskInfo . VolumeInfos {
c . volumeIds [ vi . Id ] = append ( c . volumeIds [ vi . Id ] , pb . NewServerAddressFromDataNode ( nodeInfo ) )
volumeServer := pb . NewServerAddressFromDataNode ( nodeInfo )
c . volumeIds [ vi . Id ] = append ( c . volumeIds [ vi . Id ] , volumeServer )
if ! slices . Contains ( c . volumeServers , volumeServer ) {
c . volumeServers = append ( c . volumeServers , volumeServer )
}
}
}
} )
return nil
}
func ( c * commandFsVerify ) verifyEntry ( fileId * filer_pb . FileId , volumeServer * pb . ServerAddress ) error {
err := operation . WithVolumeServerClient ( false , * volumeServer , c . env . option . GrpcDialOption ,
func ( c * commandFsVerify ) verifyEntry ( volumeServer pb . ServerAddress , fileId * filer_pb . FileId ) error {
err := operation . WithVolumeServerClient ( false , volumeServer , c . env . option . GrpcDialOption ,
func ( client volume_server_pb . VolumeServerClient ) error {
_ , err := client . VolumeNeedleStatus ( context . Background ( ) ,
& volume_server_pb . VolumeNeedleStatusRequest {
@ -104,9 +123,6 @@ func (c *commandFsVerify) verifyEntry(fileId *filer_pb.FileId, volumeServer *pb.
if err != nil && ! strings . Contains ( err . Error ( ) , storage . ErrorDeleted . Error ( ) ) {
return err
}
if * c . verbose {
fmt . Fprintf ( c . writer , "." )
}
return nil
}
@ -140,27 +156,50 @@ func (c *commandFsVerify) verifyTraverseBfs(path string) (fileCount int64, errCo
func ( outputChan chan interface { } ) {
for itemEntry := range outputChan {
i := itemEntry . ( * ItemEntry )
fileMsg := fmt . Sprintf ( "file:%s needle status " , i . path )
if * c . verbose {
fmt . Fprintf ( c . writer , fileMsg )
fileMsg = ""
}
itemPath := string ( i . path )
fileMsg := fmt . Sprintf ( "file:%s" , itemPath )
errItem := make ( map [ string ] error )
errItemLock := sync . RWMutex { }
for _ , chunk := range i . chunks {
if volumeIds , ok := c . volumeIds [ chunk . Fid . VolumeId ] ; ok {
for _ , volumeServer := range volumeIds {
if err = c . verifyEntry ( chunk . Fid , & volumeServer ) ; err != nil {
fmt . Fprintf ( c . writer , "%sfailed verify %d:%d: %+v\n" ,
fileMsg , chunk . Fid . VolumeId , chunk . Fid . FileKey , err )
break
if * c . concurrency == 0 {
if err = c . verifyEntry ( volumeServer , chunk . Fid ) ; err != nil {
fmt . Fprintf ( c . writer , "%s failed verify needle %d:%d: %+v\n" ,
fileMsg , chunk . Fid . VolumeId , chunk . Fid . FileKey , err )
}
continue
}
c . waitChanLock . RLock ( )
waitChan , ok := c . waitChan [ string ( volumeServer ) ]
c . waitChanLock . RUnlock ( )
if ! ok {
fmt . Fprintf ( c . writer , "%s failed to get channel for %s chunk: %d:%d: %+v\n" ,
string ( volumeServer ) , fileMsg , chunk . Fid . VolumeId , chunk . Fid . FileKey , err )
continue
}
waitChan <- struct { } { }
go func ( fId * filer_pb . FileId , path string , volumeServer pb . ServerAddress , msg string ) {
if err = c . verifyEntry ( volumeServer , fId ) ; err != nil {
errItemLock . Lock ( )
errItem [ path ] = err
fmt . Fprintf ( c . writer , "%s failed verify needle %d:%d: %+v\n" ,
msg , fId . VolumeId , fId . FileKey , err )
errItemLock . Unlock ( )
}
<- waitChan
} ( chunk . Fid , itemPath , volumeServer , fileMsg )
}
} else {
err = fmt . Errorf ( "volumeId %d not found" , chunk . Fid . VolumeId )
fmt . Fprintf ( c . writer , "%sfailed verify chunk %d:%d: %+v\n" ,
fmt . Fprintf ( c . writer , "%s %d:%d: %+v\n" ,
fileMsg , chunk . Fid . VolumeId , chunk . Fid . FileKey , err )
break
}
}
errItemLock . RLock ( )
err , _ = errItem [ itemPath ]
errItemLock . RUnlock ( )
if err != nil {
errCount ++
@ -168,7 +207,7 @@ func (c *commandFsVerify) verifyTraverseBfs(path string) (fileCount int64, errCo
}
if * c . verbose {
fmt . Fprintf ( c . writer , " verifed\n" )
fmt . Fprintf ( c . writer , "%s needles:%d verifed\n" , fileMsg , len ( i . chunks ) )
}
fileCount ++
}