You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

319 lines
9.6 KiB

6 years ago
6 years ago
  1. package weed_server
  2. import (
  3. "context"
  4. "fmt"
  5. "io"
  6. "io/ioutil"
  7. "math"
  8. "os"
  9. "path"
  10. "strings"
  11. "github.com/chrislusf/seaweedfs/weed/glog"
  12. "github.com/chrislusf/seaweedfs/weed/operation"
  13. "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
  14. "github.com/chrislusf/seaweedfs/weed/storage"
  15. "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding"
  16. "github.com/chrislusf/seaweedfs/weed/storage/needle"
  17. "github.com/chrislusf/seaweedfs/weed/storage/types"
  18. "github.com/chrislusf/seaweedfs/weed/util"
  19. )
  20. /*
  21. Steps to apply erasure coding to .dat .idx files
  22. 0. ensure the volume is readonly
  23. 1. client call VolumeEcShardsGenerate to generate the .ecx and .ec01~.ec14 files
  24. 2. client ask master for possible servers to hold the ec files, at least 4 servers
  25. 3. client call VolumeEcShardsCopy on above target servers to copy ec files from the source server
  26. 4. target servers report the new ec files to the master
  27. 5. master stores vid -> [14]*DataNode
  28. 6. client checks master. If all 14 slices are ready, delete the original .idx, .idx files
  29. */
  30. // VolumeEcShardsGenerate generates the .ecx and .ec01 ~ .ec14 files
  31. func (vs *VolumeServer) VolumeEcShardsGenerate(ctx context.Context, req *volume_server_pb.VolumeEcShardsGenerateRequest) (*volume_server_pb.VolumeEcShardsGenerateResponse, error) {
  32. v := vs.store.GetVolume(needle.VolumeId(req.VolumeId))
  33. if v == nil {
  34. return nil, fmt.Errorf("volume %d not found", req.VolumeId)
  35. }
  36. baseFileName := v.FileName()
  37. if v.Collection != req.Collection {
  38. return nil, fmt.Errorf("existing collection:%v unexpected input: %v", v.Collection, req.Collection)
  39. }
  40. // write .ecx file
  41. if err := erasure_coding.WriteSortedEcxFile(baseFileName); err != nil {
  42. return nil, fmt.Errorf("WriteSortedEcxFile %s: %v", baseFileName, err)
  43. }
  44. // write .ec01 ~ .ec14 files
  45. if err := erasure_coding.WriteEcFiles(baseFileName); err != nil {
  46. return nil, fmt.Errorf("WriteEcFiles %s: %v", baseFileName, err)
  47. }
  48. return &volume_server_pb.VolumeEcShardsGenerateResponse{}, nil
  49. }
  50. // VolumeEcShardsRebuild generates the any of the missing .ec01 ~ .ec14 files
  51. func (vs *VolumeServer) VolumeEcShardsRebuild(ctx context.Context, req *volume_server_pb.VolumeEcShardsRebuildRequest) (*volume_server_pb.VolumeEcShardsRebuildResponse, error) {
  52. baseFileName := erasure_coding.EcShardBaseFileName(req.Collection, int(req.VolumeId))
  53. var rebuiltShardIds []uint32
  54. for _, location := range vs.store.Locations {
  55. if util.FileExists(path.Join(location.Directory, baseFileName+".ecx")) {
  56. // write .ec01 ~ .ec14 files
  57. baseFileName = path.Join(location.Directory, baseFileName)
  58. if generatedShardIds, err := erasure_coding.RebuildEcFiles(baseFileName); err != nil {
  59. return nil, fmt.Errorf("RebuildEcFiles %s: %v", baseFileName, err)
  60. } else {
  61. rebuiltShardIds = generatedShardIds
  62. }
  63. if err := erasure_coding.RebuildEcxFile(baseFileName); err != nil {
  64. return nil, fmt.Errorf("RebuildEcxFile %s: %v", baseFileName, err)
  65. }
  66. break
  67. }
  68. }
  69. return &volume_server_pb.VolumeEcShardsRebuildResponse{
  70. RebuiltShardIds: rebuiltShardIds,
  71. }, nil
  72. }
  73. // VolumeEcShardsCopy copy the .ecx and some ec data slices
  74. func (vs *VolumeServer) VolumeEcShardsCopy(ctx context.Context, req *volume_server_pb.VolumeEcShardsCopyRequest) (*volume_server_pb.VolumeEcShardsCopyResponse, error) {
  75. location := vs.store.FindFreeLocation()
  76. if location == nil {
  77. return nil, fmt.Errorf("no space left")
  78. }
  79. baseFileName := storage.VolumeFileName(location.Directory, req.Collection, int(req.VolumeId))
  80. err := operation.WithVolumeServerClient(req.SourceDataNode, vs.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
  81. // copy ec data slices
  82. for _, shardId := range req.ShardIds {
  83. if err := vs.doCopyFile(ctx, client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, baseFileName, erasure_coding.ToExt(int(shardId)), false); err != nil {
  84. return err
  85. }
  86. }
  87. if !req.CopyEcxFile {
  88. return nil
  89. }
  90. // copy ecx file
  91. if err := vs.doCopyFile(ctx, client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, baseFileName, ".ecx", false); err != nil {
  92. return err
  93. }
  94. // copy ecj file
  95. if err := vs.doCopyFile(ctx, client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, baseFileName, ".ecj", true); err != nil {
  96. return err
  97. }
  98. return nil
  99. })
  100. if err != nil {
  101. return nil, fmt.Errorf("VolumeEcShardsCopy volume %d: %v", req.VolumeId, err)
  102. }
  103. return &volume_server_pb.VolumeEcShardsCopyResponse{}, nil
  104. }
  105. // VolumeEcShardsDelete local delete the .ecx and some ec data slices if not needed
  106. // the shard should not be mounted before calling this.
  107. func (vs *VolumeServer) VolumeEcShardsDelete(ctx context.Context, req *volume_server_pb.VolumeEcShardsDeleteRequest) (*volume_server_pb.VolumeEcShardsDeleteResponse, error) {
  108. baseFilename := erasure_coding.EcShardBaseFileName(req.Collection, int(req.VolumeId))
  109. found := false
  110. for _, location := range vs.store.Locations {
  111. if util.FileExists(path.Join(location.Directory, baseFilename+".ecx")) {
  112. found = true
  113. baseFilename = path.Join(location.Directory, baseFilename)
  114. for _, shardId := range req.ShardIds {
  115. os.Remove(baseFilename + erasure_coding.ToExt(int(shardId)))
  116. }
  117. break
  118. }
  119. }
  120. if !found {
  121. return nil, nil
  122. }
  123. // check whether to delete the ecx file also
  124. hasEcxFile := false
  125. existingShardCount := 0
  126. for _, location := range vs.store.Locations {
  127. fileInfos, err := ioutil.ReadDir(location.Directory)
  128. if err != nil {
  129. continue
  130. }
  131. for _, fileInfo := range fileInfos {
  132. if fileInfo.Name() == baseFilename+".ecx" {
  133. hasEcxFile = true
  134. continue
  135. }
  136. if strings.HasPrefix(fileInfo.Name(), baseFilename+".ec") {
  137. existingShardCount++
  138. }
  139. }
  140. }
  141. if hasEcxFile && existingShardCount == 0 {
  142. if err := os.Remove(baseFilename + ".ecx"); err != nil {
  143. return nil, err
  144. }
  145. if err := os.Remove(baseFilename + ".ecj"); err != nil {
  146. return nil, err
  147. }
  148. }
  149. return &volume_server_pb.VolumeEcShardsDeleteResponse{}, nil
  150. }
  151. func (vs *VolumeServer) VolumeEcShardsMount(ctx context.Context, req *volume_server_pb.VolumeEcShardsMountRequest) (*volume_server_pb.VolumeEcShardsMountResponse, error) {
  152. for _, shardId := range req.ShardIds {
  153. err := vs.store.MountEcShards(req.Collection, needle.VolumeId(req.VolumeId), erasure_coding.ShardId(shardId))
  154. if err != nil {
  155. glog.Errorf("ec shard mount %v: %v", req, err)
  156. } else {
  157. glog.V(2).Infof("ec shard mount %v", req)
  158. }
  159. if err != nil {
  160. return nil, fmt.Errorf("mount %d.%d: %v", req.VolumeId, shardId, err)
  161. }
  162. }
  163. return &volume_server_pb.VolumeEcShardsMountResponse{}, nil
  164. }
  165. func (vs *VolumeServer) VolumeEcShardsUnmount(ctx context.Context, req *volume_server_pb.VolumeEcShardsUnmountRequest) (*volume_server_pb.VolumeEcShardsUnmountResponse, error) {
  166. for _, shardId := range req.ShardIds {
  167. err := vs.store.UnmountEcShards(needle.VolumeId(req.VolumeId), erasure_coding.ShardId(shardId))
  168. if err != nil {
  169. glog.Errorf("ec shard unmount %v: %v", req, err)
  170. } else {
  171. glog.V(2).Infof("ec shard unmount %v", req)
  172. }
  173. if err != nil {
  174. return nil, fmt.Errorf("unmount %d.%d: %v", req.VolumeId, shardId, err)
  175. }
  176. }
  177. return &volume_server_pb.VolumeEcShardsUnmountResponse{}, nil
  178. }
  179. func (vs *VolumeServer) VolumeEcShardRead(req *volume_server_pb.VolumeEcShardReadRequest, stream volume_server_pb.VolumeServer_VolumeEcShardReadServer) error {
  180. ecVolume, found := vs.store.FindEcVolume(needle.VolumeId(req.VolumeId))
  181. if !found {
  182. return fmt.Errorf("VolumeEcShardRead not found ec volume id %d", req.VolumeId)
  183. }
  184. ecShard, found := ecVolume.FindEcVolumeShard(erasure_coding.ShardId(req.ShardId))
  185. if !found {
  186. return fmt.Errorf("not found ec shard %d.%d", req.VolumeId, req.ShardId)
  187. }
  188. if req.FileKey != 0 {
  189. _, size, _ := ecVolume.FindNeedleFromEcx(types.Uint64ToNeedleId(req.FileKey))
  190. if size == types.TombstoneFileSize {
  191. return stream.Send(&volume_server_pb.VolumeEcShardReadResponse{
  192. IsDeleted: true,
  193. })
  194. }
  195. }
  196. bufSize := req.Size
  197. if bufSize > BufferSizeLimit {
  198. bufSize = BufferSizeLimit
  199. }
  200. buffer := make([]byte, bufSize)
  201. startOffset, bytesToRead := req.Offset, req.Size
  202. for bytesToRead > 0 {
  203. // min of bytesToRead and bufSize
  204. bufferSize := bufSize
  205. if bufferSize > bytesToRead {
  206. bufferSize = bytesToRead
  207. }
  208. bytesread, err := ecShard.ReadAt(buffer[0:bufferSize], startOffset)
  209. // println("read", ecShard.FileName(), "startOffset", startOffset, bytesread, "bytes, with target", bufferSize)
  210. if bytesread > 0 {
  211. if int64(bytesread) > bytesToRead {
  212. bytesread = int(bytesToRead)
  213. }
  214. err = stream.Send(&volume_server_pb.VolumeEcShardReadResponse{
  215. Data: buffer[:bytesread],
  216. })
  217. if err != nil {
  218. // println("sending", bytesread, "bytes err", err.Error())
  219. return err
  220. }
  221. startOffset += int64(bytesread)
  222. bytesToRead -= int64(bytesread)
  223. }
  224. if err != nil {
  225. if err != io.EOF {
  226. return err
  227. }
  228. return nil
  229. }
  230. }
  231. return nil
  232. }
  233. func (vs *VolumeServer) VolumeEcBlobDelete(ctx context.Context, req *volume_server_pb.VolumeEcBlobDeleteRequest) (*volume_server_pb.VolumeEcBlobDeleteResponse, error) {
  234. resp := &volume_server_pb.VolumeEcBlobDeleteResponse{}
  235. for _, location := range vs.store.Locations {
  236. if localEcVolume, found := location.FindEcVolume(needle.VolumeId(req.VolumeId)); found {
  237. _, size, _, err := localEcVolume.LocateEcShardNeedle(types.NeedleId(req.FileKey), needle.Version(req.Version))
  238. if err != nil {
  239. return nil, fmt.Errorf("locate in local ec volume: %v", err)
  240. }
  241. if size == types.TombstoneFileSize {
  242. return resp, nil
  243. }
  244. err = localEcVolume.DeleteNeedleFromEcx(types.NeedleId(req.FileKey))
  245. if err != nil {
  246. return nil, err
  247. }
  248. break
  249. }
  250. }
  251. return resp, nil
  252. }