You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

188 lines
7.6 KiB

6 years ago
6 years ago
6 years ago
  1. package shell
  2. import (
  3. "context"
  4. "flag"
  5. "fmt"
  6. "io"
  7. "log"
  8. "time"
  9. "github.com/chrislusf/seaweedfs/weed/operation"
  10. "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
  11. "github.com/chrislusf/seaweedfs/weed/storage/needle"
  12. "google.golang.org/grpc"
  13. )
  14. func init() {
  15. Commands = append(Commands, &commandVolumeMove{})
  16. }
  17. type commandVolumeMove struct {
  18. }
  19. func (c *commandVolumeMove) Name() string {
  20. return "volume.move"
  21. }
  22. func (c *commandVolumeMove) Help() string {
  23. return `move a live volume from one volume server to another volume server
  24. volume.move -source <source volume server host:port> -target <target volume server host:port> -volumeId <volume id>
  25. volume.move -source <source volume server host:port> -target <target volume server host:port> -volumeId <volume id> -disk [ssd|hdd]
  26. This command move a live volume from one volume server to another volume server. Here are the steps:
  27. 1. This command asks the target volume server to copy the source volume from source volume server, remember the last entry's timestamp.
  28. 2. This command asks the target volume server to mount the new volume
  29. Now the master will mark this volume id as readonly.
  30. 3. This command asks the target volume server to tail the source volume for updates after the timestamp, for 1 minutes to drain the requests.
  31. 4. This command asks the source volume server to unmount the source volume
  32. Now the master will mark this volume id as writable.
  33. 5. This command asks the source volume server to delete the source volume
  34. The option "-disk [ssd|hdd]" can be used to change the volume disk type.
  35. `
  36. }
  37. func (c *commandVolumeMove) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
  38. if err = commandEnv.confirmIsLocked(); err != nil {
  39. return
  40. }
  41. volMoveCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
  42. volumeIdInt := volMoveCommand.Int("volumeId", 0, "the volume id")
  43. sourceNodeStr := volMoveCommand.String("source", "", "the source volume server <host>:<port>")
  44. targetNodeStr := volMoveCommand.String("target", "", "the target volume server <host>:<port>")
  45. diskTypeStr := volMoveCommand.String("disk", "", "[ssd|hdd] overwrite the original disk type")
  46. if err = volMoveCommand.Parse(args); err != nil {
  47. return nil
  48. }
  49. sourceVolumeServer, targetVolumeServer := *sourceNodeStr, *targetNodeStr
  50. volumeId := needle.VolumeId(*volumeIdInt)
  51. if sourceVolumeServer == targetVolumeServer {
  52. return fmt.Errorf("source and target volume servers are the same!")
  53. }
  54. return LiveMoveVolume(commandEnv.option.GrpcDialOption, volumeId, sourceVolumeServer, targetVolumeServer, 5*time.Second, *diskTypeStr)
  55. }
  56. // LiveMoveVolume moves one volume from one source volume server to one target volume server, with idleTimeout to drain the incoming requests.
  57. func LiveMoveVolume(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer, targetVolumeServer string, idleTimeout time.Duration, diskType string) (err error) {
  58. log.Printf("copying volume %d from %s to %s", volumeId, sourceVolumeServer, targetVolumeServer)
  59. lastAppendAtNs, err := copyVolume(grpcDialOption, volumeId, sourceVolumeServer, targetVolumeServer, diskType)
  60. if err != nil {
  61. return fmt.Errorf("copy volume %d from %s to %s: %v", volumeId, sourceVolumeServer, targetVolumeServer, err)
  62. }
  63. log.Printf("tailing volume %d from %s to %s", volumeId, sourceVolumeServer, targetVolumeServer)
  64. if err = tailVolume(grpcDialOption, volumeId, sourceVolumeServer, targetVolumeServer, lastAppendAtNs, idleTimeout); err != nil {
  65. return fmt.Errorf("tail volume %d from %s to %s: %v", volumeId, sourceVolumeServer, targetVolumeServer, err)
  66. }
  67. log.Printf("deleting volume %d from %s", volumeId, sourceVolumeServer)
  68. if err = deleteVolume(grpcDialOption, volumeId, sourceVolumeServer); err != nil {
  69. return fmt.Errorf("delete volume %d from %s: %v", volumeId, sourceVolumeServer, err)
  70. }
  71. log.Printf("moved volume %d from %s to %s", volumeId, sourceVolumeServer, targetVolumeServer)
  72. return nil
  73. }
  74. func copyVolume(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer, targetVolumeServer string, diskType string) (lastAppendAtNs uint64, err error) {
  75. // check to see if the volume is already read-only and if its not then we need
  76. // to mark it as read-only and then before we return we need to undo what we
  77. // did
  78. var shouldMarkWritable bool
  79. defer func() {
  80. if !shouldMarkWritable {
  81. return
  82. }
  83. clientErr := operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
  84. _, writableErr := volumeServerClient.VolumeMarkWritable(context.Background(), &volume_server_pb.VolumeMarkWritableRequest{
  85. VolumeId: uint32(volumeId),
  86. })
  87. return writableErr
  88. })
  89. if clientErr != nil {
  90. log.Printf("failed to mark volume %d as writable after copy from %s: %v", volumeId, sourceVolumeServer, clientErr)
  91. }
  92. }()
  93. err = operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
  94. resp, statusErr := volumeServerClient.VolumeStatus(context.Background(), &volume_server_pb.VolumeStatusRequest{
  95. VolumeId: uint32(volumeId),
  96. })
  97. if statusErr == nil && !resp.IsReadOnly {
  98. shouldMarkWritable = true
  99. _, readonlyErr := volumeServerClient.VolumeMarkReadonly(context.Background(), &volume_server_pb.VolumeMarkReadonlyRequest{
  100. VolumeId: uint32(volumeId),
  101. })
  102. return readonlyErr
  103. }
  104. return statusErr
  105. })
  106. if err != nil {
  107. return
  108. }
  109. err = operation.WithVolumeServerClient(targetVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
  110. resp, replicateErr := volumeServerClient.VolumeCopy(context.Background(), &volume_server_pb.VolumeCopyRequest{
  111. VolumeId: uint32(volumeId),
  112. SourceDataNode: sourceVolumeServer,
  113. DiskType: diskType,
  114. })
  115. if replicateErr == nil {
  116. lastAppendAtNs = resp.LastAppendAtNs
  117. }
  118. return replicateErr
  119. })
  120. return
  121. }
  122. func tailVolume(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer, targetVolumeServer string, lastAppendAtNs uint64, idleTimeout time.Duration) (err error) {
  123. return operation.WithVolumeServerClient(targetVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
  124. _, replicateErr := volumeServerClient.VolumeTailReceiver(context.Background(), &volume_server_pb.VolumeTailReceiverRequest{
  125. VolumeId: uint32(volumeId),
  126. SinceNs: lastAppendAtNs,
  127. IdleTimeoutSeconds: uint32(idleTimeout.Seconds()),
  128. SourceVolumeServer: sourceVolumeServer,
  129. })
  130. return replicateErr
  131. })
  132. }
  133. func deleteVolume(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer string) (err error) {
  134. return operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
  135. _, deleteErr := volumeServerClient.VolumeDelete(context.Background(), &volume_server_pb.VolumeDeleteRequest{
  136. VolumeId: uint32(volumeId),
  137. })
  138. return deleteErr
  139. })
  140. }
  141. func markVolumeWritable(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer string, writable bool) (err error) {
  142. return operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
  143. if writable {
  144. _, err = volumeServerClient.VolumeMarkWritable(context.Background(), &volume_server_pb.VolumeMarkWritableRequest{
  145. VolumeId: uint32(volumeId),
  146. })
  147. } else {
  148. _, err = volumeServerClient.VolumeMarkReadonly(context.Background(), &volume_server_pb.VolumeMarkReadonlyRequest{
  149. VolumeId: uint32(volumeId),
  150. })
  151. }
  152. return err
  153. })
  154. }