@ -37,6 +37,7 @@ type volumeCheckDisk struct {
verbose bool
applyChanges bool
syncDeletions bool
fixReadOnly bool
nonRepairThreshold float64
}
@ -48,19 +49,27 @@ func (c *commandVolumeCheckDisk) Help() string {
return ` check all replicated volumes to find and fix inconsistencies . It is optional and resource intensive .
How it works :
find all volumes that are replicated
for each volume id , if there are more than 2 replicas , find one pair with the largest 2 in file count .
for the pair volume A and B
bi - directional sync ( default ) : append entries in A and not in B to B , and entries in B and not in A to A
uni - directional sync ( read - only repair ) : only sync from source to target without modifying source
for each writable volume ID , if there are more than 2 replicas , find one pair with the largest 2 in file count
for the pair volume A and B
append entries in A and not in B to B
append entries in B and not in A to A
optionally , for each non - writable volume replica A
if volume is not full
prune late volume entries not matching its index file
select a writable volume replica B
append missing entries from B into A
mark the volume as writable ( healthy )
Options :
- slow : check all replicas even if file counts are the same
- v : verbose mode with detailed progress output
- volumeId : check only a specific volume ID ( 0 for all )
- apply : actually apply the fixes ( default is simulation mode )
- force - readonly : also check and repair read - only volumes using uni - directional sync
- fixReadO nly : also check and repair read - only volumes using uni - directional sync
- syncDeleted : sync deletion records during repair
- nonRepairThreshold : maximum fraction of missing keys allowed for repair ( default 0.3 )
@ -80,7 +89,7 @@ func (c *commandVolumeCheckDisk) Do(args []string, commandEnv *CommandEnv, write
applyChanges := fsckCommand . Bool ( "apply" , false , "apply the fix" )
// TODO: remove this alias
applyChangesAlias := fsckCommand . Bool ( "force" , false , "apply the fix (alias for -apply)" )
forceReado nly := fsckCommand . Bool ( "force-reado nly" , false , "apply the fix even on readonly volumes" )
fixReadO nly := fsckCommand . Bool ( "fixReadO nly" , false , "apply the fix even on readonly volumes (EXPERIMENTAL!) " )
syncDeletions := fsckCommand . Bool ( "syncDeleted" , false , "sync of deletions the fix" )
nonRepairThreshold := fsckCommand . Float64 ( "nonRepairThreshold" , 0.3 , "repair when missing keys is not more than this limit" )
if err = fsckCommand . Parse ( args ) ; err != nil {
@ -103,6 +112,7 @@ func (c *commandVolumeCheckDisk) Do(args []string, commandEnv *CommandEnv, write
verbose : * verbose ,
applyChanges : * applyChanges ,
syncDeletions : * syncDeletions ,
fixReadOnly : * fixReadOnly ,
nonRepairThreshold : * nonRepairThreshold ,
}
@ -123,24 +133,20 @@ func (c *commandVolumeCheckDisk) Do(args []string, commandEnv *CommandEnv, write
}
}
vcd . write ( "Pass #1 (writeable volumes)\n" )
if err := vcd . checkWriteableVolumes ( volumeReplicas ) ; err != nil {
if err := vcd . checkWritableVolumes ( volumeReplicas ) ; err != nil {
return err
}
if * forceReadonly {
vcd . write ( "Pass #2 (read-only volumes)\n" )
if err := vcd . checkReadOnlyVolumes ( volumeReplicas ) ; err != nil {
return err
}
if err := vcd . checkReadOnlyVolumes ( volumeReplicas ) ; err != nil {
return err
}
return nil
}
// checkWriteableVolumes fixes volume replicas which are not read-only.
func ( vcd * volumeCheckDisk ) checkWriteableVolumes ( volumeReplicas map [ uint32 ] [ ] * VolumeReplica ) error {
// pick 1 pairs of volume replica
// checkWritableVolumes fixes volume replicas which are not read-only.
func ( vcd * volumeCheckDisk ) checkWritableVolumes ( volumeReplicas map [ uint32 ] [ ] * VolumeReplica ) error {
vcd . write ( "Pass #1 (writable volumes)\n" )
for _ , replicas := range volumeReplicas {
// filter readonly replica
var writableReplicas [ ] * VolumeReplica
@ -157,16 +163,14 @@ func (vcd *volumeCheckDisk) checkWriteableVolumes(volumeReplicas map[uint32][]*V
} )
for len ( writableReplicas ) >= 2 {
a , b := writableReplicas [ 0 ] , writableReplicas [ 1 ]
if ! vcd . slowMode {
shouldSkip , err := vcd . shouldSkipVolume ( a , b )
if err != nil {
vcd . write ( "error checking if volume %d should be skipped: %v\n" , a . info . Id , err )
// Continue with sync despite error to be safe
} else if shouldSkip {
// always choose the larger volume to be the source
writableReplicas = append ( writableReplicas [ : 1 ] , writableReplicas [ 2 : ] ... )
continue
}
shouldSkip , err := vcd . shouldSkipVolume ( a , b )
if err != nil {
vcd . write ( "error checking if volume %d should be skipped: %v\n" , a . info . Id , err )
// Continue with sync despite error to be safe
} else if shouldSkip {
// always choose the larger volume to be the source
writableReplicas = append ( writableReplicas [ : 1 ] , writableReplicas [ 2 : ] ... )
continue
}
if err := vcd . syncTwoReplicas ( a , b , true ) ; err != nil {
vcd . write ( "sync volume %d on %s and %s: %v\n" , a . info . Id , a . location . dataNode . Id , b . location . dataNode . Id , err )
@ -183,9 +187,107 @@ func (vcd *volumeCheckDisk) checkWriteableVolumes(volumeReplicas map[uint32][]*V
return nil
}
// checkReadOnlyVolumes fixes read-only volume replicas.
// makeVolumeWritable flags a volume as writable, by volume ID.
func ( vcd * volumeCheckDisk ) makeVolumeWritable ( vid uint32 , vr * VolumeReplica ) error {
if ! vcd . applyChanges {
return nil
}
err := operation . WithVolumeServerClient ( false , pb . NewServerAddressFromDataNode ( vr . location . dataNode ) , vcd . grpcDialOption ( ) , func ( volumeServerClient volume_server_pb . VolumeServerClient ) error {
_ , vsErr := volumeServerClient . VolumeMarkWritable ( context . Background ( ) , & volume_server_pb . VolumeMarkWritableRequest {
VolumeId : vid ,
} )
return vsErr
} )
if err != nil {
return err
}
vcd . write ( "volume %d on %s is now writable\n" , vid , vr . location . dataNode . Id )
return nil
}
// makeVolumeReadOnly flags a volume as read-only, by volume ID.
func ( vcd * volumeCheckDisk ) makeVolumeReadonly ( vid uint32 , vr * VolumeReplica ) error {
if ! vcd . applyChanges {
return nil
}
err := operation . WithVolumeServerClient ( false , pb . NewServerAddressFromDataNode ( vr . location . dataNode ) , vcd . grpcDialOption ( ) , func ( volumeServerClient volume_server_pb . VolumeServerClient ) error {
_ , vsErr := volumeServerClient . VolumeMarkReadonly ( context . Background ( ) , & volume_server_pb . VolumeMarkReadonlyRequest {
VolumeId : vid ,
} )
return vsErr
} )
if err != nil {
return err
}
vcd . write ( "volume %d on %s is now read-only\n" , vid , vr . location . dataNode . Id )
return nil
}
func ( vcd * volumeCheckDisk ) checkReadOnlyVolumes ( volumeReplicas map [ uint32 ] [ ] * VolumeReplica ) error {
return fmt . Errorf ( "not yet implemented (https://github.com/seaweedfs/seaweedfs/issues/7442)" )
if ! vcd . fixReadOnly {
return nil
}
vcd . write ( "Pass #2 (read-only volumes)\n" )
for vid , replicas := range volumeReplicas {
var source * VolumeReplica = nil
roReplicas := [ ] * VolumeReplica { }
for _ , r := range replicas {
if r . info . ReadOnly {
roReplicas = append ( roReplicas , r )
} else {
// we assume all writable replicas are identical by this point, after the checkWritableVolumes() pass.
source = r
}
}
if len ( roReplicas ) == 0 {
vcd . write ( "no read-only replicas for volume %d\n" , vid )
continue
}
if source == nil {
vcd . write ( "got %d read-only replicas for volume %d and no writable replicas to fix from\n" , len ( roReplicas ) , vid )
continue
}
// attempt to fix read-only replicas from the know good source
for _ , r := range roReplicas {
// TODO: skip full readonly volumes.
skip , err := vcd . shouldSkipVolume ( r , source )
if err != nil {
vcd . write ( "error checking if volume %d should be skipped: %v\n" , r . info . Id , err )
continue
}
if skip {
continue
}
// make volume writable...
if err := vcd . makeVolumeWritable ( vid , r ) ; err != nil {
return err
}
// ...fix it...
// TODO: test whether syncTwoReplicas() is enough to prune garbage entries on broken volumes.
if err := vcd . syncTwoReplicas ( source , r , false ) ; err != nil {
vcd . write ( "sync read-only volume %d on %s from %s: %v\n" , vid , r . location . dataNode . Id , source . location . dataNode . Id , err )
// ...or revert it back to read-only, if something went wrong.
if roErr := vcd . makeVolumeReadonly ( vid , r ) ; roErr != nil {
return fmt . Errorf ( "failed to make volume %d on %s readonly after: %v: %v" , vid , r . location . dataNode . Id , err , roErr )
}
vcd . write ( "volume %d on %s is now read-only\n" , vid , r . location . dataNode . Id )
return err
}
}
}
return nil
}
func ( vcd * volumeCheckDisk ) grpcDialOption ( ) grpc . DialOption {
@ -260,6 +362,11 @@ func (vcd *volumeCheckDisk) eqVolumeFileCount(a, b *VolumeReplica) (bool, bool,
// Error Handling: Errors from eqVolumeFileCount are wrapped with context and propagated.
// The Do method logs these errors and continues processing to ensure other volumes are checked.
func ( vcd * volumeCheckDisk ) shouldSkipVolume ( a , b * VolumeReplica ) ( bool , error ) {
if vcd . slowMode {
// never skip volumes on slow mode
return false , nil
}
pulseTimeAtSecond := vcd . now . Add ( - constants . VolumePulsePeriod * 2 ) . Unix ( )
doSyncDeletedCount := false
if vcd . syncDeletions && a . info . DeleteCount != b . info . DeleteCount {