From 9744382a183f9b2d0ca0db44ffd30192631000aa Mon Sep 17 00:00:00 2001 From: Lisandro Pin Date: Tue, 11 Nov 2025 01:03:38 +0100 Subject: [PATCH] Rework parameters passing for functions within `volume.check.disk`. (#7448) * Rework parameters passing for functions within `volume.check.disk`. We'll need to rework this logic to account for read-only volumes, and there're already way too many parameters shuffled around. Grouping these into a single struct simplifies the overall codebase. * similar fix * Improved Error Handling in Tests * propagate the errors * edge cases * edge case on modified time * clean up --------- Co-authored-by: chrislu --- weed/shell/command_volume_check_disk.go | 296 +++++++++++-------- weed/shell/command_volume_check_disk_test.go | 265 +++++++++++++++-- weed/shell/command_volume_fix_replication.go | 23 +- 3 files changed, 437 insertions(+), 147 deletions(-) diff --git a/weed/shell/command_volume_check_disk.go b/weed/shell/command_volume_check_disk.go index 2abfc288c..740c9679d 100644 --- a/weed/shell/command_volume_check_disk.go +++ b/weed/shell/command_volume_check_disk.go @@ -8,7 +8,6 @@ import ( "io" "math" "net/http" - "sync" "time" "slices" @@ -26,9 +25,18 @@ func init() { Commands = append(Commands, &commandVolumeCheckDisk{}) } -type commandVolumeCheckDisk struct { - env *CommandEnv - writer io.Writer +type commandVolumeCheckDisk struct{} + +type volumeCheckDisk struct { + commandEnv *CommandEnv + writer io.Writer + now time.Time + + slowMode bool + verbose bool + applyChanges bool + syncDeletions bool + nonRepairThreshold float64 } func (c *commandVolumeCheckDisk) Name() string { @@ -53,67 +61,6 @@ func (c *commandVolumeCheckDisk) HasTag(tag CommandTag) bool { return tag == ResourceHeavy } -func (c *commandVolumeCheckDisk) getVolumeStatusFileCount(vid uint32, dn *master_pb.DataNodeInfo) (totalFileCount, deletedFileCount uint64) { - err := operation.WithVolumeServerClient(false, pb.NewServerAddressWithGrpcPort(dn.Id, int(dn.GrpcPort)), c.env.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { - resp, reqErr := volumeServerClient.VolumeStatus(context.Background(), &volume_server_pb.VolumeStatusRequest{ - VolumeId: uint32(vid), - }) - if resp != nil { - totalFileCount = resp.FileCount - deletedFileCount = resp.FileDeletedCount - } - return reqErr - }) - if err != nil { - fmt.Fprintf(c.writer, "getting number of files for volume id %d from volumes status: %+v\n", vid, err) - } - return totalFileCount, deletedFileCount -} - -func (c *commandVolumeCheckDisk) eqVolumeFileCount(a, b *VolumeReplica) (bool, bool) { - var waitGroup sync.WaitGroup - var fileCountA, fileCountB, fileDeletedCountA, fileDeletedCountB uint64 - waitGroup.Add(1) - go func() { - defer waitGroup.Done() - fileCountA, fileDeletedCountA = c.getVolumeStatusFileCount(a.info.Id, a.location.dataNode) - }() - waitGroup.Add(1) - go func() { - defer waitGroup.Done() - fileCountB, fileDeletedCountB = c.getVolumeStatusFileCount(b.info.Id, b.location.dataNode) - }() - // Trying to synchronize a remote call to two nodes - waitGroup.Wait() - return fileCountA == fileCountB, fileDeletedCountA == fileDeletedCountB -} - -func (c *commandVolumeCheckDisk) shouldSkipVolume(a, b *VolumeReplica, pulseTime time.Time, syncDeletions, verbose bool) bool { - pulseTimeAtSecond := pulseTime.Unix() - doSyncDeletedCount := false - if syncDeletions && a.info.DeleteCount != b.info.DeleteCount { - doSyncDeletedCount = true - } - if (a.info.FileCount != b.info.FileCount) || doSyncDeletedCount { - // Do synchronization of volumes, if the modification time was before the last pulsation time - if a.info.ModifiedAtSecond < pulseTimeAtSecond || b.info.ModifiedAtSecond < pulseTimeAtSecond { - return false - } - if eqFileCount, eqDeletedFileCount := c.eqVolumeFileCount(a, b); eqFileCount { - if doSyncDeletedCount && !eqDeletedFileCount { - return false - } - if verbose { - fmt.Fprintf(c.writer, "skipping active volumes %d with the same file counts on %s and %s\n", - a.info.Id, a.location.dataNode.Id, b.location.dataNode.Id) - } - } else { - return false - } - } - return true -} - func (c *commandVolumeCheckDisk) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { fsckCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) @@ -135,11 +82,20 @@ func (c *commandVolumeCheckDisk) Do(args []string, commandEnv *CommandEnv, write if err = commandEnv.confirmIsLocked(args); err != nil { return } - c.env = commandEnv - c.writer = writer + + vcd := &volumeCheckDisk{ + commandEnv: commandEnv, + writer: writer, + now: time.Now(), + + slowMode: *slowMode, + verbose: *verbose, + applyChanges: *applyChanges, + syncDeletions: *syncDeletions, + nonRepairThreshold: *nonRepairThreshold, + } // collect topology information - pulseTime := time.Now().Add(-constants.VolumePulsePeriod * 2) topologyInfo, _, err := collectTopologyInfo(commandEnv, 0) if err != nil { return err @@ -155,7 +111,7 @@ func (c *commandVolumeCheckDisk) Do(args []string, commandEnv *CommandEnv, write var writableReplicas []*VolumeReplica for _, replica := range replicas { if replica.info.ReadOnly { - fmt.Fprintf(writer, "skipping readonly volume %d on %s\n", replica.info.Id, replica.location.dataNode.Id) + vcd.write("skipping readonly volume %d on %s\n", replica.info.Id, replica.location.dataNode.Id) } else { writableReplicas = append(writableReplicas, replica) } @@ -166,13 +122,19 @@ func (c *commandVolumeCheckDisk) Do(args []string, commandEnv *CommandEnv, write }) for len(writableReplicas) >= 2 { a, b := writableReplicas[0], writableReplicas[1] - if !*slowMode && c.shouldSkipVolume(a, b, pulseTime, *syncDeletions, *verbose) { - // always choose the larger volume to be the source - writableReplicas = append(replicas[:1], writableReplicas[2:]...) - continue + if !vcd.slowMode { + shouldSkip, err := vcd.shouldSkipVolume(a, b) + if err != nil { + vcd.write("error checking if volume %d should be skipped: %v\n", a.info.Id, err) + // Continue with sync despite error to be safe + } else if shouldSkip { + // always choose the larger volume to be the source + writableReplicas = append(writableReplicas[:1], writableReplicas[2:]...) + continue + } } - if err := c.syncTwoReplicas(a, b, *applyChanges, *syncDeletions, *nonRepairThreshold, *verbose); err != nil { - fmt.Fprintf(writer, "sync volume %d on %s and %s: %v\n", a.info.Id, a.location.dataNode.Id, b.location.dataNode.Id, err) + if err := vcd.syncTwoReplicas(a, b); err != nil { + vcd.write("sync volume %d on %s and %s: %v\n", a.info.Id, a.location.dataNode.Id, b.location.dataNode.Id, err) } // always choose the larger volume to be the source if a.info.FileCount > b.info.FileCount { @@ -186,32 +148,134 @@ func (c *commandVolumeCheckDisk) Do(args []string, commandEnv *CommandEnv, write return nil } -func (c *commandVolumeCheckDisk) syncTwoReplicas(a *VolumeReplica, b *VolumeReplica, applyChanges bool, doSyncDeletions bool, nonRepairThreshold float64, verbose bool) (err error) { +func (vcd *volumeCheckDisk) isLocked() bool { + return vcd.commandEnv.isLocked() +} + +func (vcd *volumeCheckDisk) grpcDialOption() grpc.DialOption { + return vcd.commandEnv.option.GrpcDialOption +} + +func (vcd *volumeCheckDisk) write(format string, a ...any) { + fmt.Fprintf(vcd.writer, format, a...) +} + +func (vcd *volumeCheckDisk) writeVerbose(format string, a ...any) { + if vcd.verbose { + fmt.Fprintf(vcd.writer, format, a...) + } +} + +// getVolumeStatusFileCount retrieves the current file count and deleted file count +// from a volume server via gRPC. +func (vcd *volumeCheckDisk) getVolumeStatusFileCount(vid uint32, dn *master_pb.DataNodeInfo) (totalFileCount, deletedFileCount uint64, err error) { + err = operation.WithVolumeServerClient(false, pb.NewServerAddressWithGrpcPort(dn.Id, int(dn.GrpcPort)), vcd.grpcDialOption(), func(volumeServerClient volume_server_pb.VolumeServerClient) error { + resp, reqErr := volumeServerClient.VolumeStatus(context.Background(), &volume_server_pb.VolumeStatusRequest{ + VolumeId: uint32(vid), + }) + if resp != nil { + totalFileCount = resp.FileCount + deletedFileCount = resp.FileDeletedCount + } + return reqErr + }) + return totalFileCount, deletedFileCount, err +} + +// eqVolumeFileCount compares the real-time file counts of two volume replicas +// by making sequential gRPC calls to their volume servers. +// +// Returns: +// - bool: true if file counts match +// - bool: true if deleted file counts match +// - error: any error from volume server communication +// +// Error Handling: Errors from getVolumeStatusFileCount are wrapped with context +// (volume ID and server) and propagated up. Uses fmt.Errorf with %w to maintain +// error chain for errors.Is() and errors.As(). +func (vcd *volumeCheckDisk) eqVolumeFileCount(a, b *VolumeReplica) (bool, bool, error) { + fileCountA, fileDeletedCountA, errA := vcd.getVolumeStatusFileCount(a.info.Id, a.location.dataNode) + if errA != nil { + return false, false, fmt.Errorf("getting volume %d status from %s: %w", a.info.Id, a.location.dataNode.Id, errA) + } + + fileCountB, fileDeletedCountB, errB := vcd.getVolumeStatusFileCount(b.info.Id, b.location.dataNode) + if errB != nil { + return false, false, fmt.Errorf("getting volume %d status from %s: %w", b.info.Id, b.location.dataNode.Id, errB) + } + + return fileCountA == fileCountB, fileDeletedCountA == fileDeletedCountB, nil +} + +// shouldSkipVolume determines whether two volume replicas should skip synchronization. +// +// Logic: +// 1. If file counts and delete counts match (when syncDeletions enabled), skip sync +// 2. If counts differ AND both volumes were modified recently (>= pulseTimeAtSecond), +// they may still be actively receiving writes, so we return true to skip sync and +// avoid false positives +// 3. If counts differ AND at least one volume was modified before the pulse cutoff, +// call eqVolumeFileCount to get real-time counts from volume servers +// +// Returns: +// - bool: true if sync should be skipped +// - error: any error from volume server communication (when eqVolumeFileCount is called) +// +// Error Handling: Errors from eqVolumeFileCount are wrapped with context and propagated. +// The Do method logs these errors and continues processing to ensure other volumes are checked. +func (vcd *volumeCheckDisk) shouldSkipVolume(a, b *VolumeReplica) (bool, error) { + pulseTimeAtSecond := vcd.now.Add(-constants.VolumePulsePeriod * 2).Unix() + doSyncDeletedCount := false + if vcd.syncDeletions && a.info.DeleteCount != b.info.DeleteCount { + doSyncDeletedCount = true + } + if (a.info.FileCount != b.info.FileCount) || doSyncDeletedCount { + // Do synchronization of volumes, if the modification time was before the last pulsation time + if a.info.ModifiedAtSecond < pulseTimeAtSecond || b.info.ModifiedAtSecond < pulseTimeAtSecond { + return false, nil + } + eqFileCount, eqDeletedFileCount, err := vcd.eqVolumeFileCount(a, b) + if err != nil { + return false, fmt.Errorf("comparing volume %d file counts on %s and %s: %w", + a.info.Id, a.location.dataNode.Id, b.location.dataNode.Id, err) + } + if eqFileCount { + if doSyncDeletedCount && !eqDeletedFileCount { + return false, nil + } + vcd.writeVerbose("skipping active volumes %d with the same file counts on %s and %s\n", + a.info.Id, a.location.dataNode.Id, b.location.dataNode.Id) + } else { + return false, nil + } + } + return true, nil +} + +func (vcd *volumeCheckDisk) syncTwoReplicas(a *VolumeReplica, b *VolumeReplica) (err error) { aHasChanges, bHasChanges := true, true const maxIterations = 5 iteration := 0 for (aHasChanges || bHasChanges) && iteration < maxIterations { iteration++ - if verbose { - fmt.Fprintf(c.writer, "sync iteration %d for volume %d\n", iteration, a.info.Id) - } + vcd.writeVerbose("sync iteration %d for volume %d\n", iteration, a.info.Id) prevAHasChanges, prevBHasChanges := aHasChanges, bHasChanges - if aHasChanges, bHasChanges, err = c.checkBoth(a, b, applyChanges, doSyncDeletions, nonRepairThreshold, verbose); err != nil { + if aHasChanges, bHasChanges, err = vcd.checkBoth(a, b); err != nil { return err } // Detect if we're stuck in a loop with no progress if iteration > 1 && prevAHasChanges == aHasChanges && prevBHasChanges == bHasChanges && (aHasChanges || bHasChanges) { - fmt.Fprintf(c.writer, "volume %d sync is not making progress between %s and %s after iteration %d, stopping to prevent infinite loop\n", + vcd.write("volume %d sync is not making progress between %s and %s after iteration %d, stopping to prevent infinite loop\n", a.info.Id, a.location.dataNode.Id, b.location.dataNode.Id, iteration) return fmt.Errorf("sync not making progress after %d iterations", iteration) } } if iteration >= maxIterations && (aHasChanges || bHasChanges) { - fmt.Fprintf(c.writer, "volume %d sync reached maximum iterations (%d) between %s and %s, may need manual intervention\n", + vcd.write("volume %d sync reached maximum iterations (%d) between %s and %s, may need manual intervention\n", a.info.Id, maxIterations, a.location.dataNode.Id, b.location.dataNode.Id) return fmt.Errorf("reached maximum sync iterations (%d)", maxIterations) } @@ -219,7 +283,7 @@ func (c *commandVolumeCheckDisk) syncTwoReplicas(a *VolumeReplica, b *VolumeRepl return nil } -func (c *commandVolumeCheckDisk) checkBoth(a *VolumeReplica, b *VolumeReplica, applyChanges bool, doSyncDeletions bool, nonRepairThreshold float64, verbose bool) (aHasChanges bool, bHasChanges bool, err error) { +func (vcd *volumeCheckDisk) checkBoth(a *VolumeReplica, b *VolumeReplica) (aHasChanges bool, bHasChanges bool, err error) { aDB, bDB := needle_map.NewMemDb(), needle_map.NewMemDb() defer func() { aDB.Close() @@ -227,17 +291,16 @@ func (c *commandVolumeCheckDisk) checkBoth(a *VolumeReplica, b *VolumeReplica, a }() // read index db - readIndexDbCutoffFrom := uint64(time.Now().UnixNano()) - if err = readIndexDatabase(aDB, a.info.Collection, a.info.Id, pb.NewServerAddressFromDataNode(a.location.dataNode), verbose, c.writer, c.env.option.GrpcDialOption); err != nil { + if err = vcd.readIndexDatabase(aDB, a.info.Collection, a.info.Id, pb.NewServerAddressFromDataNode(a.location.dataNode)); err != nil { return true, true, fmt.Errorf("readIndexDatabase %s volume %d: %v", a.location.dataNode, a.info.Id, err) } - if err := readIndexDatabase(bDB, b.info.Collection, b.info.Id, pb.NewServerAddressFromDataNode(b.location.dataNode), verbose, c.writer, c.env.option.GrpcDialOption); err != nil { + if err := vcd.readIndexDatabase(bDB, b.info.Collection, b.info.Id, pb.NewServerAddressFromDataNode(b.location.dataNode)); err != nil { return true, true, fmt.Errorf("readIndexDatabase %s volume %d: %v", b.location.dataNode, b.info.Id, err) } // find and make up the differences - aHasChanges, err1 := doVolumeCheckDisk(bDB, aDB, b, a, verbose, c.writer, applyChanges, doSyncDeletions, nonRepairThreshold, readIndexDbCutoffFrom, c.env.option.GrpcDialOption) - bHasChanges, err2 := doVolumeCheckDisk(aDB, bDB, a, b, verbose, c.writer, applyChanges, doSyncDeletions, nonRepairThreshold, readIndexDbCutoffFrom, c.env.option.GrpcDialOption) + aHasChanges, err1 := vcd.doVolumeCheckDisk(bDB, aDB, b, a) + bHasChanges, err2 := vcd.doVolumeCheckDisk(aDB, bDB, a, b) if err1 != nil { return aHasChanges, bHasChanges, fmt.Errorf("doVolumeCheckDisk source:%s target:%s volume %d: %v", b.location.dataNode.Id, a.location.dataNode.Id, b.info.Id, err1) } @@ -247,7 +310,7 @@ func (c *commandVolumeCheckDisk) checkBoth(a *VolumeReplica, b *VolumeReplica, a return aHasChanges, bHasChanges, nil } -func doVolumeCheckDisk(minuend, subtrahend *needle_map.MemDb, source, target *VolumeReplica, verbose bool, writer io.Writer, applyChanges bool, doSyncDeletions bool, nonRepairThreshold float64, cutoffFromAtNs uint64, grpcDialOption grpc.DialOption) (hasChanges bool, err error) { +func (vcd *volumeCheckDisk) doVolumeCheckDisk(minuend, subtrahend *needle_map.MemDb, source, target *VolumeReplica) (hasChanges bool, err error) { // find missing keys // hash join, can be more efficient @@ -255,6 +318,8 @@ func doVolumeCheckDisk(minuend, subtrahend *needle_map.MemDb, source, target *Vo var partiallyDeletedNeedles []needle_map.NeedleValue var counter int doCutoffOfLastNeedle := true + cutoffFromAtNs := uint64(vcd.now.UnixNano()) + minuend.DescendingVisit(func(minuendValue needle_map.NeedleValue) error { counter++ if subtrahendValue, found := subtrahend.Get(minuendValue.Key); !found { @@ -262,7 +327,7 @@ func doVolumeCheckDisk(minuend, subtrahend *needle_map.MemDb, source, target *Vo return nil } if doCutoffOfLastNeedle { - if needleMeta, err := readNeedleMeta(grpcDialOption, pb.NewServerAddressFromDataNode(source.location.dataNode), source.info.Id, minuendValue); err == nil { + if needleMeta, err := readNeedleMeta(vcd.grpcDialOption(), pb.NewServerAddressFromDataNode(source.location.dataNode), source.info.Id, minuendValue); err == nil { // needles older than the cutoff time are not missing yet if needleMeta.AppendAtNs > cutoffFromAtNs { return nil @@ -282,7 +347,7 @@ func doVolumeCheckDisk(minuend, subtrahend *needle_map.MemDb, source, target *Vo return nil }) - fmt.Fprintf(writer, "volume %d %s has %d entries, %s missed %d and partially deleted %d entries\n", + vcd.write("volume %d %s has %d entries, %s missed %d and partially deleted %d entries\n", source.info.Id, source.location.dataNode.Id, counter, target.location.dataNode.Id, len(missingNeedles), len(partiallyDeletedNeedles)) if counter == 0 || (len(missingNeedles) == 0 && len(partiallyDeletedNeedles) == 0) { @@ -290,45 +355,40 @@ func doVolumeCheckDisk(minuend, subtrahend *needle_map.MemDb, source, target *Vo } missingNeedlesFraction := float64(len(missingNeedles)) / float64(counter) - if missingNeedlesFraction > nonRepairThreshold { + if missingNeedlesFraction > vcd.nonRepairThreshold { return false, fmt.Errorf( "failed to start repair volume %d, percentage of missing keys is greater than the threshold: %.2f > %.2f", - source.info.Id, missingNeedlesFraction, nonRepairThreshold) + source.info.Id, missingNeedlesFraction, vcd.nonRepairThreshold) } for _, needleValue := range missingNeedles { - needleBlob, err := readSourceNeedleBlob(grpcDialOption, pb.NewServerAddressFromDataNode(source.location.dataNode), source.info.Id, needleValue) + needleBlob, err := vcd.readSourceNeedleBlob(pb.NewServerAddressFromDataNode(source.location.dataNode), source.info.Id, needleValue) if err != nil { return hasChanges, err } - if !applyChanges { + if !vcd.applyChanges { continue } - if verbose { - fmt.Fprintf(writer, "read %s %s => %s\n", needleValue.Key.FileId(source.info.Id), source.location.dataNode.Id, target.location.dataNode.Id) - } - + vcd.writeVerbose("read %s %s => %s\n", needleValue.Key.FileId(source.info.Id), source.location.dataNode.Id, target.location.dataNode.Id) hasChanges = true - if err = writeNeedleBlobToTarget(grpcDialOption, pb.NewServerAddressFromDataNode(target.location.dataNode), source.info.Id, needleValue, needleBlob); err != nil { + if err = vcd.writeNeedleBlobToTarget(pb.NewServerAddressFromDataNode(target.location.dataNode), source.info.Id, needleValue, needleBlob); err != nil { return hasChanges, err } } - if doSyncDeletions && applyChanges && len(partiallyDeletedNeedles) > 0 { + if vcd.syncDeletions && vcd.applyChanges && len(partiallyDeletedNeedles) > 0 { var fidList []string for _, needleValue := range partiallyDeletedNeedles { fidList = append(fidList, needleValue.Key.FileId(source.info.Id)) - if verbose { - fmt.Fprintf(writer, "delete %s %s => %s\n", needleValue.Key.FileId(source.info.Id), source.location.dataNode.Id, target.location.dataNode.Id) - } + vcd.writeVerbose("delete %s %s => %s\n", needleValue.Key.FileId(source.info.Id), source.location.dataNode.Id, target.location.dataNode.Id) } deleteResults := operation.DeleteFileIdsAtOneVolumeServer( pb.NewServerAddressFromDataNode(target.location.dataNode), - grpcDialOption, fidList, false) + vcd.grpcDialOption(), fidList, false) // Check for errors in results for _, deleteResult := range deleteResults { @@ -343,9 +403,9 @@ func doVolumeCheckDisk(minuend, subtrahend *needle_map.MemDb, source, target *Vo return hasChanges, nil } -func readSourceNeedleBlob(grpcDialOption grpc.DialOption, sourceVolumeServer pb.ServerAddress, volumeId uint32, needleValue needle_map.NeedleValue) (needleBlob []byte, err error) { +func (vcd *volumeCheckDisk) readSourceNeedleBlob(sourceVolumeServer pb.ServerAddress, volumeId uint32, needleValue needle_map.NeedleValue) (needleBlob []byte, err error) { - err = operation.WithVolumeServerClient(false, sourceVolumeServer, grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { + err = operation.WithVolumeServerClient(false, sourceVolumeServer, vcd.grpcDialOption(), func(client volume_server_pb.VolumeServerClient) error { resp, err := client.ReadNeedleBlob(context.Background(), &volume_server_pb.ReadNeedleBlobRequest{ VolumeId: volumeId, Offset: needleValue.Offset.ToActualOffset(), @@ -360,9 +420,9 @@ func readSourceNeedleBlob(grpcDialOption grpc.DialOption, sourceVolumeServer pb. return } -func writeNeedleBlobToTarget(grpcDialOption grpc.DialOption, targetVolumeServer pb.ServerAddress, volumeId uint32, needleValue needle_map.NeedleValue, needleBlob []byte) error { +func (vcd *volumeCheckDisk) writeNeedleBlobToTarget(targetVolumeServer pb.ServerAddress, volumeId uint32, needleValue needle_map.NeedleValue, needleBlob []byte) error { - return operation.WithVolumeServerClient(false, targetVolumeServer, grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { + return operation.WithVolumeServerClient(false, targetVolumeServer, vcd.grpcDialOption(), func(client volume_server_pb.VolumeServerClient) error { _, err := client.WriteNeedleBlob(context.Background(), &volume_server_pb.WriteNeedleBlobRequest{ VolumeId: volumeId, NeedleId: uint64(needleValue.Key), @@ -371,25 +431,21 @@ func writeNeedleBlobToTarget(grpcDialOption grpc.DialOption, targetVolumeServer }) return err }) - } -func readIndexDatabase(db *needle_map.MemDb, collection string, volumeId uint32, volumeServer pb.ServerAddress, verbose bool, writer io.Writer, grpcDialOption grpc.DialOption) error { - +func (vcd *volumeCheckDisk) readIndexDatabase(db *needle_map.MemDb, collection string, volumeId uint32, volumeServer pb.ServerAddress) error { var buf bytes.Buffer - if err := copyVolumeIndexFile(collection, volumeId, volumeServer, &buf, verbose, writer, grpcDialOption); err != nil { + if err := vcd.copyVolumeIndexFile(collection, volumeId, volumeServer, &buf); err != nil { return err } - if verbose { - fmt.Fprintf(writer, "load collection %s volume %d index size %d from %s ...\n", collection, volumeId, buf.Len(), volumeServer) - } + vcd.writeVerbose("load collection %s volume %d index size %d from %s ...\n", collection, volumeId, buf.Len(), volumeServer) return db.LoadFilterFromReaderAt(bytes.NewReader(buf.Bytes()), true, false) } -func copyVolumeIndexFile(collection string, volumeId uint32, volumeServer pb.ServerAddress, buf *bytes.Buffer, verbose bool, writer io.Writer, grpcDialOption grpc.DialOption) error { +func (vcd *volumeCheckDisk) copyVolumeIndexFile(collection string, volumeId uint32, volumeServer pb.ServerAddress, buf *bytes.Buffer) error { - return operation.WithVolumeServerClient(true, volumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + return operation.WithVolumeServerClient(true, volumeServer, vcd.grpcDialOption(), func(volumeServerClient volume_server_pb.VolumeServerClient) error { ext := ".idx" @@ -406,7 +462,7 @@ func copyVolumeIndexFile(collection string, volumeId uint32, volumeServer pb.Ser return fmt.Errorf("failed to start copying volume %d%s: %v", volumeId, ext, err) } - err = writeToBuffer(copyFileClient, buf) + err = vcd.writeToBuffer(copyFileClient, buf) if err != nil { return fmt.Errorf("failed to copy %d%s from %s: %v", volumeId, ext, volumeServer, err) } @@ -416,7 +472,7 @@ func copyVolumeIndexFile(collection string, volumeId uint32, volumeServer pb.Ser }) } -func writeToBuffer(client volume_server_pb.VolumeServer_CopyFileClient, buf *bytes.Buffer) error { +func (vcd *volumeCheckDisk) writeToBuffer(client volume_server_pb.VolumeServer_CopyFileClient, buf *bytes.Buffer) error { for { resp, receiveErr := client.Recv() if receiveErr == io.EOF { diff --git a/weed/shell/command_volume_check_disk_test.go b/weed/shell/command_volume_check_disk_test.go index d86b40f1f..eee9103a8 100644 --- a/weed/shell/command_volume_check_disk_test.go +++ b/weed/shell/command_volume_check_disk_test.go @@ -1,7 +1,7 @@ package shell import ( - "os" + "bytes" "testing" "time" @@ -13,63 +13,288 @@ type testCommandVolumeCheckDisk struct { } type shouldSkipVolume struct { + name string a VolumeReplica b VolumeReplica pulseTimeAtSecond int64 + syncDeletions bool shouldSkipVolume bool } func TestShouldSkipVolume(t *testing.T) { - cmdVolumeCheckDisk := testCommandVolumeCheckDisk{} - cmdVolumeCheckDisk.writer = os.Stdout var tests = []shouldSkipVolume{ { - VolumeReplica{nil, &master_pb.VolumeInformationMessage{ + name: "identical volumes should be skipped", + a: VolumeReplica{nil, &master_pb.VolumeInformationMessage{ FileCount: 1000, DeleteCount: 100, ModifiedAtSecond: 1696583300}, }, - VolumeReplica{nil, &master_pb.VolumeInformationMessage{ + b: VolumeReplica{nil, &master_pb.VolumeInformationMessage{ FileCount: 1000, DeleteCount: 100, ModifiedAtSecond: 1696583300}, }, - 1696583400, - true, + pulseTimeAtSecond: 1696583400, + syncDeletions: true, + shouldSkipVolume: true, }, { - VolumeReplica{nil, &master_pb.VolumeInformationMessage{ + name: "different file counts should not be skipped", + a: VolumeReplica{nil, &master_pb.VolumeInformationMessage{ FileCount: 1001, DeleteCount: 100, ModifiedAtSecond: 1696583300}, }, - VolumeReplica{nil, &master_pb.VolumeInformationMessage{ + b: VolumeReplica{nil, &master_pb.VolumeInformationMessage{ FileCount: 1000, DeleteCount: 100, ModifiedAtSecond: 1696583300}, }, - 1696583400, - false, + pulseTimeAtSecond: 1696583400, + syncDeletions: true, + shouldSkipVolume: false, }, { - VolumeReplica{nil, &master_pb.VolumeInformationMessage{ + name: "different delete counts with syncDeletions enabled should not be skipped", + a: VolumeReplica{nil, &master_pb.VolumeInformationMessage{ FileCount: 1000, DeleteCount: 100, ModifiedAtSecond: 1696583300}, }, - VolumeReplica{nil, &master_pb.VolumeInformationMessage{ + b: VolumeReplica{nil, &master_pb.VolumeInformationMessage{ FileCount: 1000, DeleteCount: 101, ModifiedAtSecond: 1696583300}, }, - 1696583400, - false, + pulseTimeAtSecond: 1696583400, + syncDeletions: true, + shouldSkipVolume: false, }, + { + name: "different delete counts with syncDeletions disabled should be skipped if file counts match", + a: VolumeReplica{nil, &master_pb.VolumeInformationMessage{ + FileCount: 1000, + DeleteCount: 100, + ModifiedAtSecond: 1696583300}, + }, + b: VolumeReplica{nil, &master_pb.VolumeInformationMessage{ + FileCount: 1000, + DeleteCount: 101, + ModifiedAtSecond: 1696583300}, + }, + pulseTimeAtSecond: 1696583400, + syncDeletions: false, + shouldSkipVolume: true, + }, + // Edge case: Zero file and delete counts + { + name: "volumes with zero file counts should be skipped", + a: VolumeReplica{nil, &master_pb.VolumeInformationMessage{ + FileCount: 0, + DeleteCount: 0, + ModifiedAtSecond: 1696583300}, + }, + b: VolumeReplica{nil, &master_pb.VolumeInformationMessage{ + FileCount: 0, + DeleteCount: 0, + ModifiedAtSecond: 1696583300}, + }, + pulseTimeAtSecond: 1696583400, + syncDeletions: true, + shouldSkipVolume: true, + }, + { + name: "volumes with zero and non-zero file counts should not be skipped", + a: VolumeReplica{nil, &master_pb.VolumeInformationMessage{ + FileCount: 1, + DeleteCount: 0, + ModifiedAtSecond: 1696583300}, + }, + b: VolumeReplica{nil, &master_pb.VolumeInformationMessage{ + FileCount: 0, + DeleteCount: 0, + ModifiedAtSecond: 1696583300}, + }, + pulseTimeAtSecond: 1696583400, + syncDeletions: true, + shouldSkipVolume: false, + }, + // Edge case: Recently modified volumes (after pulse time) + // Note: VolumePulsePeriod is 10 seconds, so pulse cutoff is now - 20 seconds + // When both volumes are recently modified, skip check to avoid false positives + { + name: "recently modified volumes with same file counts should be skipped", + a: VolumeReplica{nil, &master_pb.VolumeInformationMessage{ + FileCount: 1000, + DeleteCount: 100, + ModifiedAtSecond: 1696583395}, // Modified 5 seconds ago + }, + b: VolumeReplica{nil, &master_pb.VolumeInformationMessage{ + FileCount: 1000, + DeleteCount: 100, + ModifiedAtSecond: 1696583390}, // Modified 10 seconds ago + }, + pulseTimeAtSecond: 1696583400, + syncDeletions: true, + shouldSkipVolume: true, // Same counts = skip + }, + { + name: "one volume modified before pulse cutoff with different file counts should not be skipped", + a: VolumeReplica{nil, &master_pb.VolumeInformationMessage{ + FileCount: 1000, + DeleteCount: 100, + ModifiedAtSecond: 1696583370}, // Modified 30 seconds ago (before cutoff at -20s) + }, + b: VolumeReplica{nil, &master_pb.VolumeInformationMessage{ + FileCount: 999, + DeleteCount: 100, + ModifiedAtSecond: 1696583370}, // Same modification time + }, + pulseTimeAtSecond: 1696583400, + syncDeletions: true, + shouldSkipVolume: false, // Different counts + old enough = needs sync + }, + // Edge case: Different ModifiedAtSecond values, same file counts + { + name: "different modification times with same file counts should be skipped", + a: VolumeReplica{nil, &master_pb.VolumeInformationMessage{ + FileCount: 1000, + DeleteCount: 100, + ModifiedAtSecond: 1696583300}, // 100 seconds before pulse time + }, + b: VolumeReplica{nil, &master_pb.VolumeInformationMessage{ + FileCount: 1000, + DeleteCount: 100, + ModifiedAtSecond: 1696583350}, // 50 seconds before pulse time + }, + pulseTimeAtSecond: 1696583400, + syncDeletions: true, + shouldSkipVolume: true, // Same counts, both before cutoff + }, + // Edge case: Very close to pulse time boundary + { + name: "volumes modified exactly at pulse cutoff boundary with different counts should not be skipped", + a: VolumeReplica{nil, &master_pb.VolumeInformationMessage{ + FileCount: 1001, + DeleteCount: 100, + ModifiedAtSecond: 1696583379}, // Just before cutoff (pulseTime - 21s) + }, + b: VolumeReplica{nil, &master_pb.VolumeInformationMessage{ + FileCount: 1000, + DeleteCount: 100, + ModifiedAtSecond: 1696583379}, // Just before cutoff + }, + pulseTimeAtSecond: 1696583400, + syncDeletions: true, + shouldSkipVolume: false, // At boundary with different counts - needs sync + }, + { + name: "volumes modified just after pulse cutoff boundary with same counts should be skipped", + a: VolumeReplica{nil, &master_pb.VolumeInformationMessage{ + FileCount: 1000, + DeleteCount: 100, + ModifiedAtSecond: 1696583381}, // Just after cutoff (pulseTime - 19s) + }, + b: VolumeReplica{nil, &master_pb.VolumeInformationMessage{ + FileCount: 1000, + DeleteCount: 100, + ModifiedAtSecond: 1696583381}, // Just after cutoff + }, + pulseTimeAtSecond: 1696583400, + syncDeletions: true, + shouldSkipVolume: true, // Same counts + recent = skip to avoid false positive + }, + // Edge case: Large file count differences + { + name: "large file count difference with old modification time should not be skipped", + a: VolumeReplica{nil, &master_pb.VolumeInformationMessage{ + FileCount: 10000, + DeleteCount: 100, + ModifiedAtSecond: 1696583300}, + }, + b: VolumeReplica{nil, &master_pb.VolumeInformationMessage{ + FileCount: 5000, + DeleteCount: 100, + ModifiedAtSecond: 1696583300}, + }, + pulseTimeAtSecond: 1696583400, + syncDeletions: true, + shouldSkipVolume: false, // Large difference requires sync + }, + // Edge case: Both volumes modified AFTER pulse cutoff time + // When ModifiedAtSecond >= pulseTimeAtSecond for both volumes with same counts, + // the condition (a.info.FileCount != b.info.FileCount) is false, so we skip + // without calling eqVolumeFileCount + { + name: "both volumes modified after pulse cutoff with same file counts should be skipped", + a: VolumeReplica{nil, &master_pb.VolumeInformationMessage{ + FileCount: 1000, + DeleteCount: 100, + ModifiedAtSecond: 1696583405}, // After pulse cutoff (1696583380) + }, + b: VolumeReplica{nil, &master_pb.VolumeInformationMessage{ + FileCount: 1000, + DeleteCount: 100, + ModifiedAtSecond: 1696583410}, // After pulse cutoff + }, + pulseTimeAtSecond: 1696583400, + syncDeletions: true, + shouldSkipVolume: true, // Same counts = skip without calling eqVolumeFileCount + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + var buf bytes.Buffer + vcd := &volumeCheckDisk{ + writer: &buf, + now: time.Unix(tt.pulseTimeAtSecond, 0), + verbose: false, // reduce noise in tests + syncDeletions: tt.syncDeletions, + } + result, err := vcd.shouldSkipVolume(&tt.a, &tt.b) + if err != nil { + // In unit tests, we expect no errors from shouldSkipVolume + // since we're using test data without actual network calls + t.Errorf("shouldSkipVolume() returned unexpected error: %v", err) + return + } + if result != tt.shouldSkipVolume { + t.Errorf("shouldSkipVolume() = %v, want %v\nFileCount A=%d B=%d, DeleteCount A=%d B=%d", + result, tt.shouldSkipVolume, + tt.a.info.FileCount, tt.b.info.FileCount, + tt.a.info.DeleteCount, tt.b.info.DeleteCount) + } + }) + } +} + +// TestVolumeCheckDiskHelperMethods tests the helper methods on volumeCheckDisk +func TestVolumeCheckDiskHelperMethods(t *testing.T) { + var buf bytes.Buffer + vcd := &volumeCheckDisk{ + writer: &buf, + verbose: true, } - for num, tt := range tests { - pulseTime := time.Unix(tt.pulseTimeAtSecond, 0) - if isShould := cmdVolumeCheckDisk.shouldSkipVolume(&tt.a, &tt.b, pulseTime, true, true); isShould != tt.shouldSkipVolume { - t.Fatalf("result of should skip volume is unexpected for %d test", num) - } + + // Test write method + vcd.write("test %s\n", "message") + if buf.String() != "test message\n" { + t.Errorf("write() output = %q, want %q", buf.String(), "test message\n") + } + + // Test writeVerbose with verbose=true + buf.Reset() + vcd.writeVerbose("verbose %d\n", 123) + if buf.String() != "verbose 123\n" { + t.Errorf("writeVerbose() with verbose=true output = %q, want %q", buf.String(), "verbose 123\n") + } + + // Test writeVerbose with verbose=false + buf.Reset() + vcd.verbose = false + vcd.writeVerbose("should not appear\n") + if buf.String() != "" { + t.Errorf("writeVerbose() with verbose=false output = %q, want empty", buf.String()) } } diff --git a/weed/shell/command_volume_fix_replication.go b/weed/shell/command_volume_fix_replication.go index 29bfe3f76..f4dd0239a 100644 --- a/weed/shell/command_volume_fix_replication.go +++ b/weed/shell/command_volume_fix_replication.go @@ -16,7 +16,6 @@ import ( "github.com/seaweedfs/seaweedfs/weed/storage/needle_map" "github.com/seaweedfs/seaweedfs/weed/storage/types" "github.com/seaweedfs/seaweedfs/weed/util" - "google.golang.org/grpc" "github.com/seaweedfs/seaweedfs/weed/operation" "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" @@ -204,22 +203,32 @@ func collectVolumeReplicaLocations(topologyInfo *master_pb.TopologyInfo) (map[ui type SelectOneVolumeFunc func(replicas []*VolumeReplica, replicaPlacement *super_block.ReplicaPlacement) *VolumeReplica -func checkOneVolume(a *VolumeReplica, b *VolumeReplica, writer io.Writer, grpcDialOption grpc.DialOption) (err error) { +func checkOneVolume(a *VolumeReplica, b *VolumeReplica, writer io.Writer, commandEnv *CommandEnv) (err error) { aDB, bDB := needle_map.NewMemDb(), needle_map.NewMemDb() defer func() { aDB.Close() bDB.Close() }() + vcd := &volumeCheckDisk{ + writer: writer, + commandEnv: commandEnv, + now: time.Now(), + + verbose: false, + applyChanges: true, + syncDeletions: false, + nonRepairThreshold: float64(1), + } + // read index db - readIndexDbCutoffFrom := uint64(time.Now().UnixNano()) - if err = readIndexDatabase(aDB, a.info.Collection, a.info.Id, pb.NewServerAddressFromDataNode(a.location.dataNode), false, writer, grpcDialOption); err != nil { + if err = vcd.readIndexDatabase(aDB, a.info.Collection, a.info.Id, pb.NewServerAddressFromDataNode(a.location.dataNode)); err != nil { return fmt.Errorf("readIndexDatabase %s volume %d: %v", a.location.dataNode, a.info.Id, err) } - if err := readIndexDatabase(bDB, b.info.Collection, b.info.Id, pb.NewServerAddressFromDataNode(b.location.dataNode), false, writer, grpcDialOption); err != nil { + if err := vcd.readIndexDatabase(bDB, b.info.Collection, b.info.Id, pb.NewServerAddressFromDataNode(b.location.dataNode)); err != nil { return fmt.Errorf("readIndexDatabase %s volume %d: %v", b.location.dataNode, b.info.Id, err) } - if _, err = doVolumeCheckDisk(aDB, bDB, a, b, false, writer, true, false, float64(1), readIndexDbCutoffFrom, grpcDialOption); err != nil { + if _, err = vcd.doVolumeCheckDisk(aDB, bDB, a, b); err != nil { return fmt.Errorf("doVolumeCheckDisk source:%s target:%s volume %d: %v", a.location.dataNode.Id, b.location.dataNode.Id, a.info.Id, err) } return @@ -271,7 +280,7 @@ func (c *commandVolumeFixReplication) deleteOneVolume(commandEnv *CommandEnv, wr if replicaB.location.dataNode == replica.location.dataNode { continue } - if checkErr = checkOneVolume(replica, replicaB, writer, commandEnv.option.GrpcDialOption); checkErr != nil { + if checkErr = checkOneVolume(replica, replicaB, writer, commandEnv); checkErr != nil { fmt.Fprintf(writer, "sync volume %d on %s and %s: %v\n", replica.info.Id, replica.location.dataNode.Id, replicaB.location.dataNode.Id, checkErr) break }