Browse Source

Parallelize `ec.rebuild` operations per affected volume.

pull/7466/head
Lisandro Pin 2 months ago
parent
commit
0c79057017
Failed to extract signature
  1. 66
      weed/shell/command_ec_rebuild.go
  2. 28
      weed/shell/command_ec_rebuild_test.go

66
weed/shell/command_ec_rebuild.go

@ -5,6 +5,7 @@ import (
"flag"
"fmt"
"io"
"sync"
"github.com/seaweedfs/seaweedfs/weed/operation"
"github.com/seaweedfs/seaweedfs/weed/pb"
@ -18,12 +19,14 @@ func init() {
}
type ecRebuilder struct {
// TODO: add ErrorWaitGroup for parallelization
commandEnv *CommandEnv
ecNodes []*EcNode
writer io.Writer
applyChanges bool
collections []string
ewg *ErrorWaitGroup
ecNodesMu sync.Mutex
}
type commandEcRebuild struct {
@ -71,13 +74,13 @@ func (c *commandEcRebuild) Do(args []string, commandEnv *CommandEnv, writer io.W
fixCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
collection := fixCommand.String("collection", "EACH_COLLECTION", "collection name, or \"EACH_COLLECTION\" for each collection")
maxParallelization := fixCommand.Int("maxParallelization", DefaultMaxParallelization, "run up to X tasks in parallel, whenever possible")
applyChanges := fixCommand.Bool("apply", false, "apply the changes")
// TODO: remove this alias
applyChangesAlias := fixCommand.Bool("force", false, "apply the changes (alias for -apply)")
if err = fixCommand.Parse(args); err != nil {
return nil
}
handleDeprecatedForceFlag(writer, fixCommand, applyChangesAlias, applyChanges)
infoAboutSimulationMode(writer, *applyChanges, "-apply")
@ -107,17 +110,16 @@ func (c *commandEcRebuild) Do(args []string, commandEnv *CommandEnv, writer io.W
writer: writer,
applyChanges: *applyChanges,
collections: collections,
ewg: NewErrorWaitGroup(*maxParallelization),
}
fmt.Printf("rebuildEcVolumes for %d collection(s)\n", len(collections))
for _, c := range collections {
fmt.Printf("rebuildEcVolumes collection %s\n", c)
if err = erb.rebuildEcVolumes(c); err != nil {
return err
}
erb.rebuildEcVolumes(c)
}
return nil
return erb.ewg.Wait()
}
func (erb *ecRebuilder) write(format string, a ...any) {
@ -128,10 +130,13 @@ func (erb *ecRebuilder) isLocked() bool {
return erb.commandEnv.isLocked()
}
// ecNodeWithMoreFreeSlots returns the EC node with higher free slot count, from all nodes visible to the rebuilder.
func (erb *ecRebuilder) ecNodeWithMoreFreeSlots() *EcNode {
// ecNodeWithMoreFreeSlots returns the EC node with higher free slot count, from all nodes visible to the rebuilder, and its free slot count.
func (erb *ecRebuilder) ecNodeWithMoreFreeSlots() (*EcNode, int) {
erb.ecNodesMu.Lock()
defer erb.ecNodesMu.Unlock()
if len(erb.ecNodes) == 0 {
return nil
return nil, 0
}
res := erb.ecNodes[0]
@ -141,11 +146,11 @@ func (erb *ecRebuilder) ecNodeWithMoreFreeSlots() *EcNode {
}
}
return res
return res, res.freeEcSlot
}
func (erb *ecRebuilder) rebuildEcVolumes(collection string) error {
fmt.Printf("rebuildEcVolumes %s\n", collection)
func (erb *ecRebuilder) rebuildEcVolumes(collection string) {
fmt.Printf("rebuildEcVolumes for %q\n", collection)
// collect vid => each shard locations, similar to ecShardMap in topology.go
ecShardMap := make(EcShardMap)
@ -153,37 +158,37 @@ func (erb *ecRebuilder) rebuildEcVolumes(collection string) error {
ecShardMap.registerEcNode(ecNode, collection)
}
rebuilder, freeSlots := erb.ecNodeWithMoreFreeSlots()
if freeSlots < erasure_coding.TotalShardsCount {
erb.ewg.Add(func() error {
return fmt.Errorf("disk space is not enough")
})
return
}
for vid, locations := range ecShardMap {
shardCount := locations.shardCount()
if shardCount == erasure_coding.TotalShardsCount {
continue
}
if shardCount < erasure_coding.DataShardsCount {
return fmt.Errorf("ec volume %d is unrepairable with %d shards", vid, shardCount)
erb.ewg.Add(func() error {
return fmt.Errorf("ec volume %d is unrepairable with %d shards\n", vid, shardCount)
})
return
}
if err := erb.rebuildOneEcVolume(collection, vid, locations); err != nil {
return err
}
erb.ewg.Add(func() error {
return erb.rebuildOneEcVolume(collection, vid, locations, rebuilder)
})
}
return nil
}
func (erb *ecRebuilder) rebuildOneEcVolume(collection string, volumeId needle.VolumeId, locations EcShardLocations) error {
func (erb *ecRebuilder) rebuildOneEcVolume(collection string, volumeId needle.VolumeId, locations EcShardLocations, rebuilder *EcNode) error {
if !erb.isLocked() {
return fmt.Errorf("lock is lost")
}
// TODO: fix this logic so it supports concurrent executions
rebuilder := erb.ecNodeWithMoreFreeSlots()
if rebuilder == nil {
return fmt.Errorf("no EC nodes available for rebuild")
}
if rebuilder.freeEcSlot < erasure_coding.TotalShardsCount {
return fmt.Errorf("disk space is not enough")
}
fmt.Printf("rebuildOneEcVolume %s %d\n", collection, volumeId)
// collect shard files to rebuilder local disk
@ -219,6 +224,9 @@ func (erb *ecRebuilder) rebuildOneEcVolume(collection string, volumeId needle.Vo
return err
}
// ensure ECNode updates are atomic
erb.ecNodesMu.Lock()
defer erb.ecNodesMu.Unlock()
rebuilder.addEcVolumeShards(volumeId, collection, generatedShardIds)
return nil

28
weed/shell/command_ec_rebuild_test.go

@ -118,14 +118,16 @@ func TestEcRebuilderEcNodeWithMoreFreeSlots(t *testing.T) {
ecNodes: tc.nodes,
}
node := erb.ecNodeWithMoreFreeSlots()
node, freeEcSlots := erb.ecNodeWithMoreFreeSlots()
if node == nil {
t.Fatal("Expected a node, got nil")
}
if node.info.Id != tc.expectedNode {
t.Errorf("Expected node %s, got %s", tc.expectedNode, node.info.Id)
}
if node.freeEcSlot != freeEcSlots {
t.Errorf("Expected node with %d free EC slots, got %d", freeEcSlots, node.freeEcSlot)
}
})
}
}
@ -136,10 +138,13 @@ func TestEcRebuilderEcNodeWithMoreFreeSlotsEmpty(t *testing.T) {
ecNodes: []*EcNode{},
}
node := erb.ecNodeWithMoreFreeSlots()
node, freeEcSlots := erb.ecNodeWithMoreFreeSlots()
if node != nil {
t.Errorf("Expected nil for empty node list, got %v", node)
}
if freeEcSlots != 0 {
t.Errorf("Expected no free EC slots, got %d", freeEcSlots)
}
}
// TestRebuildEcVolumesInsufficientShards tests error handling for unrepairable volumes
@ -155,15 +160,17 @@ func TestRebuildEcVolumesInsufficientShards(t *testing.T) {
env: make(map[string]string),
noLock: true, // Bypass lock check for unit test
},
ewg: NewErrorWaitGroup(DefaultMaxParallelization),
ecNodes: []*EcNode{node1},
writer: &logBuffer,
}
err := erb.rebuildEcVolumes("c1")
erb.rebuildEcVolumes("c1")
err := erb.ewg.Wait()
if err == nil {
t.Fatal("Expected error for insufficient shards, got nil")
}
if !strings.Contains(err.Error(), "unrepairable") {
t.Errorf("Expected 'unrepairable' in error message, got: %s", err.Error())
}
@ -182,12 +189,15 @@ func TestRebuildEcVolumesCompleteVolume(t *testing.T) {
env: make(map[string]string),
noLock: true, // Bypass lock check for unit test
},
ewg: NewErrorWaitGroup(DefaultMaxParallelization),
ecNodes: []*EcNode{node1},
writer: &logBuffer,
applyChanges: false,
}
err := erb.rebuildEcVolumes("c1")
erb.rebuildEcVolumes("c1")
err := erb.ewg.Wait()
if err != nil {
t.Fatalf("Expected no error for complete volume, got: %v", err)
}
@ -209,16 +219,18 @@ func TestRebuildEcVolumesInsufficientSpace(t *testing.T) {
env: make(map[string]string),
noLock: true, // Bypass lock check for unit test
},
ewg: NewErrorWaitGroup(DefaultMaxParallelization),
ecNodes: []*EcNode{node1},
writer: &logBuffer,
applyChanges: false,
}
err := erb.rebuildEcVolumes("c1")
erb.rebuildEcVolumes("c1")
err := erb.ewg.Wait()
if err == nil {
t.Fatal("Expected error for insufficient disk space, got nil")
}
if !strings.Contains(err.Error(), "disk space is not enough") {
t.Errorf("Expected 'disk space' in error message, got: %s", err.Error())
}

Loading…
Cancel
Save