@ -22,8 +22,7 @@ func init() {
}
type commandVolumeCheckDisk struct {
env * CommandEnv
syncDeletions * bool
env * CommandEnv
}
func ( c * commandVolumeCheckDisk ) Name ( ) string {
@ -51,7 +50,7 @@ func (c *commandVolumeCheckDisk) Do(args []string, commandEnv *CommandEnv, write
verbose := fsckCommand . Bool ( "v" , false , "verbose mode" )
volumeId := fsckCommand . Uint ( "volumeId" , 0 , "the volume id" )
applyChanges := fsckCommand . Bool ( "force" , false , "apply the fix" )
c . syncDeletions = fsckCommand . Bool ( "syncDeleted" , false , "sync of deletions the fix" )
syncDeletions : = fsckCommand . Bool ( "syncDeleted" , false , "sync of deletions the fix" )
nonRepairThreshold := fsckCommand . Float64 ( "nonRepairThreshold" , 0.3 , "repair when missing keys is not more than this limit" )
if err = fsckCommand . Parse ( args ) ; err != nil {
return nil
@ -97,7 +96,7 @@ func (c *commandVolumeCheckDisk) Do(args []string, commandEnv *CommandEnv, write
continue
}
if err := c . syncTwoReplicas ( a , b , * applyChanges , * nonRepairThreshold , * verbose , writer ) ; err != nil {
if err := c . syncTwoReplicas ( a , b , * applyChanges , * syncDeletions , * nonRepairThreshold , * verbose , writer ) ; err != nil {
fmt . Fprintf ( writer , "sync volume %d on %s and %s: %v\n" , a . info . Id , a . location . dataNode . Id , b . location . dataNode . Id , err )
}
replicas = replicas [ 1 : ]
@ -107,17 +106,17 @@ func (c *commandVolumeCheckDisk) Do(args []string, commandEnv *CommandEnv, write
return nil
}
func ( c * commandVolumeCheckDisk ) syncTwoReplicas ( a * VolumeReplica , b * VolumeReplica , applyChanges bool , nonRepairThreshold float64 , verbose bool , writer io . Writer ) ( err error ) {
func ( c * commandVolumeCheckDisk ) syncTwoReplicas ( a * VolumeReplica , b * VolumeReplica , applyChanges bool , doSyncDeletions bool , nonRepairThreshold float64 , verbose bool , writer io . Writer ) ( err error ) {
aHasChanges , bHasChanges := true , true
for aHasChanges || bHasChanges {
if aHasChanges , bHasChanges , err = c . checkBoth ( a , b , applyChanges , nonRepairThreshold , verbose , writer ) ; err != nil {
if aHasChanges , bHasChanges , err = c . checkBoth ( a , b , applyChanges , doSyncDeletions , nonRepairThreshold , verbose , writer ) ; err != nil {
return err
}
}
return nil
}
func ( c * commandVolumeCheckDisk ) checkBoth ( a * VolumeReplica , b * VolumeReplica , applyChanges bool , nonRepairThreshold float64 , verbose bool , writer io . Writer ) ( aHasChanges bool , bHasChanges bool , err error ) {
func ( c * commandVolumeCheckDisk ) checkBoth ( a * VolumeReplica , b * VolumeReplica , applyChanges bool , doSyncDeletions bool , nonRepairThreshold float64 , verbose bool , writer io . Writer ) ( aHasChanges bool , bHasChanges bool , err error ) {
aDB , bDB := needle_map . NewMemDb ( ) , needle_map . NewMemDb ( )
defer func ( ) {
aDB . Close ( )
@ -126,24 +125,24 @@ func (c *commandVolumeCheckDisk) checkBoth(a *VolumeReplica, b *VolumeReplica, a
// read index db
readIndexDbCutoffFrom := uint64 ( time . Now ( ) . UnixNano ( ) )
if err = c . readIndexDatabase ( aDB , a . info . Collection , a . info . Id , pb . NewServerAddressFromDataNode ( a . location . dataNode ) , verbose , writer ) ; err != nil {
if err = readIndexDatabase ( aDB , a . info . Collection , a . info . Id , pb . NewServerAddressFromDataNode ( a . location . dataNode ) , verbose , writer , c . env . option . GrpcDialOption ) ; err != nil {
return true , true , fmt . Errorf ( "readIndexDatabase %s volume %d: %v" , a . location . dataNode , a . info . Id , err )
}
if err := c . readIndexDatabase ( bDB , b . info . Collection , b . info . Id , pb . NewServerAddressFromDataNode ( b . location . dataNode ) , verbose , writer ) ; err != nil {
if err := readIndexDatabase ( bDB , b . info . Collection , b . info . Id , pb . NewServerAddressFromDataNode ( b . location . dataNode ) , verbose , writer , c . env . option . GrpcDialOption ) ; err != nil {
return true , true , fmt . Errorf ( "readIndexDatabase %s volume %d: %v" , b . location . dataNode , b . info . Id , err )
}
// find and make up the differences
if aHasChanges , err = c . doVolumeCheckDisk ( bDB , aDB , b , a , verbose , writer , applyChanges , nonRepairThreshold , readIndexDbCutoffFrom ) ; err != nil {
if aHasChanges , err = doVolumeCheckDisk ( bDB , aDB , b , a , verbose , writer , applyChanges , doSyncDeletions , nonRepairThreshold , readIndexDbCutoffFrom , c . env . option . GrpcDialOption ) ; err != nil {
return true , true , fmt . Errorf ( "doVolumeCheckDisk source:%s target:%s volume %d: %v" , b . location . dataNode . Id , a . location . dataNode . Id , b . info . Id , err )
}
if bHasChanges , err = c . doVolumeCheckDisk ( aDB , bDB , a , b , verbose , writer , applyChanges , nonRepairThreshold , readIndexDbCutoffFrom ) ; err != nil {
if bHasChanges , err = doVolumeCheckDisk ( aDB , bDB , a , b , verbose , writer , applyChanges , doSyncDeletions , nonRepairThreshold , readIndexDbCutoffFrom , c . env . option . GrpcDialOption ) ; err != nil {
return true , true , fmt . Errorf ( "doVolumeCheckDisk source:%s target:%s volume %d: %v" , a . location . dataNode . Id , b . location . dataNode . Id , a . info . Id , err )
}
return
}
func ( c * commandVolumeCheckDisk ) doVolumeCheckDisk ( minuend , subtrahend * needle_map . MemDb , source , target * VolumeReplica , verbose bool , writer io . Writer , applyChanges bool , nonRepairThreshold float64 , cutoffFromAtNs uint64 ) ( hasChanges bool , err error ) {
func doVolumeCheckDisk ( minuend , subtrahend * needle_map . MemDb , source , target * VolumeReplica , verbose bool , writer io . Writer , applyChanges bool , doSyncDeletions bool , nonRepairThreshold float64 , cutoffFromAtNs uint64 , grpcDialOption grpc . DialOption ) ( hasChanges bool , err error ) {
// find missing keys
// hash join, can be more efficient
@ -158,7 +157,7 @@ func (c *commandVolumeCheckDisk) doVolumeCheckDisk(minuend, subtrahend *needle_m
return nil
}
if doCutoffOfLastNeedle {
if needleMeta , err := readNeedleMeta ( c . env . option . G rpcDialOption, pb . NewServerAddressFromDataNode ( source . location . dataNode ) , source . info . Id , minuendValue ) ; err == nil {
if needleMeta , err := readNeedleMeta ( g rpcDialOption, pb . NewServerAddressFromDataNode ( source . location . dataNode ) , source . info . Id , minuendValue ) ; err == nil {
// needles older than the cutoff time are not missing yet
if needleMeta . AppendAtNs > cutoffFromAtNs {
return nil
@ -193,7 +192,7 @@ func (c *commandVolumeCheckDisk) doVolumeCheckDisk(minuend, subtrahend *needle_m
}
for _ , needleValue := range missingNeedles {
needleBlob , err := readSourceNeedleBlob ( c . env . option . G rpcDialOption, pb . NewServerAddressFromDataNode ( source . location . dataNode ) , source . info . Id , needleValue )
needleBlob , err := readSourceNeedleBlob ( g rpcDialOption, pb . NewServerAddressFromDataNode ( source . location . dataNode ) , source . info . Id , needleValue )
if err != nil {
return hasChanges , err
}
@ -208,13 +207,13 @@ func (c *commandVolumeCheckDisk) doVolumeCheckDisk(minuend, subtrahend *needle_m
hasChanges = true
if err = c . writeNeedleBlobToTarget ( pb . NewServerAddressFromDataNode ( target . location . dataNode ) , source . info . Id , needleValue , needleBlob ) ; err != nil {
if err = writeNeedleBlobToTarget ( grpcDialOption , pb . NewServerAddressFromDataNode ( target . location . dataNode ) , source . info . Id , needleValue , needleBlob ) ; err != nil {
return hasChanges , err
}
}
if * c . s yncDeletions && len ( partiallyDeletedNeedles ) > 0 {
if doS yncDeletions && len ( partiallyDeletedNeedles ) > 0 {
var fidList [ ] string
for _ , needleValue := range partiallyDeletedNeedles {
fidList = append ( fidList , needleValue . Key . FileId ( source . info . Id ) )
@ -224,7 +223,7 @@ func (c *commandVolumeCheckDisk) doVolumeCheckDisk(minuend, subtrahend *needle_m
}
deleteResults , deleteErr := operation . DeleteFilesAtOneVolumeServer (
pb . NewServerAddressFromDataNode ( target . location . dataNode ) ,
c . env . option . G rpcDialOption, fidList , false )
g rpcDialOption, fidList , false )
if deleteErr != nil {
return hasChanges , deleteErr
}
@ -255,9 +254,9 @@ func readSourceNeedleBlob(grpcDialOption grpc.DialOption, sourceVolumeServer pb.
return
}
func ( c * commandVolumeCheckDisk ) writeNeedleBlobToTarget ( targetVolumeServer pb . ServerAddress , volumeId uint32 , needleValue needle_map . NeedleValue , needleBlob [ ] byte ) error {
func writeNeedleBlobToTarget ( grpcDialOption grpc . DialOption , targetVolumeServer pb . ServerAddress , volumeId uint32 , needleValue needle_map . NeedleValue , needleBlob [ ] byte ) error {
return operation . WithVolumeServerClient ( false , targetVolumeServer , c . env . option . G rpcDialOption, func ( client volume_server_pb . VolumeServerClient ) error {
return operation . WithVolumeServerClient ( false , targetVolumeServer , g rpcDialOption, func ( client volume_server_pb . VolumeServerClient ) error {
_ , err := client . WriteNeedleBlob ( context . Background ( ) , & volume_server_pb . WriteNeedleBlobRequest {
VolumeId : volumeId ,
NeedleId : uint64 ( needleValue . Key ) ,
@ -269,10 +268,10 @@ func (c *commandVolumeCheckDisk) writeNeedleBlobToTarget(targetVolumeServer pb.S
}
func ( c * commandVolumeCheckDisk ) readIndexDatabase ( db * needle_map . MemDb , collection string , volumeId uint32 , volumeServer pb . ServerAddress , verbose bool , writer io . Writer ) error {
func readIndexDatabase ( db * needle_map . MemDb , collection string , volumeId uint32 , volumeServer pb . ServerAddress , verbose bool , writer io . Writer , grpcDialOption grpc . DialOption ) error {
var buf bytes . Buffer
if err := c . copyVolumeIndexFile ( collection , volumeId , volumeServer , & buf , verbose , writer ) ; err != nil {
if err := copyVolumeIndexFile ( collection , volumeId , volumeServer , & buf , verbose , writer , grpcDialOption ) ; err != nil {
return err
}
@ -282,9 +281,9 @@ func (c *commandVolumeCheckDisk) readIndexDatabase(db *needle_map.MemDb, collect
return db . LoadFilterFromReaderAt ( bytes . NewReader ( buf . Bytes ( ) ) , true , false )
}
func ( c * commandVolumeCheckDisk ) copyVolumeIndexFile ( collection string , volumeId uint32 , volumeServer pb . ServerAddress , buf * bytes . Buffer , verbose bool , writer io . Writer ) error {
func copyVolumeIndexFile ( collection string , volumeId uint32 , volumeServer pb . ServerAddress , buf * bytes . Buffer , verbose bool , writer io . Writer , grpcDialOption grpc . DialOption ) error {
return operation . WithVolumeServerClient ( true , volumeServer , c . env . option . G rpcDialOption, func ( volumeServerClient volume_server_pb . VolumeServerClient ) error {
return operation . WithVolumeServerClient ( true , volumeServer , g rpcDialOption, func ( volumeServerClient volume_server_pb . VolumeServerClient ) error {
ext := ".idx"