Browse Source

Better volume.tier.move performance

pull/5637/head
Sakuragawa Misty 10 months ago
parent
commit
93040b8240
  1. 29
      weed/shell/command_volume_tier_move.go

29
weed/shell/command_volume_tier_move.go

@ -64,6 +64,7 @@ func (c *commandVolumeTierMove) Do(args []string, commandEnv *CommandEnv, writer
applyChange := tierCommand.Bool("force", false, "actually apply the changes") applyChange := tierCommand.Bool("force", false, "actually apply the changes")
ioBytePerSecond := tierCommand.Int64("ioBytePerSecond", 0, "limit the speed of move") ioBytePerSecond := tierCommand.Int64("ioBytePerSecond", 0, "limit the speed of move")
replicationString := tierCommand.String("toReplication", "", "the new target replication setting") replicationString := tierCommand.String("toReplication", "", "the new target replication setting")
disableSrcParallelLimit := tierCommand.Bool("disableSrcParallelLimit", false, "limit each src server to only one job")
if err = tierCommand.Parse(args); err != nil { if err = tierCommand.Parse(args); err != nil {
return nil return nil
@ -103,7 +104,8 @@ func (c *commandVolumeTierMove) Do(args []string, commandEnv *CommandEnv, writer
} }
wg := sync.WaitGroup{} wg := sync.WaitGroup{}
bufferLen := len(allLocations)
//bufferLen := len(allLocations)
bufferLen := 0
c.queues = make(map[pb.ServerAddress]chan volumeTierMoveJob) c.queues = make(map[pb.ServerAddress]chan volumeTierMoveJob)
for _, dst := range allLocations { for _, dst := range allLocations {
@ -114,6 +116,9 @@ func (c *commandVolumeTierMove) Do(args []string, commandEnv *CommandEnv, writer
go func(dst location, jobs <-chan volumeTierMoveJob, applyChanges bool) { go func(dst location, jobs <-chan volumeTierMoveJob, applyChanges bool) {
defer wg.Done() defer wg.Done()
for job := range jobs { for job := range jobs {
if job.src == "" && job.vid == 0 {
continue // ignore testing job
}
fmt.Fprintf(writer, "moving volume %d from %s to %s with disk type %s ...\n", job.vid, job.src, dst.dataNode.Id, toDiskType.ReadableString()) fmt.Fprintf(writer, "moving volume %d from %s to %s with disk type %s ...\n", job.vid, job.src, dst.dataNode.Id, toDiskType.ReadableString())
locations, found := commandEnv.MasterClient.GetLocationsClone(uint32(job.vid)) locations, found := commandEnv.MasterClient.GetLocationsClone(uint32(job.vid))
@ -122,11 +127,16 @@ func (c *commandVolumeTierMove) Do(args []string, commandEnv *CommandEnv, writer
continue continue
} }
unlock := c.Lock(job.src)
unlock := func() {}
if !*disableSrcParallelLimit { // when srcParallelLimit disabled, don't do the locking limit
unlock = c.Lock(job.src)
}
if applyChanges { if applyChanges {
if err := c.doMoveOneVolume(commandEnv, writer, job.vid, toDiskType, locations, job.src, dst, *ioBytePerSecond, replicationString); err != nil { if err := c.doMoveOneVolume(commandEnv, writer, job.vid, toDiskType, locations, job.src, dst, *ioBytePerSecond, replicationString); err != nil {
fmt.Fprintf(writer, "move volume %d %s => %s: %v\n", job.vid, job.src, dst.dataNode.Id, err) fmt.Fprintf(writer, "move volume %d %s => %s: %v\n", job.vid, job.src, dst.dataNode.Id, err)
} else {
fmt.Fprintf(writer, "tier moved volume %d %s => %s\n", job.vid, job.src, dst.dataNode.Id)
} }
} }
unlock() unlock()
@ -193,6 +203,7 @@ func (c *commandVolumeTierMove) doVolumeTierMove(commandEnv *CommandEnv, writer
// find one server with the most empty volume slots with target disk type // find one server with the most empty volume slots with target disk type
hasFoundTarget := false hasFoundTarget := false
hasAvailableTarget := false
fn := capacityByFreeVolumeCount(toDiskType) fn := capacityByFreeVolumeCount(toDiskType)
for _, dst := range allLocations { for _, dst := range allLocations {
if fn(dst.dataNode) > 0 && !hasFoundTarget { if fn(dst.dataNode) > 0 && !hasFoundTarget {
@ -209,6 +220,14 @@ func (c *commandVolumeTierMove) doVolumeTierMove(commandEnv *CommandEnv, writer
if sourceVolumeServer == "" { if sourceVolumeServer == "" {
continue continue
} }
hasAvailableTarget = true
select {
case c.queues[pb.NewServerAddressFromDataNode(dst.dataNode)] <- volumeTierMoveJob{"", 0}:
break
default:
continue
}
hasFoundTarget = true hasFoundTarget = true
// adjust volume count // adjust volume count
@ -219,8 +238,12 @@ func (c *commandVolumeTierMove) doVolumeTierMove(commandEnv *CommandEnv, writer
} }
} }
if !hasFoundTarget {
if !hasAvailableTarget {
fmt.Fprintf(writer, "can not find disk type %s for volume %d\n", toDiskType.ReadableString(), vid) fmt.Fprintf(writer, "can not find disk type %s for volume %d\n", toDiskType.ReadableString(), vid)
} else if !hasFoundTarget {
fmt.Fprintf(writer, "waiting for free worker for disk type %s volume %d\n", toDiskType.ReadableString(), vid)
time.Sleep(5 * time.Second)
return c.doVolumeTierMove(commandEnv, writer, vid, toDiskType, allLocations)
} }
return nil return nil

Loading…
Cancel
Save