|
|
@ -116,8 +116,10 @@ func (c *commandVolumeTierMove) Do(args []string, commandEnv *CommandEnv, writer |
|
|
|
|
|
|
|
unlock := c.Lock(job.src) |
|
|
|
|
|
|
|
if err := c.doMoveOneVolume(commandEnv, writer, job.vid, toDiskType, locations, job.src, dst, applyChanges); err != nil { |
|
|
|
fmt.Fprintf(writer, "move volume %d %s => %s: %v\n", job.vid, job.src, dst.dataNode.Id, err) |
|
|
|
if applyChanges { |
|
|
|
if err := c.doMoveOneVolume(commandEnv, writer, job.vid, toDiskType, locations, job.src, dst); err != nil { |
|
|
|
fmt.Fprintf(writer, "move volume %d %s => %s: %v\n", job.vid, job.src, dst.dataNode.Id, err) |
|
|
|
} |
|
|
|
} |
|
|
|
unlock() |
|
|
|
} |
|
|
@ -216,34 +218,27 @@ func (c *commandVolumeTierMove) doVolumeTierMove(commandEnv *CommandEnv, writer |
|
|
|
return nil |
|
|
|
} |
|
|
|
|
|
|
|
func (c *commandVolumeTierMove) doMoveOneVolume(commandEnv *CommandEnv, writer io.Writer, vid needle.VolumeId, toDiskType types.DiskType, locations []wdclient.Location, sourceVolumeServer pb.ServerAddress, dst location, applyChanges bool) (err error) { |
|
|
|
func (c *commandVolumeTierMove) doMoveOneVolume(commandEnv *CommandEnv, writer io.Writer, vid needle.VolumeId, toDiskType types.DiskType, locations []wdclient.Location, sourceVolumeServer pb.ServerAddress, dst location) (err error) { |
|
|
|
|
|
|
|
// mark all replicas as read only
|
|
|
|
if applyChanges { |
|
|
|
if err = markVolumeReplicasWritable(commandEnv.option.GrpcDialOption, vid, locations, false); err != nil { |
|
|
|
return fmt.Errorf("mark volume %d as readonly on %s: %v", vid, locations[0].Url, err) |
|
|
|
} |
|
|
|
if err = LiveMoveVolume(commandEnv.option.GrpcDialOption, writer, vid, sourceVolumeServer, pb.NewServerAddressFromDataNode(dst.dataNode), 5*time.Second, toDiskType.ReadableString(), true); err != nil { |
|
|
|
|
|
|
|
// mark all replicas as writable
|
|
|
|
if err = markVolumeReplicasWritable(commandEnv.option.GrpcDialOption, vid, locations, true); err != nil { |
|
|
|
glog.Errorf("mark volume %d as writable on %s: %v", vid, locations[0].Url, err) |
|
|
|
} |
|
|
|
if err = markVolumeReplicasWritable(commandEnv.option.GrpcDialOption, vid, locations, false); err != nil { |
|
|
|
return fmt.Errorf("mark volume %d as readonly on %s: %v", vid, locations[0].Url, err) |
|
|
|
} |
|
|
|
if err = LiveMoveVolume(commandEnv.option.GrpcDialOption, writer, vid, sourceVolumeServer, pb.NewServerAddressFromDataNode(dst.dataNode), 5*time.Second, toDiskType.ReadableString(), true); err != nil { |
|
|
|
|
|
|
|
return fmt.Errorf("move volume %d %s => %s : %v", vid, locations[0].Url, dst.dataNode.Id, err) |
|
|
|
// mark all replicas as writable
|
|
|
|
if err = markVolumeReplicasWritable(commandEnv.option.GrpcDialOption, vid, locations, true); err != nil { |
|
|
|
glog.Errorf("mark volume %d as writable on %s: %v", vid, locations[0].Url, err) |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
// adjust volume count
|
|
|
|
dst.dataNode.DiskInfos[string(toDiskType)].VolumeCount++ |
|
|
|
return fmt.Errorf("move volume %d %s => %s : %v", vid, locations[0].Url, dst.dataNode.Id, err) |
|
|
|
} |
|
|
|
|
|
|
|
// remove the remaining replicas
|
|
|
|
for _, loc := range locations { |
|
|
|
if loc.Url != dst.dataNode.Id && loc.ServerAddress() != sourceVolumeServer { |
|
|
|
if applyChanges { |
|
|
|
if err = deleteVolume(commandEnv.option.GrpcDialOption, vid, loc.ServerAddress()); err != nil { |
|
|
|
fmt.Fprintf(writer, "failed to delete volume %d on %s: %v\n", vid, loc.Url, err) |
|
|
|
} |
|
|
|
if err = deleteVolume(commandEnv.option.GrpcDialOption, vid, loc.ServerAddress()); err != nil { |
|
|
|
fmt.Fprintf(writer, "failed to delete volume %d on %s: %v\n", vid, loc.Url, err) |
|
|
|
} |
|
|
|
// reduce volume count? Not really necessary since they are "more" full and will not be a candidate to move to
|
|
|
|
} |
|
|
|