You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

199 lines
5.6 KiB

  1. package weed_server
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/chrislusf/seaweedfs/weed/glog"
  6. "github.com/chrislusf/seaweedfs/weed/operation"
  7. "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
  8. "github.com/chrislusf/seaweedfs/weed/storage"
  9. "io"
  10. "os"
  11. )
  12. func (vs *VolumeServer) ReplicateVolume(ctx context.Context, req *volume_server_pb.ReplicateVolumeRequest) (*volume_server_pb.ReplicateVolumeResponse, error) {
  13. v := vs.store.GetVolume(storage.VolumeId(req.VolumeId))
  14. if v != nil {
  15. // unmount the volume
  16. err := vs.store.UnmountVolume(storage.VolumeId(req.VolumeId))
  17. if err != nil {
  18. return nil, fmt.Errorf("failed to unmount volume %d: %v", req.VolumeId, err)
  19. }
  20. }
  21. location := vs.store.FindFreeLocation()
  22. if location == nil {
  23. return nil, fmt.Errorf("no space left")
  24. }
  25. volumeFileName := storage.VolumeFileName(req.Collection, location.Directory, int(req.VolumeId))
  26. // the master will not start compaction for read-only volumes, so it is safe to just copy files directly
  27. // copy .dat and .idx files
  28. // read .idx .dat file size and timestamp
  29. // send .idx file
  30. // send .dat file
  31. // confirm size and timestamp
  32. var volFileInfoResp *volume_server_pb.ReadVolumeFileStatusResponse
  33. datFileName := volumeFileName + ".dat"
  34. idxFileName := volumeFileName + ".idx"
  35. err := operation.WithVolumeServerClient(req.SourceDataNode, vs.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
  36. var err error
  37. volFileInfoResp, err = client.ReadVolumeFileStatus(ctx,
  38. &volume_server_pb.ReadVolumeFileStatusRequest{
  39. VolumeId: req.VolumeId,
  40. })
  41. if nil != err {
  42. return fmt.Errorf("read volume file status failed, %v", err)
  43. }
  44. copyFileClient, err := client.CopyFile(ctx, &volume_server_pb.CopyFileRequest{
  45. VolumeId: req.VolumeId,
  46. IsIdxFile: true,
  47. })
  48. if err != nil {
  49. return fmt.Errorf("failed to start copying volume %d idx file: %v", req.VolumeId, err)
  50. }
  51. err = writeToFile(copyFileClient, idxFileName)
  52. if err != nil {
  53. return fmt.Errorf("failed to copy volume %d idx file: %v", req.VolumeId, err)
  54. }
  55. copyFileClient, err = client.CopyFile(ctx, &volume_server_pb.CopyFileRequest{
  56. VolumeId: req.VolumeId,
  57. IsDatFile: true,
  58. })
  59. if err != nil {
  60. return fmt.Errorf("failed to start copying volume %d dat file: %v", req.VolumeId, err)
  61. }
  62. err = writeToFile(copyFileClient, datFileName)
  63. if err != nil {
  64. return fmt.Errorf("failed to copy volume %d dat file: %v", req.VolumeId, err)
  65. }
  66. return nil
  67. })
  68. if err != nil {
  69. os.Remove(idxFileName)
  70. os.Remove(datFileName)
  71. return nil, err
  72. }
  73. if err = checkCopyFiles(volFileInfoResp, idxFileName, datFileName); err != nil { // added by panyc16
  74. return nil, err
  75. }
  76. // mount the volume
  77. err = vs.store.MountVolume(storage.VolumeId(req.VolumeId))
  78. if err != nil {
  79. return nil, fmt.Errorf("failed to mount volume %d: %v", req.VolumeId, err)
  80. }
  81. return &volume_server_pb.ReplicateVolumeResponse{}, err
  82. }
  83. /**
  84. only check the the differ of the file size
  85. todo: maybe should check the received count and deleted count of the volume
  86. */
  87. func checkCopyFiles(originFileInf *volume_server_pb.ReadVolumeFileStatusResponse, idxFileName, datFileName string) error {
  88. stat, err := os.Stat(idxFileName)
  89. if err != nil {
  90. return fmt.Errorf("get idx file info failed, %v", err)
  91. }
  92. if originFileInf.IdxFileSize != uint64(stat.Size()) {
  93. return fmt.Errorf("the idx file size [%v] is not same as origin file size [%v]",
  94. stat.Size(), originFileInf.IdxFileSize)
  95. }
  96. stat, err = os.Stat(datFileName)
  97. if err != nil {
  98. return fmt.Errorf("get dat file info failed, %v", err)
  99. }
  100. if originFileInf.DatFileSize != uint64(stat.Size()) {
  101. return fmt.Errorf("the dat file size [%v] is not same as origin file size [%v]",
  102. stat.Size(), originFileInf.DatFileSize)
  103. }
  104. return nil
  105. }
  106. func writeToFile(client volume_server_pb.VolumeServer_CopyFileClient, fileName string) error {
  107. glog.V(4).Infof("writing to %s", fileName)
  108. dst, err := os.OpenFile(fileName, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644)
  109. if err != nil {
  110. return nil
  111. }
  112. defer dst.Close()
  113. for {
  114. resp, receiveErr := client.Recv()
  115. if receiveErr == io.EOF {
  116. break
  117. }
  118. if receiveErr != nil {
  119. return fmt.Errorf("receiving %s: %v", fileName, receiveErr)
  120. }
  121. dst.Write(resp.FileContent)
  122. }
  123. return nil
  124. }
  125. func (vs *VolumeServer) ReadVolumeFileStatus(ctx context.Context, req *volume_server_pb.ReadVolumeFileStatusRequest) (*volume_server_pb.ReadVolumeFileStatusResponse, error) {
  126. resp := &volume_server_pb.ReadVolumeFileStatusResponse{}
  127. v := vs.store.GetVolume(storage.VolumeId(req.VolumeId))
  128. if v == nil {
  129. return nil, fmt.Errorf("not found volume id %d", req.VolumeId)
  130. }
  131. resp.VolumeId = req.VolumeId
  132. resp.DatFileSize = v.DataFileSize()
  133. resp.IdxFileSize = v.IndexFileSize()
  134. resp.DatFileTimestamp = v.LastModifiedTime()
  135. resp.IdxFileTimestamp = v.LastModifiedTime()
  136. resp.FileCount = v.FileCount()
  137. return resp, nil
  138. }
  139. func (vs *VolumeServer) CopyFile(req *volume_server_pb.CopyFileRequest, stream volume_server_pb.VolumeServer_CopyFileServer) error {
  140. v := vs.store.GetVolume(storage.VolumeId(req.VolumeId))
  141. if v == nil {
  142. return fmt.Errorf("not found volume id %d", req.VolumeId)
  143. }
  144. const BufferSize = 1024 * 16
  145. var fileName = v.FileName()
  146. if req.IsDatFile {
  147. fileName += ".dat"
  148. } else if req.IsIdxFile {
  149. fileName += ".idx"
  150. }
  151. file, err := os.Open(fileName)
  152. if err != nil {
  153. return err
  154. }
  155. defer file.Close()
  156. buffer := make([]byte, BufferSize)
  157. for {
  158. bytesread, err := file.Read(buffer)
  159. if err != nil {
  160. if err != io.EOF {
  161. return err
  162. }
  163. break
  164. }
  165. stream.Send(&volume_server_pb.CopyFileResponse{
  166. FileContent: buffer[:bytesread],
  167. })
  168. }
  169. return nil
  170. }