@ -10,6 +10,8 @@ import (
"math"
"math/rand/v2"
"net/http"
"strings"
"sync"
"time"
"slices"
@ -32,6 +34,7 @@ type commandVolumeCheckDisk struct{}
type volumeCheckDisk struct {
commandEnv * CommandEnv
writer io . Writer
writerMu sync . Mutex
now time . Time
slowMode bool
@ -149,14 +152,14 @@ func (c *commandVolumeCheckDisk) Do(args []string, commandEnv *CommandEnv, write
// checkWritableVolumes fixes volume replicas which are not read-only.
func ( vcd * volumeCheckDisk ) checkWritableVolumes ( volumeReplicas map [ uint32 ] [ ] * VolumeReplica ) error {
vcd . write ( "Pass #1 (writable volumes)\n " )
vcd . write ( "Pass #1 (writable volumes)" )
for _ , replicas := range volumeReplicas {
// filter readonly replica
var writableReplicas [ ] * VolumeReplica
for _ , replica := range replicas {
if replica . info . ReadOnly {
vcd . write ( "skipping readonly volume %d on %s\n " , replica . info . Id , replica . location . dataNode . Id )
vcd . write ( "skipping readonly volume %d on %s" , replica . info . Id , replica . location . dataNode . Id )
} else {
writableReplicas = append ( writableReplicas , replica )
}
@ -169,7 +172,7 @@ func (vcd *volumeCheckDisk) checkWritableVolumes(volumeReplicas map[uint32][]*Vo
a , b := writableReplicas [ 0 ] , writableReplicas [ 1 ]
shouldSkip , err := vcd . shouldSkipVolume ( a , b )
if err != nil {
vcd . write ( "error checking if volume %d should be skipped: %v\n " , a . info . Id , err )
vcd . write ( "error checking if volume %d should be skipped: %v" , a . info . Id , err )
// Continue with sync despite error to be safe
} else if shouldSkip {
// always choose the larger volume to be the source
@ -177,7 +180,7 @@ func (vcd *volumeCheckDisk) checkWritableVolumes(volumeReplicas map[uint32][]*Vo
continue
}
if err := vcd . syncTwoReplicas ( a , b , true ) ; err != nil {
vcd . write ( "sync volume %d on %s and %s: %v\n " , a . info . Id , a . location . dataNode . Id , b . location . dataNode . Id , err )
vcd . write ( "sync volume %d on %s and %s: %v" , a . info . Id , a . location . dataNode . Id , b . location . dataNode . Id , err )
}
// always choose the larger volume to be the source
if a . info . FileCount > b . info . FileCount {
@ -207,7 +210,7 @@ func (vcd *volumeCheckDisk) makeVolumeWritable(vid uint32, vr *VolumeReplica) er
return err
}
vcd . write ( "volume %d on %s is now writable\n " , vid , vr . location . dataNode . Id )
vcd . write ( "volume %d on %s is now writable" , vid , vr . location . dataNode . Id )
return nil
}
@ -227,7 +230,7 @@ func (vcd *volumeCheckDisk) makeVolumeReadonly(vid uint32, vr *VolumeReplica) er
return err
}
vcd . write ( "volume %d on %s is now read-only\n " , vid , vr . location . dataNode . Id )
vcd . write ( "volume %d on %s is now read-only" , vid , vr . location . dataNode . Id )
return nil
}
@ -235,7 +238,7 @@ func (vcd *volumeCheckDisk) checkReadOnlyVolumes(volumeReplicas map[uint32][]*Vo
if ! vcd . fixReadOnly {
return
}
vcd . write ( "Pass #2 (read-only volumes)\n " )
vcd . write ( "Pass #2 (read-only volumes)" )
for vid , replicas := range volumeReplicas {
roReplicas := [ ] * VolumeReplica { }
@ -249,11 +252,11 @@ func (vcd *volumeCheckDisk) checkReadOnlyVolumes(volumeReplicas map[uint32][]*Vo
}
}
if len ( roReplicas ) == 0 {
vcd . write ( "no read-only replicas for volume %d\n " , vid )
vcd . write ( "no read-only replicas for volume %d" , vid )
continue
}
if len ( rwReplicas ) == 0 {
vcd . write ( "got %d read-only replicas for volume %d and no writable replicas to fix from\n " , len ( roReplicas ) , vid )
vcd . write ( "got %d read-only replicas for volume %d and no writable replicas to fix from" , len ( roReplicas ) , vid )
continue
}
@ -303,12 +306,15 @@ func (vcd *volumeCheckDisk) grpcDialOption() grpc.DialOption {
}
func ( vcd * volumeCheckDisk ) write ( format string , a ... any ) {
fmt . Fprintf ( vcd . writer , format , a ... )
vcd . writerMu . Lock ( )
defer vcd . writerMu . Unlock ( )
fmt . Fprintf ( vcd . writer , strings . TrimRight ( format , "\r\n " ) , a ... )
fmt . Fprint ( vcd . writer , "\n" )
}
func ( vcd * volumeCheckDisk ) writeVerbose ( format string , a ... any ) {
if vcd . verbose {
fmt . Fprintf ( vcd . writer , format , a ... )
vcd . write ( format , a ... )
}
}
@ -394,7 +400,7 @@ func (vcd *volumeCheckDisk) shouldSkipVolume(a, b *VolumeReplica) (bool, error)
if doSyncDeletedCount && ! eqDeletedFileCount {
return false , nil
}
vcd . writeVerbose ( "skipping active volumes %d with the same file counts on %s and %s\n " ,
vcd . writeVerbose ( "skipping active volumes %d with the same file counts on %s and %s" ,
a . info . Id , a . location . dataNode . Id , b . location . dataNode . Id )
} else {
return false , nil
@ -412,7 +418,7 @@ func (vcd *volumeCheckDisk) syncTwoReplicas(source, target *VolumeReplica, bidi
for ( sourceHasChanges || targetHasChanges ) && iteration < maxIterations {
iteration ++
vcd . writeVerbose ( "sync iteration %d/%d for volume %d\n " , iteration , maxIterations , source . info . Id )
vcd . writeVerbose ( "sync iteration %d/%d for volume %d" , iteration , maxIterations , source . info . Id )
prevSourceHasChanges , prevTargetHasChanges := sourceHasChanges , targetHasChanges
if sourceHasChanges , targetHasChanges , err = vcd . checkBoth ( source , target , bidi ) ; err != nil {
@ -421,14 +427,14 @@ func (vcd *volumeCheckDisk) syncTwoReplicas(source, target *VolumeReplica, bidi
// Detect if we're stuck in a loop with no progress
if iteration > 1 && prevSourceHasChanges == sourceHasChanges && prevTargetHasChanges == targetHasChanges && ( sourceHasChanges || targetHasChanges ) {
vcd . write ( "volume %d sync is not making progress between %s and %s after iteration %d, stopping to prevent infinite loop\n " ,
vcd . write ( "volume %d sync is not making progress between %s and %s after iteration %d, stopping to prevent infinite loop" ,
source . info . Id , source . location . dataNode . Id , target . location . dataNode . Id , iteration )
return fmt . Errorf ( "sync not making progress after %d iterations" , iteration )
}
}
if iteration >= maxIterations && ( sourceHasChanges || targetHasChanges ) {
vcd . write ( "volume %d sync reached maximum iterations (%d) between %s and %s, may need manual intervention\n " ,
vcd . write ( "volume %d sync reached maximum iterations (%d) between %s and %s, may need manual intervention" ,
source . info . Id , maxIterations , source . location . dataNode . Id , target . location . dataNode . Id )
return fmt . Errorf ( "reached maximum sync iterations (%d)" , maxIterations )
}
@ -518,7 +524,7 @@ func (vcd *volumeCheckDisk) doVolumeCheckDisk(minuend, subtrahend *needle_map.Me
return nil
} )
vcd . write ( "volume %d %s has %d entries, %s missed %d and partially deleted %d entries\n " ,
vcd . write ( "volume %d %s has %d entries, %s missed %d and partially deleted %d entries" ,
source . info . Id , source . location . dataNode . Id , counter , target . location . dataNode . Id , len ( missingNeedles ) , len ( partiallyDeletedNeedles ) )
if counter == 0 || ( len ( missingNeedles ) == 0 && len ( partiallyDeletedNeedles ) == 0 ) {
@ -542,7 +548,7 @@ func (vcd *volumeCheckDisk) doVolumeCheckDisk(minuend, subtrahend *needle_map.Me
continue
}
vcd . writeVerbose ( "read %s %s => %s\n " , needleValue . Key . FileId ( source . info . Id ) , source . location . dataNode . Id , target . location . dataNode . Id )
vcd . writeVerbose ( "read %s %s => %s" , needleValue . Key . FileId ( source . info . Id ) , source . location . dataNode . Id , target . location . dataNode . Id )
hasChanges = true
if err = vcd . writeNeedleBlobToTarget ( pb . NewServerAddressFromDataNode ( target . location . dataNode ) , source . info . Id , needleValue , needleBlob ) ; err != nil {
@ -555,7 +561,7 @@ func (vcd *volumeCheckDisk) doVolumeCheckDisk(minuend, subtrahend *needle_map.Me
var fidList [ ] string
for _ , needleValue := range partiallyDeletedNeedles {
fidList = append ( fidList , needleValue . Key . FileId ( source . info . Id ) )
vcd . writeVerbose ( "delete %s %s => %s\n " , needleValue . Key . FileId ( source . info . Id ) , source . location . dataNode . Id , target . location . dataNode . Id )
vcd . writeVerbose ( "delete %s %s => %s" , needleValue . Key . FileId ( source . info . Id ) , source . location . dataNode . Id , target . location . dataNode . Id )
}
deleteResults := operation . DeleteFileIdsAtOneVolumeServer (
pb . NewServerAddressFromDataNode ( target . location . dataNode ) ,
@ -610,7 +616,7 @@ func (vcd *volumeCheckDisk) readIndexDatabase(db *needle_map.MemDb, collection s
return err
}
vcd . writeVerbose ( "load collection %s volume %d index size %d from %s ...\n " , collection , volumeId , buf . Len ( ) , volumeServer )
vcd . writeVerbose ( "load collection %s volume %d index size %d from %s ..." , collection , volumeId , buf . Len ( ) , volumeServer )
return db . LoadFilterFromReaderAt ( bytes . NewReader ( buf . Bytes ( ) ) , true , false )
}