From 76c48ffe27e339145f140301f9a865bec40b515c Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Thu, 6 May 2021 01:53:35 -0700 Subject: [PATCH] optional parallel copy ec shards fix https://github.com/chrislusf/seaweedfs/issues/2048 --- weed/shell/command_ec_encode.go | 47 +++++++++++++++++++-------------- 1 file changed, 27 insertions(+), 20 deletions(-) diff --git a/weed/shell/command_ec_encode.go b/weed/shell/command_ec_encode.go index 634cb11e2..8480bab06 100644 --- a/weed/shell/command_ec_encode.go +++ b/weed/shell/command_ec_encode.go @@ -63,6 +63,7 @@ func (c *commandEcEncode) Do(args []string, commandEnv *CommandEnv, writer io.Wr collection := encodeCommand.String("collection", "", "the collection name") fullPercentage := encodeCommand.Float64("fullPercent", 95, "the volume reaches the percentage of max volume size") quietPeriod := encodeCommand.Duration("quietFor", time.Hour, "select volumes without no writes for this period") + parallelCopy := encodeCommand.Bool("parallelCopy", true, "copy shards in parallel") if err = encodeCommand.Parse(args); err != nil { return nil } @@ -71,7 +72,7 @@ func (c *commandEcEncode) Do(args []string, commandEnv *CommandEnv, writer io.Wr // volumeId is provided if vid != 0 { - return doEcEncode(commandEnv, *collection, vid) + return doEcEncode(commandEnv, *collection, vid, *parallelCopy) } // apply to all volumes in the collection @@ -81,7 +82,7 @@ func (c *commandEcEncode) Do(args []string, commandEnv *CommandEnv, writer io.Wr } fmt.Printf("ec encode volumes: %v\n", volumeIds) for _, vid := range volumeIds { - if err = doEcEncode(commandEnv, *collection, vid); err != nil { + if err = doEcEncode(commandEnv, *collection, vid, *parallelCopy); err != nil { return err } } @@ -89,7 +90,7 @@ func (c *commandEcEncode) Do(args []string, commandEnv *CommandEnv, writer io.Wr return nil } -func doEcEncode(commandEnv *CommandEnv, collection string, vid needle.VolumeId) (err error) { +func doEcEncode(commandEnv *CommandEnv, collection string, vid needle.VolumeId, parallelCopy bool) (err error) { // find volume location locations, found := commandEnv.MasterClient.GetLocations(uint32(vid)) if !found { @@ -111,7 +112,7 @@ func doEcEncode(commandEnv *CommandEnv, collection string, vid needle.VolumeId) } // balance the ec shards to current cluster - err = spreadEcShards(commandEnv, vid, collection, locations) + err = spreadEcShards(commandEnv, vid, collection, locations, parallelCopy) if err != nil { return fmt.Errorf("spread ec shards for volume %d from %s: %v", vid, locations[0].Url, err) } @@ -157,7 +158,7 @@ func generateEcShards(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, } -func spreadEcShards(commandEnv *CommandEnv, volumeId needle.VolumeId, collection string, existingLocations []wdclient.Location) (err error) { +func spreadEcShards(commandEnv *CommandEnv, volumeId needle.VolumeId, collection string, existingLocations []wdclient.Location, parallelCopy bool) (err error) { allEcNodes, totalFreeEcSlots, err := collectEcNodes(commandEnv, "") if err != nil { @@ -176,7 +177,7 @@ func spreadEcShards(commandEnv *CommandEnv, volumeId needle.VolumeId, collection allocatedEcIds := balancedEcDistribution(allocatedDataNodes) // ask the data nodes to copy from the source volume server - copiedShardIds, err := parallelCopyEcShardsFromSource(commandEnv.option.GrpcDialOption, allocatedDataNodes, allocatedEcIds, volumeId, collection, existingLocations[0]) + copiedShardIds, err := parallelCopyEcShardsFromSource(commandEnv.option.GrpcDialOption, allocatedDataNodes, allocatedEcIds, volumeId, collection, existingLocations[0], parallelCopy) if err != nil { return err } @@ -206,30 +207,36 @@ func spreadEcShards(commandEnv *CommandEnv, volumeId needle.VolumeId, collection } -func parallelCopyEcShardsFromSource(grpcDialOption grpc.DialOption, targetServers []*EcNode, allocatedEcIds [][]uint32, volumeId needle.VolumeId, collection string, existingLocation wdclient.Location) (actuallyCopied []uint32, err error) { +func parallelCopyEcShardsFromSource(grpcDialOption grpc.DialOption, targetServers []*EcNode, allocatedEcIds [][]uint32, volumeId needle.VolumeId, collection string, existingLocation wdclient.Location, parallelCopy bool) (actuallyCopied []uint32, err error) { fmt.Printf("parallelCopyEcShardsFromSource %d %s\n", volumeId, existingLocation.Url) - // parallelize - shardIdChan := make(chan []uint32, len(targetServers)) var wg sync.WaitGroup + shardIdChan := make(chan []uint32, len(targetServers)) + copyFunc := func(server *EcNode, allocatedEcShardIds []uint32) { + defer wg.Done() + copiedShardIds, copyErr := oneServerCopyAndMountEcShardsFromSource(grpcDialOption, server, + allocatedEcShardIds, volumeId, collection, existingLocation.Url) + if copyErr != nil { + err = copyErr + } else { + shardIdChan <- copiedShardIds + server.addEcVolumeShards(volumeId, collection, copiedShardIds) + } + } + + // maybe parallelize for i, server := range targetServers { if len(allocatedEcIds[i]) <= 0 { continue } wg.Add(1) - go func(server *EcNode, allocatedEcShardIds []uint32) { - defer wg.Done() - copiedShardIds, copyErr := oneServerCopyAndMountEcShardsFromSource(grpcDialOption, server, - allocatedEcShardIds, volumeId, collection, existingLocation.Url) - if copyErr != nil { - err = copyErr - } else { - shardIdChan <- copiedShardIds - server.addEcVolumeShards(volumeId, collection, copiedShardIds) - } - }(server, allocatedEcIds[i]) + if parallelCopy { + go copyFunc(server, allocatedEcIds[i]) + } else { + copyFunc(server, allocatedEcIds[i]) + } } wg.Wait() close(shardIdChan)