Browse Source

Added ability to change replication settings upon volume.tier.move (#3583)

pull/3585/head
Brian 2 years ago
committed by GitHub
parent
commit
a28b668647
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 35
      weed/shell/command_volume_tier_move.go

35
weed/shell/command_volume_tier_move.go

@ -12,7 +12,11 @@ import (
"path/filepath" "path/filepath"
"sync" "sync"
"time" "time"
"context"
"errors"
"github.com/seaweedfs/seaweedfs/weed/operation"
"github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
"github.com/seaweedfs/seaweedfs/weed/storage/needle" "github.com/seaweedfs/seaweedfs/weed/storage/needle"
) )
@ -40,7 +44,7 @@ func (c *commandVolumeTierMove) Name() string {
func (c *commandVolumeTierMove) Help() string { func (c *commandVolumeTierMove) Help() string {
return `change a volume from one disk type to another return `change a volume from one disk type to another
volume.tier.move -fromDiskType=hdd -toDiskType=ssd [-collectionPattern=""] [-fullPercent=95] [-quietFor=1h] [-parallelLimit=4]
volume.tier.move -fromDiskType=hdd -toDiskType=ssd [-collectionPattern=""] [-fullPercent=95] [-quietFor=1h] [-parallelLimit=4] [-toReplication=XYZ]
Even if the volume is replicated, only one replica will be changed and the rest replicas will be dropped. Even if the volume is replicated, only one replica will be changed and the rest replicas will be dropped.
So "volume.fix.replication" and "volume.balance" should be followed. So "volume.fix.replication" and "volume.balance" should be followed.
@ -59,6 +63,8 @@ func (c *commandVolumeTierMove) Do(args []string, commandEnv *CommandEnv, writer
parallelLimit := tierCommand.Int("parallelLimit", 0, "limit the number of parallel copying jobs") parallelLimit := tierCommand.Int("parallelLimit", 0, "limit the number of parallel copying jobs")
applyChange := tierCommand.Bool("force", false, "actually apply the changes") applyChange := tierCommand.Bool("force", false, "actually apply the changes")
ioBytePerSecond := tierCommand.Int64("ioBytePerSecond", 0, "limit the speed of move") ioBytePerSecond := tierCommand.Int64("ioBytePerSecond", 0, "limit the speed of move")
replicationString := tierCommand.String("toReplication", "", "the new target replication setting");
if err = tierCommand.Parse(args); err != nil { if err = tierCommand.Parse(args); err != nil {
return nil return nil
} }
@ -119,7 +125,7 @@ func (c *commandVolumeTierMove) Do(args []string, commandEnv *CommandEnv, writer
unlock := c.Lock(job.src) unlock := c.Lock(job.src)
if applyChanges { if applyChanges {
if err := c.doMoveOneVolume(commandEnv, writer, job.vid, toDiskType, locations, job.src, dst, *ioBytePerSecond); err != nil {
if err := c.doMoveOneVolume(commandEnv, writer, job.vid, toDiskType, locations, job.src, dst, *ioBytePerSecond, replicationString); err != nil {
fmt.Fprintf(writer, "move volume %d %s => %s: %v\n", job.vid, job.src, dst.dataNode.Id, err) fmt.Fprintf(writer, "move volume %d %s => %s: %v\n", job.vid, job.src, dst.dataNode.Id, err)
} }
} }
@ -220,7 +226,7 @@ func (c *commandVolumeTierMove) doVolumeTierMove(commandEnv *CommandEnv, writer
return nil return nil
} }
func (c *commandVolumeTierMove) doMoveOneVolume(commandEnv *CommandEnv, writer io.Writer, vid needle.VolumeId, toDiskType types.DiskType, locations []wdclient.Location, sourceVolumeServer pb.ServerAddress, dst location, ioBytePerSecond int64) (err error) {
func (c *commandVolumeTierMove) doMoveOneVolume(commandEnv *CommandEnv, writer io.Writer, vid needle.VolumeId, toDiskType types.DiskType, locations []wdclient.Location, sourceVolumeServer pb.ServerAddress, dst location, ioBytePerSecond int64, replicationString *string ) (err error) {
if !commandEnv.isLocked() { if !commandEnv.isLocked() {
return fmt.Errorf("lock is lost") return fmt.Errorf("lock is lost")
@ -230,8 +236,9 @@ func (c *commandVolumeTierMove) doMoveOneVolume(commandEnv *CommandEnv, writer i
if err = markVolumeReplicasWritable(commandEnv.option.GrpcDialOption, vid, locations, false); err != nil { if err = markVolumeReplicasWritable(commandEnv.option.GrpcDialOption, vid, locations, false); err != nil {
return fmt.Errorf("mark volume %d as readonly on %s: %v", vid, locations[0].Url, err) return fmt.Errorf("mark volume %d as readonly on %s: %v", vid, locations[0].Url, err)
} }
if err = LiveMoveVolume(commandEnv.option.GrpcDialOption, writer, vid, sourceVolumeServer, pb.NewServerAddressFromDataNode(dst.dataNode), 5*time.Second, toDiskType.ReadableString(), ioBytePerSecond, true); err != nil {
newAddress := pb.NewServerAddressFromDataNode(dst.dataNode)
if err = LiveMoveVolume(commandEnv.option.GrpcDialOption, writer, vid, sourceVolumeServer, newAddress, 5*time.Second, toDiskType.ReadableString(), ioBytePerSecond, true); err != nil {
// mark all replicas as writable // mark all replicas as writable
if err = markVolumeReplicasWritable(commandEnv.option.GrpcDialOption, vid, locations, true); err != nil { if err = markVolumeReplicasWritable(commandEnv.option.GrpcDialOption, vid, locations, true); err != nil {
glog.Errorf("mark volume %d as writable on %s: %v", vid, locations[0].Url, err) glog.Errorf("mark volume %d as writable on %s: %v", vid, locations[0].Url, err)
@ -240,6 +247,26 @@ func (c *commandVolumeTierMove) doMoveOneVolume(commandEnv *CommandEnv, writer i
return fmt.Errorf("move volume %d %s => %s : %v", vid, locations[0].Url, dst.dataNode.Id, err) return fmt.Errorf("move volume %d %s => %s : %v", vid, locations[0].Url, dst.dataNode.Id, err)
} }
// If move is successful and replication is not empty, alter moved volume's replication setting
if *replicationString != "" {
err = operation.WithVolumeServerClient(false, newAddress, commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
resp, configureErr := volumeServerClient.VolumeConfigure(context.Background(), &volume_server_pb.VolumeConfigureRequest{
VolumeId: uint32(vid),
Replication: *replicationString,
})
if configureErr != nil {
return configureErr
}
if resp.Error != "" {
return errors.New(resp.Error)
}
return nil
})
if err != nil {
glog.Errorf("update volume %d replication on %s: %v", vid, locations[0].Url, err)
}
}
// remove the remaining replicas // remove the remaining replicas
for _, loc := range locations { for _, loc := range locations {
if loc.Url != dst.dataNode.Id && loc.ServerAddress() != sourceVolumeServer { if loc.Url != dst.dataNode.Id && loc.ServerAddress() != sourceVolumeServer {

Loading…
Cancel
Save