Browse Source

Split logic for `volume.check.disk` into writable and read-only volume replicas. (#7476)

pull/7095/merge
Lisandro Pin 3 days ago
committed by GitHub
parent
commit
0e69f7c916
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
  1. 38
      weed/shell/command_volume_check_disk.go

38
weed/shell/command_volume_check_disk.go

@ -66,10 +66,11 @@ func (c *commandVolumeCheckDisk) Do(args []string, commandEnv *CommandEnv, write
fsckCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
slowMode := fsckCommand.Bool("slow", false, "slow mode checks all replicas even file counts are the same")
verbose := fsckCommand.Bool("v", false, "verbose mode")
volumeId := fsckCommand.Uint("volumeId", 0, "the volume id")
volumeId := fsckCommand.Uint("volumeId", 0, "the volume ID (0 for all)")
applyChanges := fsckCommand.Bool("apply", false, "apply the fix")
// TODO: remove this alias
applyChangesAlias := fsckCommand.Bool("force", false, "apply the fix (alias for -apply)")
forceReadonly := fsckCommand.Bool("force-readonly", false, "apply the fix even on readonly volumes")
syncDeletions := fsckCommand.Bool("syncDeleted", false, "sync of deletions the fix")
nonRepairThreshold := fsckCommand.Float64("nonRepairThreshold", 0.3, "repair when missing keys is not more than this limit")
if err = fsckCommand.Parse(args); err != nil {
@ -100,13 +101,37 @@ func (c *commandVolumeCheckDisk) Do(args []string, commandEnv *CommandEnv, write
if err != nil {
return err
}
// collect volume replicas, optionally filtered by volume ID
volumeReplicas, _ := collectVolumeReplicaLocations(topologyInfo)
if vid := uint32(*volumeId); vid > 0 {
if replicas, ok := volumeReplicas[vid]; ok {
volumeReplicas = map[uint32][]*VolumeReplica{
vid: replicas,
}
} else {
return fmt.Errorf("volume %d not found", vid)
}
}
vcd.write("Pass #1 (writeable volumes)\n")
if err := vcd.checkWriteableVolumes(volumeReplicas); err != nil {
return err
}
if *forceReadonly {
vcd.write("Pass #2 (read-only volumes)\n")
if err := vcd.checkReadOnlyVolumes(volumeReplicas); err != nil {
return err
}
}
return nil
}
// checkWriteableVolumes fixes volume replicas which are not read-only.
func (vcd *volumeCheckDisk) checkWriteableVolumes(volumeReplicas map[uint32][]*VolumeReplica) error {
// pick 1 pairs of volume replica
for _, replicas := range volumeReplicas {
if *volumeId > 0 && replicas[0].info.Id != uint32(*volumeId) {
continue
}
// filter readonly replica
var writableReplicas []*VolumeReplica
for _, replica := range replicas {
@ -148,6 +173,11 @@ func (c *commandVolumeCheckDisk) Do(args []string, commandEnv *CommandEnv, write
return nil
}
// checkReadOnlyVolumes fixes read-only volume replicas.
func (vcd *volumeCheckDisk) checkReadOnlyVolumes(volumeReplicas map[uint32][]*VolumeReplica) error {
return fmt.Errorf("not yet implemented (https://github.com/seaweedfs/seaweedfs/issues/7442)")
}
func (vcd *volumeCheckDisk) isLocked() bool {
return vcd.commandEnv.isLocked()
}

Loading…
Cancel
Save