diff --git a/weed/shell/command_ec_scrub.go b/weed/shell/command_ec_scrub.go index 587d5e5ef..64b474fe9 100644 --- a/weed/shell/command_ec_scrub.go +++ b/weed/shell/command_ec_scrub.go @@ -7,6 +7,7 @@ import ( "io" "strconv" "strings" + "sync" "github.com/seaweedfs/seaweedfs/weed/operation" "github.com/seaweedfs/seaweedfs/weed/pb" @@ -50,7 +51,7 @@ func (c *commandEcVolumeScrub) Do(args []string, commandEnv *CommandEnv, writer volumeIDsStr := volScrubCommand.String("volumeId", "", "comma-separated EC volume IDs to process (optional)") // TODO: switch default mode to LOCAL, once implemented. mode := volScrubCommand.String("mode", "index", "scrubbing mode (index/local/full)") - // TODO: add per-node parallelization + maxParallelization := volScrubCommand.Int("maxParallelization", DefaultMaxParallelization, "run up to X tasks in parallel, whenever possible") if err = volScrubCommand.Parse(args); err != nil { return err @@ -102,45 +103,57 @@ func (c *commandEcVolumeScrub) Do(args []string, commandEnv *CommandEnv, writer fmt.Fprintf(writer, "using %s mode\n", c.mode.String()) c.env = commandEnv - return c.scrubEcVolumes(writer) + return c.scrubEcVolumes(writer, *maxParallelization) } -func (c *commandEcVolumeScrub) scrubEcVolumes(writer io.Writer) error { +func (c *commandEcVolumeScrub) scrubEcVolumes(writer io.Writer, maxParallelization int) error { var brokenVolumesStr, brokenShardsStr []string var details []string var totalVolumes, brokenVolumes, brokenShards, totalFiles uint64 - - for i, addr := range c.volumeServerAddrs { - fmt.Fprintf(writer, "Scrubbing %s (%d/%d)...\n", addr.String(), i+1, len(c.volumeServerAddrs)) - - err := operation.WithVolumeServerClient(false, addr, c.env.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { - res, err := volumeServerClient.ScrubEcVolume(context.Background(), &volume_server_pb.ScrubEcVolumeRequest{ - Mode: c.mode, - VolumeIds: c.volumeIDs, + var mu sync.Mutex + + ewg := NewErrorWaitGroup(maxParallelization) + count := 0 + for _, addr := range c.volumeServerAddrs { + ewg.Add(func() error { + mu.Lock() + count++ + fmt.Fprintf(writer, "Scrubbing %s (%d/%d)...\n", addr.String(), count, len(c.volumeServerAddrs)) + mu.Unlock() + + err := operation.WithVolumeServerClient(false, addr, c.env.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + res, err := volumeServerClient.ScrubEcVolume(context.Background(), &volume_server_pb.ScrubEcVolumeRequest{ + Mode: c.mode, + VolumeIds: c.volumeIDs, + }) + if err != nil { + return err + } + + mu.Lock() + defer mu.Unlock() + + totalVolumes += res.GetTotalVolumes() + totalFiles += res.GetTotalFiles() + brokenVolumes += uint64(len(res.GetBrokenVolumeIds())) + brokenShards += uint64(len(res.GetBrokenShardInfos())) + for _, d := range res.GetDetails() { + details = append(details, fmt.Sprintf("[%s] %s", addr, d)) + } + for _, vid := range res.GetBrokenVolumeIds() { + brokenVolumesStr = append(brokenVolumesStr, fmt.Sprintf("%s:%v", addr, vid)) + } + for _, si := range res.GetBrokenShardInfos() { + brokenShardsStr = append(brokenShardsStr, fmt.Sprintf("%s:%v:%v", addr, si.VolumeId, si.ShardId)) + } + + return nil }) - if err != nil { - return err - } - - totalVolumes += res.GetTotalVolumes() - totalFiles += res.GetTotalFiles() - brokenVolumes += uint64(len(res.GetBrokenVolumeIds())) - brokenShards += uint64(len(res.GetBrokenShardInfos())) - for _, d := range res.GetDetails() { - details = append(details, fmt.Sprintf("[%s] %s", addr, d)) - } - for _, vid := range res.GetBrokenVolumeIds() { - brokenVolumesStr = append(brokenVolumesStr, fmt.Sprintf("%s:%v", addr, vid)) - } - for _, si := range res.GetBrokenShardInfos() { - brokenShardsStr = append(brokenShardsStr, fmt.Sprintf("%s:%v:%v", addr, si.VolumeId, si.ShardId)) - } - - return nil - }) - if err != nil { return err - } + }) + } + if err := ewg.Wait(); err != nil { + return err } fmt.Fprintf(writer, "Scrubbed %d EC files and %d volumes on %d nodes\n", totalFiles, totalVolumes, len(c.volumeServerAddrs)) diff --git a/weed/shell/command_volume_scrub.go b/weed/shell/command_volume_scrub.go index c8a43729f..c6a24e13a 100644 --- a/weed/shell/command_volume_scrub.go +++ b/weed/shell/command_volume_scrub.go @@ -7,6 +7,7 @@ import ( "io" "strconv" "strings" + "sync" "github.com/seaweedfs/seaweedfs/weed/operation" "github.com/seaweedfs/seaweedfs/weed/pb" @@ -51,7 +52,7 @@ func (c *commandVolumeScrub) Do(args []string, commandEnv *CommandEnv, writer io volumeIDsStr := volScrubCommand.String("volumeId", "", "comma-separated volume IDs to process (optional)") // TODO: switch default mode to LOCAL, once implemented. mode := volScrubCommand.String("mode", "index", "scrubbing mode (index/local/full)") - // TODO: add per-node parallelization + maxParallelization := volScrubCommand.Int("maxParallelization", DefaultMaxParallelization, "run up to X tasks in parallel, whenever possible") if err = volScrubCommand.Parse(args); err != nil { return err @@ -103,41 +104,53 @@ func (c *commandVolumeScrub) Do(args []string, commandEnv *CommandEnv, writer io fmt.Fprintf(writer, "using %s mode\n", c.mode.String()) c.env = commandEnv - return c.scrubVolumes(writer) + return c.scrubVolumes(writer, *maxParallelization) } -func (c *commandVolumeScrub) scrubVolumes(writer io.Writer) error { +func (c *commandVolumeScrub) scrubVolumes(writer io.Writer, maxParallelization int) error { var brokenVolumesStr []string var details []string var totalVolumes, brokenVolumes, totalFiles uint64 - - for i, addr := range c.volumeServerAddrs { - fmt.Fprintf(writer, "Scrubbing %s (%d/%d)...\n", addr.String(), i+1, len(c.volumeServerAddrs)) - - err := operation.WithVolumeServerClient(false, addr, c.env.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { - res, err := volumeServerClient.ScrubVolume(context.Background(), &volume_server_pb.ScrubVolumeRequest{ - Mode: c.mode, - VolumeIds: c.volumeIDs, + var mu sync.Mutex + + ewg := NewErrorWaitGroup(maxParallelization) + count := 0 + for _, addr := range c.volumeServerAddrs { + ewg.Add(func() error { + mu.Lock() + count++ + fmt.Fprintf(writer, "Scrubbing %s (%d/%d)...\n", addr.String(), count, len(c.volumeServerAddrs)) + mu.Unlock() + + err := operation.WithVolumeServerClient(false, addr, c.env.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + res, err := volumeServerClient.ScrubVolume(context.Background(), &volume_server_pb.ScrubVolumeRequest{ + Mode: c.mode, + VolumeIds: c.volumeIDs, + }) + if err != nil { + return err + } + + mu.Lock() + defer mu.Unlock() + + totalVolumes += res.GetTotalVolumes() + totalFiles += res.GetTotalFiles() + brokenVolumes += uint64(len(res.GetBrokenVolumeIds())) + for _, d := range res.GetDetails() { + details = append(details, fmt.Sprintf("[%s] %s", addr, d)) + } + for _, vid := range res.GetBrokenVolumeIds() { + brokenVolumesStr = append(brokenVolumesStr, fmt.Sprintf("%s:%v", addr, vid)) + } + + return nil }) - if err != nil { - return err - } - - totalVolumes += res.GetTotalVolumes() - totalFiles += res.GetTotalFiles() - brokenVolumes += uint64(len(res.GetBrokenVolumeIds())) - for _, d := range res.GetDetails() { - details = append(details, fmt.Sprintf("[%s] %s", addr, d)) - } - for _, vid := range res.GetBrokenVolumeIds() { - brokenVolumesStr = append(brokenVolumesStr, fmt.Sprintf("%s:%v", addr, vid)) - } - - return nil - }) - if err != nil { return err - } + }) + } + if err := ewg.Wait(); err != nil { + return err } fmt.Fprintf(writer, "Scrubbed %d files and %d volumes on %d nodes\n", totalFiles, totalVolumes, len(c.volumeServerAddrs))