|
|
@ -5,6 +5,7 @@ import ( |
|
|
|
"fmt" |
|
|
|
"github.com/chrislusf/seaweedfs/weed/pb/master_pb" |
|
|
|
"github.com/chrislusf/seaweedfs/weed/storage/types" |
|
|
|
"github.com/chrislusf/seaweedfs/weed/wdclient" |
|
|
|
"io" |
|
|
|
"time" |
|
|
|
|
|
|
@ -80,6 +81,15 @@ func (c *commandVolumeTierMove) Do(args []string, commandEnv *CommandEnv, writer |
|
|
|
return nil |
|
|
|
} |
|
|
|
|
|
|
|
func isOneOf(server string, locations []wdclient.Location) bool { |
|
|
|
for _, loc := range locations { |
|
|
|
if server == loc.Url { |
|
|
|
return true |
|
|
|
} |
|
|
|
} |
|
|
|
return false |
|
|
|
} |
|
|
|
|
|
|
|
func doVolumeTierMove(commandEnv *CommandEnv, writer io.Writer, collection string, vid needle.VolumeId, toDiskType types.DiskType, allLocations []location, applyChanges bool) (err error) { |
|
|
|
// find volume location
|
|
|
|
locations, found := commandEnv.MasterClient.GetLocations(uint32(vid)) |
|
|
@ -94,6 +104,9 @@ func doVolumeTierMove(commandEnv *CommandEnv, writer io.Writer, collection strin |
|
|
|
for _, dst := range allLocations { |
|
|
|
if fn(dst.dataNode) > 0 { |
|
|
|
// ask the volume server to replicate the volume
|
|
|
|
if isOneOf(dst.dataNode.Id, locations) { |
|
|
|
continue |
|
|
|
} |
|
|
|
sourceVolumeServer := "" |
|
|
|
for _, loc := range locations { |
|
|
|
if loc.Url != dst.dataNode.Id { |
|
|
@ -111,12 +124,21 @@ func doVolumeTierMove(commandEnv *CommandEnv, writer io.Writer, collection strin |
|
|
|
} |
|
|
|
|
|
|
|
// mark all replicas as read only
|
|
|
|
err = markVolumeReadonly(commandEnv.option.GrpcDialOption, vid, locations) |
|
|
|
if err != nil { |
|
|
|
if err = markVolumeReadonly(commandEnv.option.GrpcDialOption, vid, locations); err != nil { |
|
|
|
return fmt.Errorf("mark volume %d as readonly on %s: %v", vid, locations[0].Url, err) |
|
|
|
} |
|
|
|
return LiveMoveVolume(commandEnv.option.GrpcDialOption, vid, sourceVolumeServer, dst.dataNode.Id, 5*time.Second, toDiskType.String()) |
|
|
|
if err = LiveMoveVolume(commandEnv.option.GrpcDialOption, vid, sourceVolumeServer, dst.dataNode.Id, 5*time.Second, toDiskType.String()); err != nil { |
|
|
|
return fmt.Errorf("move volume %d %s => %s : %v", vid, locations[0].Url, dst.dataNode.Id, err) |
|
|
|
} |
|
|
|
|
|
|
|
// remove the remaining replicas
|
|
|
|
for _, loc := range locations { |
|
|
|
if loc.Url != sourceVolumeServer { |
|
|
|
if err = deleteVolume(commandEnv.option.GrpcDialOption, vid, loc.Url); err != nil { |
|
|
|
fmt.Fprintf(writer, "failed to delete volume %d on %s\n", vid, loc.Url) |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|