Browse Source

adjust volume count even when not applying the changes

pull/2631/head
chrislu 3 years ago
parent
commit
b8490fe427
  1. 48
      weed/shell/command_volume_tier_move.go

48
weed/shell/command_volume_tier_move.go

@ -27,7 +27,7 @@ type volumeTierMoveJob struct {
type commandVolumeTierMove struct { type commandVolumeTierMove struct {
activeServers sync.Map activeServers sync.Map
queues map[pb.ServerAddress]chan volumeTierMoveJob
queues map[pb.ServerAddress]chan volumeTierMoveJob
//activeServers map[pb.ServerAddress]struct{} //activeServers map[pb.ServerAddress]struct{}
//activeServersLock sync.Mutex //activeServersLock sync.Mutex
//activeServersCond *sync.Cond //activeServersCond *sync.Cond
@ -40,7 +40,7 @@ func (c *commandVolumeTierMove) Name() string {
func (c *commandVolumeTierMove) Help() string { func (c *commandVolumeTierMove) Help() string {
return `change a volume from one disk type to another return `change a volume from one disk type to another
volume.tier.move -fromDiskType=hdd -toDiskType=ssd [-collectionPattern=""] [-fullPercent=95] [-quietFor=1h]
volume.tier.move -fromDiskType=hdd -toDiskType=ssd [-collectionPattern=""] [-fullPercent=95] [-quietFor=1h] [-parallelLimit=4]
Even if the volume is replicated, only one replica will be changed and the rest replicas will be dropped. Even if the volume is replicated, only one replica will be changed and the rest replicas will be dropped.
So "volume.fix.replication" and "volume.balance" should be followed. So "volume.fix.replication" and "volume.balance" should be followed.
@ -56,7 +56,7 @@ func (c *commandVolumeTierMove) Do(args []string, commandEnv *CommandEnv, writer
quietPeriod := tierCommand.Duration("quietFor", 24*time.Hour, "select volumes without no writes for this period") quietPeriod := tierCommand.Duration("quietFor", 24*time.Hour, "select volumes without no writes for this period")
source := tierCommand.String("fromDiskType", "", "the source disk type") source := tierCommand.String("fromDiskType", "", "the source disk type")
target := tierCommand.String("toDiskType", "", "the target disk type") target := tierCommand.String("toDiskType", "", "the target disk type")
limitWorkers := tierCommand.Int("limitWorkers", 0, "limit the number of active copying workers")
parallelLimit := tierCommand.Int("parallelLimit", 0, "limit the number of parallel copying jobs")
applyChange := tierCommand.Bool("force", false, "actually apply the changes") applyChange := tierCommand.Bool("force", false, "actually apply the changes")
if err = tierCommand.Parse(args); err != nil { if err = tierCommand.Parse(args); err != nil {
return nil return nil
@ -90,8 +90,8 @@ func (c *commandVolumeTierMove) Do(args []string, commandEnv *CommandEnv, writer
allLocations = filterLocationsByDiskType(allLocations, toDiskType) allLocations = filterLocationsByDiskType(allLocations, toDiskType)
keepDataNodesSorted(allLocations, toDiskType) keepDataNodesSorted(allLocations, toDiskType)
if len(allLocations) > 0 && *limitWorkers > 0 && *limitWorkers < len(allLocations) {
allLocations = allLocations[:*limitWorkers]
if len(allLocations) > 0 && *parallelLimit > 0 && *parallelLimit < len(allLocations) {
allLocations = allLocations[:*parallelLimit]
} }
wg := sync.WaitGroup{} wg := sync.WaitGroup{}
@ -103,12 +103,10 @@ func (c *commandVolumeTierMove) Do(args []string, commandEnv *CommandEnv, writer
c.queues[destServerAddress] = make(chan volumeTierMoveJob, bufferLen) c.queues[destServerAddress] = make(chan volumeTierMoveJob, bufferLen)
wg.Add(1) wg.Add(1)
go func (dst location, jobs <-chan volumeTierMoveJob, applyChanges bool) {
go func(dst location, jobs <-chan volumeTierMoveJob, applyChanges bool) {
defer wg.Done()
for job := range jobs { for job := range jobs {
fmt.Fprintf(writer, "moving volume %d from %s to %s with disk type %s ...\n", job.vid, job.src, dst.dataNode.Id, toDiskType.ReadableString()) fmt.Fprintf(writer, "moving volume %d from %s to %s with disk type %s ...\n", job.vid, job.src, dst.dataNode.Id, toDiskType.ReadableString())
if !applyChanges {
continue
}
locations, found := commandEnv.MasterClient.GetLocations(uint32(job.vid)) locations, found := commandEnv.MasterClient.GetLocations(uint32(job.vid))
if !found { if !found {
@ -118,12 +116,11 @@ func (c *commandVolumeTierMove) Do(args []string, commandEnv *CommandEnv, writer
unlock := c.Lock(job.src) unlock := c.Lock(job.src)
if err := c.doMoveOneVolume(commandEnv, writer, job.vid, toDiskType, locations, job.src, dst); err != nil {
if err := c.doMoveOneVolume(commandEnv, writer, job.vid, toDiskType, locations, job.src, dst, applyChanges); err != nil {
fmt.Fprintf(writer, "move volume %d %s => %s: %v\n", job.vid, job.src, dst.dataNode.Id, err) fmt.Fprintf(writer, "move volume %d %s => %s: %v\n", job.vid, job.src, dst.dataNode.Id, err)
} }
unlock() unlock()
} }
wg.Done()
}(dst, c.queues[destServerAddress], *applyChange) }(dst, c.queues[destServerAddress], *applyChange)
} }
@ -219,20 +216,22 @@ func (c *commandVolumeTierMove) doVolumeTierMove(commandEnv *CommandEnv, writer
return nil return nil
} }
func (c *commandVolumeTierMove) doMoveOneVolume(commandEnv *CommandEnv, writer io.Writer, vid needle.VolumeId, toDiskType types.DiskType, locations []wdclient.Location, sourceVolumeServer pb.ServerAddress, dst location) (err error) {
func (c *commandVolumeTierMove) doMoveOneVolume(commandEnv *CommandEnv, writer io.Writer, vid needle.VolumeId, toDiskType types.DiskType, locations []wdclient.Location, sourceVolumeServer pb.ServerAddress, dst location, applyChanges bool) (err error) {
// mark all replicas as read only // mark all replicas as read only
if err = markVolumeReplicasWritable(commandEnv.option.GrpcDialOption, vid, locations, false); err != nil {
return fmt.Errorf("mark volume %d as readonly on %s: %v", vid, locations[0].Url, err)
}
if err = LiveMoveVolume(commandEnv.option.GrpcDialOption, writer, vid, sourceVolumeServer, pb.NewServerAddressFromDataNode(dst.dataNode), 5*time.Second, toDiskType.ReadableString(), true); err != nil {
// mark all replicas as writable
if err = markVolumeReplicasWritable(commandEnv.option.GrpcDialOption, vid, locations, true); err != nil {
glog.Errorf("mark volume %d as writable on %s: %v", vid, locations[0].Url, err)
if applyChanges {
if err = markVolumeReplicasWritable(commandEnv.option.GrpcDialOption, vid, locations, false); err != nil {
return fmt.Errorf("mark volume %d as readonly on %s: %v", vid, locations[0].Url, err)
} }
if err = LiveMoveVolume(commandEnv.option.GrpcDialOption, writer, vid, sourceVolumeServer, pb.NewServerAddressFromDataNode(dst.dataNode), 5*time.Second, toDiskType.ReadableString(), true); err != nil {
return fmt.Errorf("move volume %d %s => %s : %v", vid, locations[0].Url, dst.dataNode.Id, err)
// mark all replicas as writable
if err = markVolumeReplicasWritable(commandEnv.option.GrpcDialOption, vid, locations, true); err != nil {
glog.Errorf("mark volume %d as writable on %s: %v", vid, locations[0].Url, err)
}
return fmt.Errorf("move volume %d %s => %s : %v", vid, locations[0].Url, dst.dataNode.Id, err)
}
} }
// adjust volume count // adjust volume count
@ -241,9 +240,12 @@ func (c *commandVolumeTierMove) doMoveOneVolume(commandEnv *CommandEnv, writer i
// remove the remaining replicas // remove the remaining replicas
for _, loc := range locations { for _, loc := range locations {
if loc.Url != dst.dataNode.Id && loc.ServerAddress() != sourceVolumeServer { if loc.Url != dst.dataNode.Id && loc.ServerAddress() != sourceVolumeServer {
if err = deleteVolume(commandEnv.option.GrpcDialOption, vid, loc.ServerAddress()); err != nil {
fmt.Fprintf(writer, "failed to delete volume %d on %s: %v\n", vid, loc.Url, err)
if applyChanges {
if err = deleteVolume(commandEnv.option.GrpcDialOption, vid, loc.ServerAddress()); err != nil {
fmt.Fprintf(writer, "failed to delete volume %d on %s: %v\n", vid, loc.Url, err)
}
} }
// reduce volume count? Not really necessary since they are "more" full and will not be a candidate to move to
} }
} }
return nil return nil

Loading…
Cancel
Save