@ -8,7 +8,6 @@ import (
"io"
"math"
"net/http"
"sync"
"time"
"slices"
@ -26,9 +25,18 @@ func init() {
Commands = append ( Commands , & commandVolumeCheckDisk { } )
}
type commandVolumeCheckDisk struct {
env * CommandEnv
type commandVolumeCheckDisk struct { }
type volumeCheckDisk struct {
commandEnv * CommandEnv
writer io . Writer
now time . Time
slowMode bool
verbose bool
applyChanges bool
syncDeletions bool
nonRepairThreshold float64
}
func ( c * commandVolumeCheckDisk ) Name ( ) string {
@ -53,67 +61,6 @@ func (c *commandVolumeCheckDisk) HasTag(tag CommandTag) bool {
return tag == ResourceHeavy
}
func ( c * commandVolumeCheckDisk ) getVolumeStatusFileCount ( vid uint32 , dn * master_pb . DataNodeInfo ) ( totalFileCount , deletedFileCount uint64 ) {
err := operation . WithVolumeServerClient ( false , pb . NewServerAddressWithGrpcPort ( dn . Id , int ( dn . GrpcPort ) ) , c . env . option . GrpcDialOption , func ( volumeServerClient volume_server_pb . VolumeServerClient ) error {
resp , reqErr := volumeServerClient . VolumeStatus ( context . Background ( ) , & volume_server_pb . VolumeStatusRequest {
VolumeId : uint32 ( vid ) ,
} )
if resp != nil {
totalFileCount = resp . FileCount
deletedFileCount = resp . FileDeletedCount
}
return reqErr
} )
if err != nil {
fmt . Fprintf ( c . writer , "getting number of files for volume id %d from volumes status: %+v\n" , vid , err )
}
return totalFileCount , deletedFileCount
}
func ( c * commandVolumeCheckDisk ) eqVolumeFileCount ( a , b * VolumeReplica ) ( bool , bool ) {
var waitGroup sync . WaitGroup
var fileCountA , fileCountB , fileDeletedCountA , fileDeletedCountB uint64
waitGroup . Add ( 1 )
go func ( ) {
defer waitGroup . Done ( )
fileCountA , fileDeletedCountA = c . getVolumeStatusFileCount ( a . info . Id , a . location . dataNode )
} ( )
waitGroup . Add ( 1 )
go func ( ) {
defer waitGroup . Done ( )
fileCountB , fileDeletedCountB = c . getVolumeStatusFileCount ( b . info . Id , b . location . dataNode )
} ( )
// Trying to synchronize a remote call to two nodes
waitGroup . Wait ( )
return fileCountA == fileCountB , fileDeletedCountA == fileDeletedCountB
}
func ( c * commandVolumeCheckDisk ) shouldSkipVolume ( a , b * VolumeReplica , pulseTime time . Time , syncDeletions , verbose bool ) bool {
pulseTimeAtSecond := pulseTime . Unix ( )
doSyncDeletedCount := false
if syncDeletions && a . info . DeleteCount != b . info . DeleteCount {
doSyncDeletedCount = true
}
if ( a . info . FileCount != b . info . FileCount ) || doSyncDeletedCount {
// Do synchronization of volumes, if the modification time was before the last pulsation time
if a . info . ModifiedAtSecond < pulseTimeAtSecond || b . info . ModifiedAtSecond < pulseTimeAtSecond {
return false
}
if eqFileCount , eqDeletedFileCount := c . eqVolumeFileCount ( a , b ) ; eqFileCount {
if doSyncDeletedCount && ! eqDeletedFileCount {
return false
}
if verbose {
fmt . Fprintf ( c . writer , "skipping active volumes %d with the same file counts on %s and %s\n" ,
a . info . Id , a . location . dataNode . Id , b . location . dataNode . Id )
}
} else {
return false
}
}
return true
}
func ( c * commandVolumeCheckDisk ) Do ( args [ ] string , commandEnv * CommandEnv , writer io . Writer ) ( err error ) {
fsckCommand := flag . NewFlagSet ( c . Name ( ) , flag . ContinueOnError )
@ -135,11 +82,20 @@ func (c *commandVolumeCheckDisk) Do(args []string, commandEnv *CommandEnv, write
if err = commandEnv . confirmIsLocked ( args ) ; err != nil {
return
}
c . env = commandEnv
c . writer = writer
vcd := & volumeCheckDisk {
commandEnv : commandEnv ,
writer : writer ,
now : time . Now ( ) ,
slowMode : * slowMode ,
verbose : * verbose ,
applyChanges : * applyChanges ,
syncDeletions : * syncDeletions ,
nonRepairThreshold : * nonRepairThreshold ,
}
// collect topology information
pulseTime := time . Now ( ) . Add ( - constants . VolumePulsePeriod * 2 )
topologyInfo , _ , err := collectTopologyInfo ( commandEnv , 0 )
if err != nil {
return err
@ -155,7 +111,7 @@ func (c *commandVolumeCheckDisk) Do(args []string, commandEnv *CommandEnv, write
var writableReplicas [ ] * VolumeReplica
for _ , replica := range replicas {
if replica . info . ReadOnly {
fmt . Fprintf ( writer , "skipping readonly volume %d on %s\n" , replica . info . Id , replica . location . dataNode . Id )
vcd . write ( "skipping readonly volume %d on %s\n" , replica . info . Id , replica . location . dataNode . Id )
} else {
writableReplicas = append ( writableReplicas , replica )
}
@ -166,13 +122,19 @@ func (c *commandVolumeCheckDisk) Do(args []string, commandEnv *CommandEnv, write
} )
for len ( writableReplicas ) >= 2 {
a , b := writableReplicas [ 0 ] , writableReplicas [ 1 ]
if ! * slowMode && c . shouldSkipVolume ( a , b , pulseTime , * syncDeletions , * verbose ) {
if ! vcd . slowMode {
shouldSkip , err := vcd . shouldSkipVolume ( a , b )
if err != nil {
vcd . write ( "error checking if volume %d should be skipped: %v\n" , a . info . Id , err )
// Continue with sync despite error to be safe
} else if shouldSkip {
// always choose the larger volume to be the source
writableReplicas = append ( replicas [ : 1 ] , writableReplicas [ 2 : ] ... )
writableReplicas = append ( w ritableR eplicas[ : 1 ] , writableReplicas [ 2 : ] ... )
continue
}
if err := c . syncTwoReplicas ( a , b , * applyChanges , * syncDeletions , * nonRepairThreshold , * verbose ) ; err != nil {
fmt . Fprintf ( writer , "sync volume %d on %s and %s: %v\n" , a . info . Id , a . location . dataNode . Id , b . location . dataNode . Id , err )
}
if err := vcd . syncTwoReplicas ( a , b ) ; err != nil {
vcd . write ( "sync volume %d on %s and %s: %v\n" , a . info . Id , a . location . dataNode . Id , b . location . dataNode . Id , err )
}
// always choose the larger volume to be the source
if a . info . FileCount > b . info . FileCount {
@ -186,32 +148,134 @@ func (c *commandVolumeCheckDisk) Do(args []string, commandEnv *CommandEnv, write
return nil
}
func ( c * commandVolumeCheckDisk ) syncTwoReplicas ( a * VolumeReplica , b * VolumeReplica , applyChanges bool , doSyncDeletions bool , nonRepairThreshold float64 , verbose bool ) ( err error ) {
func ( vcd * volumeCheckDisk ) isLocked ( ) bool {
return vcd . commandEnv . isLocked ( )
}
func ( vcd * volumeCheckDisk ) grpcDialOption ( ) grpc . DialOption {
return vcd . commandEnv . option . GrpcDialOption
}
func ( vcd * volumeCheckDisk ) write ( format string , a ... any ) {
fmt . Fprintf ( vcd . writer , format , a ... )
}
func ( vcd * volumeCheckDisk ) writeVerbose ( format string , a ... any ) {
if vcd . verbose {
fmt . Fprintf ( vcd . writer , format , a ... )
}
}
// getVolumeStatusFileCount retrieves the current file count and deleted file count
// from a volume server via gRPC.
func ( vcd * volumeCheckDisk ) getVolumeStatusFileCount ( vid uint32 , dn * master_pb . DataNodeInfo ) ( totalFileCount , deletedFileCount uint64 , err error ) {
err = operation . WithVolumeServerClient ( false , pb . NewServerAddressWithGrpcPort ( dn . Id , int ( dn . GrpcPort ) ) , vcd . grpcDialOption ( ) , func ( volumeServerClient volume_server_pb . VolumeServerClient ) error {
resp , reqErr := volumeServerClient . VolumeStatus ( context . Background ( ) , & volume_server_pb . VolumeStatusRequest {
VolumeId : uint32 ( vid ) ,
} )
if resp != nil {
totalFileCount = resp . FileCount
deletedFileCount = resp . FileDeletedCount
}
return reqErr
} )
return totalFileCount , deletedFileCount , err
}
// eqVolumeFileCount compares the real-time file counts of two volume replicas
// by making sequential gRPC calls to their volume servers.
//
// Returns:
// - bool: true if file counts match
// - bool: true if deleted file counts match
// - error: any error from volume server communication
//
// Error Handling: Errors from getVolumeStatusFileCount are wrapped with context
// (volume ID and server) and propagated up. Uses fmt.Errorf with %w to maintain
// error chain for errors.Is() and errors.As().
func ( vcd * volumeCheckDisk ) eqVolumeFileCount ( a , b * VolumeReplica ) ( bool , bool , error ) {
fileCountA , fileDeletedCountA , errA := vcd . getVolumeStatusFileCount ( a . info . Id , a . location . dataNode )
if errA != nil {
return false , false , fmt . Errorf ( "getting volume %d status from %s: %w" , a . info . Id , a . location . dataNode . Id , errA )
}
fileCountB , fileDeletedCountB , errB := vcd . getVolumeStatusFileCount ( b . info . Id , b . location . dataNode )
if errB != nil {
return false , false , fmt . Errorf ( "getting volume %d status from %s: %w" , b . info . Id , b . location . dataNode . Id , errB )
}
return fileCountA == fileCountB , fileDeletedCountA == fileDeletedCountB , nil
}
// shouldSkipVolume determines whether two volume replicas should skip synchronization.
//
// Logic:
// 1. If file counts and delete counts match (when syncDeletions enabled), skip sync
// 2. If counts differ AND both volumes were modified recently (>= pulseTimeAtSecond),
// they may still be actively receiving writes, so we return true to skip sync and
// avoid false positives
// 3. If counts differ AND at least one volume was modified before the pulse cutoff,
// call eqVolumeFileCount to get real-time counts from volume servers
//
// Returns:
// - bool: true if sync should be skipped
// - error: any error from volume server communication (when eqVolumeFileCount is called)
//
// Error Handling: Errors from eqVolumeFileCount are wrapped with context and propagated.
// The Do method logs these errors and continues processing to ensure other volumes are checked.
func ( vcd * volumeCheckDisk ) shouldSkipVolume ( a , b * VolumeReplica ) ( bool , error ) {
pulseTimeAtSecond := vcd . now . Add ( - constants . VolumePulsePeriod * 2 ) . Unix ( )
doSyncDeletedCount := false
if vcd . syncDeletions && a . info . DeleteCount != b . info . DeleteCount {
doSyncDeletedCount = true
}
if ( a . info . FileCount != b . info . FileCount ) || doSyncDeletedCount {
// Do synchronization of volumes, if the modification time was before the last pulsation time
if a . info . ModifiedAtSecond < pulseTimeAtSecond || b . info . ModifiedAtSecond < pulseTimeAtSecond {
return false , nil
}
eqFileCount , eqDeletedFileCount , err := vcd . eqVolumeFileCount ( a , b )
if err != nil {
return false , fmt . Errorf ( "comparing volume %d file counts on %s and %s: %w" ,
a . info . Id , a . location . dataNode . Id , b . location . dataNode . Id , err )
}
if eqFileCount {
if doSyncDeletedCount && ! eqDeletedFileCount {
return false , nil
}
vcd . writeVerbose ( "skipping active volumes %d with the same file counts on %s and %s\n" ,
a . info . Id , a . location . dataNode . Id , b . location . dataNode . Id )
} else {
return false , nil
}
}
return true , nil
}
func ( vcd * volumeCheckDisk ) syncTwoReplicas ( a * VolumeReplica , b * VolumeReplica ) ( err error ) {
aHasChanges , bHasChanges := true , true
const maxIterations = 5
iteration := 0
for ( aHasChanges || bHasChanges ) && iteration < maxIterations {
iteration ++
if verbose {
fmt . Fprintf ( c . writer , "sync iteration %d for volume %d\n" , iteration , a . info . Id )
}
vcd . writeVerbose ( "sync iteration %d for volume %d\n" , iteration , a . info . Id )
prevAHasChanges , prevBHasChanges := aHasChanges , bHasChanges
if aHasChanges , bHasChanges , err = c . checkBoth ( a , b , applyChanges , doSyncDeletions , nonRepairThreshold , verbose ) ; err != nil {
if aHasChanges , bHasChanges , err = v cd . checkBoth ( a , b ) ; err != nil {
return err
}
// Detect if we're stuck in a loop with no progress
if iteration > 1 && prevAHasChanges == aHasChanges && prevBHasChanges == bHasChanges && ( aHasChanges || bHasChanges ) {
fmt . Fprintf ( c . writer , "volume %d sync is not making progress between %s and %s after iteration %d, stopping to prevent infinite loop\n" ,
vcd . write ( "volume %d sync is not making progress between %s and %s after iteration %d, stopping to prevent infinite loop\n" ,
a . info . Id , a . location . dataNode . Id , b . location . dataNode . Id , iteration )
return fmt . Errorf ( "sync not making progress after %d iterations" , iteration )
}
}
if iteration >= maxIterations && ( aHasChanges || bHasChanges ) {
fmt . Fprintf ( c . writer , "volume %d sync reached maximum iterations (%d) between %s and %s, may need manual intervention\n" ,
vcd . write ( "volume %d sync reached maximum iterations (%d) between %s and %s, may need manual intervention\n" ,
a . info . Id , maxIterations , a . location . dataNode . Id , b . location . dataNode . Id )
return fmt . Errorf ( "reached maximum sync iterations (%d)" , maxIterations )
}
@ -219,7 +283,7 @@ func (c *commandVolumeCheckDisk) syncTwoReplicas(a *VolumeReplica, b *VolumeRepl
return nil
}
func ( c * commandV olumeCheckDisk) checkBoth ( a * VolumeReplica , b * VolumeReplica , applyChanges bool , doSyncDeletions bool , nonRepairThreshold float64 , verbose bool ) ( aHasChanges bool , bHasChanges bool , err error ) {
func ( vcd * v olumeCheckDisk) checkBoth ( a * VolumeReplica , b * VolumeReplica ) ( aHasChanges bool , bHasChanges bool , err error ) {
aDB , bDB := needle_map . NewMemDb ( ) , needle_map . NewMemDb ( )
defer func ( ) {
aDB . Close ( )
@ -227,17 +291,16 @@ func (c *commandVolumeCheckDisk) checkBoth(a *VolumeReplica, b *VolumeReplica, a
} ( )
// read index db
readIndexDbCutoffFrom := uint64 ( time . Now ( ) . UnixNano ( ) )
if err = readIndexDatabase ( aDB , a . info . Collection , a . info . Id , pb . NewServerAddressFromDataNode ( a . location . dataNode ) , verbose , c . writer , c . env . option . GrpcDialOption ) ; err != nil {
if err = vcd . readIndexDatabase ( aDB , a . info . Collection , a . info . Id , pb . NewServerAddressFromDataNode ( a . location . dataNode ) ) ; err != nil {
return true , true , fmt . Errorf ( "readIndexDatabase %s volume %d: %v" , a . location . dataNode , a . info . Id , err )
}
if err := readIndexDatabase ( bDB , b . info . Collection , b . info . Id , pb . NewServerAddressFromDataNode ( b . location . dataNode ) , verbose , c . writer , c . env . option . GrpcDialOption ) ; err != nil {
if err := vcd . readIndexDatabase ( bDB , b . info . Collection , b . info . Id , pb . NewServerAddressFromDataNode ( b . location . dataNode ) ) ; err != nil {
return true , true , fmt . Errorf ( "readIndexDatabase %s volume %d: %v" , b . location . dataNode , b . info . Id , err )
}
// find and make up the differences
aHasChanges , err1 := doVolumeCheckDisk ( bDB , aDB , b , a , verbose , c . writer , applyChanges , doSyncDeletions , nonRepairThreshold , readIndexDbCutoffFrom , c . env . option . GrpcDialOption )
bHasChanges , err2 := doVolumeCheckDisk ( aDB , bDB , a , b , verbose , c . writer , applyChanges , doSyncDeletions , nonRepairThreshold , readIndexDbCutoffFrom , c . env . option . GrpcDialOption )
aHasChanges , err1 := vcd . doVolumeCheckDisk ( bDB , aDB , b , a )
bHasChanges , err2 := vcd . doVolumeCheckDisk ( aDB , bDB , a , b )
if err1 != nil {
return aHasChanges , bHasChanges , fmt . Errorf ( "doVolumeCheckDisk source:%s target:%s volume %d: %v" , b . location . dataNode . Id , a . location . dataNode . Id , b . info . Id , err1 )
}
@ -247,7 +310,7 @@ func (c *commandVolumeCheckDisk) checkBoth(a *VolumeReplica, b *VolumeReplica, a
return aHasChanges , bHasChanges , nil
}
func doVolumeCheckDisk ( minuend , subtrahend * needle_map . MemDb , source , target * VolumeReplica , verbose bool , writer io . Writer , applyChanges bool , doSyncDeletions bool , nonRepairThreshold float64 , cutoffFromAtNs uint64 , grpcDialOption grpc . DialOption ) ( hasChanges bool , err error ) {
func ( vcd * volumeCheckDisk ) doVolumeCheckDisk ( minuend , subtrahend * needle_map . MemDb , source , target * VolumeReplica ) ( hasChanges bool , err error ) {
// find missing keys
// hash join, can be more efficient
@ -255,6 +318,8 @@ func doVolumeCheckDisk(minuend, subtrahend *needle_map.MemDb, source, target *Vo
var partiallyDeletedNeedles [ ] needle_map . NeedleValue
var counter int
doCutoffOfLastNeedle := true
cutoffFromAtNs := uint64 ( vcd . now . UnixNano ( ) )
minuend . DescendingVisit ( func ( minuendValue needle_map . NeedleValue ) error {
counter ++
if subtrahendValue , found := subtrahend . Get ( minuendValue . Key ) ; ! found {
@ -262,7 +327,7 @@ func doVolumeCheckDisk(minuend, subtrahend *needle_map.MemDb, source, target *Vo
return nil
}
if doCutoffOfLastNeedle {
if needleMeta , err := readNeedleMeta ( grpcDialOption , pb . NewServerAddressFromDataNode ( source . location . dataNode ) , source . info . Id , minuendValue ) ; err == nil {
if needleMeta , err := readNeedleMeta ( vcd . grpcDialOption ( ) , pb . NewServerAddressFromDataNode ( source . location . dataNode ) , source . info . Id , minuendValue ) ; err == nil {
// needles older than the cutoff time are not missing yet
if needleMeta . AppendAtNs > cutoffFromAtNs {
return nil
@ -282,7 +347,7 @@ func doVolumeCheckDisk(minuend, subtrahend *needle_map.MemDb, source, target *Vo
return nil
} )
fmt . Fprintf ( writer , "volume %d %s has %d entries, %s missed %d and partially deleted %d entries\n" ,
vcd . write ( "volume %d %s has %d entries, %s missed %d and partially deleted %d entries\n" ,
source . info . Id , source . location . dataNode . Id , counter , target . location . dataNode . Id , len ( missingNeedles ) , len ( partiallyDeletedNeedles ) )
if counter == 0 || ( len ( missingNeedles ) == 0 && len ( partiallyDeletedNeedles ) == 0 ) {
@ -290,45 +355,40 @@ func doVolumeCheckDisk(minuend, subtrahend *needle_map.MemDb, source, target *Vo
}
missingNeedlesFraction := float64 ( len ( missingNeedles ) ) / float64 ( counter )
if missingNeedlesFraction > nonRepairThreshold {
if missingNeedlesFraction > vcd . nonRepairThreshold {
return false , fmt . Errorf (
"failed to start repair volume %d, percentage of missing keys is greater than the threshold: %.2f > %.2f" ,
source . info . Id , missingNeedlesFraction , nonRepairThreshold )
source . info . Id , missingNeedlesFraction , vcd . nonRepairThreshold )
}
for _ , needleValue := range missingNeedles {
needleBlob , err := readSourceNeedleBlob ( grpcDialOption , pb . NewServerAddressFromDataNode ( source . location . dataNode ) , source . info . Id , needleValue )
needleBlob , err := vcd . readSourceNeedleBlob ( pb . NewServerAddressFromDataNode ( source . location . dataNode ) , source . info . Id , needleValue )
if err != nil {
return hasChanges , err
}
if ! applyChanges {
if ! vcd . applyChanges {
continue
}
if verbose {
fmt . Fprintf ( writer , "read %s %s => %s\n" , needleValue . Key . FileId ( source . info . Id ) , source . location . dataNode . Id , target . location . dataNode . Id )
}
vcd . writeVerbose ( "read %s %s => %s\n" , needleValue . Key . FileId ( source . info . Id ) , source . location . dataNode . Id , target . location . dataNode . Id )
hasChanges = true
if err = writeNeedleBlobToTarget ( grpcDialOption , pb . NewServerAddressFromDataNode ( target . location . dataNode ) , source . info . Id , needleValue , needleBlob ) ; err != nil {
if err = vcd . writeNeedleBlobToTarget ( pb . NewServerAddressFromDataNode ( target . location . dataNode ) , source . info . Id , needleValue , needleBlob ) ; err != nil {
return hasChanges , err
}
}
if doS yncDeletions && applyChanges && len ( partiallyDeletedNeedles ) > 0 {
if vcd . s yncDeletions && vcd . applyChanges && len ( partiallyDeletedNeedles ) > 0 {
var fidList [ ] string
for _ , needleValue := range partiallyDeletedNeedles {
fidList = append ( fidList , needleValue . Key . FileId ( source . info . Id ) )
if verbose {
fmt . Fprintf ( writer , "delete %s %s => %s\n" , needleValue . Key . FileId ( source . info . Id ) , source . location . dataNode . Id , target . location . dataNode . Id )
}
vcd . writeVerbose ( "delete %s %s => %s\n" , needleValue . Key . FileId ( source . info . Id ) , source . location . dataNode . Id , target . location . dataNode . Id )
}
deleteResults := operation . DeleteFileIdsAtOneVolumeServer (
pb . NewServerAddressFromDataNode ( target . location . dataNode ) ,
grpcDialOption , fidList , false )
vcd . grpcDialOption ( ) , fidList , false )
// Check for errors in results
for _ , deleteResult := range deleteResults {
@ -343,9 +403,9 @@ func doVolumeCheckDisk(minuend, subtrahend *needle_map.MemDb, source, target *Vo
return hasChanges , nil
}
func readSourceNeedleBlob ( grpcDialOption grpc . DialOption , sourceVolumeServer pb . ServerAddress , volumeId uint32 , needleValue needle_map . NeedleValue ) ( needleBlob [ ] byte , err error ) {
func ( vcd * volumeCheckDisk ) readSourceNeedleBlob ( sourceVolumeServer pb . ServerAddress , volumeId uint32 , needleValue needle_map . NeedleValue ) ( needleBlob [ ] byte , err error ) {
err = operation . WithVolumeServerClient ( false , sourceVolumeServer , grpcDialOption , func ( client volume_server_pb . VolumeServerClient ) error {
err = operation . WithVolumeServerClient ( false , sourceVolumeServer , vcd . grpcDialOption ( ) , func ( client volume_server_pb . VolumeServerClient ) error {
resp , err := client . ReadNeedleBlob ( context . Background ( ) , & volume_server_pb . ReadNeedleBlobRequest {
VolumeId : volumeId ,
Offset : needleValue . Offset . ToActualOffset ( ) ,
@ -360,9 +420,9 @@ func readSourceNeedleBlob(grpcDialOption grpc.DialOption, sourceVolumeServer pb.
return
}
func writeNeedleBlobToTarget ( grpcDialOption grpc . DialOption , targetVolumeServer pb . ServerAddress , volumeId uint32 , needleValue needle_map . NeedleValue , needleBlob [ ] byte ) error {
func ( vcd * volumeCheckDisk ) writeNeedleBlobToTarget ( targetVolumeServer pb . ServerAddress , volumeId uint32 , needleValue needle_map . NeedleValue , needleBlob [ ] byte ) error {
return operation . WithVolumeServerClient ( false , targetVolumeServer , grpcDialOption , func ( client volume_server_pb . VolumeServerClient ) error {
return operation . WithVolumeServerClient ( false , targetVolumeServer , vcd . grpcDialOption ( ) , func ( client volume_server_pb . VolumeServerClient ) error {
_ , err := client . WriteNeedleBlob ( context . Background ( ) , & volume_server_pb . WriteNeedleBlobRequest {
VolumeId : volumeId ,
NeedleId : uint64 ( needleValue . Key ) ,
@ -371,25 +431,21 @@ func writeNeedleBlobToTarget(grpcDialOption grpc.DialOption, targetVolumeServer
} )
return err
} )
}
func readIndexDatabase ( db * needle_map . MemDb , collection string , volumeId uint32 , volumeServer pb . ServerAddress , verbose bool , writer io . Writer , grpcDialOption grpc . DialOption ) error {
func ( vcd * volumeCheckDisk ) readIndexDatabase ( db * needle_map . MemDb , collection string , volumeId uint32 , volumeServer pb . ServerAddress ) error {
var buf bytes . Buffer
if err := copyVolumeIndexFile ( collection , volumeId , volumeServer , & buf , verbose , writer , grpcDialOption ) ; err != nil {
if err := vcd . copyVolumeIndexFile ( collection , volumeId , volumeServer , & buf ) ; err != nil {
return err
}
if verbose {
fmt . Fprintf ( writer , "load collection %s volume %d index size %d from %s ...\n" , collection , volumeId , buf . Len ( ) , volumeServer )
}
vcd . writeVerbose ( "load collection %s volume %d index size %d from %s ...\n" , collection , volumeId , buf . Len ( ) , volumeServer )
return db . LoadFilterFromReaderAt ( bytes . NewReader ( buf . Bytes ( ) ) , true , false )
}
func copyVolumeIndexFile ( collection string , volumeId uint32 , volumeServer pb . ServerAddress , buf * bytes . Buffer , verbose bool , writer io . Writer , grpcDialOption grpc . DialOption ) error {
func ( vcd * volumeCheckDisk ) copyVolumeIndexFile ( collection string , volumeId uint32 , volumeServer pb . ServerAddress , buf * bytes . Buffer ) error {
return operation . WithVolumeServerClient ( true , volumeServer , grpcDialOption , func ( volumeServerClient volume_server_pb . VolumeServerClient ) error {
return operation . WithVolumeServerClient ( true , volumeServer , vcd . grpcDialOption ( ) , func ( volumeServerClient volume_server_pb . VolumeServerClient ) error {
ext := ".idx"
@ -406,7 +462,7 @@ func copyVolumeIndexFile(collection string, volumeId uint32, volumeServer pb.Ser
return fmt . Errorf ( "failed to start copying volume %d%s: %v" , volumeId , ext , err )
}
err = writeToBuffer ( copyFileClient , buf )
err = vcd . writeToBuffer ( copyFileClient , buf )
if err != nil {
return fmt . Errorf ( "failed to copy %d%s from %s: %v" , volumeId , ext , volumeServer , err )
}
@ -416,7 +472,7 @@ func copyVolumeIndexFile(collection string, volumeId uint32, volumeServer pb.Ser
} )
}
func writeToBuffer ( client volume_server_pb . VolumeServer_CopyFileClient , buf * bytes . Buffer ) error {
func ( vcd * volumeCheckDisk ) writeToBuffer ( client volume_server_pb . VolumeServer_CopyFileClient , buf * bytes . Buffer ) error {
for {
resp , receiveErr := client . Recv ( )
if receiveErr == io . EOF {