diff --git a/weed/shell/command_volume_check_disk.go b/weed/shell/command_volume_check_disk.go index 3f9859045..84bc730f4 100644 --- a/weed/shell/command_volume_check_disk.go +++ b/weed/shell/command_volume_check_disk.go @@ -9,7 +9,6 @@ import ( "io" "math" "net/http" - "strings" "time" "slices" @@ -53,8 +52,17 @@ func (c *commandVolumeCheckDisk) Help() string { find all volumes that are replicated for each volume id, if there are more than 2 replicas, find one pair with the largest 2 in file count. for the pair volume A and B - append entries in A and not in B to B - append entries in B and not in A to A + bi-directional sync (default): append entries in A and not in B to B, and entries in B and not in A to A + uni-directional sync (read-only repair): only sync from source to target without modifying source + + Options: + -slow: check all replicas even if file counts are the same + -v: verbose mode with detailed progress output + -volumeId: check only a specific volume ID (0 for all) + -apply: actually apply the fixes (default is simulation mode) + -force-readonly: also check and repair read-only volumes using uni-directional sync + -syncDeleted: sync deletion records during repair + -nonRepairThreshold: maximum fraction of missing keys allowed for repair (default 0.3) ` } @@ -175,9 +183,67 @@ func (vcd *volumeCheckDisk) checkWriteableVolumes(volumeReplicas map[uint32][]*V return nil } -// checkReadOnlyVolumes fixes read-only volume replicas. +// checkReadOnlyVolumes fixes read-only volume replicas by performing uni-directional sync +// from a healthy source to read-only targets without modifying the source. func (vcd *volumeCheckDisk) checkReadOnlyVolumes(volumeReplicas map[uint32][]*VolumeReplica) error { - return fmt.Errorf("not yet implemented (https://github.com/seaweedfs/seaweedfs/issues/7442)") + for _, replicas := range volumeReplicas { + // Filter for read-only replicas + var readOnlyReplicas []*VolumeReplica + for _, replica := range replicas { + if replica.info.ReadOnly { + readOnlyReplicas = append(readOnlyReplicas, replica) + } + } + + // Skip if no read-only replicas + if len(readOnlyReplicas) == 0 { + continue + } + + // Find a writable source replica to sync from (prefer the one with highest file count) + var sourceReplica *VolumeReplica + for _, replica := range replicas { + if !replica.info.ReadOnly { + if sourceReplica == nil || replica.info.FileCount > sourceReplica.info.FileCount { + sourceReplica = replica + } + } + } + + // If no writable source found, use the read-only replica with highest file count as source + if sourceReplica == nil { + vcd.writeVerbose("no writable source found for volume %d, using read-only replica with highest file count\n", readOnlyReplicas[0].info.Id) + slices.SortFunc(readOnlyReplicas, func(a, b *VolumeReplica) int { + return int(b.info.FileCount - a.info.FileCount) + }) + sourceReplica = readOnlyReplicas[0] + readOnlyReplicas = readOnlyReplicas[1:] + } + + // Perform uni-directional sync from source to each read-only target + for _, target := range readOnlyReplicas { + if !vcd.slowMode { + shouldSkip, err := vcd.shouldSkipVolume(sourceReplica, target) + if err != nil { + vcd.write("error checking if volume %d should be skipped: %v\n", sourceReplica.info.Id, err) + // Continue with sync despite error to be safe + } else if shouldSkip { + continue + } + } + + vcd.write("syncing read-only volume %d from %s to %s (uni-directional)\n", + sourceReplica.info.Id, sourceReplica.location.dataNode.Id, target.location.dataNode.Id) + + // Use uni-directional sync (bidi=false) to avoid modifying source + if err := vcd.syncTwoReplicas(sourceReplica, target, false); err != nil { + vcd.write("sync read-only volume %d from %s to %s: %v\n", + sourceReplica.info.Id, sourceReplica.location.dataNode.Id, target.location.dataNode.Id, err) + } + } + } + + return nil } func (vcd *volumeCheckDisk) isLocked() bool { @@ -295,13 +361,13 @@ func (vcd *volumeCheckDisk) syncTwoReplicas(source, target *VolumeReplica, bidi iteration++ vcd.writeVerbose("sync iteration %d/%d for volume %d\n", iteration, maxIterations, source.info.Id) - prevSourceHasChanges, prevDestHasChanges := sourceHasChanges, targetHasChanges + prevSourceHasChanges, prevTargetHasChanges := sourceHasChanges, targetHasChanges if sourceHasChanges, targetHasChanges, err = vcd.checkBoth(source, target, bidi); err != nil { return err } // Detect if we're stuck in a loop with no progress - if iteration > 1 && prevSourceHasChanges == sourceHasChanges && prevDestHasChanges == targetHasChanges && (sourceHasChanges || targetHasChanges) { + if iteration > 1 && prevSourceHasChanges == sourceHasChanges && prevTargetHasChanges == targetHasChanges && (sourceHasChanges || targetHasChanges) { vcd.write("volume %d sync is not making progress between %s and %s after iteration %d, stopping to prevent infinite loop\n", source.info.Id, source.location.dataNode.Id, target.location.dataNode.Id, iteration) return fmt.Errorf("sync not making progress after %d iterations", iteration) @@ -338,24 +404,25 @@ func (vcd *volumeCheckDisk) checkBoth(source, target *VolumeReplica, bidi bool) } // find and make up the differences - errStrs := []string{} - targetHasChanges, err = vcd.doVolumeCheckDisk(sourceDB, targetDB, source, target) - if err != nil { - errStrs = append( - errStrs, - fmt.Sprintf("doVolumeCheckDisk source:%s target:%s volume %d: %v", target.location.dataNode.Id, source.location.dataNode.Id, target.info.Id, err)) + var errs []error + targetHasChanges, errTarget := vcd.doVolumeCheckDisk(sourceDB, targetDB, source, target) + if errTarget != nil { + errs = append(errs, + fmt.Errorf("doVolumeCheckDisk source:%s target:%s volume %d: %w", + source.location.dataNode.Id, target.location.dataNode.Id, source.info.Id, errTarget)) } sourceHasChanges = false if bidi { - sourceHasChanges, err = vcd.doVolumeCheckDisk(targetDB, sourceDB, target, source) - if err != nil { - errStrs = append( - errStrs, - fmt.Sprintf("doVolumeCheckDisk source:%s target:%s volume %d: %v", source.location.dataNode.Id, target.location.dataNode.Id, source.info.Id, err)) + var errSource error + sourceHasChanges, errSource = vcd.doVolumeCheckDisk(targetDB, sourceDB, target, source) + if errSource != nil { + errs = append(errs, + fmt.Errorf("doVolumeCheckDisk source:%s target:%s volume %d: %w", + target.location.dataNode.Id, source.location.dataNode.Id, target.info.Id, errSource)) } } - if len(errStrs) != 0 { - return sourceHasChanges, targetHasChanges, errors.New(strings.Join(errStrs, ", ")) + if len(errs) > 0 { + return sourceHasChanges, targetHasChanges, errors.Join(errs...) } return sourceHasChanges, targetHasChanges, nil