Browse Source

Improve safety for weed shell's `ec.encode`. (#6773)

Improve safety for weed shells `ec.encode`.

The current process for `ec.encode` is:

1. EC shards for a volume are generated and added to a single server
2. The original volume is deleted
3. EC shards get re-balanced across the entire topology

It is then possible to lose data between #2 and #3, if the underlying volume storage/server/rack/DC
happens to fail, for whatever reason. As a fix, this MR reworks `ec.encode` so:

  * Newly created EC shards are spread across all locations for the source volume.
  * Source volumes are deleted only after EC shards are converted and balanced.
pull/6777/head
Lisandro Pin 3 days ago
committed by GitHub
parent
commit
848d1f7c34
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
  1. 8
      weed/shell/command_ec_common.go
  2. 84
      weed/shell/command_ec_encode.go

8
weed/shell/command_ec_common.go

@ -134,6 +134,14 @@ func NewErrorWaitGroup(maxConcurrency int) *ErrorWaitGroup {
}
}
func (ewg *ErrorWaitGroup) Reset() {
close(ewg.wgSem)
ewg.wg = &sync.WaitGroup{}
ewg.wgSem = make(chan bool, ewg.maxConcurrency)
ewg.errors = nil
}
func (ewg *ErrorWaitGroup) Add(f ErrorWaitGroupTask) {
if ewg.maxConcurrency <= 1 {
// Keep run order deterministic when parallelization is off

84
weed/shell/command_ec_encode.go

@ -118,33 +118,42 @@ func (c *commandEcEncode) Do(args []string, commandEnv *CommandEnv, writer io.Wr
if err = doEcEncode(commandEnv, *collection, volumeIds, *maxParallelization); err != nil {
return fmt.Errorf("ec encode for volumes %v: %v", volumeIds, err)
}
// ...then re-balance ec shards.
// ...re-balance ec shards...
if err := EcBalance(commandEnv, balanceCollections, "", rp, *maxParallelization, *applyBalancing); err != nil {
return fmt.Errorf("re-balance ec shards for collection(s) %v: %v", balanceCollections, err)
}
// ...then delete original volumes.
if err := doDeleteVolumes(commandEnv, volumeIds, *maxParallelization); err != nil {
return fmt.Errorf("re-balance ec shards for collection(s) %v: %v", balanceCollections, err)
}
return nil
}
func doEcEncode(commandEnv *CommandEnv, collection string, volumeIds []needle.VolumeId, maxParallelization int) error {
var ewg *ErrorWaitGroup
if !commandEnv.isLocked() {
return fmt.Errorf("lock is lost")
}
// resolve volume locations
locations := map[needle.VolumeId][]wdclient.Location{}
func volumeLocations(commandEnv *CommandEnv, volumeIds []needle.VolumeId) (map[needle.VolumeId][]wdclient.Location, error) {
res := map[needle.VolumeId][]wdclient.Location{}
for _, vid := range volumeIds {
ls, ok := commandEnv.MasterClient.GetLocationsClone(uint32(vid))
if !ok {
return fmt.Errorf("volume %d not found", vid)
return nil, fmt.Errorf("volume %d not found", vid)
}
locations[vid] = ls
res[vid] = ls
}
return res, nil
}
func doEcEncode(commandEnv *CommandEnv, collection string, volumeIds []needle.VolumeId, maxParallelization int) error {
if !commandEnv.isLocked() {
return fmt.Errorf("lock is lost")
}
locations, err := volumeLocations(commandEnv, volumeIds)
if err != nil {
return nil
}
// mark volumes as readonly
ewg = NewErrorWaitGroup(maxParallelization)
ewg := NewErrorWaitGroup(maxParallelization)
for _, vid := range volumeIds {
for _, l := range locations[vid] {
ewg.Add(func() error {
@ -160,9 +169,9 @@ func doEcEncode(commandEnv *CommandEnv, collection string, volumeIds []needle.Vo
}
// generate ec shards
ewg = NewErrorWaitGroup(maxParallelization)
for _, vid := range volumeIds {
target := locations[vid][0]
ewg.Reset()
for i, vid := range volumeIds {
target := locations[vid][i%len(locations[vid])]
ewg.Add(func() error {
if err := generateEcShards(commandEnv.option.GrpcDialOption, vid, collection, target.ServerAddress()); err != nil {
return fmt.Errorf("generate ec shards for volume %d on %s: %v", vid, target.Url, err)
@ -174,39 +183,50 @@ func doEcEncode(commandEnv *CommandEnv, collection string, volumeIds []needle.Vo
return err
}
// ask the source volume server to delete the original volume
ewg = NewErrorWaitGroup(maxParallelization)
// mount all ec shards for the converted volume
shardIds := make([]uint32, erasure_coding.TotalShardsCount)
for i := range shardIds {
shardIds[i] = uint32(i)
}
ewg.Reset()
for _, vid := range volumeIds {
for _, l := range locations[vid] {
target := locations[vid][0]
ewg.Add(func() error {
if err := deleteVolume(commandEnv.option.GrpcDialOption, vid, l.ServerAddress(), false); err != nil {
return fmt.Errorf("deleteVolume %s volume %d: %v", l.Url, vid, err)
if err := mountEcShards(commandEnv.option.GrpcDialOption, collection, vid, target.ServerAddress(), shardIds); err != nil {
return fmt.Errorf("mount ec shards for volume %d on %s: %v", vid, target.Url, err)
}
fmt.Printf("deleted volume %d from %s\n", vid, l.Url)
return nil
})
}
}
if err := ewg.Wait(); err != nil {
return err
}
// mount all ec shards for the converted volume
shardIds := make([]uint32, erasure_coding.TotalShardsCount)
for i := range shardIds {
shardIds[i] = uint32(i)
return nil
}
func doDeleteVolumes(commandEnv *CommandEnv, volumeIds []needle.VolumeId, maxParallelization int) error {
if !commandEnv.isLocked() {
return fmt.Errorf("lock is lost")
}
locations, err := volumeLocations(commandEnv, volumeIds)
if err != nil {
return nil
}
ewg = NewErrorWaitGroup(maxParallelization)
ewg := NewErrorWaitGroup(maxParallelization)
for _, vid := range volumeIds {
target := locations[vid][0]
for _, l := range locations[vid] {
ewg.Add(func() error {
if err := mountEcShards(commandEnv.option.GrpcDialOption, collection, vid, target.ServerAddress(), shardIds); err != nil {
return fmt.Errorf("mount ec shards for volume %d on %s: %v", vid, target.Url, err)
if err := deleteVolume(commandEnv.option.GrpcDialOption, vid, l.ServerAddress(), false); err != nil {
return fmt.Errorf("deleteVolume %s volume %d: %v", l.Url, vid, err)
}
fmt.Printf("deleted volume %d from %s\n", vid, l.Url)
return nil
})
}
}
if err := ewg.Wait(); err != nil {
return err
}
@ -216,7 +236,7 @@ func doEcEncode(commandEnv *CommandEnv, collection string, volumeIds []needle.Vo
func generateEcShards(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, collection string, sourceVolumeServer pb.ServerAddress) error {
fmt.Printf("generateEcShards %s %d on %s ...\n", collection, volumeId, sourceVolumeServer)
fmt.Printf("generateEcShards %d (collection %q) on %s ...\n", volumeId, collection, sourceVolumeServer)
err := operation.WithVolumeServerClient(false, sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
_, genErr := volumeServerClient.VolumeEcShardsGenerate(context.Background(), &volume_server_pb.VolumeEcShardsGenerateRequest{

Loading…
Cancel
Save