Browse Source

Parallelize read-only volume check pass for `volume.check.disk`. (#7602)

pull/7612/head
Lisandro Pin 1 day ago
committed by GitHub
parent
commit
ee775825bc
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
  1. 52
      weed/shell/command_volume_check_disk.go
  2. 8
      weed/shell/common.go

52
weed/shell/command_volume_check_disk.go

@ -40,6 +40,8 @@ type volumeCheckDisk struct {
syncDeletions bool syncDeletions bool
fixReadOnly bool fixReadOnly bool
nonRepairThreshold float64 nonRepairThreshold float64
ewg *ErrorWaitGroup
} }
func (c *commandVolumeCheckDisk) Name() string { func (c *commandVolumeCheckDisk) Name() string {
@ -92,6 +94,7 @@ func (c *commandVolumeCheckDisk) Do(args []string, commandEnv *CommandEnv, write
applyChangesAlias := fsckCommand.Bool("force", false, "apply the fix (alias for -apply)") applyChangesAlias := fsckCommand.Bool("force", false, "apply the fix (alias for -apply)")
fixReadOnly := fsckCommand.Bool("fixReadOnly", false, "apply the fix even on readonly volumes (EXPERIMENTAL!)") fixReadOnly := fsckCommand.Bool("fixReadOnly", false, "apply the fix even on readonly volumes (EXPERIMENTAL!)")
syncDeletions := fsckCommand.Bool("syncDeleted", false, "sync of deletions the fix") syncDeletions := fsckCommand.Bool("syncDeleted", false, "sync of deletions the fix")
maxParallelization := fsckCommand.Int("maxParallelization", DefaultMaxParallelization, "run up to X tasks in parallel, whenever possible")
nonRepairThreshold := fsckCommand.Float64("nonRepairThreshold", 0.3, "repair when missing keys is not more than this limit") nonRepairThreshold := fsckCommand.Float64("nonRepairThreshold", 0.3, "repair when missing keys is not more than this limit")
if err = fsckCommand.Parse(args); err != nil { if err = fsckCommand.Parse(args); err != nil {
return nil return nil
@ -115,6 +118,8 @@ func (c *commandVolumeCheckDisk) Do(args []string, commandEnv *CommandEnv, write
syncDeletions: *syncDeletions, syncDeletions: *syncDeletions,
fixReadOnly: *fixReadOnly, fixReadOnly: *fixReadOnly,
nonRepairThreshold: *nonRepairThreshold, nonRepairThreshold: *nonRepairThreshold,
ewg: NewErrorWaitGroup(*maxParallelization),
} }
// collect topology information // collect topology information
@ -137,11 +142,9 @@ func (c *commandVolumeCheckDisk) Do(args []string, commandEnv *CommandEnv, write
if err := vcd.checkWritableVolumes(volumeReplicas); err != nil { if err := vcd.checkWritableVolumes(volumeReplicas); err != nil {
return err return err
} }
if err := vcd.checkReadOnlyVolumes(volumeReplicas); err != nil {
return err
}
vcd.checkReadOnlyVolumes(volumeReplicas)
return nil
return vcd.ewg.Wait()
} }
// checkWritableVolumes fixes volume replicas which are not read-only. // checkWritableVolumes fixes volume replicas which are not read-only.
@ -228,9 +231,9 @@ func (vcd *volumeCheckDisk) makeVolumeReadonly(vid uint32, vr *VolumeReplica) er
return nil return nil
} }
func (vcd *volumeCheckDisk) checkReadOnlyVolumes(volumeReplicas map[uint32][]*VolumeReplica) error {
func (vcd *volumeCheckDisk) checkReadOnlyVolumes(volumeReplicas map[uint32][]*VolumeReplica) {
if !vcd.fixReadOnly { if !vcd.fixReadOnly {
return nil
return
} }
vcd.write("Pass #2 (read-only volumes)\n") vcd.write("Pass #2 (read-only volumes)\n")
@ -261,35 +264,38 @@ func (vcd *volumeCheckDisk) checkReadOnlyVolumes(volumeReplicas map[uint32][]*Vo
skip, err := vcd.shouldSkipVolume(r, source) skip, err := vcd.shouldSkipVolume(r, source)
if err != nil { if err != nil {
vcd.write("error checking if volume %d should be skipped: %v\n", r.info.Id, err)
vcd.ewg.AddErrorf("failed to check if volume %d should be skipped: %v\n", r.info.Id, err)
continue continue
} }
if skip { if skip {
continue continue
} }
// make volume writable...
if err := vcd.makeVolumeWritable(vid, r); err != nil {
return err
}
vcd.ewg.Add(func() error {
// make volume writable...
if err := vcd.makeVolumeWritable(vid, r); err != nil {
return err
}
// ...fix it...
// TODO: test whether syncTwoReplicas() is enough to prune garbage entries on broken volumes.
if err := vcd.syncTwoReplicas(source, r, false); err != nil {
vcd.write("sync read-only volume %d on %s from %s: %v\n", vid, r.location.dataNode.Id, source.location.dataNode.Id, err)
// ...fix it...
// TODO: test whether syncTwoReplicas() is enough to prune garbage entries on broken volumes.
if err := vcd.syncTwoReplicas(source, r, false); err != nil {
vcd.write("sync read-only volume %d on %s from %s: %v\n", vid, r.location.dataNode.Id, source.location.dataNode.Id, err)
// ...or revert it back to read-only, if something went wrong.
// TODO: we should keep unchanged volumes as read-only, so we don't modify valid volumes which are full.
if roErr := vcd.makeVolumeReadonly(vid, r); roErr != nil {
return fmt.Errorf("failed to make volume %d on %s readonly after: %v: %v", vid, r.location.dataNode.Id, err, roErr)
}
vcd.write("volume %d on %s is now read-only\n", vid, r.location.dataNode.Id)
// ...or revert it back to read-only, if something went wrong.
if roErr := vcd.makeVolumeReadonly(vid, r); roErr != nil {
return fmt.Errorf("failed to make volume %d on %s readonly after: %v: %v", vid, r.location.dataNode.Id, err, roErr)
return err
} }
vcd.write("volume %d on %s is now read-only\n", vid, r.location.dataNode.Id)
return err
}
return nil
})
} }
} }
return nil
} }
func (vcd *volumeCheckDisk) grpcDialOption() grpc.DialOption { func (vcd *volumeCheckDisk) grpcDialOption() grpc.DialOption {

8
weed/shell/common.go

@ -2,6 +2,7 @@ package shell
import ( import (
"errors" "errors"
"fmt"
"sync" "sync"
) )
@ -64,6 +65,13 @@ func (ewg *ErrorWaitGroup) Add(f ErrorWaitGroupTask) {
}() }()
} }
// AddErrorf adds an error to an ErrorWaitGroupTask result, without queueing any goroutines.
func (ewg *ErrorWaitGroup) AddErrorf(format string, a ...interface{}) {
ewg.errorsMu.Lock()
ewg.errors = append(ewg.errors, fmt.Errorf(format, a...))
ewg.errorsMu.Unlock()
}
// Wait sleeps until all ErrorWaitGroupTasks are completed, then returns errors for them. // Wait sleeps until all ErrorWaitGroupTasks are completed, then returns errors for them.
func (ewg *ErrorWaitGroup) Wait() error { func (ewg *ErrorWaitGroup) Wait() error {
ewg.wg.Wait() ewg.wg.Wait()

Loading…
Cancel
Save