From bc53d319a803a0b7f70b0dd0db5258487deb668c Mon Sep 17 00:00:00 2001 From: Lisandro Pin Date: Fri, 7 Nov 2025 10:40:15 +0100 Subject: [PATCH] Rework parameters passing for functions within `volume.check.disk`. We'll need to rework this logic to account for read-only volumes, and there're already way too many parameters shuffled around. Grouping these into a single struct simplifies the overall codebase. --- weed/shell/command_volume_check_disk.go | 258 ++++++++++--------- weed/shell/command_volume_check_disk_test.go | 11 +- weed/shell/command_volume_fix_replication.go | 23 +- 3 files changed, 164 insertions(+), 128 deletions(-) diff --git a/weed/shell/command_volume_check_disk.go b/weed/shell/command_volume_check_disk.go index 2abfc288c..fe4cbc92f 100644 --- a/weed/shell/command_volume_check_disk.go +++ b/weed/shell/command_volume_check_disk.go @@ -8,7 +8,6 @@ import ( "io" "math" "net/http" - "sync" "time" "slices" @@ -26,9 +25,18 @@ func init() { Commands = append(Commands, &commandVolumeCheckDisk{}) } -type commandVolumeCheckDisk struct { - env *CommandEnv - writer io.Writer +type commandVolumeCheckDisk struct{} + +type volumeCheckDisk struct { + commandEnv *CommandEnv + writer io.Writer + now time.Time + + slowMode bool + verbose bool + applyChanges bool + syncDeletions bool + nonRepairThreshold float64 } func (c *commandVolumeCheckDisk) Name() string { @@ -53,67 +61,6 @@ func (c *commandVolumeCheckDisk) HasTag(tag CommandTag) bool { return tag == ResourceHeavy } -func (c *commandVolumeCheckDisk) getVolumeStatusFileCount(vid uint32, dn *master_pb.DataNodeInfo) (totalFileCount, deletedFileCount uint64) { - err := operation.WithVolumeServerClient(false, pb.NewServerAddressWithGrpcPort(dn.Id, int(dn.GrpcPort)), c.env.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { - resp, reqErr := volumeServerClient.VolumeStatus(context.Background(), &volume_server_pb.VolumeStatusRequest{ - VolumeId: uint32(vid), - }) - if resp != nil { - totalFileCount = resp.FileCount - deletedFileCount = resp.FileDeletedCount - } - return reqErr - }) - if err != nil { - fmt.Fprintf(c.writer, "getting number of files for volume id %d from volumes status: %+v\n", vid, err) - } - return totalFileCount, deletedFileCount -} - -func (c *commandVolumeCheckDisk) eqVolumeFileCount(a, b *VolumeReplica) (bool, bool) { - var waitGroup sync.WaitGroup - var fileCountA, fileCountB, fileDeletedCountA, fileDeletedCountB uint64 - waitGroup.Add(1) - go func() { - defer waitGroup.Done() - fileCountA, fileDeletedCountA = c.getVolumeStatusFileCount(a.info.Id, a.location.dataNode) - }() - waitGroup.Add(1) - go func() { - defer waitGroup.Done() - fileCountB, fileDeletedCountB = c.getVolumeStatusFileCount(b.info.Id, b.location.dataNode) - }() - // Trying to synchronize a remote call to two nodes - waitGroup.Wait() - return fileCountA == fileCountB, fileDeletedCountA == fileDeletedCountB -} - -func (c *commandVolumeCheckDisk) shouldSkipVolume(a, b *VolumeReplica, pulseTime time.Time, syncDeletions, verbose bool) bool { - pulseTimeAtSecond := pulseTime.Unix() - doSyncDeletedCount := false - if syncDeletions && a.info.DeleteCount != b.info.DeleteCount { - doSyncDeletedCount = true - } - if (a.info.FileCount != b.info.FileCount) || doSyncDeletedCount { - // Do synchronization of volumes, if the modification time was before the last pulsation time - if a.info.ModifiedAtSecond < pulseTimeAtSecond || b.info.ModifiedAtSecond < pulseTimeAtSecond { - return false - } - if eqFileCount, eqDeletedFileCount := c.eqVolumeFileCount(a, b); eqFileCount { - if doSyncDeletedCount && !eqDeletedFileCount { - return false - } - if verbose { - fmt.Fprintf(c.writer, "skipping active volumes %d with the same file counts on %s and %s\n", - a.info.Id, a.location.dataNode.Id, b.location.dataNode.Id) - } - } else { - return false - } - } - return true -} - func (c *commandVolumeCheckDisk) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { fsckCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) @@ -135,11 +82,20 @@ func (c *commandVolumeCheckDisk) Do(args []string, commandEnv *CommandEnv, write if err = commandEnv.confirmIsLocked(args); err != nil { return } - c.env = commandEnv - c.writer = writer + + vcd := &volumeCheckDisk{ + commandEnv: commandEnv, + writer: writer, + now: time.Now(), + + slowMode: *slowMode, + verbose: *verbose, + applyChanges: *applyChanges, + syncDeletions: *syncDeletions, + nonRepairThreshold: *nonRepairThreshold, + } // collect topology information - pulseTime := time.Now().Add(-constants.VolumePulsePeriod * 2) topologyInfo, _, err := collectTopologyInfo(commandEnv, 0) if err != nil { return err @@ -155,7 +111,7 @@ func (c *commandVolumeCheckDisk) Do(args []string, commandEnv *CommandEnv, write var writableReplicas []*VolumeReplica for _, replica := range replicas { if replica.info.ReadOnly { - fmt.Fprintf(writer, "skipping readonly volume %d on %s\n", replica.info.Id, replica.location.dataNode.Id) + vcd.write("skipping readonly volume %d on %s\n", replica.info.Id, replica.location.dataNode.Id) } else { writableReplicas = append(writableReplicas, replica) } @@ -166,13 +122,13 @@ func (c *commandVolumeCheckDisk) Do(args []string, commandEnv *CommandEnv, write }) for len(writableReplicas) >= 2 { a, b := writableReplicas[0], writableReplicas[1] - if !*slowMode && c.shouldSkipVolume(a, b, pulseTime, *syncDeletions, *verbose) { + if !vcd.slowMode && vcd.shouldSkipVolume(a, b) { // always choose the larger volume to be the source writableReplicas = append(replicas[:1], writableReplicas[2:]...) continue } - if err := c.syncTwoReplicas(a, b, *applyChanges, *syncDeletions, *nonRepairThreshold, *verbose); err != nil { - fmt.Fprintf(writer, "sync volume %d on %s and %s: %v\n", a.info.Id, a.location.dataNode.Id, b.location.dataNode.Id, err) + if err := vcd.syncTwoReplicas(a, b); err != nil { + vcd.write("sync volume %d on %s and %s: %v\n", a.info.Id, a.location.dataNode.Id, b.location.dataNode.Id, err) } // always choose the larger volume to be the source if a.info.FileCount > b.info.FileCount { @@ -186,32 +142,108 @@ func (c *commandVolumeCheckDisk) Do(args []string, commandEnv *CommandEnv, write return nil } -func (c *commandVolumeCheckDisk) syncTwoReplicas(a *VolumeReplica, b *VolumeReplica, applyChanges bool, doSyncDeletions bool, nonRepairThreshold float64, verbose bool) (err error) { +func (vcd *volumeCheckDisk) isLocked() bool { + return vcd.commandEnv.isLocked() +} + +func (vcd *volumeCheckDisk) grpcDialOption() grpc.DialOption { + return vcd.commandEnv.option.GrpcDialOption +} + +func (vcd *volumeCheckDisk) write(format string, a ...any) { + fmt.Fprintf(vcd.writer, format, a...) +} + +func (vcd *volumeCheckDisk) writeVerbose(format string, a ...any) { + if vcd.verbose { + fmt.Fprintf(vcd.writer, format, a...) + } +} + +func (vcd *volumeCheckDisk) getVolumeStatusFileCount(vid uint32, dn *master_pb.DataNodeInfo) (totalFileCount, deletedFileCount uint64) { + err := operation.WithVolumeServerClient(false, pb.NewServerAddressWithGrpcPort(dn.Id, int(dn.GrpcPort)), vcd.grpcDialOption(), func(volumeServerClient volume_server_pb.VolumeServerClient) error { + resp, reqErr := volumeServerClient.VolumeStatus(context.Background(), &volume_server_pb.VolumeStatusRequest{ + VolumeId: uint32(vid), + }) + if resp != nil { + totalFileCount = resp.FileCount + deletedFileCount = resp.FileDeletedCount + } + return reqErr + }) + if err != nil { + vcd.write("getting number of files for volume id %d from volumes status: %+v\n", vid, err) + } + return totalFileCount, deletedFileCount +} + +func (vcd *volumeCheckDisk) eqVolumeFileCount(a, b *VolumeReplica) (bool, bool) { + var fileCountA, fileCountB, fileDeletedCountA, fileDeletedCountB uint64 + + ewg := NewErrorWaitGroup(DefaultMaxParallelization) + ewg.Add(func() error { + fileCountA, fileDeletedCountA = vcd.getVolumeStatusFileCount(a.info.Id, a.location.dataNode) + return nil + }) + ewg.Add(func() error { + fileCountB, fileDeletedCountB = vcd.getVolumeStatusFileCount(b.info.Id, b.location.dataNode) + return nil + }) + // Trying to synchronize a remote call to two nodes + // TODO: bubble errors up? + ewg.Wait() + + return fileCountA == fileCountB, fileDeletedCountA == fileDeletedCountB +} + +func (vcd *volumeCheckDisk) shouldSkipVolume(a, b *VolumeReplica) bool { + pulseTimeAtSecond := vcd.now.Add(-constants.VolumePulsePeriod * 2).Unix() + doSyncDeletedCount := false + if vcd.syncDeletions && a.info.DeleteCount != b.info.DeleteCount { + doSyncDeletedCount = true + } + if (a.info.FileCount != b.info.FileCount) || doSyncDeletedCount { + // Do synchronization of volumes, if the modification time was before the last pulsation time + if a.info.ModifiedAtSecond < pulseTimeAtSecond || b.info.ModifiedAtSecond < pulseTimeAtSecond { + return false + } + if eqFileCount, eqDeletedFileCount := vcd.eqVolumeFileCount(a, b); eqFileCount { + if doSyncDeletedCount && !eqDeletedFileCount { + return false + } + vcd.writeVerbose("skipping active volumes %d with the same file counts on %s and %s\n", + a.info.Id, a.location.dataNode.Id, b.location.dataNode.Id) + } else { + return false + } + } + return true +} + +func (vcd *volumeCheckDisk) syncTwoReplicas(a *VolumeReplica, b *VolumeReplica) (err error) { aHasChanges, bHasChanges := true, true const maxIterations = 5 iteration := 0 for (aHasChanges || bHasChanges) && iteration < maxIterations { iteration++ - if verbose { - fmt.Fprintf(c.writer, "sync iteration %d for volume %d\n", iteration, a.info.Id) - } + vcd.writeVerbose("sync iteration %d for volume %d\n", iteration, a.info.Id) prevAHasChanges, prevBHasChanges := aHasChanges, bHasChanges - if aHasChanges, bHasChanges, err = c.checkBoth(a, b, applyChanges, doSyncDeletions, nonRepairThreshold, verbose); err != nil { + if aHasChanges, bHasChanges, err = vcd.checkBoth(a, b); err != nil { return err } // Detect if we're stuck in a loop with no progress if iteration > 1 && prevAHasChanges == aHasChanges && prevBHasChanges == bHasChanges && (aHasChanges || bHasChanges) { - fmt.Fprintf(c.writer, "volume %d sync is not making progress between %s and %s after iteration %d, stopping to prevent infinite loop\n", + vcd.write("volume %d sync is not making progress between %s and %s after iteration %d, stopping to prevent infinite loop\n", a.info.Id, a.location.dataNode.Id, b.location.dataNode.Id, iteration) return fmt.Errorf("sync not making progress after %d iterations", iteration) } } if iteration >= maxIterations && (aHasChanges || bHasChanges) { - fmt.Fprintf(c.writer, "volume %d sync reached maximum iterations (%d) between %s and %s, may need manual intervention\n", + vcd.write("volume %d sync reached maximum iterations (%d) between %s and %s, may need manual intervention\n", a.info.Id, maxIterations, a.location.dataNode.Id, b.location.dataNode.Id) return fmt.Errorf("reached maximum sync iterations (%d)", maxIterations) } @@ -219,7 +251,7 @@ func (c *commandVolumeCheckDisk) syncTwoReplicas(a *VolumeReplica, b *VolumeRepl return nil } -func (c *commandVolumeCheckDisk) checkBoth(a *VolumeReplica, b *VolumeReplica, applyChanges bool, doSyncDeletions bool, nonRepairThreshold float64, verbose bool) (aHasChanges bool, bHasChanges bool, err error) { +func (vcd *volumeCheckDisk) checkBoth(a *VolumeReplica, b *VolumeReplica) (aHasChanges bool, bHasChanges bool, err error) { aDB, bDB := needle_map.NewMemDb(), needle_map.NewMemDb() defer func() { aDB.Close() @@ -227,17 +259,16 @@ func (c *commandVolumeCheckDisk) checkBoth(a *VolumeReplica, b *VolumeReplica, a }() // read index db - readIndexDbCutoffFrom := uint64(time.Now().UnixNano()) - if err = readIndexDatabase(aDB, a.info.Collection, a.info.Id, pb.NewServerAddressFromDataNode(a.location.dataNode), verbose, c.writer, c.env.option.GrpcDialOption); err != nil { + if err = vcd.readIndexDatabase(aDB, a.info.Collection, a.info.Id, pb.NewServerAddressFromDataNode(a.location.dataNode)); err != nil { return true, true, fmt.Errorf("readIndexDatabase %s volume %d: %v", a.location.dataNode, a.info.Id, err) } - if err := readIndexDatabase(bDB, b.info.Collection, b.info.Id, pb.NewServerAddressFromDataNode(b.location.dataNode), verbose, c.writer, c.env.option.GrpcDialOption); err != nil { + if err := vcd.readIndexDatabase(bDB, b.info.Collection, b.info.Id, pb.NewServerAddressFromDataNode(b.location.dataNode)); err != nil { return true, true, fmt.Errorf("readIndexDatabase %s volume %d: %v", b.location.dataNode, b.info.Id, err) } // find and make up the differences - aHasChanges, err1 := doVolumeCheckDisk(bDB, aDB, b, a, verbose, c.writer, applyChanges, doSyncDeletions, nonRepairThreshold, readIndexDbCutoffFrom, c.env.option.GrpcDialOption) - bHasChanges, err2 := doVolumeCheckDisk(aDB, bDB, a, b, verbose, c.writer, applyChanges, doSyncDeletions, nonRepairThreshold, readIndexDbCutoffFrom, c.env.option.GrpcDialOption) + aHasChanges, err1 := vcd.doVolumeCheckDisk(bDB, aDB, b, a) + bHasChanges, err2 := vcd.doVolumeCheckDisk(aDB, bDB, a, b) if err1 != nil { return aHasChanges, bHasChanges, fmt.Errorf("doVolumeCheckDisk source:%s target:%s volume %d: %v", b.location.dataNode.Id, a.location.dataNode.Id, b.info.Id, err1) } @@ -247,7 +278,7 @@ func (c *commandVolumeCheckDisk) checkBoth(a *VolumeReplica, b *VolumeReplica, a return aHasChanges, bHasChanges, nil } -func doVolumeCheckDisk(minuend, subtrahend *needle_map.MemDb, source, target *VolumeReplica, verbose bool, writer io.Writer, applyChanges bool, doSyncDeletions bool, nonRepairThreshold float64, cutoffFromAtNs uint64, grpcDialOption grpc.DialOption) (hasChanges bool, err error) { +func (vcd *volumeCheckDisk) doVolumeCheckDisk(minuend, subtrahend *needle_map.MemDb, source, target *VolumeReplica) (hasChanges bool, err error) { // find missing keys // hash join, can be more efficient @@ -255,6 +286,8 @@ func doVolumeCheckDisk(minuend, subtrahend *needle_map.MemDb, source, target *Vo var partiallyDeletedNeedles []needle_map.NeedleValue var counter int doCutoffOfLastNeedle := true + cutoffFromAtNs := uint64(vcd.now.UnixNano()) + minuend.DescendingVisit(func(minuendValue needle_map.NeedleValue) error { counter++ if subtrahendValue, found := subtrahend.Get(minuendValue.Key); !found { @@ -262,7 +295,7 @@ func doVolumeCheckDisk(minuend, subtrahend *needle_map.MemDb, source, target *Vo return nil } if doCutoffOfLastNeedle { - if needleMeta, err := readNeedleMeta(grpcDialOption, pb.NewServerAddressFromDataNode(source.location.dataNode), source.info.Id, minuendValue); err == nil { + if needleMeta, err := readNeedleMeta(vcd.grpcDialOption(), pb.NewServerAddressFromDataNode(source.location.dataNode), source.info.Id, minuendValue); err == nil { // needles older than the cutoff time are not missing yet if needleMeta.AppendAtNs > cutoffFromAtNs { return nil @@ -282,7 +315,7 @@ func doVolumeCheckDisk(minuend, subtrahend *needle_map.MemDb, source, target *Vo return nil }) - fmt.Fprintf(writer, "volume %d %s has %d entries, %s missed %d and partially deleted %d entries\n", + vcd.write("volume %d %s has %d entries, %s missed %d and partially deleted %d entries\n", source.info.Id, source.location.dataNode.Id, counter, target.location.dataNode.Id, len(missingNeedles), len(partiallyDeletedNeedles)) if counter == 0 || (len(missingNeedles) == 0 && len(partiallyDeletedNeedles) == 0) { @@ -290,45 +323,40 @@ func doVolumeCheckDisk(minuend, subtrahend *needle_map.MemDb, source, target *Vo } missingNeedlesFraction := float64(len(missingNeedles)) / float64(counter) - if missingNeedlesFraction > nonRepairThreshold { + if missingNeedlesFraction > vcd.nonRepairThreshold { return false, fmt.Errorf( "failed to start repair volume %d, percentage of missing keys is greater than the threshold: %.2f > %.2f", - source.info.Id, missingNeedlesFraction, nonRepairThreshold) + source.info.Id, missingNeedlesFraction, vcd.nonRepairThreshold) } for _, needleValue := range missingNeedles { - needleBlob, err := readSourceNeedleBlob(grpcDialOption, pb.NewServerAddressFromDataNode(source.location.dataNode), source.info.Id, needleValue) + needleBlob, err := vcd.readSourceNeedleBlob(pb.NewServerAddressFromDataNode(source.location.dataNode), source.info.Id, needleValue) if err != nil { return hasChanges, err } - if !applyChanges { + if !vcd.applyChanges { continue } - if verbose { - fmt.Fprintf(writer, "read %s %s => %s\n", needleValue.Key.FileId(source.info.Id), source.location.dataNode.Id, target.location.dataNode.Id) - } - + vcd.writeVerbose("read %s %s => %s\n", needleValue.Key.FileId(source.info.Id), source.location.dataNode.Id, target.location.dataNode.Id) hasChanges = true - if err = writeNeedleBlobToTarget(grpcDialOption, pb.NewServerAddressFromDataNode(target.location.dataNode), source.info.Id, needleValue, needleBlob); err != nil { + if err = vcd.writeNeedleBlobToTarget(pb.NewServerAddressFromDataNode(target.location.dataNode), source.info.Id, needleValue, needleBlob); err != nil { return hasChanges, err } } - if doSyncDeletions && applyChanges && len(partiallyDeletedNeedles) > 0 { + if vcd.syncDeletions && vcd.applyChanges && len(partiallyDeletedNeedles) > 0 { var fidList []string for _, needleValue := range partiallyDeletedNeedles { fidList = append(fidList, needleValue.Key.FileId(source.info.Id)) - if verbose { - fmt.Fprintf(writer, "delete %s %s => %s\n", needleValue.Key.FileId(source.info.Id), source.location.dataNode.Id, target.location.dataNode.Id) - } + vcd.writeVerbose("delete %s %s => %s\n", needleValue.Key.FileId(source.info.Id), source.location.dataNode.Id, target.location.dataNode.Id) } deleteResults := operation.DeleteFileIdsAtOneVolumeServer( pb.NewServerAddressFromDataNode(target.location.dataNode), - grpcDialOption, fidList, false) + vcd.grpcDialOption(), fidList, false) // Check for errors in results for _, deleteResult := range deleteResults { @@ -343,9 +371,9 @@ func doVolumeCheckDisk(minuend, subtrahend *needle_map.MemDb, source, target *Vo return hasChanges, nil } -func readSourceNeedleBlob(grpcDialOption grpc.DialOption, sourceVolumeServer pb.ServerAddress, volumeId uint32, needleValue needle_map.NeedleValue) (needleBlob []byte, err error) { +func (vcd *volumeCheckDisk) readSourceNeedleBlob(sourceVolumeServer pb.ServerAddress, volumeId uint32, needleValue needle_map.NeedleValue) (needleBlob []byte, err error) { - err = operation.WithVolumeServerClient(false, sourceVolumeServer, grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { + err = operation.WithVolumeServerClient(false, sourceVolumeServer, vcd.grpcDialOption(), func(client volume_server_pb.VolumeServerClient) error { resp, err := client.ReadNeedleBlob(context.Background(), &volume_server_pb.ReadNeedleBlobRequest{ VolumeId: volumeId, Offset: needleValue.Offset.ToActualOffset(), @@ -360,9 +388,9 @@ func readSourceNeedleBlob(grpcDialOption grpc.DialOption, sourceVolumeServer pb. return } -func writeNeedleBlobToTarget(grpcDialOption grpc.DialOption, targetVolumeServer pb.ServerAddress, volumeId uint32, needleValue needle_map.NeedleValue, needleBlob []byte) error { +func (vcd *volumeCheckDisk) writeNeedleBlobToTarget(targetVolumeServer pb.ServerAddress, volumeId uint32, needleValue needle_map.NeedleValue, needleBlob []byte) error { - return operation.WithVolumeServerClient(false, targetVolumeServer, grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { + return operation.WithVolumeServerClient(false, targetVolumeServer, vcd.grpcDialOption(), func(client volume_server_pb.VolumeServerClient) error { _, err := client.WriteNeedleBlob(context.Background(), &volume_server_pb.WriteNeedleBlobRequest{ VolumeId: volumeId, NeedleId: uint64(needleValue.Key), @@ -371,25 +399,21 @@ func writeNeedleBlobToTarget(grpcDialOption grpc.DialOption, targetVolumeServer }) return err }) - } -func readIndexDatabase(db *needle_map.MemDb, collection string, volumeId uint32, volumeServer pb.ServerAddress, verbose bool, writer io.Writer, grpcDialOption grpc.DialOption) error { - +func (vcd *volumeCheckDisk) readIndexDatabase(db *needle_map.MemDb, collection string, volumeId uint32, volumeServer pb.ServerAddress) error { var buf bytes.Buffer - if err := copyVolumeIndexFile(collection, volumeId, volumeServer, &buf, verbose, writer, grpcDialOption); err != nil { + if err := vcd.copyVolumeIndexFile(collection, volumeId, volumeServer, &buf); err != nil { return err } - if verbose { - fmt.Fprintf(writer, "load collection %s volume %d index size %d from %s ...\n", collection, volumeId, buf.Len(), volumeServer) - } + vcd.writeVerbose("load collection %s volume %d index size %d from %s ...\n", collection, volumeId, buf.Len(), volumeServer) return db.LoadFilterFromReaderAt(bytes.NewReader(buf.Bytes()), true, false) } -func copyVolumeIndexFile(collection string, volumeId uint32, volumeServer pb.ServerAddress, buf *bytes.Buffer, verbose bool, writer io.Writer, grpcDialOption grpc.DialOption) error { +func (vcd *volumeCheckDisk) copyVolumeIndexFile(collection string, volumeId uint32, volumeServer pb.ServerAddress, buf *bytes.Buffer) error { - return operation.WithVolumeServerClient(true, volumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + return operation.WithVolumeServerClient(true, volumeServer, vcd.grpcDialOption(), func(volumeServerClient volume_server_pb.VolumeServerClient) error { ext := ".idx" @@ -406,7 +430,7 @@ func copyVolumeIndexFile(collection string, volumeId uint32, volumeServer pb.Ser return fmt.Errorf("failed to start copying volume %d%s: %v", volumeId, ext, err) } - err = writeToBuffer(copyFileClient, buf) + err = vcd.writeToBuffer(copyFileClient, buf) if err != nil { return fmt.Errorf("failed to copy %d%s from %s: %v", volumeId, ext, volumeServer, err) } @@ -416,7 +440,7 @@ func copyVolumeIndexFile(collection string, volumeId uint32, volumeServer pb.Ser }) } -func writeToBuffer(client volume_server_pb.VolumeServer_CopyFileClient, buf *bytes.Buffer) error { +func (vcd *volumeCheckDisk) writeToBuffer(client volume_server_pb.VolumeServer_CopyFileClient, buf *bytes.Buffer) error { for { resp, receiveErr := client.Recv() if receiveErr == io.EOF { diff --git a/weed/shell/command_volume_check_disk_test.go b/weed/shell/command_volume_check_disk_test.go index d86b40f1f..727245b0d 100644 --- a/weed/shell/command_volume_check_disk_test.go +++ b/weed/shell/command_volume_check_disk_test.go @@ -20,8 +20,6 @@ type shouldSkipVolume struct { } func TestShouldSkipVolume(t *testing.T) { - cmdVolumeCheckDisk := testCommandVolumeCheckDisk{} - cmdVolumeCheckDisk.writer = os.Stdout var tests = []shouldSkipVolume{ { VolumeReplica{nil, &master_pb.VolumeInformationMessage{ @@ -67,8 +65,13 @@ func TestShouldSkipVolume(t *testing.T) { }, } for num, tt := range tests { - pulseTime := time.Unix(tt.pulseTimeAtSecond, 0) - if isShould := cmdVolumeCheckDisk.shouldSkipVolume(&tt.a, &tt.b, pulseTime, true, true); isShould != tt.shouldSkipVolume { + vcd := &volumeCheckDisk{ + writer: os.Stdout, + now: time.Unix(tt.pulseTimeAtSecond, 0), + verbose: true, + syncDeletions: true, + } + if isShould := vcd.shouldSkipVolume(&tt.a, &tt.b); isShould != tt.shouldSkipVolume { t.Fatalf("result of should skip volume is unexpected for %d test", num) } } diff --git a/weed/shell/command_volume_fix_replication.go b/weed/shell/command_volume_fix_replication.go index 29bfe3f76..f4dd0239a 100644 --- a/weed/shell/command_volume_fix_replication.go +++ b/weed/shell/command_volume_fix_replication.go @@ -16,7 +16,6 @@ import ( "github.com/seaweedfs/seaweedfs/weed/storage/needle_map" "github.com/seaweedfs/seaweedfs/weed/storage/types" "github.com/seaweedfs/seaweedfs/weed/util" - "google.golang.org/grpc" "github.com/seaweedfs/seaweedfs/weed/operation" "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" @@ -204,22 +203,32 @@ func collectVolumeReplicaLocations(topologyInfo *master_pb.TopologyInfo) (map[ui type SelectOneVolumeFunc func(replicas []*VolumeReplica, replicaPlacement *super_block.ReplicaPlacement) *VolumeReplica -func checkOneVolume(a *VolumeReplica, b *VolumeReplica, writer io.Writer, grpcDialOption grpc.DialOption) (err error) { +func checkOneVolume(a *VolumeReplica, b *VolumeReplica, writer io.Writer, commandEnv *CommandEnv) (err error) { aDB, bDB := needle_map.NewMemDb(), needle_map.NewMemDb() defer func() { aDB.Close() bDB.Close() }() + vcd := &volumeCheckDisk{ + writer: writer, + commandEnv: commandEnv, + now: time.Now(), + + verbose: false, + applyChanges: true, + syncDeletions: false, + nonRepairThreshold: float64(1), + } + // read index db - readIndexDbCutoffFrom := uint64(time.Now().UnixNano()) - if err = readIndexDatabase(aDB, a.info.Collection, a.info.Id, pb.NewServerAddressFromDataNode(a.location.dataNode), false, writer, grpcDialOption); err != nil { + if err = vcd.readIndexDatabase(aDB, a.info.Collection, a.info.Id, pb.NewServerAddressFromDataNode(a.location.dataNode)); err != nil { return fmt.Errorf("readIndexDatabase %s volume %d: %v", a.location.dataNode, a.info.Id, err) } - if err := readIndexDatabase(bDB, b.info.Collection, b.info.Id, pb.NewServerAddressFromDataNode(b.location.dataNode), false, writer, grpcDialOption); err != nil { + if err := vcd.readIndexDatabase(bDB, b.info.Collection, b.info.Id, pb.NewServerAddressFromDataNode(b.location.dataNode)); err != nil { return fmt.Errorf("readIndexDatabase %s volume %d: %v", b.location.dataNode, b.info.Id, err) } - if _, err = doVolumeCheckDisk(aDB, bDB, a, b, false, writer, true, false, float64(1), readIndexDbCutoffFrom, grpcDialOption); err != nil { + if _, err = vcd.doVolumeCheckDisk(aDB, bDB, a, b); err != nil { return fmt.Errorf("doVolumeCheckDisk source:%s target:%s volume %d: %v", a.location.dataNode.Id, b.location.dataNode.Id, a.info.Id, err) } return @@ -271,7 +280,7 @@ func (c *commandVolumeFixReplication) deleteOneVolume(commandEnv *CommandEnv, wr if replicaB.location.dataNode == replica.location.dataNode { continue } - if checkErr = checkOneVolume(replica, replicaB, writer, commandEnv.option.GrpcDialOption); checkErr != nil { + if checkErr = checkOneVolume(replica, replicaB, writer, commandEnv); checkErr != nil { fmt.Fprintf(writer, "sync volume %d on %s and %s: %v\n", replica.info.Id, replica.location.dataNode.Id, replicaB.location.dataNode.Id, checkErr) break }