Browse Source

Rework parameters passing for functions within `volume.check.disk`.

We'll need to rework this logic to account for read-only volumes, and there're already way too many parameters shuffled around.

Grouping these into a single struct simplifies the overall codebase.
pull/7448/head
Lisandro Pin 4 weeks ago
parent
commit
bc53d319a8
Failed to extract signature
  1. 256
      weed/shell/command_volume_check_disk.go
  2. 11
      weed/shell/command_volume_check_disk_test.go
  3. 23
      weed/shell/command_volume_fix_replication.go

256
weed/shell/command_volume_check_disk.go

@ -8,7 +8,6 @@ import (
"io"
"math"
"net/http"
"sync"
"time"
"slices"
@ -26,9 +25,18 @@ func init() {
Commands = append(Commands, &commandVolumeCheckDisk{})
}
type commandVolumeCheckDisk struct {
env *CommandEnv
type commandVolumeCheckDisk struct{}
type volumeCheckDisk struct {
commandEnv *CommandEnv
writer io.Writer
now time.Time
slowMode bool
verbose bool
applyChanges bool
syncDeletions bool
nonRepairThreshold float64
}
func (c *commandVolumeCheckDisk) Name() string {
@ -53,67 +61,6 @@ func (c *commandVolumeCheckDisk) HasTag(tag CommandTag) bool {
return tag == ResourceHeavy
}
func (c *commandVolumeCheckDisk) getVolumeStatusFileCount(vid uint32, dn *master_pb.DataNodeInfo) (totalFileCount, deletedFileCount uint64) {
err := operation.WithVolumeServerClient(false, pb.NewServerAddressWithGrpcPort(dn.Id, int(dn.GrpcPort)), c.env.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
resp, reqErr := volumeServerClient.VolumeStatus(context.Background(), &volume_server_pb.VolumeStatusRequest{
VolumeId: uint32(vid),
})
if resp != nil {
totalFileCount = resp.FileCount
deletedFileCount = resp.FileDeletedCount
}
return reqErr
})
if err != nil {
fmt.Fprintf(c.writer, "getting number of files for volume id %d from volumes status: %+v\n", vid, err)
}
return totalFileCount, deletedFileCount
}
func (c *commandVolumeCheckDisk) eqVolumeFileCount(a, b *VolumeReplica) (bool, bool) {
var waitGroup sync.WaitGroup
var fileCountA, fileCountB, fileDeletedCountA, fileDeletedCountB uint64
waitGroup.Add(1)
go func() {
defer waitGroup.Done()
fileCountA, fileDeletedCountA = c.getVolumeStatusFileCount(a.info.Id, a.location.dataNode)
}()
waitGroup.Add(1)
go func() {
defer waitGroup.Done()
fileCountB, fileDeletedCountB = c.getVolumeStatusFileCount(b.info.Id, b.location.dataNode)
}()
// Trying to synchronize a remote call to two nodes
waitGroup.Wait()
return fileCountA == fileCountB, fileDeletedCountA == fileDeletedCountB
}
func (c *commandVolumeCheckDisk) shouldSkipVolume(a, b *VolumeReplica, pulseTime time.Time, syncDeletions, verbose bool) bool {
pulseTimeAtSecond := pulseTime.Unix()
doSyncDeletedCount := false
if syncDeletions && a.info.DeleteCount != b.info.DeleteCount {
doSyncDeletedCount = true
}
if (a.info.FileCount != b.info.FileCount) || doSyncDeletedCount {
// Do synchronization of volumes, if the modification time was before the last pulsation time
if a.info.ModifiedAtSecond < pulseTimeAtSecond || b.info.ModifiedAtSecond < pulseTimeAtSecond {
return false
}
if eqFileCount, eqDeletedFileCount := c.eqVolumeFileCount(a, b); eqFileCount {
if doSyncDeletedCount && !eqDeletedFileCount {
return false
}
if verbose {
fmt.Fprintf(c.writer, "skipping active volumes %d with the same file counts on %s and %s\n",
a.info.Id, a.location.dataNode.Id, b.location.dataNode.Id)
}
} else {
return false
}
}
return true
}
func (c *commandVolumeCheckDisk) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
fsckCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
@ -135,11 +82,20 @@ func (c *commandVolumeCheckDisk) Do(args []string, commandEnv *CommandEnv, write
if err = commandEnv.confirmIsLocked(args); err != nil {
return
}
c.env = commandEnv
c.writer = writer
vcd := &volumeCheckDisk{
commandEnv: commandEnv,
writer: writer,
now: time.Now(),
slowMode: *slowMode,
verbose: *verbose,
applyChanges: *applyChanges,
syncDeletions: *syncDeletions,
nonRepairThreshold: *nonRepairThreshold,
}
// collect topology information
pulseTime := time.Now().Add(-constants.VolumePulsePeriod * 2)
topologyInfo, _, err := collectTopologyInfo(commandEnv, 0)
if err != nil {
return err
@ -155,7 +111,7 @@ func (c *commandVolumeCheckDisk) Do(args []string, commandEnv *CommandEnv, write
var writableReplicas []*VolumeReplica
for _, replica := range replicas {
if replica.info.ReadOnly {
fmt.Fprintf(writer, "skipping readonly volume %d on %s\n", replica.info.Id, replica.location.dataNode.Id)
vcd.write("skipping readonly volume %d on %s\n", replica.info.Id, replica.location.dataNode.Id)
} else {
writableReplicas = append(writableReplicas, replica)
}
@ -166,13 +122,13 @@ func (c *commandVolumeCheckDisk) Do(args []string, commandEnv *CommandEnv, write
})
for len(writableReplicas) >= 2 {
a, b := writableReplicas[0], writableReplicas[1]
if !*slowMode && c.shouldSkipVolume(a, b, pulseTime, *syncDeletions, *verbose) {
if !vcd.slowMode && vcd.shouldSkipVolume(a, b) {
// always choose the larger volume to be the source
writableReplicas = append(replicas[:1], writableReplicas[2:]...)
continue
}
if err := c.syncTwoReplicas(a, b, *applyChanges, *syncDeletions, *nonRepairThreshold, *verbose); err != nil {
fmt.Fprintf(writer, "sync volume %d on %s and %s: %v\n", a.info.Id, a.location.dataNode.Id, b.location.dataNode.Id, err)
if err := vcd.syncTwoReplicas(a, b); err != nil {
vcd.write("sync volume %d on %s and %s: %v\n", a.info.Id, a.location.dataNode.Id, b.location.dataNode.Id, err)
}
// always choose the larger volume to be the source
if a.info.FileCount > b.info.FileCount {
@ -186,32 +142,108 @@ func (c *commandVolumeCheckDisk) Do(args []string, commandEnv *CommandEnv, write
return nil
}
func (c *commandVolumeCheckDisk) syncTwoReplicas(a *VolumeReplica, b *VolumeReplica, applyChanges bool, doSyncDeletions bool, nonRepairThreshold float64, verbose bool) (err error) {
func (vcd *volumeCheckDisk) isLocked() bool {
return vcd.commandEnv.isLocked()
}
func (vcd *volumeCheckDisk) grpcDialOption() grpc.DialOption {
return vcd.commandEnv.option.GrpcDialOption
}
func (vcd *volumeCheckDisk) write(format string, a ...any) {
fmt.Fprintf(vcd.writer, format, a...)
}
func (vcd *volumeCheckDisk) writeVerbose(format string, a ...any) {
if vcd.verbose {
fmt.Fprintf(vcd.writer, format, a...)
}
}
func (vcd *volumeCheckDisk) getVolumeStatusFileCount(vid uint32, dn *master_pb.DataNodeInfo) (totalFileCount, deletedFileCount uint64) {
err := operation.WithVolumeServerClient(false, pb.NewServerAddressWithGrpcPort(dn.Id, int(dn.GrpcPort)), vcd.grpcDialOption(), func(volumeServerClient volume_server_pb.VolumeServerClient) error {
resp, reqErr := volumeServerClient.VolumeStatus(context.Background(), &volume_server_pb.VolumeStatusRequest{
VolumeId: uint32(vid),
})
if resp != nil {
totalFileCount = resp.FileCount
deletedFileCount = resp.FileDeletedCount
}
return reqErr
})
if err != nil {
vcd.write("getting number of files for volume id %d from volumes status: %+v\n", vid, err)
}
return totalFileCount, deletedFileCount
}
func (vcd *volumeCheckDisk) eqVolumeFileCount(a, b *VolumeReplica) (bool, bool) {
var fileCountA, fileCountB, fileDeletedCountA, fileDeletedCountB uint64
ewg := NewErrorWaitGroup(DefaultMaxParallelization)
ewg.Add(func() error {
fileCountA, fileDeletedCountA = vcd.getVolumeStatusFileCount(a.info.Id, a.location.dataNode)
return nil
})
ewg.Add(func() error {
fileCountB, fileDeletedCountB = vcd.getVolumeStatusFileCount(b.info.Id, b.location.dataNode)
return nil
})
// Trying to synchronize a remote call to two nodes
// TODO: bubble errors up?
ewg.Wait()
return fileCountA == fileCountB, fileDeletedCountA == fileDeletedCountB
}
func (vcd *volumeCheckDisk) shouldSkipVolume(a, b *VolumeReplica) bool {
pulseTimeAtSecond := vcd.now.Add(-constants.VolumePulsePeriod * 2).Unix()
doSyncDeletedCount := false
if vcd.syncDeletions && a.info.DeleteCount != b.info.DeleteCount {
doSyncDeletedCount = true
}
if (a.info.FileCount != b.info.FileCount) || doSyncDeletedCount {
// Do synchronization of volumes, if the modification time was before the last pulsation time
if a.info.ModifiedAtSecond < pulseTimeAtSecond || b.info.ModifiedAtSecond < pulseTimeAtSecond {
return false
}
if eqFileCount, eqDeletedFileCount := vcd.eqVolumeFileCount(a, b); eqFileCount {
if doSyncDeletedCount && !eqDeletedFileCount {
return false
}
vcd.writeVerbose("skipping active volumes %d with the same file counts on %s and %s\n",
a.info.Id, a.location.dataNode.Id, b.location.dataNode.Id)
} else {
return false
}
}
return true
}
func (vcd *volumeCheckDisk) syncTwoReplicas(a *VolumeReplica, b *VolumeReplica) (err error) {
aHasChanges, bHasChanges := true, true
const maxIterations = 5
iteration := 0
for (aHasChanges || bHasChanges) && iteration < maxIterations {
iteration++
if verbose {
fmt.Fprintf(c.writer, "sync iteration %d for volume %d\n", iteration, a.info.Id)
}
vcd.writeVerbose("sync iteration %d for volume %d\n", iteration, a.info.Id)
prevAHasChanges, prevBHasChanges := aHasChanges, bHasChanges
if aHasChanges, bHasChanges, err = c.checkBoth(a, b, applyChanges, doSyncDeletions, nonRepairThreshold, verbose); err != nil {
if aHasChanges, bHasChanges, err = vcd.checkBoth(a, b); err != nil {
return err
}
// Detect if we're stuck in a loop with no progress
if iteration > 1 && prevAHasChanges == aHasChanges && prevBHasChanges == bHasChanges && (aHasChanges || bHasChanges) {
fmt.Fprintf(c.writer, "volume %d sync is not making progress between %s and %s after iteration %d, stopping to prevent infinite loop\n",
vcd.write("volume %d sync is not making progress between %s and %s after iteration %d, stopping to prevent infinite loop\n",
a.info.Id, a.location.dataNode.Id, b.location.dataNode.Id, iteration)
return fmt.Errorf("sync not making progress after %d iterations", iteration)
}
}
if iteration >= maxIterations && (aHasChanges || bHasChanges) {
fmt.Fprintf(c.writer, "volume %d sync reached maximum iterations (%d) between %s and %s, may need manual intervention\n",
vcd.write("volume %d sync reached maximum iterations (%d) between %s and %s, may need manual intervention\n",
a.info.Id, maxIterations, a.location.dataNode.Id, b.location.dataNode.Id)
return fmt.Errorf("reached maximum sync iterations (%d)", maxIterations)
}
@ -219,7 +251,7 @@ func (c *commandVolumeCheckDisk) syncTwoReplicas(a *VolumeReplica, b *VolumeRepl
return nil
}
func (c *commandVolumeCheckDisk) checkBoth(a *VolumeReplica, b *VolumeReplica, applyChanges bool, doSyncDeletions bool, nonRepairThreshold float64, verbose bool) (aHasChanges bool, bHasChanges bool, err error) {
func (vcd *volumeCheckDisk) checkBoth(a *VolumeReplica, b *VolumeReplica) (aHasChanges bool, bHasChanges bool, err error) {
aDB, bDB := needle_map.NewMemDb(), needle_map.NewMemDb()
defer func() {
aDB.Close()
@ -227,17 +259,16 @@ func (c *commandVolumeCheckDisk) checkBoth(a *VolumeReplica, b *VolumeReplica, a
}()
// read index db
readIndexDbCutoffFrom := uint64(time.Now().UnixNano())
if err = readIndexDatabase(aDB, a.info.Collection, a.info.Id, pb.NewServerAddressFromDataNode(a.location.dataNode), verbose, c.writer, c.env.option.GrpcDialOption); err != nil {
if err = vcd.readIndexDatabase(aDB, a.info.Collection, a.info.Id, pb.NewServerAddressFromDataNode(a.location.dataNode)); err != nil {
return true, true, fmt.Errorf("readIndexDatabase %s volume %d: %v", a.location.dataNode, a.info.Id, err)
}
if err := readIndexDatabase(bDB, b.info.Collection, b.info.Id, pb.NewServerAddressFromDataNode(b.location.dataNode), verbose, c.writer, c.env.option.GrpcDialOption); err != nil {
if err := vcd.readIndexDatabase(bDB, b.info.Collection, b.info.Id, pb.NewServerAddressFromDataNode(b.location.dataNode)); err != nil {
return true, true, fmt.Errorf("readIndexDatabase %s volume %d: %v", b.location.dataNode, b.info.Id, err)
}
// find and make up the differences
aHasChanges, err1 := doVolumeCheckDisk(bDB, aDB, b, a, verbose, c.writer, applyChanges, doSyncDeletions, nonRepairThreshold, readIndexDbCutoffFrom, c.env.option.GrpcDialOption)
bHasChanges, err2 := doVolumeCheckDisk(aDB, bDB, a, b, verbose, c.writer, applyChanges, doSyncDeletions, nonRepairThreshold, readIndexDbCutoffFrom, c.env.option.GrpcDialOption)
aHasChanges, err1 := vcd.doVolumeCheckDisk(bDB, aDB, b, a)
bHasChanges, err2 := vcd.doVolumeCheckDisk(aDB, bDB, a, b)
if err1 != nil {
return aHasChanges, bHasChanges, fmt.Errorf("doVolumeCheckDisk source:%s target:%s volume %d: %v", b.location.dataNode.Id, a.location.dataNode.Id, b.info.Id, err1)
}
@ -247,7 +278,7 @@ func (c *commandVolumeCheckDisk) checkBoth(a *VolumeReplica, b *VolumeReplica, a
return aHasChanges, bHasChanges, nil
}
func doVolumeCheckDisk(minuend, subtrahend *needle_map.MemDb, source, target *VolumeReplica, verbose bool, writer io.Writer, applyChanges bool, doSyncDeletions bool, nonRepairThreshold float64, cutoffFromAtNs uint64, grpcDialOption grpc.DialOption) (hasChanges bool, err error) {
func (vcd *volumeCheckDisk) doVolumeCheckDisk(minuend, subtrahend *needle_map.MemDb, source, target *VolumeReplica) (hasChanges bool, err error) {
// find missing keys
// hash join, can be more efficient
@ -255,6 +286,8 @@ func doVolumeCheckDisk(minuend, subtrahend *needle_map.MemDb, source, target *Vo
var partiallyDeletedNeedles []needle_map.NeedleValue
var counter int
doCutoffOfLastNeedle := true
cutoffFromAtNs := uint64(vcd.now.UnixNano())
minuend.DescendingVisit(func(minuendValue needle_map.NeedleValue) error {
counter++
if subtrahendValue, found := subtrahend.Get(minuendValue.Key); !found {
@ -262,7 +295,7 @@ func doVolumeCheckDisk(minuend, subtrahend *needle_map.MemDb, source, target *Vo
return nil
}
if doCutoffOfLastNeedle {
if needleMeta, err := readNeedleMeta(grpcDialOption, pb.NewServerAddressFromDataNode(source.location.dataNode), source.info.Id, minuendValue); err == nil {
if needleMeta, err := readNeedleMeta(vcd.grpcDialOption(), pb.NewServerAddressFromDataNode(source.location.dataNode), source.info.Id, minuendValue); err == nil {
// needles older than the cutoff time are not missing yet
if needleMeta.AppendAtNs > cutoffFromAtNs {
return nil
@ -282,7 +315,7 @@ func doVolumeCheckDisk(minuend, subtrahend *needle_map.MemDb, source, target *Vo
return nil
})
fmt.Fprintf(writer, "volume %d %s has %d entries, %s missed %d and partially deleted %d entries\n",
vcd.write("volume %d %s has %d entries, %s missed %d and partially deleted %d entries\n",
source.info.Id, source.location.dataNode.Id, counter, target.location.dataNode.Id, len(missingNeedles), len(partiallyDeletedNeedles))
if counter == 0 || (len(missingNeedles) == 0 && len(partiallyDeletedNeedles) == 0) {
@ -290,45 +323,40 @@ func doVolumeCheckDisk(minuend, subtrahend *needle_map.MemDb, source, target *Vo
}
missingNeedlesFraction := float64(len(missingNeedles)) / float64(counter)
if missingNeedlesFraction > nonRepairThreshold {
if missingNeedlesFraction > vcd.nonRepairThreshold {
return false, fmt.Errorf(
"failed to start repair volume %d, percentage of missing keys is greater than the threshold: %.2f > %.2f",
source.info.Id, missingNeedlesFraction, nonRepairThreshold)
source.info.Id, missingNeedlesFraction, vcd.nonRepairThreshold)
}
for _, needleValue := range missingNeedles {
needleBlob, err := readSourceNeedleBlob(grpcDialOption, pb.NewServerAddressFromDataNode(source.location.dataNode), source.info.Id, needleValue)
needleBlob, err := vcd.readSourceNeedleBlob(pb.NewServerAddressFromDataNode(source.location.dataNode), source.info.Id, needleValue)
if err != nil {
return hasChanges, err
}
if !applyChanges {
if !vcd.applyChanges {
continue
}
if verbose {
fmt.Fprintf(writer, "read %s %s => %s\n", needleValue.Key.FileId(source.info.Id), source.location.dataNode.Id, target.location.dataNode.Id)
}
vcd.writeVerbose("read %s %s => %s\n", needleValue.Key.FileId(source.info.Id), source.location.dataNode.Id, target.location.dataNode.Id)
hasChanges = true
if err = writeNeedleBlobToTarget(grpcDialOption, pb.NewServerAddressFromDataNode(target.location.dataNode), source.info.Id, needleValue, needleBlob); err != nil {
if err = vcd.writeNeedleBlobToTarget(pb.NewServerAddressFromDataNode(target.location.dataNode), source.info.Id, needleValue, needleBlob); err != nil {
return hasChanges, err
}
}
if doSyncDeletions && applyChanges && len(partiallyDeletedNeedles) > 0 {
if vcd.syncDeletions && vcd.applyChanges && len(partiallyDeletedNeedles) > 0 {
var fidList []string
for _, needleValue := range partiallyDeletedNeedles {
fidList = append(fidList, needleValue.Key.FileId(source.info.Id))
if verbose {
fmt.Fprintf(writer, "delete %s %s => %s\n", needleValue.Key.FileId(source.info.Id), source.location.dataNode.Id, target.location.dataNode.Id)
}
vcd.writeVerbose("delete %s %s => %s\n", needleValue.Key.FileId(source.info.Id), source.location.dataNode.Id, target.location.dataNode.Id)
}
deleteResults := operation.DeleteFileIdsAtOneVolumeServer(
pb.NewServerAddressFromDataNode(target.location.dataNode),
grpcDialOption, fidList, false)
vcd.grpcDialOption(), fidList, false)
// Check for errors in results
for _, deleteResult := range deleteResults {
@ -343,9 +371,9 @@ func doVolumeCheckDisk(minuend, subtrahend *needle_map.MemDb, source, target *Vo
return hasChanges, nil
}
func readSourceNeedleBlob(grpcDialOption grpc.DialOption, sourceVolumeServer pb.ServerAddress, volumeId uint32, needleValue needle_map.NeedleValue) (needleBlob []byte, err error) {
func (vcd *volumeCheckDisk) readSourceNeedleBlob(sourceVolumeServer pb.ServerAddress, volumeId uint32, needleValue needle_map.NeedleValue) (needleBlob []byte, err error) {
err = operation.WithVolumeServerClient(false, sourceVolumeServer, grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
err = operation.WithVolumeServerClient(false, sourceVolumeServer, vcd.grpcDialOption(), func(client volume_server_pb.VolumeServerClient) error {
resp, err := client.ReadNeedleBlob(context.Background(), &volume_server_pb.ReadNeedleBlobRequest{
VolumeId: volumeId,
Offset: needleValue.Offset.ToActualOffset(),
@ -360,9 +388,9 @@ func readSourceNeedleBlob(grpcDialOption grpc.DialOption, sourceVolumeServer pb.
return
}
func writeNeedleBlobToTarget(grpcDialOption grpc.DialOption, targetVolumeServer pb.ServerAddress, volumeId uint32, needleValue needle_map.NeedleValue, needleBlob []byte) error {
func (vcd *volumeCheckDisk) writeNeedleBlobToTarget(targetVolumeServer pb.ServerAddress, volumeId uint32, needleValue needle_map.NeedleValue, needleBlob []byte) error {
return operation.WithVolumeServerClient(false, targetVolumeServer, grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
return operation.WithVolumeServerClient(false, targetVolumeServer, vcd.grpcDialOption(), func(client volume_server_pb.VolumeServerClient) error {
_, err := client.WriteNeedleBlob(context.Background(), &volume_server_pb.WriteNeedleBlobRequest{
VolumeId: volumeId,
NeedleId: uint64(needleValue.Key),
@ -371,25 +399,21 @@ func writeNeedleBlobToTarget(grpcDialOption grpc.DialOption, targetVolumeServer
})
return err
})
}
func readIndexDatabase(db *needle_map.MemDb, collection string, volumeId uint32, volumeServer pb.ServerAddress, verbose bool, writer io.Writer, grpcDialOption grpc.DialOption) error {
func (vcd *volumeCheckDisk) readIndexDatabase(db *needle_map.MemDb, collection string, volumeId uint32, volumeServer pb.ServerAddress) error {
var buf bytes.Buffer
if err := copyVolumeIndexFile(collection, volumeId, volumeServer, &buf, verbose, writer, grpcDialOption); err != nil {
if err := vcd.copyVolumeIndexFile(collection, volumeId, volumeServer, &buf); err != nil {
return err
}
if verbose {
fmt.Fprintf(writer, "load collection %s volume %d index size %d from %s ...\n", collection, volumeId, buf.Len(), volumeServer)
}
vcd.writeVerbose("load collection %s volume %d index size %d from %s ...\n", collection, volumeId, buf.Len(), volumeServer)
return db.LoadFilterFromReaderAt(bytes.NewReader(buf.Bytes()), true, false)
}
func copyVolumeIndexFile(collection string, volumeId uint32, volumeServer pb.ServerAddress, buf *bytes.Buffer, verbose bool, writer io.Writer, grpcDialOption grpc.DialOption) error {
func (vcd *volumeCheckDisk) copyVolumeIndexFile(collection string, volumeId uint32, volumeServer pb.ServerAddress, buf *bytes.Buffer) error {
return operation.WithVolumeServerClient(true, volumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
return operation.WithVolumeServerClient(true, volumeServer, vcd.grpcDialOption(), func(volumeServerClient volume_server_pb.VolumeServerClient) error {
ext := ".idx"
@ -406,7 +430,7 @@ func copyVolumeIndexFile(collection string, volumeId uint32, volumeServer pb.Ser
return fmt.Errorf("failed to start copying volume %d%s: %v", volumeId, ext, err)
}
err = writeToBuffer(copyFileClient, buf)
err = vcd.writeToBuffer(copyFileClient, buf)
if err != nil {
return fmt.Errorf("failed to copy %d%s from %s: %v", volumeId, ext, volumeServer, err)
}
@ -416,7 +440,7 @@ func copyVolumeIndexFile(collection string, volumeId uint32, volumeServer pb.Ser
})
}
func writeToBuffer(client volume_server_pb.VolumeServer_CopyFileClient, buf *bytes.Buffer) error {
func (vcd *volumeCheckDisk) writeToBuffer(client volume_server_pb.VolumeServer_CopyFileClient, buf *bytes.Buffer) error {
for {
resp, receiveErr := client.Recv()
if receiveErr == io.EOF {

11
weed/shell/command_volume_check_disk_test.go

@ -20,8 +20,6 @@ type shouldSkipVolume struct {
}
func TestShouldSkipVolume(t *testing.T) {
cmdVolumeCheckDisk := testCommandVolumeCheckDisk{}
cmdVolumeCheckDisk.writer = os.Stdout
var tests = []shouldSkipVolume{
{
VolumeReplica{nil, &master_pb.VolumeInformationMessage{
@ -67,8 +65,13 @@ func TestShouldSkipVolume(t *testing.T) {
},
}
for num, tt := range tests {
pulseTime := time.Unix(tt.pulseTimeAtSecond, 0)
if isShould := cmdVolumeCheckDisk.shouldSkipVolume(&tt.a, &tt.b, pulseTime, true, true); isShould != tt.shouldSkipVolume {
vcd := &volumeCheckDisk{
writer: os.Stdout,
now: time.Unix(tt.pulseTimeAtSecond, 0),
verbose: true,
syncDeletions: true,
}
if isShould := vcd.shouldSkipVolume(&tt.a, &tt.b); isShould != tt.shouldSkipVolume {
t.Fatalf("result of should skip volume is unexpected for %d test", num)
}
}

23
weed/shell/command_volume_fix_replication.go

@ -16,7 +16,6 @@ import (
"github.com/seaweedfs/seaweedfs/weed/storage/needle_map"
"github.com/seaweedfs/seaweedfs/weed/storage/types"
"github.com/seaweedfs/seaweedfs/weed/util"
"google.golang.org/grpc"
"github.com/seaweedfs/seaweedfs/weed/operation"
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
@ -204,22 +203,32 @@ func collectVolumeReplicaLocations(topologyInfo *master_pb.TopologyInfo) (map[ui
type SelectOneVolumeFunc func(replicas []*VolumeReplica, replicaPlacement *super_block.ReplicaPlacement) *VolumeReplica
func checkOneVolume(a *VolumeReplica, b *VolumeReplica, writer io.Writer, grpcDialOption grpc.DialOption) (err error) {
func checkOneVolume(a *VolumeReplica, b *VolumeReplica, writer io.Writer, commandEnv *CommandEnv) (err error) {
aDB, bDB := needle_map.NewMemDb(), needle_map.NewMemDb()
defer func() {
aDB.Close()
bDB.Close()
}()
vcd := &volumeCheckDisk{
writer: writer,
commandEnv: commandEnv,
now: time.Now(),
verbose: false,
applyChanges: true,
syncDeletions: false,
nonRepairThreshold: float64(1),
}
// read index db
readIndexDbCutoffFrom := uint64(time.Now().UnixNano())
if err = readIndexDatabase(aDB, a.info.Collection, a.info.Id, pb.NewServerAddressFromDataNode(a.location.dataNode), false, writer, grpcDialOption); err != nil {
if err = vcd.readIndexDatabase(aDB, a.info.Collection, a.info.Id, pb.NewServerAddressFromDataNode(a.location.dataNode)); err != nil {
return fmt.Errorf("readIndexDatabase %s volume %d: %v", a.location.dataNode, a.info.Id, err)
}
if err := readIndexDatabase(bDB, b.info.Collection, b.info.Id, pb.NewServerAddressFromDataNode(b.location.dataNode), false, writer, grpcDialOption); err != nil {
if err := vcd.readIndexDatabase(bDB, b.info.Collection, b.info.Id, pb.NewServerAddressFromDataNode(b.location.dataNode)); err != nil {
return fmt.Errorf("readIndexDatabase %s volume %d: %v", b.location.dataNode, b.info.Id, err)
}
if _, err = doVolumeCheckDisk(aDB, bDB, a, b, false, writer, true, false, float64(1), readIndexDbCutoffFrom, grpcDialOption); err != nil {
if _, err = vcd.doVolumeCheckDisk(aDB, bDB, a, b); err != nil {
return fmt.Errorf("doVolumeCheckDisk source:%s target:%s volume %d: %v", a.location.dataNode.Id, b.location.dataNode.Id, a.info.Id, err)
}
return
@ -271,7 +280,7 @@ func (c *commandVolumeFixReplication) deleteOneVolume(commandEnv *CommandEnv, wr
if replicaB.location.dataNode == replica.location.dataNode {
continue
}
if checkErr = checkOneVolume(replica, replicaB, writer, commandEnv.option.GrpcDialOption); checkErr != nil {
if checkErr = checkOneVolume(replica, replicaB, writer, commandEnv); checkErr != nil {
fmt.Fprintf(writer, "sync volume %d on %s and %s: %v\n", replica.info.Id, replica.location.dataNode.Id, replicaB.location.dataNode.Id, checkErr)
break
}

Loading…
Cancel
Save