|
|
@ -5,6 +5,12 @@ import ( |
|
|
|
"context" |
|
|
|
"flag" |
|
|
|
"fmt" |
|
|
|
"io" |
|
|
|
"math" |
|
|
|
"net/http" |
|
|
|
"sync" |
|
|
|
"time" |
|
|
|
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/operation" |
|
|
|
"github.com/seaweedfs/seaweedfs/weed/pb" |
|
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb" |
|
|
@ -13,11 +19,6 @@ import ( |
|
|
|
"github.com/seaweedfs/seaweedfs/weed/storage/needle_map" |
|
|
|
"golang.org/x/exp/slices" |
|
|
|
"google.golang.org/grpc" |
|
|
|
"io" |
|
|
|
"math" |
|
|
|
"net/http" |
|
|
|
"sync" |
|
|
|
"time" |
|
|
|
) |
|
|
|
|
|
|
|
func init() { |
|
|
@ -141,23 +142,35 @@ func (c *commandVolumeCheckDisk) Do(args []string, commandEnv *CommandEnv, write |
|
|
|
if *volumeId > 0 && replicas[0].info.Id != uint32(*volumeId) { |
|
|
|
continue |
|
|
|
} |
|
|
|
slices.SortFunc(replicas, func(a, b *VolumeReplica) int { |
|
|
|
// filter readonly replica
|
|
|
|
var writableReplicas []*VolumeReplica |
|
|
|
for _, replica := range replicas { |
|
|
|
if replica.info.ReadOnly { |
|
|
|
fmt.Fprintf(writer, "skipping readonly volume %d on %s\n", replica.info.Id, replica.location.dataNode.Id) |
|
|
|
} else { |
|
|
|
writableReplicas = append(writableReplicas, replica) |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
slices.SortFunc(writableReplicas, func(a, b *VolumeReplica) int { |
|
|
|
return int(b.info.FileCount - a.info.FileCount) |
|
|
|
}) |
|
|
|
for len(replicas) >= 2 { |
|
|
|
a, b := replicas[0], replicas[1] |
|
|
|
replicas = replicas[1:] |
|
|
|
if a.info.ReadOnly || b.info.ReadOnly { |
|
|
|
fmt.Fprintf(writer, "skipping readonly volume %d on %s and %s\n", |
|
|
|
a.info.Id, a.location.dataNode.Id, b.location.dataNode.Id) |
|
|
|
continue |
|
|
|
} |
|
|
|
for len(writableReplicas) >= 2 { |
|
|
|
a, b := writableReplicas[0], writableReplicas[1] |
|
|
|
if !*slowMode && c.shouldSkipVolume(a, b, pulseTimeAtSecond, *syncDeletions, *verbose) { |
|
|
|
// always choose the larger volume to be the source
|
|
|
|
writableReplicas = append(replicas[:1], writableReplicas[2:]...) |
|
|
|
continue |
|
|
|
} |
|
|
|
if err := c.syncTwoReplicas(a, b, *applyChanges, *syncDeletions, *nonRepairThreshold, *verbose); err != nil { |
|
|
|
fmt.Fprintf(writer, "sync volume %d on %s and %s: %v\n", a.info.Id, a.location.dataNode.Id, b.location.dataNode.Id, err) |
|
|
|
} |
|
|
|
// always choose the larger volume to be the source
|
|
|
|
if a.info.FileCount > b.info.FileCount { |
|
|
|
writableReplicas = append(writableReplicas[:1], writableReplicas[2:]...) |
|
|
|
} else { |
|
|
|
writableReplicas = writableReplicas[1:] |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
@ -191,13 +204,15 @@ func (c *commandVolumeCheckDisk) checkBoth(a *VolumeReplica, b *VolumeReplica, a |
|
|
|
} |
|
|
|
|
|
|
|
// find and make up the differences
|
|
|
|
if aHasChanges, err = doVolumeCheckDisk(bDB, aDB, b, a, verbose, c.writer, applyChanges, doSyncDeletions, nonRepairThreshold, readIndexDbCutoffFrom, c.env.option.GrpcDialOption); err != nil { |
|
|
|
return true, true, fmt.Errorf("doVolumeCheckDisk source:%s target:%s volume %d: %v", b.location.dataNode.Id, a.location.dataNode.Id, b.info.Id, err) |
|
|
|
aHasChanges, err1 := doVolumeCheckDisk(bDB, aDB, b, a, verbose, c.writer, applyChanges, doSyncDeletions, nonRepairThreshold, readIndexDbCutoffFrom, c.env.option.GrpcDialOption) |
|
|
|
bHasChanges, err2 := doVolumeCheckDisk(aDB, bDB, a, b, verbose, c.writer, applyChanges, doSyncDeletions, nonRepairThreshold, readIndexDbCutoffFrom, c.env.option.GrpcDialOption) |
|
|
|
if err1 != nil { |
|
|
|
return aHasChanges, bHasChanges, fmt.Errorf("doVolumeCheckDisk source:%s target:%s volume %d: %v", b.location.dataNode.Id, a.location.dataNode.Id, b.info.Id, err1) |
|
|
|
} |
|
|
|
if bHasChanges, err = doVolumeCheckDisk(aDB, bDB, a, b, verbose, c.writer, applyChanges, doSyncDeletions, nonRepairThreshold, readIndexDbCutoffFrom, c.env.option.GrpcDialOption); err != nil { |
|
|
|
return true, true, fmt.Errorf("doVolumeCheckDisk source:%s target:%s volume %d: %v", a.location.dataNode.Id, b.location.dataNode.Id, a.info.Id, err) |
|
|
|
if err2 != nil { |
|
|
|
return aHasChanges, bHasChanges, fmt.Errorf("doVolumeCheckDisk source:%s target:%s volume %d: %v", a.location.dataNode.Id, b.location.dataNode.Id, a.info.Id, err2) |
|
|
|
} |
|
|
|
return |
|
|
|
return aHasChanges, bHasChanges, nil |
|
|
|
} |
|
|
|
|
|
|
|
func doVolumeCheckDisk(minuend, subtrahend *needle_map.MemDb, source, target *VolumeReplica, verbose bool, writer io.Writer, applyChanges bool, doSyncDeletions bool, nonRepairThreshold float64, cutoffFromAtNs uint64, grpcDialOption grpc.DialOption) (hasChanges bool, err error) { |
|
|
|