diff --git a/weed/shell/command_volume_check_disk.go b/weed/shell/command_volume_check_disk.go index 0e76f6ac9..669bbaf3b 100644 --- a/weed/shell/command_volume_check_disk.go +++ b/weed/shell/command_volume_check_disk.go @@ -5,6 +5,12 @@ import ( "context" "flag" "fmt" + "io" + "math" + "net/http" + "sync" + "time" + "github.com/seaweedfs/seaweedfs/weed/operation" "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" @@ -13,11 +19,6 @@ import ( "github.com/seaweedfs/seaweedfs/weed/storage/needle_map" "golang.org/x/exp/slices" "google.golang.org/grpc" - "io" - "math" - "net/http" - "sync" - "time" ) func init() { @@ -141,23 +142,35 @@ func (c *commandVolumeCheckDisk) Do(args []string, commandEnv *CommandEnv, write if *volumeId > 0 && replicas[0].info.Id != uint32(*volumeId) { continue } - slices.SortFunc(replicas, func(a, b *VolumeReplica) int { + // filter readonly replica + var writableReplicas []*VolumeReplica + for _, replica := range replicas { + if replica.info.ReadOnly { + fmt.Fprintf(writer, "skipping readonly volume %d on %s\n", replica.info.Id, replica.location.dataNode.Id) + } else { + writableReplicas = append(writableReplicas, replica) + } + } + + slices.SortFunc(writableReplicas, func(a, b *VolumeReplica) int { return int(b.info.FileCount - a.info.FileCount) }) - for len(replicas) >= 2 { - a, b := replicas[0], replicas[1] - replicas = replicas[1:] - if a.info.ReadOnly || b.info.ReadOnly { - fmt.Fprintf(writer, "skipping readonly volume %d on %s and %s\n", - a.info.Id, a.location.dataNode.Id, b.location.dataNode.Id) - continue - } + for len(writableReplicas) >= 2 { + a, b := writableReplicas[0], writableReplicas[1] if !*slowMode && c.shouldSkipVolume(a, b, pulseTimeAtSecond, *syncDeletions, *verbose) { + // always choose the larger volume to be the source + writableReplicas = append(replicas[:1], writableReplicas[2:]...) continue } if err := c.syncTwoReplicas(a, b, *applyChanges, *syncDeletions, *nonRepairThreshold, *verbose); err != nil { fmt.Fprintf(writer, "sync volume %d on %s and %s: %v\n", a.info.Id, a.location.dataNode.Id, b.location.dataNode.Id, err) } + // always choose the larger volume to be the source + if a.info.FileCount > b.info.FileCount { + writableReplicas = append(writableReplicas[:1], writableReplicas[2:]...) + } else { + writableReplicas = writableReplicas[1:] + } } } @@ -191,13 +204,15 @@ func (c *commandVolumeCheckDisk) checkBoth(a *VolumeReplica, b *VolumeReplica, a } // find and make up the differences - if aHasChanges, err = doVolumeCheckDisk(bDB, aDB, b, a, verbose, c.writer, applyChanges, doSyncDeletions, nonRepairThreshold, readIndexDbCutoffFrom, c.env.option.GrpcDialOption); err != nil { - return true, true, fmt.Errorf("doVolumeCheckDisk source:%s target:%s volume %d: %v", b.location.dataNode.Id, a.location.dataNode.Id, b.info.Id, err) + aHasChanges, err1 := doVolumeCheckDisk(bDB, aDB, b, a, verbose, c.writer, applyChanges, doSyncDeletions, nonRepairThreshold, readIndexDbCutoffFrom, c.env.option.GrpcDialOption) + bHasChanges, err2 := doVolumeCheckDisk(aDB, bDB, a, b, verbose, c.writer, applyChanges, doSyncDeletions, nonRepairThreshold, readIndexDbCutoffFrom, c.env.option.GrpcDialOption) + if err1 != nil { + return aHasChanges, bHasChanges, fmt.Errorf("doVolumeCheckDisk source:%s target:%s volume %d: %v", b.location.dataNode.Id, a.location.dataNode.Id, b.info.Id, err1) } - if bHasChanges, err = doVolumeCheckDisk(aDB, bDB, a, b, verbose, c.writer, applyChanges, doSyncDeletions, nonRepairThreshold, readIndexDbCutoffFrom, c.env.option.GrpcDialOption); err != nil { - return true, true, fmt.Errorf("doVolumeCheckDisk source:%s target:%s volume %d: %v", a.location.dataNode.Id, b.location.dataNode.Id, a.info.Id, err) + if err2 != nil { + return aHasChanges, bHasChanges, fmt.Errorf("doVolumeCheckDisk source:%s target:%s volume %d: %v", a.location.dataNode.Id, b.location.dataNode.Id, a.info.Id, err2) } - return + return aHasChanges, bHasChanges, nil } func doVolumeCheckDisk(minuend, subtrahend *needle_map.MemDb, source, target *VolumeReplica, verbose bool, writer io.Writer, applyChanges bool, doSyncDeletions bool, nonRepairThreshold float64, cutoffFromAtNs uint64, grpcDialOption grpc.DialOption) (hasChanges bool, err error) {