You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

233 lines
6.8 KiB

6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
  1. package weed_server
  2. import (
  3. "context"
  4. "fmt"
  5. "io"
  6. "os"
  7. "time"
  8. "github.com/chrislusf/seaweedfs/weed/glog"
  9. "github.com/chrislusf/seaweedfs/weed/operation"
  10. "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
  11. "github.com/chrislusf/seaweedfs/weed/storage"
  12. "github.com/chrislusf/seaweedfs/weed/storage/needle"
  13. )
  14. // VolumeCopy copy the .idx .dat files, and mount the volume
  15. func (vs *VolumeServer) VolumeCopy(ctx context.Context, req *volume_server_pb.VolumeCopyRequest) (*volume_server_pb.VolumeCopyResponse, error) {
  16. v := vs.store.GetVolume(needle.VolumeId(req.VolumeId))
  17. if v != nil {
  18. return nil, fmt.Errorf("volume %d already exists", req.VolumeId)
  19. }
  20. location := vs.store.FindFreeLocation()
  21. if location == nil {
  22. return nil, fmt.Errorf("no space left")
  23. }
  24. // the master will not start compaction for read-only volumes, so it is safe to just copy files directly
  25. // copy .dat and .idx files
  26. // read .idx .dat file size and timestamp
  27. // send .idx file
  28. // send .dat file
  29. // confirm size and timestamp
  30. var volFileInfoResp *volume_server_pb.ReadVolumeFileStatusResponse
  31. var volumeFileName, idxFileName, datFileName string
  32. err := operation.WithVolumeServerClient(req.SourceDataNode, vs.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
  33. var err error
  34. volFileInfoResp, err = client.ReadVolumeFileStatus(ctx,
  35. &volume_server_pb.ReadVolumeFileStatusRequest{
  36. VolumeId: req.VolumeId,
  37. })
  38. if nil != err {
  39. return fmt.Errorf("read volume file status failed, %v", err)
  40. }
  41. volumeFileName = storage.VolumeFileName(volFileInfoResp.Collection, location.Directory, int(req.VolumeId))
  42. // println("source:", volFileInfoResp.String())
  43. copyFileClient, err := client.CopyFile(ctx, &volume_server_pb.CopyFileRequest{
  44. VolumeId: req.VolumeId,
  45. IsIdxFile: true,
  46. CompactionRevision: volFileInfoResp.CompactionRevision,
  47. StopOffset: volFileInfoResp.IdxFileSize,
  48. })
  49. if err != nil {
  50. return fmt.Errorf("failed to start copying volume %d idx file: %v", req.VolumeId, err)
  51. }
  52. idxFileName = volumeFileName + ".idx"
  53. err = writeToFile(copyFileClient, idxFileName)
  54. if err != nil {
  55. return fmt.Errorf("failed to copy volume %d idx file: %v", req.VolumeId, err)
  56. }
  57. copyFileClient, err = client.CopyFile(ctx, &volume_server_pb.CopyFileRequest{
  58. VolumeId: req.VolumeId,
  59. IsDatFile: true,
  60. CompactionRevision: volFileInfoResp.CompactionRevision,
  61. StopOffset: volFileInfoResp.DatFileSize,
  62. })
  63. if err != nil {
  64. return fmt.Errorf("failed to start copying volume %d dat file: %v", req.VolumeId, err)
  65. }
  66. datFileName = volumeFileName + ".dat"
  67. err = writeToFile(copyFileClient, datFileName)
  68. if err != nil {
  69. return fmt.Errorf("failed to copy volume %d dat file: %v", req.VolumeId, err)
  70. }
  71. return nil
  72. })
  73. if err != nil && volumeFileName != "" {
  74. if idxFileName != "" {
  75. os.Remove(idxFileName)
  76. }
  77. if datFileName != "" {
  78. os.Remove(datFileName)
  79. }
  80. return nil, err
  81. }
  82. if err = checkCopyFiles(volFileInfoResp, idxFileName, datFileName); err != nil { // added by panyc16
  83. return nil, err
  84. }
  85. // mount the volume
  86. err = vs.store.MountVolume(needle.VolumeId(req.VolumeId))
  87. if err != nil {
  88. return nil, fmt.Errorf("failed to mount volume %d: %v", req.VolumeId, err)
  89. }
  90. return &volume_server_pb.VolumeCopyResponse{
  91. LastAppendAtNs: volFileInfoResp.DatFileTimestampSeconds * uint64(time.Second),
  92. }, err
  93. }
  94. /**
  95. only check the the differ of the file size
  96. todo: maybe should check the received count and deleted count of the volume
  97. */
  98. func checkCopyFiles(originFileInf *volume_server_pb.ReadVolumeFileStatusResponse, idxFileName, datFileName string) error {
  99. stat, err := os.Stat(idxFileName)
  100. if err != nil {
  101. return fmt.Errorf("get idx file info failed, %v", err)
  102. }
  103. if originFileInf.IdxFileSize != uint64(stat.Size()) {
  104. return fmt.Errorf("the idx file size [%v] is not same as origin file size [%v]",
  105. stat.Size(), originFileInf.IdxFileSize)
  106. }
  107. stat, err = os.Stat(datFileName)
  108. if err != nil {
  109. return fmt.Errorf("get dat file info failed, %v", err)
  110. }
  111. if originFileInf.DatFileSize != uint64(stat.Size()) {
  112. return fmt.Errorf("the dat file size [%v] is not same as origin file size [%v]",
  113. stat.Size(), originFileInf.DatFileSize)
  114. }
  115. return nil
  116. }
  117. func writeToFile(client volume_server_pb.VolumeServer_CopyFileClient, fileName string) error {
  118. glog.V(4).Infof("writing to %s", fileName)
  119. dst, err := os.OpenFile(fileName, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644)
  120. if err != nil {
  121. return nil
  122. }
  123. defer dst.Close()
  124. for {
  125. resp, receiveErr := client.Recv()
  126. if receiveErr == io.EOF {
  127. break
  128. }
  129. if receiveErr != nil {
  130. return fmt.Errorf("receiving %s: %v", fileName, receiveErr)
  131. }
  132. dst.Write(resp.FileContent)
  133. }
  134. return nil
  135. }
  136. func (vs *VolumeServer) ReadVolumeFileStatus(ctx context.Context, req *volume_server_pb.ReadVolumeFileStatusRequest) (*volume_server_pb.ReadVolumeFileStatusResponse, error) {
  137. resp := &volume_server_pb.ReadVolumeFileStatusResponse{}
  138. v := vs.store.GetVolume(needle.VolumeId(req.VolumeId))
  139. if v == nil {
  140. return nil, fmt.Errorf("not found volume id %d", req.VolumeId)
  141. }
  142. resp.VolumeId = req.VolumeId
  143. datSize, idxSize, modTime := v.FileStat()
  144. resp.DatFileSize = datSize
  145. resp.IdxFileSize = idxSize
  146. resp.DatFileTimestampSeconds = uint64(modTime.Unix())
  147. resp.IdxFileTimestampSeconds = uint64(modTime.Unix())
  148. resp.FileCount = v.FileCount()
  149. resp.CompactionRevision = uint32(v.CompactionRevision)
  150. resp.Collection = v.Collection
  151. return resp, nil
  152. }
  153. func (vs *VolumeServer) CopyFile(req *volume_server_pb.CopyFileRequest, stream volume_server_pb.VolumeServer_CopyFileServer) error {
  154. v := vs.store.GetVolume(needle.VolumeId(req.VolumeId))
  155. if v == nil {
  156. return fmt.Errorf("not found volume id %d", req.VolumeId)
  157. }
  158. if uint32(v.CompactionRevision) != req.CompactionRevision {
  159. return fmt.Errorf("volume %d is compacted", req.VolumeId)
  160. }
  161. bytesToRead := int64(req.StopOffset)
  162. const BufferSize = 1024 * 1024 * 2
  163. var fileName = v.FileName()
  164. if req.IsDatFile {
  165. fileName += ".dat"
  166. } else if req.IsIdxFile {
  167. fileName += ".idx"
  168. }
  169. file, err := os.Open(fileName)
  170. if err != nil {
  171. return err
  172. }
  173. defer file.Close()
  174. buffer := make([]byte, BufferSize)
  175. for bytesToRead > 0 {
  176. bytesread, err := file.Read(buffer)
  177. // println(fileName, "read", bytesread, "bytes, with target", bytesToRead)
  178. if err != nil {
  179. if err != io.EOF {
  180. return err
  181. }
  182. // println(fileName, "read", bytesread, "bytes, with target", bytesToRead, "err", err.Error())
  183. break
  184. }
  185. if int64(bytesread) > bytesToRead {
  186. bytesread = int(bytesToRead)
  187. }
  188. err = stream.Send(&volume_server_pb.CopyFileResponse{
  189. FileContent: buffer[:bytesread],
  190. })
  191. if err != nil {
  192. // println("sending", bytesread, "bytes err", err.Error())
  193. return err
  194. }
  195. bytesToRead -= int64(bytesread)
  196. }
  197. return nil
  198. }