From ee775825bcd46911adeb811241bad56836dab05f Mon Sep 17 00:00:00 2001 From: Lisandro Pin Date: Tue, 2 Dec 2025 18:29:27 +0100 Subject: [PATCH] Parallelize read-only volume check pass for `volume.check.disk`. (#7602) --- weed/shell/command_volume_check_disk.go | 52 ++++++++++++++----------- weed/shell/common.go | 8 ++++ 2 files changed, 37 insertions(+), 23 deletions(-) diff --git a/weed/shell/command_volume_check_disk.go b/weed/shell/command_volume_check_disk.go index d7b015979..78b2486dd 100644 --- a/weed/shell/command_volume_check_disk.go +++ b/weed/shell/command_volume_check_disk.go @@ -40,6 +40,8 @@ type volumeCheckDisk struct { syncDeletions bool fixReadOnly bool nonRepairThreshold float64 + + ewg *ErrorWaitGroup } func (c *commandVolumeCheckDisk) Name() string { @@ -92,6 +94,7 @@ func (c *commandVolumeCheckDisk) Do(args []string, commandEnv *CommandEnv, write applyChangesAlias := fsckCommand.Bool("force", false, "apply the fix (alias for -apply)") fixReadOnly := fsckCommand.Bool("fixReadOnly", false, "apply the fix even on readonly volumes (EXPERIMENTAL!)") syncDeletions := fsckCommand.Bool("syncDeleted", false, "sync of deletions the fix") + maxParallelization := fsckCommand.Int("maxParallelization", DefaultMaxParallelization, "run up to X tasks in parallel, whenever possible") nonRepairThreshold := fsckCommand.Float64("nonRepairThreshold", 0.3, "repair when missing keys is not more than this limit") if err = fsckCommand.Parse(args); err != nil { return nil @@ -115,6 +118,8 @@ func (c *commandVolumeCheckDisk) Do(args []string, commandEnv *CommandEnv, write syncDeletions: *syncDeletions, fixReadOnly: *fixReadOnly, nonRepairThreshold: *nonRepairThreshold, + + ewg: NewErrorWaitGroup(*maxParallelization), } // collect topology information @@ -137,11 +142,9 @@ func (c *commandVolumeCheckDisk) Do(args []string, commandEnv *CommandEnv, write if err := vcd.checkWritableVolumes(volumeReplicas); err != nil { return err } - if err := vcd.checkReadOnlyVolumes(volumeReplicas); err != nil { - return err - } + vcd.checkReadOnlyVolumes(volumeReplicas) - return nil + return vcd.ewg.Wait() } // checkWritableVolumes fixes volume replicas which are not read-only. @@ -228,9 +231,9 @@ func (vcd *volumeCheckDisk) makeVolumeReadonly(vid uint32, vr *VolumeReplica) er return nil } -func (vcd *volumeCheckDisk) checkReadOnlyVolumes(volumeReplicas map[uint32][]*VolumeReplica) error { +func (vcd *volumeCheckDisk) checkReadOnlyVolumes(volumeReplicas map[uint32][]*VolumeReplica) { if !vcd.fixReadOnly { - return nil + return } vcd.write("Pass #2 (read-only volumes)\n") @@ -261,35 +264,38 @@ func (vcd *volumeCheckDisk) checkReadOnlyVolumes(volumeReplicas map[uint32][]*Vo skip, err := vcd.shouldSkipVolume(r, source) if err != nil { - vcd.write("error checking if volume %d should be skipped: %v\n", r.info.Id, err) + vcd.ewg.AddErrorf("failed to check if volume %d should be skipped: %v\n", r.info.Id, err) continue } if skip { continue } - // make volume writable... - if err := vcd.makeVolumeWritable(vid, r); err != nil { - return err - } + vcd.ewg.Add(func() error { + // make volume writable... + if err := vcd.makeVolumeWritable(vid, r); err != nil { + return err + } + + // ...fix it... + // TODO: test whether syncTwoReplicas() is enough to prune garbage entries on broken volumes. + if err := vcd.syncTwoReplicas(source, r, false); err != nil { + vcd.write("sync read-only volume %d on %s from %s: %v\n", vid, r.location.dataNode.Id, source.location.dataNode.Id, err) - // ...fix it... - // TODO: test whether syncTwoReplicas() is enough to prune garbage entries on broken volumes. - if err := vcd.syncTwoReplicas(source, r, false); err != nil { - vcd.write("sync read-only volume %d on %s from %s: %v\n", vid, r.location.dataNode.Id, source.location.dataNode.Id, err) + // ...or revert it back to read-only, if something went wrong. + // TODO: we should keep unchanged volumes as read-only, so we don't modify valid volumes which are full. + if roErr := vcd.makeVolumeReadonly(vid, r); roErr != nil { + return fmt.Errorf("failed to make volume %d on %s readonly after: %v: %v", vid, r.location.dataNode.Id, err, roErr) + } + vcd.write("volume %d on %s is now read-only\n", vid, r.location.dataNode.Id) - // ...or revert it back to read-only, if something went wrong. - if roErr := vcd.makeVolumeReadonly(vid, r); roErr != nil { - return fmt.Errorf("failed to make volume %d on %s readonly after: %v: %v", vid, r.location.dataNode.Id, err, roErr) + return err } - vcd.write("volume %d on %s is now read-only\n", vid, r.location.dataNode.Id) - return err - } + return nil + }) } } - - return nil } func (vcd *volumeCheckDisk) grpcDialOption() grpc.DialOption { diff --git a/weed/shell/common.go b/weed/shell/common.go index 43571176e..cb2df5828 100644 --- a/weed/shell/common.go +++ b/weed/shell/common.go @@ -2,6 +2,7 @@ package shell import ( "errors" + "fmt" "sync" ) @@ -64,6 +65,13 @@ func (ewg *ErrorWaitGroup) Add(f ErrorWaitGroupTask) { }() } +// AddErrorf adds an error to an ErrorWaitGroupTask result, without queueing any goroutines. +func (ewg *ErrorWaitGroup) AddErrorf(format string, a ...interface{}) { + ewg.errorsMu.Lock() + ewg.errors = append(ewg.errors, fmt.Errorf(format, a...)) + ewg.errorsMu.Unlock() +} + // Wait sleeps until all ErrorWaitGroupTasks are completed, then returns errors for them. func (ewg *ErrorWaitGroup) Wait() error { ewg.wg.Wait()