Browse Source

`ec.encode`: Fix bug causing source volumes not being deleted after EC conversion. (#6447)

This logic was originally part of `spreadEcShards()`, which got removed during
the unification effort with `ec.balance` (https://github.com/seaweedfs/seaweedfs/pull/6344),
accidentally breaking functionality in the process.

The commit restores the deletion code for EC'd volumes - with parallelization support.
pull/6224/merge
Lisandro Pin 4 days ago
committed by GitHub
parent
commit
eab2e0e112
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
  1. 19
      weed/shell/command_ec_encode.go

19
weed/shell/command_ec_encode.go

@ -132,6 +132,8 @@ func (c *commandEcEncode) Do(args []string, commandEnv *CommandEnv, writer io.Wr
} }
func doEcEncode(commandEnv *CommandEnv, collection string, vid needle.VolumeId, maxParallelization int) error { func doEcEncode(commandEnv *CommandEnv, collection string, vid needle.VolumeId, maxParallelization int) error {
var ewg *ErrorWaitGroup
if !commandEnv.isLocked() { if !commandEnv.isLocked() {
return fmt.Errorf("lock is lost") return fmt.Errorf("lock is lost")
} }
@ -143,7 +145,7 @@ func doEcEncode(commandEnv *CommandEnv, collection string, vid needle.VolumeId,
} }
// mark the volume as readonly // mark the volume as readonly
ewg := NewErrorWaitGroup(maxParallelization)
ewg = NewErrorWaitGroup(maxParallelization)
for _, location := range locations { for _, location := range locations {
ewg.Add(func() error { ewg.Add(func() error {
if err := markVolumeReplicaWritable(commandEnv.option.GrpcDialOption, vid, location, false, false); err != nil { if err := markVolumeReplicaWritable(commandEnv.option.GrpcDialOption, vid, location, false, false); err != nil {
@ -161,6 +163,21 @@ func doEcEncode(commandEnv *CommandEnv, collection string, vid needle.VolumeId,
return fmt.Errorf("generate ec shards for volume %d on %s: %v", vid, locations[0].Url, err) return fmt.Errorf("generate ec shards for volume %d on %s: %v", vid, locations[0].Url, err)
} }
// ask the source volume server to delete the original volume
ewg = NewErrorWaitGroup(maxParallelization)
for _, location := range locations {
ewg.Add(func() error {
if err := deleteVolume(commandEnv.option.GrpcDialOption, vid, location.ServerAddress(), false); err != nil {
return fmt.Errorf("deleteVolume %s volume %d: %v", location.Url, vid, err)
}
fmt.Printf("deleted volume %d from %s\n", vid, location.Url)
return nil
})
}
if err := ewg.Wait(); err != nil {
return err
}
return nil return nil
} }

Loading…
Cancel
Save