You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

263 lines
7.9 KiB

6 years ago
6 years ago
  1. package weed_server
  2. import (
  3. "context"
  4. "fmt"
  5. "io"
  6. "io/ioutil"
  7. "math"
  8. "os"
  9. "path"
  10. "strings"
  11. "github.com/chrislusf/seaweedfs/weed/glog"
  12. "github.com/chrislusf/seaweedfs/weed/operation"
  13. "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
  14. "github.com/chrislusf/seaweedfs/weed/storage"
  15. "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding"
  16. "github.com/chrislusf/seaweedfs/weed/storage/needle"
  17. "github.com/chrislusf/seaweedfs/weed/util"
  18. )
  19. /*
  20. Steps to apply erasure coding to .dat .idx files
  21. 0. ensure the volume is readonly
  22. 1. client call VolumeEcShardsGenerate to generate the .ecx and .ec01~.ec14 files
  23. 2. client ask master for possible servers to hold the ec files, at least 4 servers
  24. 3. client call VolumeEcShardsCopy on above target servers to copy ec files from the source server
  25. 4. target servers report the new ec files to the master
  26. 5. master stores vid -> [14]*DataNode
  27. 6. client checks master. If all 14 slices are ready, delete the original .idx, .idx files
  28. */
  29. // VolumeEcShardsGenerate generates the .ecx and .ec01 ~ .ec14 files
  30. func (vs *VolumeServer) VolumeEcShardsGenerate(ctx context.Context, req *volume_server_pb.VolumeEcShardsGenerateRequest) (*volume_server_pb.VolumeEcShardsGenerateResponse, error) {
  31. v := vs.store.GetVolume(needle.VolumeId(req.VolumeId))
  32. if v == nil {
  33. return nil, fmt.Errorf("volume %d not found", req.VolumeId)
  34. }
  35. baseFileName := v.FileName()
  36. if v.Collection != req.Collection {
  37. return nil, fmt.Errorf("existing collection:%v unexpected input: %v", v.Collection, req.Collection)
  38. }
  39. // write .ecx file
  40. if err := erasure_coding.WriteSortedEcxFile(baseFileName); err != nil {
  41. return nil, fmt.Errorf("WriteSortedEcxFile %s: %v", baseFileName, err)
  42. }
  43. // write .ec01 ~ .ec14 files
  44. if err := erasure_coding.WriteEcFiles(baseFileName); err != nil {
  45. return nil, fmt.Errorf("WriteEcFiles %s: %v", baseFileName, err)
  46. }
  47. return &volume_server_pb.VolumeEcShardsGenerateResponse{}, nil
  48. }
  49. // VolumeEcShardsRebuild generates the any of the missing .ec01 ~ .ec14 files
  50. func (vs *VolumeServer) VolumeEcShardsRebuild(ctx context.Context, req *volume_server_pb.VolumeEcShardsRebuildRequest) (*volume_server_pb.VolumeEcShardsRebuildResponse, error) {
  51. baseFileName := erasure_coding.EcShardBaseFileName(req.Collection, int(req.VolumeId))
  52. var rebuiltShardIds []uint32
  53. for _, location := range vs.store.Locations {
  54. if util.FileExists(path.Join(location.Directory, baseFileName+".ecx")) {
  55. // write .ec01 ~ .ec14 files
  56. baseFileName = path.Join(location.Directory, baseFileName)
  57. if generatedShardIds, err := erasure_coding.RebuildEcFiles(baseFileName); err != nil {
  58. return nil, fmt.Errorf("RebuildEcFiles %s: %v", baseFileName, err)
  59. } else {
  60. rebuiltShardIds = generatedShardIds
  61. }
  62. break
  63. }
  64. }
  65. return &volume_server_pb.VolumeEcShardsRebuildResponse{
  66. RebuiltShardIds: rebuiltShardIds,
  67. }, nil
  68. }
  69. // VolumeEcShardsCopy copy the .ecx and some ec data slices
  70. func (vs *VolumeServer) VolumeEcShardsCopy(ctx context.Context, req *volume_server_pb.VolumeEcShardsCopyRequest) (*volume_server_pb.VolumeEcShardsCopyResponse, error) {
  71. location := vs.store.FindFreeLocation()
  72. if location == nil {
  73. return nil, fmt.Errorf("no space left")
  74. }
  75. baseFileName := storage.VolumeFileName(location.Directory, req.Collection, int(req.VolumeId))
  76. err := operation.WithVolumeServerClient(req.SourceDataNode, vs.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
  77. // copy ec data slices
  78. for _, shardId := range req.ShardIds {
  79. if err := vs.doCopyFile(ctx, client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, baseFileName, erasure_coding.ToExt(int(shardId))); err != nil {
  80. return err
  81. }
  82. }
  83. if !req.CopyEcxFile {
  84. return nil
  85. }
  86. // copy ecx file
  87. if err := vs.doCopyFile(ctx, client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, baseFileName, ".ecx"); err != nil {
  88. return err
  89. }
  90. return nil
  91. })
  92. if err != nil {
  93. return nil, fmt.Errorf("VolumeEcShardsCopy volume %d: %v", req.VolumeId, err)
  94. }
  95. return &volume_server_pb.VolumeEcShardsCopyResponse{}, nil
  96. }
  97. // VolumeEcShardsDelete local delete the .ecx and some ec data slices if not needed
  98. // the shard should not be mounted before calling this.
  99. func (vs *VolumeServer) VolumeEcShardsDelete(ctx context.Context, req *volume_server_pb.VolumeEcShardsDeleteRequest) (*volume_server_pb.VolumeEcShardsDeleteResponse, error) {
  100. baseFilename := erasure_coding.EcShardBaseFileName(req.Collection, int(req.VolumeId))
  101. found := false
  102. for _, location := range vs.store.Locations {
  103. if util.FileExists(path.Join(location.Directory, baseFilename+".ecx")) {
  104. found = true
  105. baseFilename = path.Join(location.Directory, baseFilename)
  106. for _, shardId := range req.ShardIds {
  107. os.Remove(baseFilename + erasure_coding.ToExt(int(shardId)))
  108. }
  109. break
  110. }
  111. }
  112. if !found {
  113. return nil, nil
  114. }
  115. // check whether to delete the ecx file also
  116. hasEcxFile := false
  117. existingShardCount := 0
  118. for _, location := range vs.store.Locations {
  119. fileInfos, err := ioutil.ReadDir(location.Directory)
  120. if err != nil {
  121. continue
  122. }
  123. for _, fileInfo := range fileInfos {
  124. if fileInfo.Name() == baseFilename+".ecx" {
  125. hasEcxFile = true
  126. continue
  127. }
  128. if strings.HasPrefix(fileInfo.Name(), baseFilename+".ec") {
  129. existingShardCount++
  130. }
  131. }
  132. }
  133. if hasEcxFile && existingShardCount == 0 {
  134. if err := os.Remove(baseFilename + ".ecx"); err != nil {
  135. return nil, err
  136. }
  137. }
  138. return &volume_server_pb.VolumeEcShardsDeleteResponse{}, nil
  139. }
  140. func (vs *VolumeServer) VolumeEcShardsMount(ctx context.Context, req *volume_server_pb.VolumeEcShardsMountRequest) (*volume_server_pb.VolumeEcShardsMountResponse, error) {
  141. for _, shardId := range req.ShardIds {
  142. err := vs.store.MountEcShards(req.Collection, needle.VolumeId(req.VolumeId), erasure_coding.ShardId(shardId))
  143. if err != nil {
  144. glog.Errorf("ec shard mount %v: %v", req, err)
  145. } else {
  146. glog.V(2).Infof("ec shard mount %v", req)
  147. }
  148. if err != nil {
  149. return nil, fmt.Errorf("mount %d.%d: %v", req.VolumeId, shardId, err)
  150. }
  151. }
  152. return &volume_server_pb.VolumeEcShardsMountResponse{}, nil
  153. }
  154. func (vs *VolumeServer) VolumeEcShardsUnmount(ctx context.Context, req *volume_server_pb.VolumeEcShardsUnmountRequest) (*volume_server_pb.VolumeEcShardsUnmountResponse, error) {
  155. for _, shardId := range req.ShardIds {
  156. err := vs.store.UnmountEcShards(needle.VolumeId(req.VolumeId), erasure_coding.ShardId(shardId))
  157. if err != nil {
  158. glog.Errorf("ec shard unmount %v: %v", req, err)
  159. } else {
  160. glog.V(2).Infof("ec shard unmount %v", req)
  161. }
  162. if err != nil {
  163. return nil, fmt.Errorf("unmount %d.%d: %v", req.VolumeId, shardId, err)
  164. }
  165. }
  166. return &volume_server_pb.VolumeEcShardsUnmountResponse{}, nil
  167. }
  168. func (vs *VolumeServer) VolumeEcShardRead(req *volume_server_pb.VolumeEcShardReadRequest, stream volume_server_pb.VolumeServer_VolumeEcShardReadServer) error {
  169. ecVolume, found := vs.store.FindEcVolume(needle.VolumeId(req.VolumeId))
  170. if !found {
  171. return fmt.Errorf("VolumeEcShardRead not found ec volume id %d", req.VolumeId)
  172. }
  173. ecShard, found := ecVolume.FindEcVolumeShard(erasure_coding.ShardId(req.ShardId))
  174. if !found {
  175. return fmt.Errorf("not found ec shard %d.%d", req.VolumeId, req.ShardId)
  176. }
  177. bufSize := req.Size
  178. if bufSize > BufferSizeLimit {
  179. bufSize = BufferSizeLimit
  180. }
  181. buffer := make([]byte, bufSize)
  182. startOffset, bytesToRead := req.Offset, req.Size
  183. for bytesToRead > 0 {
  184. bytesread, err := ecShard.ReadAt(buffer, startOffset)
  185. // println(fileName, "read", bytesread, "bytes, with target", bytesToRead)
  186. if bytesread > 0 {
  187. if int64(bytesread) > bytesToRead {
  188. bytesread = int(bytesToRead)
  189. }
  190. err = stream.Send(&volume_server_pb.VolumeEcShardReadResponse{
  191. Data: buffer[:bytesread],
  192. })
  193. if err != nil {
  194. // println("sending", bytesread, "bytes err", err.Error())
  195. return err
  196. }
  197. bytesToRead -= int64(bytesread)
  198. }
  199. if err != nil {
  200. if err != io.EOF {
  201. return err
  202. }
  203. return nil
  204. }
  205. }
  206. return nil
  207. }