diff --git a/weed/shell/command_volume_check_disk.go b/weed/shell/command_volume_check_disk.go index ca7efa5d4..3f9859045 100644 --- a/weed/shell/command_volume_check_disk.go +++ b/weed/shell/command_volume_check_disk.go @@ -3,11 +3,13 @@ package shell import ( "bytes" "context" + "errors" "flag" "fmt" "io" "math" "net/http" + "strings" "time" "slices" @@ -158,7 +160,7 @@ func (vcd *volumeCheckDisk) checkWriteableVolumes(volumeReplicas map[uint32][]*V continue } } - if err := vcd.syncTwoReplicas(a, b); err != nil { + if err := vcd.syncTwoReplicas(a, b, true); err != nil { vcd.write("sync volume %d on %s and %s: %v\n", a.info.Id, a.location.dataNode.Id, b.location.dataNode.Id, err) } // always choose the larger volume to be the source @@ -282,62 +284,81 @@ func (vcd *volumeCheckDisk) shouldSkipVolume(a, b *VolumeReplica) (bool, error) return true, nil } -func (vcd *volumeCheckDisk) syncTwoReplicas(a *VolumeReplica, b *VolumeReplica) (err error) { - aHasChanges, bHasChanges := true, true +// syncTwoReplicas attempts to sync all entries from a source volume replica into a target. If bi-directional mode +// is enabled, changes from target are also synced back into the source. +func (vcd *volumeCheckDisk) syncTwoReplicas(source, target *VolumeReplica, bidi bool) (err error) { + sourceHasChanges, targetHasChanges := true, true const maxIterations = 5 iteration := 0 - for (aHasChanges || bHasChanges) && iteration < maxIterations { + for (sourceHasChanges || targetHasChanges) && iteration < maxIterations { iteration++ - vcd.writeVerbose("sync iteration %d for volume %d\n", iteration, a.info.Id) + vcd.writeVerbose("sync iteration %d/%d for volume %d\n", iteration, maxIterations, source.info.Id) - prevAHasChanges, prevBHasChanges := aHasChanges, bHasChanges - if aHasChanges, bHasChanges, err = vcd.checkBoth(a, b); err != nil { + prevSourceHasChanges, prevDestHasChanges := sourceHasChanges, targetHasChanges + if sourceHasChanges, targetHasChanges, err = vcd.checkBoth(source, target, bidi); err != nil { return err } // Detect if we're stuck in a loop with no progress - if iteration > 1 && prevAHasChanges == aHasChanges && prevBHasChanges == bHasChanges && (aHasChanges || bHasChanges) { + if iteration > 1 && prevSourceHasChanges == sourceHasChanges && prevDestHasChanges == targetHasChanges && (sourceHasChanges || targetHasChanges) { vcd.write("volume %d sync is not making progress between %s and %s after iteration %d, stopping to prevent infinite loop\n", - a.info.Id, a.location.dataNode.Id, b.location.dataNode.Id, iteration) + source.info.Id, source.location.dataNode.Id, target.location.dataNode.Id, iteration) return fmt.Errorf("sync not making progress after %d iterations", iteration) } } - if iteration >= maxIterations && (aHasChanges || bHasChanges) { + if iteration >= maxIterations && (sourceHasChanges || targetHasChanges) { vcd.write("volume %d sync reached maximum iterations (%d) between %s and %s, may need manual intervention\n", - a.info.Id, maxIterations, a.location.dataNode.Id, b.location.dataNode.Id) + source.info.Id, maxIterations, source.location.dataNode.Id, target.location.dataNode.Id) return fmt.Errorf("reached maximum sync iterations (%d)", maxIterations) } return nil } -func (vcd *volumeCheckDisk) checkBoth(a *VolumeReplica, b *VolumeReplica) (aHasChanges bool, bHasChanges bool, err error) { - aDB, bDB := needle_map.NewMemDb(), needle_map.NewMemDb() +// checkBoth performs a sync between source and target volume replicas. If bi-directional mode is enabled, changes from target are also synced back into the source. +// Returns whether the source and/or target were modified. +func (vcd *volumeCheckDisk) checkBoth(source, target *VolumeReplica, bidi bool) (sourceHasChanges bool, targetHasChanges bool, err error) { + sourceDB, targetDB := needle_map.NewMemDb(), needle_map.NewMemDb() + if sourceDB == nil || targetDB == nil { + return false, false, fmt.Errorf("failed to allocate in-memory needle DBs") + } defer func() { - aDB.Close() - bDB.Close() + sourceDB.Close() + targetDB.Close() }() // read index db - if err = vcd.readIndexDatabase(aDB, a.info.Collection, a.info.Id, pb.NewServerAddressFromDataNode(a.location.dataNode)); err != nil { - return true, true, fmt.Errorf("readIndexDatabase %s volume %d: %v", a.location.dataNode, a.info.Id, err) + if err = vcd.readIndexDatabase(sourceDB, source.info.Collection, source.info.Id, pb.NewServerAddressFromDataNode(source.location.dataNode)); err != nil { + return true, true, fmt.Errorf("readIndexDatabase %s volume %d: %v", source.location.dataNode.Id, source.info.Id, err) } - if err := vcd.readIndexDatabase(bDB, b.info.Collection, b.info.Id, pb.NewServerAddressFromDataNode(b.location.dataNode)); err != nil { - return true, true, fmt.Errorf("readIndexDatabase %s volume %d: %v", b.location.dataNode, b.info.Id, err) + if err := vcd.readIndexDatabase(targetDB, target.info.Collection, target.info.Id, pb.NewServerAddressFromDataNode(target.location.dataNode)); err != nil { + return true, true, fmt.Errorf("readIndexDatabase %s volume %d: %v", target.location.dataNode.Id, target.info.Id, err) } // find and make up the differences - aHasChanges, err1 := vcd.doVolumeCheckDisk(bDB, aDB, b, a) - bHasChanges, err2 := vcd.doVolumeCheckDisk(aDB, bDB, a, b) - if err1 != nil { - return aHasChanges, bHasChanges, fmt.Errorf("doVolumeCheckDisk source:%s target:%s volume %d: %v", b.location.dataNode.Id, a.location.dataNode.Id, b.info.Id, err1) + errStrs := []string{} + targetHasChanges, err = vcd.doVolumeCheckDisk(sourceDB, targetDB, source, target) + if err != nil { + errStrs = append( + errStrs, + fmt.Sprintf("doVolumeCheckDisk source:%s target:%s volume %d: %v", target.location.dataNode.Id, source.location.dataNode.Id, target.info.Id, err)) + } + sourceHasChanges = false + if bidi { + sourceHasChanges, err = vcd.doVolumeCheckDisk(targetDB, sourceDB, target, source) + if err != nil { + errStrs = append( + errStrs, + fmt.Sprintf("doVolumeCheckDisk source:%s target:%s volume %d: %v", source.location.dataNode.Id, target.location.dataNode.Id, source.info.Id, err)) + } } - if err2 != nil { - return aHasChanges, bHasChanges, fmt.Errorf("doVolumeCheckDisk source:%s target:%s volume %d: %v", a.location.dataNode.Id, b.location.dataNode.Id, a.info.Id, err2) + if len(errStrs) != 0 { + return sourceHasChanges, targetHasChanges, errors.New(strings.Join(errStrs, ", ")) } - return aHasChanges, bHasChanges, nil + + return sourceHasChanges, targetHasChanges, nil } func (vcd *volumeCheckDisk) doVolumeCheckDisk(minuend, subtrahend *needle_map.MemDb, source, target *VolumeReplica) (hasChanges bool, err error) {