diff --git a/weed/shell/command_ec_encode.go b/weed/shell/command_ec_encode.go index a1d899d15..afbf90b44 100644 --- a/weed/shell/command_ec_encode.go +++ b/weed/shell/command_ec_encode.go @@ -9,6 +9,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb" + "github.com/seaweedfs/seaweedfs/weed/wdclient" "google.golang.org/grpc" @@ -98,76 +99,74 @@ func (c *commandEcEncode) Do(args []string, commandEnv *CommandEnv, writer io.Wr } } - var collections []string var volumeIds []needle.VolumeId + var balanceCollections []string if vid := needle.VolumeId(*volumeId); vid != 0 { // volumeId is provided volumeIds = append(volumeIds, vid) - collections = collectCollectionsForVolumeIds(topologyInfo, volumeIds) + balanceCollections = collectCollectionsForVolumeIds(topologyInfo, volumeIds) } else { // apply to all volumes for the given collection volumeIds, err = collectVolumeIdsForEcEncode(commandEnv, *collection, *fullPercentage, *quietPeriod) if err != nil { return err } - collections = append(collections, *collection) + balanceCollections = []string{*collection} } // encode all requested volumes... - for _, vid := range volumeIds { - if err = doEcEncode(commandEnv, *collection, vid, *maxParallelization); err != nil { - return fmt.Errorf("ec encode for volume %d: %v", vid, err) - } + if err = doEcEncode(commandEnv, *collection, volumeIds, *maxParallelization); err != nil { + return fmt.Errorf("ec encode for volumes %v: %v", volumeIds, err) } // ...then re-balance ec shards. - if err := EcBalance(commandEnv, collections, "", rp, *maxParallelization, *applyBalancing); err != nil { - return fmt.Errorf("re-balance ec shards for collection(s) %v: %v", collections, err) + if err := EcBalance(commandEnv, balanceCollections, "", rp, *maxParallelization, *applyBalancing); err != nil { + return fmt.Errorf("re-balance ec shards for collection(s) %v: %v", balanceCollections, err) } return nil } -func doEcEncode(commandEnv *CommandEnv, collection string, vid needle.VolumeId, maxParallelization int) error { +func doEcEncode(commandEnv *CommandEnv, collection string, volumeIds []needle.VolumeId, maxParallelization int) error { var ewg *ErrorWaitGroup if !commandEnv.isLocked() { return fmt.Errorf("lock is lost") } - // find volume location - locations, found := commandEnv.MasterClient.GetLocationsClone(uint32(vid)) - if !found { - return fmt.Errorf("volume %d not found", vid) + // resolve volume locations + locations := map[needle.VolumeId][]wdclient.Location{} + for _, vid := range volumeIds { + ls, ok := commandEnv.MasterClient.GetLocationsClone(uint32(vid)) + if !ok { + return fmt.Errorf("volume %d not found", vid) + } + locations[vid] = ls } - target := locations[0] - // mark the volume as readonly + // mark volumes as readonly ewg = NewErrorWaitGroup(maxParallelization) - for _, location := range locations { - ewg.Add(func() error { - if err := markVolumeReplicaWritable(commandEnv.option.GrpcDialOption, vid, location, false, false); err != nil { - return fmt.Errorf("mark volume %d as readonly on %s: %v", vid, location.Url, err) - } - return nil - }) + for _, vid := range volumeIds { + for _, l := range locations[vid] { + ewg.Add(func() error { + if err := markVolumeReplicaWritable(commandEnv.option.GrpcDialOption, vid, l, false, false); err != nil { + return fmt.Errorf("mark volume %d as readonly on %s: %v", vid, l.Url, err) + } + return nil + }) + } } if err := ewg.Wait(); err != nil { return err } // generate ec shards - if err := generateEcShards(commandEnv.option.GrpcDialOption, vid, collection, target.ServerAddress()); err != nil { - return fmt.Errorf("generate ec shards for volume %d on %s: %v", vid, target.Url, err) - } - - // ask the source volume server to delete the original volume ewg = NewErrorWaitGroup(maxParallelization) - for _, location := range locations { + for _, vid := range volumeIds { + target := locations[vid][0] ewg.Add(func() error { - if err := deleteVolume(commandEnv.option.GrpcDialOption, vid, location.ServerAddress(), false); err != nil { - return fmt.Errorf("deleteVolume %s volume %d: %v", location.Url, vid, err) + if err := generateEcShards(commandEnv.option.GrpcDialOption, vid, collection, target.ServerAddress()); err != nil { + return fmt.Errorf("generate ec shards for volume %d on %s: %v", vid, target.Url, err) } - fmt.Printf("deleted volume %d from %s\n", vid, location.Url) return nil }) } @@ -175,13 +174,41 @@ func doEcEncode(commandEnv *CommandEnv, collection string, vid needle.VolumeId, return err } + // ask the source volume server to delete the original volume + ewg = NewErrorWaitGroup(maxParallelization) + for _, vid := range volumeIds { + for _, l := range locations[vid] { + ewg.Add(func() error { + if err := deleteVolume(commandEnv.option.GrpcDialOption, vid, l.ServerAddress(), false); err != nil { + return fmt.Errorf("deleteVolume %s volume %d: %v", l.Url, vid, err) + } + fmt.Printf("deleted volume %d from %s\n", vid, l.Url) + return nil + }) + } + } + if err := ewg.Wait(); err != nil { + return err + } + // mount all ec shards for the converted volume shardIds := make([]uint32, erasure_coding.TotalShardsCount) for i := range shardIds { shardIds[i] = uint32(i) } - if err := mountEcShards(commandEnv.option.GrpcDialOption, collection, vid, target.ServerAddress(), shardIds); err != nil { - return fmt.Errorf("mount ec shards for volume %d on %s: %v", vid, target.Url, err) + + ewg = NewErrorWaitGroup(maxParallelization) + for _, vid := range volumeIds { + target := locations[vid][0] + ewg.Add(func() error { + if err := mountEcShards(commandEnv.option.GrpcDialOption, collection, vid, target.ServerAddress(), shardIds); err != nil { + return fmt.Errorf("mount ec shards for volume %d on %s: %v", vid, target.Url, err) + } + return nil + }) + } + if err := ewg.Wait(); err != nil { + return err } return nil