diff --git a/weed/shell/command_ec_common.go b/weed/shell/command_ec_common.go index c49ab7611..209d8a733 100644 --- a/weed/shell/command_ec_common.go +++ b/weed/shell/command_ec_common.go @@ -134,6 +134,14 @@ func NewErrorWaitGroup(maxConcurrency int) *ErrorWaitGroup { } } +func (ewg *ErrorWaitGroup) Reset() { + close(ewg.wgSem) + + ewg.wg = &sync.WaitGroup{} + ewg.wgSem = make(chan bool, ewg.maxConcurrency) + ewg.errors = nil +} + func (ewg *ErrorWaitGroup) Add(f ErrorWaitGroupTask) { if ewg.maxConcurrency <= 1 { // Keep run order deterministic when parallelization is off diff --git a/weed/shell/command_ec_encode.go b/weed/shell/command_ec_encode.go index afbf90b44..d9e3a88c6 100644 --- a/weed/shell/command_ec_encode.go +++ b/weed/shell/command_ec_encode.go @@ -118,33 +118,42 @@ func (c *commandEcEncode) Do(args []string, commandEnv *CommandEnv, writer io.Wr if err = doEcEncode(commandEnv, *collection, volumeIds, *maxParallelization); err != nil { return fmt.Errorf("ec encode for volumes %v: %v", volumeIds, err) } - // ...then re-balance ec shards. + // ...re-balance ec shards... if err := EcBalance(commandEnv, balanceCollections, "", rp, *maxParallelization, *applyBalancing); err != nil { return fmt.Errorf("re-balance ec shards for collection(s) %v: %v", balanceCollections, err) } + // ...then delete original volumes. + if err := doDeleteVolumes(commandEnv, volumeIds, *maxParallelization); err != nil { + return fmt.Errorf("re-balance ec shards for collection(s) %v: %v", balanceCollections, err) + } return nil } -func doEcEncode(commandEnv *CommandEnv, collection string, volumeIds []needle.VolumeId, maxParallelization int) error { - var ewg *ErrorWaitGroup - - if !commandEnv.isLocked() { - return fmt.Errorf("lock is lost") - } - - // resolve volume locations - locations := map[needle.VolumeId][]wdclient.Location{} +func volumeLocations(commandEnv *CommandEnv, volumeIds []needle.VolumeId) (map[needle.VolumeId][]wdclient.Location, error) { + res := map[needle.VolumeId][]wdclient.Location{} for _, vid := range volumeIds { ls, ok := commandEnv.MasterClient.GetLocationsClone(uint32(vid)) if !ok { - return fmt.Errorf("volume %d not found", vid) + return nil, fmt.Errorf("volume %d not found", vid) } - locations[vid] = ls + res[vid] = ls + } + + return res, nil +} + +func doEcEncode(commandEnv *CommandEnv, collection string, volumeIds []needle.VolumeId, maxParallelization int) error { + if !commandEnv.isLocked() { + return fmt.Errorf("lock is lost") + } + locations, err := volumeLocations(commandEnv, volumeIds) + if err != nil { + return nil } // mark volumes as readonly - ewg = NewErrorWaitGroup(maxParallelization) + ewg := NewErrorWaitGroup(maxParallelization) for _, vid := range volumeIds { for _, l := range locations[vid] { ewg.Add(func() error { @@ -160,9 +169,9 @@ func doEcEncode(commandEnv *CommandEnv, collection string, volumeIds []needle.Vo } // generate ec shards - ewg = NewErrorWaitGroup(maxParallelization) - for _, vid := range volumeIds { - target := locations[vid][0] + ewg.Reset() + for i, vid := range volumeIds { + target := locations[vid][i%len(locations[vid])] ewg.Add(func() error { if err := generateEcShards(commandEnv.option.GrpcDialOption, vid, collection, target.ServerAddress()); err != nil { return fmt.Errorf("generate ec shards for volume %d on %s: %v", vid, target.Url, err) @@ -174,30 +183,13 @@ func doEcEncode(commandEnv *CommandEnv, collection string, volumeIds []needle.Vo return err } - // ask the source volume server to delete the original volume - ewg = NewErrorWaitGroup(maxParallelization) - for _, vid := range volumeIds { - for _, l := range locations[vid] { - ewg.Add(func() error { - if err := deleteVolume(commandEnv.option.GrpcDialOption, vid, l.ServerAddress(), false); err != nil { - return fmt.Errorf("deleteVolume %s volume %d: %v", l.Url, vid, err) - } - fmt.Printf("deleted volume %d from %s\n", vid, l.Url) - return nil - }) - } - } - if err := ewg.Wait(); err != nil { - return err - } - // mount all ec shards for the converted volume shardIds := make([]uint32, erasure_coding.TotalShardsCount) for i := range shardIds { shardIds[i] = uint32(i) } - ewg = NewErrorWaitGroup(maxParallelization) + ewg.Reset() for _, vid := range volumeIds { target := locations[vid][0] ewg.Add(func() error { @@ -214,9 +206,37 @@ func doEcEncode(commandEnv *CommandEnv, collection string, volumeIds []needle.Vo return nil } +func doDeleteVolumes(commandEnv *CommandEnv, volumeIds []needle.VolumeId, maxParallelization int) error { + if !commandEnv.isLocked() { + return fmt.Errorf("lock is lost") + } + locations, err := volumeLocations(commandEnv, volumeIds) + if err != nil { + return nil + } + + ewg := NewErrorWaitGroup(maxParallelization) + for _, vid := range volumeIds { + for _, l := range locations[vid] { + ewg.Add(func() error { + if err := deleteVolume(commandEnv.option.GrpcDialOption, vid, l.ServerAddress(), false); err != nil { + return fmt.Errorf("deleteVolume %s volume %d: %v", l.Url, vid, err) + } + fmt.Printf("deleted volume %d from %s\n", vid, l.Url) + return nil + }) + } + } + if err := ewg.Wait(); err != nil { + return err + } + + return nil +} + func generateEcShards(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, collection string, sourceVolumeServer pb.ServerAddress) error { - fmt.Printf("generateEcShards %s %d on %s ...\n", collection, volumeId, sourceVolumeServer) + fmt.Printf("generateEcShards %d (collection %q) on %s ...\n", volumeId, collection, sourceVolumeServer) err := operation.WithVolumeServerClient(false, sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { _, genErr := volumeServerClient.VolumeEcShardsGenerate(context.Background(), &volume_server_pb.VolumeEcShardsGenerateRequest{