You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

240 lines
7.3 KiB

6 years ago
6 years ago
  1. package weed_server
  2. import (
  3. "context"
  4. "fmt"
  5. "io"
  6. "math"
  7. "os"
  8. "github.com/chrislusf/seaweedfs/weed/glog"
  9. "github.com/chrislusf/seaweedfs/weed/operation"
  10. "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
  11. "github.com/chrislusf/seaweedfs/weed/storage"
  12. "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding"
  13. "github.com/chrislusf/seaweedfs/weed/storage/needle"
  14. )
  15. /*
  16. Steps to apply erasure coding to .dat .idx files
  17. 0. ensure the volume is readonly
  18. 1. client call VolumeEcShardsGenerate to generate the .ecx and .ec01~.ec14 files
  19. 2. client ask master for possible servers to hold the ec files, at least 4 servers
  20. 3. client call VolumeEcShardsCopy on above target servers to copy ec files from the source server
  21. 4. target servers report the new ec files to the master
  22. 5. master stores vid -> [14]*DataNode
  23. 6. client checks master. If all 14 slices are ready, delete the original .idx, .idx files
  24. */
  25. // VolumeEcShardsGenerate generates the .ecx and .ec01 ~ .ec14 files
  26. func (vs *VolumeServer) VolumeEcShardsGenerate(ctx context.Context, req *volume_server_pb.VolumeEcShardsGenerateRequest) (*volume_server_pb.VolumeEcShardsGenerateResponse, error) {
  27. v := vs.store.GetVolume(needle.VolumeId(req.VolumeId))
  28. if v == nil {
  29. return nil, fmt.Errorf("volume %d not found", req.VolumeId)
  30. }
  31. baseFileName := v.FileName()
  32. if v.Collection != req.Collection {
  33. return nil, fmt.Errorf("existing collection:%v unexpected input: %v", v.Collection, req.Collection)
  34. }
  35. // write .ecx file
  36. if err := erasure_coding.WriteSortedEcxFile(baseFileName); err != nil {
  37. return nil, fmt.Errorf("WriteSortedEcxFile %s: %v", baseFileName, err)
  38. }
  39. // write .ec01 ~ .ec14 files
  40. if err := erasure_coding.WriteEcFiles(baseFileName); err != nil {
  41. return nil, fmt.Errorf("WriteEcFiles %s: %v", baseFileName, err)
  42. }
  43. return &volume_server_pb.VolumeEcShardsGenerateResponse{}, nil
  44. }
  45. // VolumeEcShardsCopy copy the .ecx and some ec data slices
  46. func (vs *VolumeServer) VolumeEcShardsCopy(ctx context.Context, req *volume_server_pb.VolumeEcShardsCopyRequest) (*volume_server_pb.VolumeEcShardsCopyResponse, error) {
  47. location := vs.store.FindFreeLocation()
  48. if location == nil {
  49. return nil, fmt.Errorf("no space left")
  50. }
  51. baseFileName := storage.VolumeFileName(req.Collection, location.Directory, int(req.VolumeId))
  52. err := operation.WithVolumeServerClient(req.SourceDataNode, vs.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
  53. // copy ecx file
  54. if err := vs.doCopyFile(ctx, client, req.VolumeId, math.MaxUint32, math.MaxInt64, baseFileName, ".ecx"); err != nil {
  55. return err
  56. }
  57. // copy ec data slices
  58. for _, shardId := range req.ShardIds {
  59. if err := vs.doCopyFile(ctx, client, req.VolumeId, math.MaxUint32, math.MaxInt64, baseFileName, erasure_coding.ToExt(int(shardId))); err != nil {
  60. return err
  61. }
  62. }
  63. return nil
  64. })
  65. if err != nil {
  66. return nil, fmt.Errorf("VolumeEcShardsCopy volume %d: %v", req.VolumeId, err)
  67. }
  68. return &volume_server_pb.VolumeEcShardsCopyResponse{}, nil
  69. }
  70. // VolumeEcShardsDelete local delete the .ecx and some ec data slices if not needed, assuming current server has the source volume
  71. func (vs *VolumeServer) VolumeEcShardsDelete(ctx context.Context, req *volume_server_pb.VolumeEcShardsDeleteRequest) (*volume_server_pb.VolumeEcShardsDeleteResponse, error) {
  72. foundExistingVolume, err := vs.doDeleteUnmountedShards(ctx, req)
  73. if err != nil {
  74. return nil, err
  75. }
  76. if !foundExistingVolume {
  77. err = vs.doDeleteMountedShards(ctx, req)
  78. }
  79. return &volume_server_pb.VolumeEcShardsDeleteResponse{}, err
  80. }
  81. // VolumeEcShardsDelete local delete the .ecx and some ec data slices if not needed, assuming current server has the source volume
  82. func (vs *VolumeServer) doDeleteUnmountedShards(ctx context.Context, req *volume_server_pb.VolumeEcShardsDeleteRequest) (foundVolume bool, err error) {
  83. v := vs.store.GetVolume(needle.VolumeId(req.VolumeId))
  84. if v == nil {
  85. return false, nil
  86. }
  87. baseFileName := v.FileName()
  88. for _, shardId := range req.ShardIds {
  89. if err := os.Remove(baseFileName + erasure_coding.ToExt(int(shardId))); err != nil {
  90. return true, err
  91. }
  92. }
  93. if req.ShouldDeleteEcx {
  94. if err := os.Remove(baseFileName + ".ecx"); err != nil {
  95. return true, err
  96. }
  97. }
  98. return true, nil
  99. }
  100. // VolumeEcShardsDelete local delete the .ecx and some ec data slices if not needed, assuming current server has the source volume
  101. func (vs *VolumeServer) doDeleteMountedShards(ctx context.Context, req *volume_server_pb.VolumeEcShardsDeleteRequest) (error) {
  102. ecv, found := vs.store.FindEcVolume(needle.VolumeId(req.VolumeId))
  103. if !found {
  104. return fmt.Errorf("volume %d not found", req.VolumeId)
  105. }
  106. for _, shardId := range req.ShardIds {
  107. if shard, found := ecv.DeleteEcVolumeShard(erasure_coding.ShardId(shardId)); found {
  108. shard.Destroy()
  109. }
  110. }
  111. if len(ecv.Shards) == 0 {
  112. vs.store.DestroyEcVolume(needle.VolumeId(req.VolumeId))
  113. }
  114. return nil
  115. }
  116. func (vs *VolumeServer) VolumeEcShardsMount(ctx context.Context, req *volume_server_pb.VolumeEcShardsMountRequest) (*volume_server_pb.VolumeEcShardsMountResponse, error) {
  117. for _, shardId := range req.ShardIds {
  118. err := vs.store.MountEcShards(req.Collection, needle.VolumeId(req.VolumeId), erasure_coding.ShardId(shardId))
  119. if err != nil {
  120. glog.Errorf("ec shard mount %v: %v", req, err)
  121. } else {
  122. glog.V(2).Infof("ec shard mount %v", req)
  123. }
  124. if err != nil {
  125. return nil, fmt.Errorf("mount %d.%d: %v", req.VolumeId, shardId, err)
  126. }
  127. }
  128. return &volume_server_pb.VolumeEcShardsMountResponse{}, nil
  129. }
  130. func (vs *VolumeServer) VolumeEcShardsUnmount(ctx context.Context, req *volume_server_pb.VolumeEcShardsUnmountRequest) (*volume_server_pb.VolumeEcShardsUnmountResponse, error) {
  131. for _, shardId := range req.ShardIds {
  132. err := vs.store.UnmountEcShards(needle.VolumeId(req.VolumeId), erasure_coding.ShardId(shardId))
  133. if err != nil {
  134. glog.Errorf("ec shard unmount %v: %v", req, err)
  135. } else {
  136. glog.V(2).Infof("ec shard unmount %v", req)
  137. }
  138. if err != nil {
  139. return nil, fmt.Errorf("unmount %d.%d: %v", req.VolumeId, shardId, err)
  140. }
  141. }
  142. return &volume_server_pb.VolumeEcShardsUnmountResponse{}, nil
  143. }
  144. func (vs *VolumeServer) VolumeEcShardRead(req *volume_server_pb.VolumeEcShardReadRequest, stream volume_server_pb.VolumeServer_VolumeEcShardReadServer) error {
  145. ecVolume, found := vs.store.FindEcVolume(needle.VolumeId(req.VolumeId))
  146. if !found {
  147. return fmt.Errorf("not found ec volume id %d", req.VolumeId)
  148. }
  149. ecShard, found := ecVolume.FindEcVolumeShard(erasure_coding.ShardId(req.ShardId))
  150. if !found {
  151. return fmt.Errorf("not found ec shard %d.%d", req.VolumeId, req.ShardId)
  152. }
  153. bufSize := req.Size
  154. if bufSize > BufferSizeLimit {
  155. bufSize = BufferSizeLimit
  156. }
  157. buffer := make([]byte, bufSize)
  158. startOffset, bytesToRead := req.Offset, req.Size
  159. for bytesToRead > 0 {
  160. bytesread, err := ecShard.ReadAt(buffer, startOffset)
  161. // println(fileName, "read", bytesread, "bytes, with target", bytesToRead)
  162. if bytesread > 0 {
  163. if int64(bytesread) > bytesToRead {
  164. bytesread = int(bytesToRead)
  165. }
  166. err = stream.Send(&volume_server_pb.VolumeEcShardReadResponse{
  167. Data: buffer[:bytesread],
  168. })
  169. if err != nil {
  170. // println("sending", bytesread, "bytes err", err.Error())
  171. return err
  172. }
  173. bytesToRead -= int64(bytesread)
  174. }
  175. if err != nil {
  176. if err != io.EOF {
  177. return err
  178. }
  179. return nil
  180. }
  181. }
  182. return nil
  183. }