diff --git a/weed/shell/command_ec_rebuild.go b/weed/shell/command_ec_rebuild.go index 3b2c63cc7..ad31bc676 100644 --- a/weed/shell/command_ec_rebuild.go +++ b/weed/shell/command_ec_rebuild.go @@ -5,6 +5,7 @@ import ( "flag" "fmt" "io" + "sync" "github.com/seaweedfs/seaweedfs/weed/operation" "github.com/seaweedfs/seaweedfs/weed/pb" @@ -18,12 +19,14 @@ func init() { } type ecRebuilder struct { - // TODO: add ErrorWaitGroup for parallelization commandEnv *CommandEnv ecNodes []*EcNode writer io.Writer applyChanges bool collections []string + + ewg *ErrorWaitGroup + ecNodesMu sync.Mutex } type commandEcRebuild struct { @@ -71,13 +74,13 @@ func (c *commandEcRebuild) Do(args []string, commandEnv *CommandEnv, writer io.W fixCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) collection := fixCommand.String("collection", "EACH_COLLECTION", "collection name, or \"EACH_COLLECTION\" for each collection") + maxParallelization := fixCommand.Int("maxParallelization", DefaultMaxParallelization, "run up to X tasks in parallel, whenever possible") applyChanges := fixCommand.Bool("apply", false, "apply the changes") // TODO: remove this alias applyChangesAlias := fixCommand.Bool("force", false, "apply the changes (alias for -apply)") if err = fixCommand.Parse(args); err != nil { return nil } - handleDeprecatedForceFlag(writer, fixCommand, applyChangesAlias, applyChanges) infoAboutSimulationMode(writer, *applyChanges, "-apply") @@ -107,17 +110,16 @@ func (c *commandEcRebuild) Do(args []string, commandEnv *CommandEnv, writer io.W writer: writer, applyChanges: *applyChanges, collections: collections, + + ewg: NewErrorWaitGroup(*maxParallelization), } fmt.Printf("rebuildEcVolumes for %d collection(s)\n", len(collections)) for _, c := range collections { - fmt.Printf("rebuildEcVolumes collection %s\n", c) - if err = erb.rebuildEcVolumes(c); err != nil { - return err - } + erb.rebuildEcVolumes(c) } - return nil + return erb.ewg.Wait() } func (erb *ecRebuilder) write(format string, a ...any) { @@ -128,10 +130,13 @@ func (erb *ecRebuilder) isLocked() bool { return erb.commandEnv.isLocked() } -// ecNodeWithMoreFreeSlots returns the EC node with higher free slot count, from all nodes visible to the rebuilder. -func (erb *ecRebuilder) ecNodeWithMoreFreeSlots() *EcNode { +// ecNodeWithMoreFreeSlots returns the EC node with higher free slot count, from all nodes visible to the rebuilder, and its free slot count. +func (erb *ecRebuilder) ecNodeWithMoreFreeSlots() (*EcNode, int) { + erb.ecNodesMu.Lock() + defer erb.ecNodesMu.Unlock() + if len(erb.ecNodes) == 0 { - return nil + return nil, 0 } res := erb.ecNodes[0] @@ -141,11 +146,11 @@ func (erb *ecRebuilder) ecNodeWithMoreFreeSlots() *EcNode { } } - return res + return res, res.freeEcSlot } -func (erb *ecRebuilder) rebuildEcVolumes(collection string) error { - fmt.Printf("rebuildEcVolumes %s\n", collection) +func (erb *ecRebuilder) rebuildEcVolumes(collection string) { + fmt.Printf("rebuildEcVolumes for %q\n", collection) // collect vid => each shard locations, similar to ecShardMap in topology.go ecShardMap := make(EcShardMap) @@ -153,37 +158,37 @@ func (erb *ecRebuilder) rebuildEcVolumes(collection string) error { ecShardMap.registerEcNode(ecNode, collection) } + rebuilder, freeSlots := erb.ecNodeWithMoreFreeSlots() + if freeSlots < erasure_coding.TotalShardsCount { + erb.ewg.Add(func() error { + return fmt.Errorf("disk space is not enough") + }) + return + } + for vid, locations := range ecShardMap { shardCount := locations.shardCount() if shardCount == erasure_coding.TotalShardsCount { continue } if shardCount < erasure_coding.DataShardsCount { - return fmt.Errorf("ec volume %d is unrepairable with %d shards", vid, shardCount) + erb.ewg.Add(func() error { + return fmt.Errorf("ec volume %d is unrepairable with %d shards\n", vid, shardCount) + }) + return } - if err := erb.rebuildOneEcVolume(collection, vid, locations); err != nil { - return err - } + erb.ewg.Add(func() error { + return erb.rebuildOneEcVolume(collection, vid, locations, rebuilder) + }) } - - return nil } -func (erb *ecRebuilder) rebuildOneEcVolume(collection string, volumeId needle.VolumeId, locations EcShardLocations) error { +func (erb *ecRebuilder) rebuildOneEcVolume(collection string, volumeId needle.VolumeId, locations EcShardLocations, rebuilder *EcNode) error { if !erb.isLocked() { return fmt.Errorf("lock is lost") } - // TODO: fix this logic so it supports concurrent executions - rebuilder := erb.ecNodeWithMoreFreeSlots() - if rebuilder == nil { - return fmt.Errorf("no EC nodes available for rebuild") - } - if rebuilder.freeEcSlot < erasure_coding.TotalShardsCount { - return fmt.Errorf("disk space is not enough") - } - fmt.Printf("rebuildOneEcVolume %s %d\n", collection, volumeId) // collect shard files to rebuilder local disk @@ -219,6 +224,9 @@ func (erb *ecRebuilder) rebuildOneEcVolume(collection string, volumeId needle.Vo return err } + // ensure ECNode updates are atomic + erb.ecNodesMu.Lock() + defer erb.ecNodesMu.Unlock() rebuilder.addEcVolumeShards(volumeId, collection, generatedShardIds) return nil diff --git a/weed/shell/command_ec_rebuild_test.go b/weed/shell/command_ec_rebuild_test.go index 5ab431137..7543b008c 100644 --- a/weed/shell/command_ec_rebuild_test.go +++ b/weed/shell/command_ec_rebuild_test.go @@ -118,14 +118,16 @@ func TestEcRebuilderEcNodeWithMoreFreeSlots(t *testing.T) { ecNodes: tc.nodes, } - node := erb.ecNodeWithMoreFreeSlots() + node, freeEcSlots := erb.ecNodeWithMoreFreeSlots() if node == nil { t.Fatal("Expected a node, got nil") } - if node.info.Id != tc.expectedNode { t.Errorf("Expected node %s, got %s", tc.expectedNode, node.info.Id) } + if node.freeEcSlot != freeEcSlots { + t.Errorf("Expected node with %d free EC slots, got %d", freeEcSlots, node.freeEcSlot) + } }) } } @@ -136,10 +138,13 @@ func TestEcRebuilderEcNodeWithMoreFreeSlotsEmpty(t *testing.T) { ecNodes: []*EcNode{}, } - node := erb.ecNodeWithMoreFreeSlots() + node, freeEcSlots := erb.ecNodeWithMoreFreeSlots() if node != nil { t.Errorf("Expected nil for empty node list, got %v", node) } + if freeEcSlots != 0 { + t.Errorf("Expected no free EC slots, got %d", freeEcSlots) + } } // TestRebuildEcVolumesInsufficientShards tests error handling for unrepairable volumes @@ -155,15 +160,17 @@ func TestRebuildEcVolumesInsufficientShards(t *testing.T) { env: make(map[string]string), noLock: true, // Bypass lock check for unit test }, + ewg: NewErrorWaitGroup(DefaultMaxParallelization), ecNodes: []*EcNode{node1}, writer: &logBuffer, } - err := erb.rebuildEcVolumes("c1") + erb.rebuildEcVolumes("c1") + err := erb.ewg.Wait() + if err == nil { t.Fatal("Expected error for insufficient shards, got nil") } - if !strings.Contains(err.Error(), "unrepairable") { t.Errorf("Expected 'unrepairable' in error message, got: %s", err.Error()) } @@ -182,12 +189,15 @@ func TestRebuildEcVolumesCompleteVolume(t *testing.T) { env: make(map[string]string), noLock: true, // Bypass lock check for unit test }, + ewg: NewErrorWaitGroup(DefaultMaxParallelization), ecNodes: []*EcNode{node1}, writer: &logBuffer, applyChanges: false, } - err := erb.rebuildEcVolumes("c1") + erb.rebuildEcVolumes("c1") + err := erb.ewg.Wait() + if err != nil { t.Fatalf("Expected no error for complete volume, got: %v", err) } @@ -209,16 +219,18 @@ func TestRebuildEcVolumesInsufficientSpace(t *testing.T) { env: make(map[string]string), noLock: true, // Bypass lock check for unit test }, + ewg: NewErrorWaitGroup(DefaultMaxParallelization), ecNodes: []*EcNode{node1}, writer: &logBuffer, applyChanges: false, } - err := erb.rebuildEcVolumes("c1") + erb.rebuildEcVolumes("c1") + err := erb.ewg.Wait() + if err == nil { t.Fatal("Expected error for insufficient disk space, got nil") } - if !strings.Contains(err.Error(), "disk space is not enough") { t.Errorf("Expected 'disk space' in error message, got: %s", err.Error()) }