Browse Source

Improve parallelization for `ec.encode` (#6769)

Improve parallelization for `ec.encode`.

Instead of processing one volume at at time, perform all EC conversion
steps (mark readonly -> generate EC shards -> delete volume -> remount) in
parallel for all of them.

This should substantially improve performance when EC encoding
entire collections.
pull/6771/head
Lisandro Pin 5 days ago
committed by GitHub
parent
commit
97dad06ed8
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
  1. 71
      weed/shell/command_ec_encode.go

71
weed/shell/command_ec_encode.go

@ -9,6 +9,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/wdclient"
"google.golang.org/grpc"
@ -98,79 +99,94 @@ func (c *commandEcEncode) Do(args []string, commandEnv *CommandEnv, writer io.Wr
}
}
var collections []string
var volumeIds []needle.VolumeId
var balanceCollections []string
if vid := needle.VolumeId(*volumeId); vid != 0 {
// volumeId is provided
volumeIds = append(volumeIds, vid)
collections = collectCollectionsForVolumeIds(topologyInfo, volumeIds)
balanceCollections = collectCollectionsForVolumeIds(topologyInfo, volumeIds)
} else {
// apply to all volumes for the given collection
volumeIds, err = collectVolumeIdsForEcEncode(commandEnv, *collection, *fullPercentage, *quietPeriod)
if err != nil {
return err
}
collections = append(collections, *collection)
balanceCollections = []string{*collection}
}
// encode all requested volumes...
for _, vid := range volumeIds {
if err = doEcEncode(commandEnv, *collection, vid, *maxParallelization); err != nil {
return fmt.Errorf("ec encode for volume %d: %v", vid, err)
}
if err = doEcEncode(commandEnv, *collection, volumeIds, *maxParallelization); err != nil {
return fmt.Errorf("ec encode for volumes %v: %v", volumeIds, err)
}
// ...then re-balance ec shards.
if err := EcBalance(commandEnv, collections, "", rp, *maxParallelization, *applyBalancing); err != nil {
return fmt.Errorf("re-balance ec shards for collection(s) %v: %v", collections, err)
if err := EcBalance(commandEnv, balanceCollections, "", rp, *maxParallelization, *applyBalancing); err != nil {
return fmt.Errorf("re-balance ec shards for collection(s) %v: %v", balanceCollections, err)
}
return nil
}
func doEcEncode(commandEnv *CommandEnv, collection string, vid needle.VolumeId, maxParallelization int) error {
func doEcEncode(commandEnv *CommandEnv, collection string, volumeIds []needle.VolumeId, maxParallelization int) error {
var ewg *ErrorWaitGroup
if !commandEnv.isLocked() {
return fmt.Errorf("lock is lost")
}
// find volume location
locations, found := commandEnv.MasterClient.GetLocationsClone(uint32(vid))
if !found {
// resolve volume locations
locations := map[needle.VolumeId][]wdclient.Location{}
for _, vid := range volumeIds {
ls, ok := commandEnv.MasterClient.GetLocationsClone(uint32(vid))
if !ok {
return fmt.Errorf("volume %d not found", vid)
}
target := locations[0]
locations[vid] = ls
}
// mark the volume as readonly
// mark volumes as readonly
ewg = NewErrorWaitGroup(maxParallelization)
for _, location := range locations {
for _, vid := range volumeIds {
for _, l := range locations[vid] {
ewg.Add(func() error {
if err := markVolumeReplicaWritable(commandEnv.option.GrpcDialOption, vid, location, false, false); err != nil {
return fmt.Errorf("mark volume %d as readonly on %s: %v", vid, location.Url, err)
if err := markVolumeReplicaWritable(commandEnv.option.GrpcDialOption, vid, l, false, false); err != nil {
return fmt.Errorf("mark volume %d as readonly on %s: %v", vid, l.Url, err)
}
return nil
})
}
}
if err := ewg.Wait(); err != nil {
return err
}
// generate ec shards
ewg = NewErrorWaitGroup(maxParallelization)
for _, vid := range volumeIds {
target := locations[vid][0]
ewg.Add(func() error {
if err := generateEcShards(commandEnv.option.GrpcDialOption, vid, collection, target.ServerAddress()); err != nil {
return fmt.Errorf("generate ec shards for volume %d on %s: %v", vid, target.Url, err)
}
return nil
})
}
if err := ewg.Wait(); err != nil {
return err
}
// ask the source volume server to delete the original volume
ewg = NewErrorWaitGroup(maxParallelization)
for _, location := range locations {
for _, vid := range volumeIds {
for _, l := range locations[vid] {
ewg.Add(func() error {
if err := deleteVolume(commandEnv.option.GrpcDialOption, vid, location.ServerAddress(), false); err != nil {
return fmt.Errorf("deleteVolume %s volume %d: %v", location.Url, vid, err)
if err := deleteVolume(commandEnv.option.GrpcDialOption, vid, l.ServerAddress(), false); err != nil {
return fmt.Errorf("deleteVolume %s volume %d: %v", l.Url, vid, err)
}
fmt.Printf("deleted volume %d from %s\n", vid, location.Url)
fmt.Printf("deleted volume %d from %s\n", vid, l.Url)
return nil
})
}
}
if err := ewg.Wait(); err != nil {
return err
}
@ -180,9 +196,20 @@ func doEcEncode(commandEnv *CommandEnv, collection string, vid needle.VolumeId,
for i := range shardIds {
shardIds[i] = uint32(i)
}
ewg = NewErrorWaitGroup(maxParallelization)
for _, vid := range volumeIds {
target := locations[vid][0]
ewg.Add(func() error {
if err := mountEcShards(commandEnv.option.GrpcDialOption, collection, vid, target.ServerAddress(), shardIds); err != nil {
return fmt.Errorf("mount ec shards for volume %d on %s: %v", vid, target.Url, err)
}
return nil
})
}
if err := ewg.Wait(); err != nil {
return err
}
return nil
}

Loading…
Cancel
Save