You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

155 lines
3.9 KiB

  1. package weed_server
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/chrislusf/seaweedfs/weed/operation"
  6. "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
  7. "github.com/chrislusf/seaweedfs/weed/storage"
  8. "io"
  9. "os"
  10. )
  11. func (vs *VolumeServer) ReplicateVolume(ctx context.Context, req *volume_server_pb.ReplicateVolumeRequest) (*volume_server_pb.ReplicateVolumeResponse, error) {
  12. v := vs.store.GetVolume(storage.VolumeId(req.VolumeId))
  13. if v != nil {
  14. // unmount the volume
  15. err := vs.store.UnmountVolume(storage.VolumeId(req.VolumeId))
  16. if err != nil {
  17. return nil, fmt.Errorf("failed to unmount volume %d: %v", req.VolumeId, err)
  18. }
  19. }
  20. location := vs.store.FindFreeLocation()
  21. if location == nil {
  22. return nil, fmt.Errorf("no space left")
  23. }
  24. volumeFileName := storage.VolumeFileName(req.Collection, location.Directory, int(req.VolumeId))
  25. // the master will not start compaction for read-only volumes, so it is safe to just copy files directly
  26. // copy .dat and .idx files
  27. // read .idx .dat file size and timestamp
  28. // send .idx file
  29. // send .dat file
  30. // confirm size and timestamp
  31. err := operation.WithVolumeServerClient(req.SourceDataNode, vs.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
  32. // TODO read file sizes before copying
  33. client.ReadVolumeFileStatus(ctx, &volume_server_pb.ReadVolumeFileStatusRequest{})
  34. copyFileClient, err := client.CopyFile(ctx, &volume_server_pb.CopyFileRequest{
  35. VolumeId: req.VolumeId,
  36. IsIdxFile: true,
  37. })
  38. if err != nil {
  39. return fmt.Errorf("failed to start copying volume %d idx file: %v", req.VolumeId, err)
  40. }
  41. err = writeToFile(copyFileClient, volumeFileName+".idx")
  42. if err != nil {
  43. return fmt.Errorf("failed to copy volume %d idx file: %v", req.VolumeId, err)
  44. }
  45. copyFileClient, err = client.CopyFile(ctx, &volume_server_pb.CopyFileRequest{
  46. VolumeId: req.VolumeId,
  47. IsDatFile: true,
  48. })
  49. if err != nil {
  50. return fmt.Errorf("failed to start copying volume %d dat file: %v", req.VolumeId, err)
  51. }
  52. err = writeToFile(copyFileClient, volumeFileName+".dat")
  53. if err != nil {
  54. return fmt.Errorf("failed to copy volume %d dat file: %v", req.VolumeId, err)
  55. }
  56. return nil
  57. })
  58. if err != nil {
  59. return nil, err
  60. }
  61. // TODO: check the timestamp and size
  62. // mount the volume
  63. err = vs.store.MountVolume(storage.VolumeId(req.VolumeId))
  64. if err != nil {
  65. return nil, fmt.Errorf("failed to mount volume %d: %v", req.VolumeId, err)
  66. }
  67. return &volume_server_pb.ReplicateVolumeResponse{}, err
  68. }
  69. func writeToFile(client volume_server_pb.VolumeServer_CopyFileClient, fileName string) error {
  70. println("writing to ", fileName)
  71. dst, err := os.OpenFile(fileName, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644)
  72. if err != nil {
  73. return nil
  74. }
  75. defer dst.Close()
  76. for {
  77. resp, receiveErr := client.Recv()
  78. if receiveErr == io.EOF {
  79. break
  80. }
  81. if receiveErr != nil {
  82. return fmt.Errorf("receiving %s: %v", fileName, receiveErr)
  83. }
  84. dst.Write(resp.FileContent)
  85. }
  86. return nil
  87. }
  88. func (vs *VolumeServer) ReadVolumeFileStatus(ctx context.Context, req *volume_server_pb.ReadVolumeFileStatusRequest) (*volume_server_pb.ReadVolumeFileStatusResponse, error) {
  89. resp := &volume_server_pb.ReadVolumeFileStatusResponse{}
  90. return resp, nil
  91. }
  92. func (vs *VolumeServer) CopyFile(req *volume_server_pb.CopyFileRequest, stream volume_server_pb.VolumeServer_CopyFileServer) error {
  93. v := vs.store.GetVolume(storage.VolumeId(req.VolumeId))
  94. if v == nil {
  95. return fmt.Errorf("not found volume id %d", req.VolumeId)
  96. }
  97. const BufferSize = 1024 * 16
  98. var fileName = v.FileName()
  99. if req.IsDatFile {
  100. fileName += ".dat"
  101. } else if req.IsIdxFile {
  102. fileName += ".idx"
  103. }
  104. file, err := os.Open(fileName)
  105. if err != nil {
  106. return err
  107. }
  108. defer file.Close()
  109. buffer := make([]byte, BufferSize)
  110. for {
  111. bytesread, err := file.Read(buffer)
  112. if err != nil {
  113. if err != io.EOF {
  114. return err
  115. }
  116. break
  117. }
  118. stream.Send(&volume_server_pb.CopyFileResponse{
  119. FileContent: buffer[:bytesread],
  120. })
  121. }
  122. return nil
  123. }