Browse Source

volume: add capability to change disk type when moving a volume

pull/1794/head
Chris Lu 4 years ago
parent
commit
770393a48c
  1. 1
      weed/pb/volume_server.proto
  2. 1252
      weed/pb/volume_server_pb/volume_server.pb.go
  3. 6
      weed/server/volume_grpc_copy.go
  4. 2
      weed/shell/command_volume_balance.go
  5. 2
      weed/shell/command_volume_copy.go
  6. 13
      weed/shell/command_volume_move.go

1
weed/pb/volume_server.proto

@ -234,6 +234,7 @@ message VolumeCopyRequest {
string replication = 3;
string ttl = 4;
string source_data_node = 5;
string disk_type = 6;
}
message VolumeCopyResponse {
uint64 last_append_at_ns = 1;

1252
weed/pb/volume_server_pb/volume_server.pb.go
File diff suppressed because it is too large
View File

6
weed/server/volume_grpc_copy.go

@ -54,7 +54,11 @@ func (vs *VolumeServer) VolumeCopy(ctx context.Context, req *volume_server_pb.Vo
return fmt.Errorf("read volume file status failed, %v", err)
}
location := vs.store.FindFreeLocation(storage.DiskType(volFileInfoResp.DiskType))
diskType := volFileInfoResp.DiskType
if req.DiskType != "" {
diskType = req.DiskType
}
location := vs.store.FindFreeLocation(storage.DiskType(diskType))
if location == nil {
return fmt.Errorf("no space left")
}

2
weed/shell/command_volume_balance.go

@ -332,7 +332,7 @@ func moveVolume(commandEnv *CommandEnv, v *master_pb.VolumeInformationMessage, f
}
fmt.Fprintf(os.Stdout, " moving %s volume %s%d %s => %s\n", v.DiskType, collectionPrefix, v.Id, fullNode.info.Id, emptyNode.info.Id)
if applyChange {
return LiveMoveVolume(commandEnv.option.GrpcDialOption, needle.VolumeId(v.Id), fullNode.info.Id, emptyNode.info.Id, 5*time.Second)
return LiveMoveVolume(commandEnv.option.GrpcDialOption, needle.VolumeId(v.Id), fullNode.info.Id, emptyNode.info.Id, 5*time.Second, "")
}
return nil
}

2
weed/shell/command_volume_copy.go

@ -52,6 +52,6 @@ func (c *commandVolumeCopy) Do(args []string, commandEnv *CommandEnv, writer io.
return fmt.Errorf("source and target volume servers are the same!")
}
_, err = copyVolume(commandEnv.option.GrpcDialOption, volumeId, sourceVolumeServer, targetVolumeServer)
_, err = copyVolume(commandEnv.option.GrpcDialOption, volumeId, sourceVolumeServer, targetVolumeServer, "")
return
}

13
weed/shell/command_volume_move.go

@ -29,6 +29,7 @@ func (c *commandVolumeMove) Help() string {
return `move a live volume from one volume server to another volume server
volume.move -source <source volume server host:port> -target <target volume server host:port> -volumeId <volume id>
volume.move -source <source volume server host:port> -target <target volume server host:port> -volumeId <volume id> -disk [ssd|hdd]
This command move a live volume from one volume server to another volume server. Here are the steps:
@ -40,6 +41,8 @@ func (c *commandVolumeMove) Help() string {
Now the master will mark this volume id as writable.
5. This command asks the source volume server to delete the source volume
The option "-disk [ssd|hdd]" can be used to change the volume disk type.
`
}
@ -53,6 +56,7 @@ func (c *commandVolumeMove) Do(args []string, commandEnv *CommandEnv, writer io.
volumeIdInt := volMoveCommand.Int("volumeId", 0, "the volume id")
sourceNodeStr := volMoveCommand.String("source", "", "the source volume server <host>:<port>")
targetNodeStr := volMoveCommand.String("target", "", "the target volume server <host>:<port>")
diskTypeStr := volMoveCommand.String("disk", "", "[ssd|hdd] overwrite the original disk type")
if err = volMoveCommand.Parse(args); err != nil {
return nil
}
@ -65,14 +69,14 @@ func (c *commandVolumeMove) Do(args []string, commandEnv *CommandEnv, writer io.
return fmt.Errorf("source and target volume servers are the same!")
}
return LiveMoveVolume(commandEnv.option.GrpcDialOption, volumeId, sourceVolumeServer, targetVolumeServer, 5*time.Second)
return LiveMoveVolume(commandEnv.option.GrpcDialOption, volumeId, sourceVolumeServer, targetVolumeServer, 5*time.Second, *diskTypeStr)
}
// LiveMoveVolume moves one volume from one source volume server to one target volume server, with idleTimeout to drain the incoming requests.
func LiveMoveVolume(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer, targetVolumeServer string, idleTimeout time.Duration) (err error) {
func LiveMoveVolume(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer, targetVolumeServer string, idleTimeout time.Duration, diskType string) (err error) {
log.Printf("copying volume %d from %s to %s", volumeId, sourceVolumeServer, targetVolumeServer)
lastAppendAtNs, err := copyVolume(grpcDialOption, volumeId, sourceVolumeServer, targetVolumeServer)
lastAppendAtNs, err := copyVolume(grpcDialOption, volumeId, sourceVolumeServer, targetVolumeServer, diskType)
if err != nil {
return fmt.Errorf("copy volume %d from %s to %s: %v", volumeId, sourceVolumeServer, targetVolumeServer, err)
}
@ -91,7 +95,7 @@ func LiveMoveVolume(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, so
return nil
}
func copyVolume(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer, targetVolumeServer string) (lastAppendAtNs uint64, err error) {
func copyVolume(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer, targetVolumeServer string, diskType string) (lastAppendAtNs uint64, err error) {
// check to see if the volume is already read-only and if its not then we need
// to mark it as read-only and then before we return we need to undo what we
@ -134,6 +138,7 @@ func copyVolume(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, source
resp, replicateErr := volumeServerClient.VolumeCopy(context.Background(), &volume_server_pb.VolumeCopyRequest{
VolumeId: uint32(volumeId),
SourceDataNode: sourceVolumeServer,
DiskType: diskType,
})
if replicateErr == nil {
lastAppendAtNs = resp.LastAppendAtNs

Loading…
Cancel
Save