|
|
|
@ -9,7 +9,6 @@ import ( |
|
|
|
"io" |
|
|
|
"math" |
|
|
|
"net/http" |
|
|
|
"strings" |
|
|
|
"time" |
|
|
|
|
|
|
|
"slices" |
|
|
|
@ -53,8 +52,17 @@ func (c *commandVolumeCheckDisk) Help() string { |
|
|
|
find all volumes that are replicated |
|
|
|
for each volume id, if there are more than 2 replicas, find one pair with the largest 2 in file count. |
|
|
|
for the pair volume A and B |
|
|
|
append entries in A and not in B to B |
|
|
|
append entries in B and not in A to A |
|
|
|
bi-directional sync (default): append entries in A and not in B to B, and entries in B and not in A to A |
|
|
|
uni-directional sync (read-only repair): only sync from source to target without modifying source |
|
|
|
|
|
|
|
Options: |
|
|
|
-slow: check all replicas even if file counts are the same |
|
|
|
-v: verbose mode with detailed progress output |
|
|
|
-volumeId: check only a specific volume ID (0 for all) |
|
|
|
-apply: actually apply the fixes (default is simulation mode) |
|
|
|
-force-readonly: also check and repair read-only volumes using uni-directional sync |
|
|
|
-syncDeleted: sync deletion records during repair |
|
|
|
-nonRepairThreshold: maximum fraction of missing keys allowed for repair (default 0.3) |
|
|
|
|
|
|
|
` |
|
|
|
} |
|
|
|
@ -175,9 +183,67 @@ func (vcd *volumeCheckDisk) checkWriteableVolumes(volumeReplicas map[uint32][]*V |
|
|
|
return nil |
|
|
|
} |
|
|
|
|
|
|
|
// checkReadOnlyVolumes fixes read-only volume replicas.
|
|
|
|
// checkReadOnlyVolumes fixes read-only volume replicas by performing uni-directional sync
|
|
|
|
// from a healthy source to read-only targets without modifying the source.
|
|
|
|
func (vcd *volumeCheckDisk) checkReadOnlyVolumes(volumeReplicas map[uint32][]*VolumeReplica) error { |
|
|
|
return fmt.Errorf("not yet implemented (https://github.com/seaweedfs/seaweedfs/issues/7442)") |
|
|
|
for _, replicas := range volumeReplicas { |
|
|
|
// Filter for read-only replicas
|
|
|
|
var readOnlyReplicas []*VolumeReplica |
|
|
|
for _, replica := range replicas { |
|
|
|
if replica.info.ReadOnly { |
|
|
|
readOnlyReplicas = append(readOnlyReplicas, replica) |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
// Skip if no read-only replicas
|
|
|
|
if len(readOnlyReplicas) == 0 { |
|
|
|
continue |
|
|
|
} |
|
|
|
|
|
|
|
// Find a writable source replica to sync from (prefer the one with highest file count)
|
|
|
|
var sourceReplica *VolumeReplica |
|
|
|
for _, replica := range replicas { |
|
|
|
if !replica.info.ReadOnly { |
|
|
|
if sourceReplica == nil || replica.info.FileCount > sourceReplica.info.FileCount { |
|
|
|
sourceReplica = replica |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
// If no writable source found, use the read-only replica with highest file count as source
|
|
|
|
if sourceReplica == nil { |
|
|
|
vcd.writeVerbose("no writable source found for volume %d, using read-only replica with highest file count\n", readOnlyReplicas[0].info.Id) |
|
|
|
slices.SortFunc(readOnlyReplicas, func(a, b *VolumeReplica) int { |
|
|
|
return int(b.info.FileCount - a.info.FileCount) |
|
|
|
}) |
|
|
|
sourceReplica = readOnlyReplicas[0] |
|
|
|
readOnlyReplicas = readOnlyReplicas[1:] |
|
|
|
} |
|
|
|
|
|
|
|
// Perform uni-directional sync from source to each read-only target
|
|
|
|
for _, target := range readOnlyReplicas { |
|
|
|
if !vcd.slowMode { |
|
|
|
shouldSkip, err := vcd.shouldSkipVolume(sourceReplica, target) |
|
|
|
if err != nil { |
|
|
|
vcd.write("error checking if volume %d should be skipped: %v\n", sourceReplica.info.Id, err) |
|
|
|
// Continue with sync despite error to be safe
|
|
|
|
} else if shouldSkip { |
|
|
|
continue |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
vcd.write("syncing read-only volume %d from %s to %s (uni-directional)\n", |
|
|
|
sourceReplica.info.Id, sourceReplica.location.dataNode.Id, target.location.dataNode.Id) |
|
|
|
|
|
|
|
// Use uni-directional sync (bidi=false) to avoid modifying source
|
|
|
|
if err := vcd.syncTwoReplicas(sourceReplica, target, false); err != nil { |
|
|
|
vcd.write("sync read-only volume %d from %s to %s: %v\n", |
|
|
|
sourceReplica.info.Id, sourceReplica.location.dataNode.Id, target.location.dataNode.Id, err) |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
return nil |
|
|
|
} |
|
|
|
|
|
|
|
func (vcd *volumeCheckDisk) isLocked() bool { |
|
|
|
@ -295,13 +361,13 @@ func (vcd *volumeCheckDisk) syncTwoReplicas(source, target *VolumeReplica, bidi |
|
|
|
iteration++ |
|
|
|
vcd.writeVerbose("sync iteration %d/%d for volume %d\n", iteration, maxIterations, source.info.Id) |
|
|
|
|
|
|
|
prevSourceHasChanges, prevDestHasChanges := sourceHasChanges, targetHasChanges |
|
|
|
prevSourceHasChanges, prevTargetHasChanges := sourceHasChanges, targetHasChanges |
|
|
|
if sourceHasChanges, targetHasChanges, err = vcd.checkBoth(source, target, bidi); err != nil { |
|
|
|
return err |
|
|
|
} |
|
|
|
|
|
|
|
// Detect if we're stuck in a loop with no progress
|
|
|
|
if iteration > 1 && prevSourceHasChanges == sourceHasChanges && prevDestHasChanges == targetHasChanges && (sourceHasChanges || targetHasChanges) { |
|
|
|
if iteration > 1 && prevSourceHasChanges == sourceHasChanges && prevTargetHasChanges == targetHasChanges && (sourceHasChanges || targetHasChanges) { |
|
|
|
vcd.write("volume %d sync is not making progress between %s and %s after iteration %d, stopping to prevent infinite loop\n", |
|
|
|
source.info.Id, source.location.dataNode.Id, target.location.dataNode.Id, iteration) |
|
|
|
return fmt.Errorf("sync not making progress after %d iterations", iteration) |
|
|
|
@ -338,24 +404,25 @@ func (vcd *volumeCheckDisk) checkBoth(source, target *VolumeReplica, bidi bool) |
|
|
|
} |
|
|
|
|
|
|
|
// find and make up the differences
|
|
|
|
errStrs := []string{} |
|
|
|
targetHasChanges, err = vcd.doVolumeCheckDisk(sourceDB, targetDB, source, target) |
|
|
|
if err != nil { |
|
|
|
errStrs = append( |
|
|
|
errStrs, |
|
|
|
fmt.Sprintf("doVolumeCheckDisk source:%s target:%s volume %d: %v", target.location.dataNode.Id, source.location.dataNode.Id, target.info.Id, err)) |
|
|
|
var errs []error |
|
|
|
targetHasChanges, errTarget := vcd.doVolumeCheckDisk(sourceDB, targetDB, source, target) |
|
|
|
if errTarget != nil { |
|
|
|
errs = append(errs, |
|
|
|
fmt.Errorf("doVolumeCheckDisk source:%s target:%s volume %d: %w", |
|
|
|
source.location.dataNode.Id, target.location.dataNode.Id, source.info.Id, errTarget)) |
|
|
|
} |
|
|
|
sourceHasChanges = false |
|
|
|
if bidi { |
|
|
|
sourceHasChanges, err = vcd.doVolumeCheckDisk(targetDB, sourceDB, target, source) |
|
|
|
if err != nil { |
|
|
|
errStrs = append( |
|
|
|
errStrs, |
|
|
|
fmt.Sprintf("doVolumeCheckDisk source:%s target:%s volume %d: %v", source.location.dataNode.Id, target.location.dataNode.Id, source.info.Id, err)) |
|
|
|
var errSource error |
|
|
|
sourceHasChanges, errSource = vcd.doVolumeCheckDisk(targetDB, sourceDB, target, source) |
|
|
|
if errSource != nil { |
|
|
|
errs = append(errs, |
|
|
|
fmt.Errorf("doVolumeCheckDisk source:%s target:%s volume %d: %w", |
|
|
|
target.location.dataNode.Id, source.location.dataNode.Id, target.info.Id, errSource)) |
|
|
|
} |
|
|
|
} |
|
|
|
if len(errStrs) != 0 { |
|
|
|
return sourceHasChanges, targetHasChanges, errors.New(strings.Join(errStrs, ", ")) |
|
|
|
if len(errs) > 0 { |
|
|
|
return sourceHasChanges, targetHasChanges, errors.Join(errs...) |
|
|
|
} |
|
|
|
|
|
|
|
return sourceHasChanges, targetHasChanges, nil |
|
|
|
|