@ -3,11 +3,13 @@ package shell
import (
"bytes"
"context"
"errors"
"flag"
"fmt"
"io"
"math"
"net/http"
"strings"
"time"
"slices"
@ -158,7 +160,7 @@ func (vcd *volumeCheckDisk) checkWriteableVolumes(volumeReplicas map[uint32][]*V
continue
}
}
if err := vcd . syncTwoReplicas ( a , b ) ; err != nil {
if err := vcd . syncTwoReplicas ( a , b , true ) ; err != nil {
vcd . write ( "sync volume %d on %s and %s: %v\n" , a . info . Id , a . location . dataNode . Id , b . location . dataNode . Id , err )
}
// always choose the larger volume to be the source
@ -282,62 +284,81 @@ func (vcd *volumeCheckDisk) shouldSkipVolume(a, b *VolumeReplica) (bool, error)
return true , nil
}
func ( vcd * volumeCheckDisk ) syncTwoReplicas ( a * VolumeReplica , b * VolumeReplica ) ( err error ) {
aHasChanges , bHasChanges := true , true
// syncTwoReplicas attempts to sync all entries from a source volume replica into a target. If bi-directional mode
// is enabled, changes from target are also synced back into the source.
func ( vcd * volumeCheckDisk ) syncTwoReplicas ( source , target * VolumeReplica , bidi bool ) ( err error ) {
sourceHasChanges , targetHasChanges := true , true
const maxIterations = 5
iteration := 0
for ( aHasChanges || b HasChanges) && iteration < maxIterations {
for ( sourceHasChanges || target HasChanges) && iteration < maxIterations {
iteration ++
vcd . writeVerbose ( "sync iteration %d for volume %d\n" , iteration , a . info . Id )
vcd . writeVerbose ( "sync iteration %d/%d for volume %d\n" , iteration , m axIterations , source . info . Id )
prevAHasChanges , prevBHasChanges := aHasChanges , b HasChanges
if aHasChanges , b HasChanges, err = vcd . checkBoth ( a , b ) ; err != nil {
prevSourceHasChanges , prevDestHasChanges := sourceHasChanges , target HasChanges
if sourceHasChanges , target HasChanges, err = vcd . checkBoth ( source , target , bidi ) ; err != nil {
return err
}
// Detect if we're stuck in a loop with no progress
if iteration > 1 && prevAHasChanges == aHasChanges && prevBHasChanges == bHasChanges && ( aHasChanges || b HasChanges ) {
if iteration > 1 && prevSourceHasChanges == sourceHasChanges && prevDestHasChanges == targetHasChanges && ( sourceHasChanges || target HasChanges ) {
vcd . write ( "volume %d sync is not making progress between %s and %s after iteration %d, stopping to prevent infinite loop\n" ,
a . info . Id , a . location . dataNode . Id , b . location . dataNode . Id , iteration )
source . info . Id , source . location . dataNode . Id , target . location . dataNode . Id , iteration )
return fmt . Errorf ( "sync not making progress after %d iterations" , iteration )
}
}
if iteration >= maxIterations && ( aHasChanges || b HasChanges) {
if iteration >= maxIterations && ( sourceHasChanges || target HasChanges) {
vcd . write ( "volume %d sync reached maximum iterations (%d) between %s and %s, may need manual intervention\n" ,
a . info . Id , maxIterations , a . location . dataNode . Id , b . location . dataNode . Id )
source . info . Id , maxIterations , source . location . dataNode . Id , target . location . dataNode . Id )
return fmt . Errorf ( "reached maximum sync iterations (%d)" , maxIterations )
}
return nil
}
func ( vcd * volumeCheckDisk ) checkBoth ( a * VolumeReplica , b * VolumeReplica ) ( aHasChanges bool , bHasChanges bool , err error ) {
aDB , bDB := needle_map . NewMemDb ( ) , needle_map . NewMemDb ( )
// checkBoth performs a sync between source and target volume replicas. If bi-directional mode is enabled, changes from target are also synced back into the source.
// Returns whether the source and/or target were modified.
func ( vcd * volumeCheckDisk ) checkBoth ( source , target * VolumeReplica , bidi bool ) ( sourceHasChanges bool , targetHasChanges bool , err error ) {
sourceDB , targetDB := needle_map . NewMemDb ( ) , needle_map . NewMemDb ( )
if sourceDB == nil || targetDB == nil {
return false , false , fmt . Errorf ( "failed to allocate in-memory needle DBs" )
}
defer func ( ) {
aDB . Close ( )
bDB . Close ( )
source DB. Close ( )
target DB. Close ( )
} ( )
// read index db
if err = vcd . readIndexDatabase ( aDB , a . info . Collection , a . info . Id , pb . NewServerAddressFromDataNode ( a . location . dataNode ) ) ; err != nil {
return true , true , fmt . Errorf ( "readIndexDatabase %s volume %d: %v" , a . location . dataNode , a . info . Id , err )
if err = vcd . readIndexDatabase ( sourceDB , source . info . Collection , source . info . Id , pb . NewServerAddressFromDataNode ( source . location . dataNode ) ) ; err != nil {
return true , true , fmt . Errorf ( "readIndexDatabase %s volume %d: %v" , source . location . dataNode . Id , source . info . Id , err )
}
if err := vcd . readIndexDatabase ( bDB , b . info . Collection , b . info . Id , pb . NewServerAddressFromDataNode ( b . location . dataNode ) ) ; err != nil {
return true , true , fmt . Errorf ( "readIndexDatabase %s volume %d: %v" , b . location . dataNode , b . info . Id , err )
if err := vcd . readIndexDatabase ( targetDB , target . info . Collection , target . info . Id , pb . NewServerAddressFromDataNode ( target . location . dataNode ) ) ; err != nil {
return true , true , fmt . Errorf ( "readIndexDatabase %s volume %d: %v" , target . location . dataNode . Id , target . info . Id , err )
}
// find and make up the differences
aHasChanges , err1 := vcd . doVolumeCheckDisk ( bDB , aDB , b , a )
bHasChanges , err2 := vcd . doVolumeCheckDisk ( aDB , bDB , a , b )
if err1 != nil {
return aHasChanges , bHasChanges , fmt . Errorf ( "doVolumeCheckDisk source:%s target:%s volume %d: %v" , b . location . dataNode . Id , a . location . dataNode . Id , b . info . Id , err1 )
errStrs := [ ] string { }
targetHasChanges , err = vcd . doVolumeCheckDisk ( sourceDB , targetDB , source , target )
if err != nil {
errStrs = append (
errStrs ,
fmt . Sprintf ( "doVolumeCheckDisk source:%s target:%s volume %d: %v" , target . location . dataNode . Id , source . location . dataNode . Id , target . info . Id , err ) )
}
sourceHasChanges = false
if bidi {
sourceHasChanges , err = vcd . doVolumeCheckDisk ( targetDB , sourceDB , target , source )
if err != nil {
errStrs = append (
errStrs ,
fmt . Sprintf ( "doVolumeCheckDisk source:%s target:%s volume %d: %v" , source . location . dataNode . Id , target . location . dataNode . Id , source . info . Id , err ) )
}
}
if err2 != nil {
return aHasChanges , bHasChanges , fmt . Errorf ( "doVolumeCheckDisk source:%s target:%s volume %d: %v" , a . location . dataNode . Id , b . location . dataNode . Id , a . info . Id , err2 )
if len ( errStrs ) != 0 {
return sourceHasChanges , targetHasChanges , errors . New ( strings . Join ( errStrs , ", " ) )
}
return aHasChanges , bHasChanges , nil
return sourceHasChanges , targetHasChanges , nil
}
func ( vcd * volumeCheckDisk ) doVolumeCheckDisk ( minuend , subtrahend * needle_map . MemDb , source , target * VolumeReplica ) ( hasChanges bool , err error ) {