You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

377 lines
12 KiB

5 years ago
6 years ago
6 years ago
  1. package weed_server
  2. import (
  3. "context"
  4. "fmt"
  5. "io"
  6. "io/ioutil"
  7. "math"
  8. "os"
  9. "path"
  10. "path/filepath"
  11. "strings"
  12. "github.com/chrislusf/seaweedfs/weed/glog"
  13. "github.com/chrislusf/seaweedfs/weed/operation"
  14. "github.com/chrislusf/seaweedfs/weed/pb"
  15. "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
  16. "github.com/chrislusf/seaweedfs/weed/storage"
  17. "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding"
  18. "github.com/chrislusf/seaweedfs/weed/storage/needle"
  19. "github.com/chrislusf/seaweedfs/weed/storage/types"
  20. "github.com/chrislusf/seaweedfs/weed/util"
  21. )
  22. /*
  23. Steps to apply erasure coding to .dat .idx files
  24. 0. ensure the volume is readonly
  25. 1. client call VolumeEcShardsGenerate to generate the .ecx and .ec00 ~ .ec13 files
  26. 2. client ask master for possible servers to hold the ec files, at least 4 servers
  27. 3. client call VolumeEcShardsCopy on above target servers to copy ec files from the source server
  28. 4. target servers report the new ec files to the master
  29. 5. master stores vid -> [14]*DataNode
  30. 6. client checks master. If all 14 slices are ready, delete the original .idx, .idx files
  31. */
  32. // VolumeEcShardsGenerate generates the .ecx and .ec00 ~ .ec13 files
  33. func (vs *VolumeServer) VolumeEcShardsGenerate(ctx context.Context, req *volume_server_pb.VolumeEcShardsGenerateRequest) (*volume_server_pb.VolumeEcShardsGenerateResponse, error) {
  34. v := vs.store.GetVolume(needle.VolumeId(req.VolumeId))
  35. if v == nil {
  36. return nil, fmt.Errorf("volume %d not found", req.VolumeId)
  37. }
  38. baseFileName := v.FileName()
  39. if v.Collection != req.Collection {
  40. return nil, fmt.Errorf("existing collection:%v unexpected input: %v", v.Collection, req.Collection)
  41. }
  42. // write .ecx file
  43. if err := erasure_coding.WriteSortedFileFromIdx(baseFileName, ".ecx"); err != nil {
  44. return nil, fmt.Errorf("WriteSortedFileFromIdx %s: %v", baseFileName, err)
  45. }
  46. // write .ec00 ~ .ec13 files
  47. if err := erasure_coding.WriteEcFiles(baseFileName); err != nil {
  48. return nil, fmt.Errorf("WriteEcFiles %s: %v", baseFileName, err)
  49. }
  50. // write .vif files
  51. if err := pb.SaveVolumeInfo(baseFileName+".vif", &volume_server_pb.VolumeInfo{Version: uint32(v.Version())}); err != nil {
  52. return nil, fmt.Errorf("WriteEcFiles %s: %v", baseFileName, err)
  53. }
  54. return &volume_server_pb.VolumeEcShardsGenerateResponse{}, nil
  55. }
  56. // VolumeEcShardsRebuild generates the any of the missing .ec00 ~ .ec13 files
  57. func (vs *VolumeServer) VolumeEcShardsRebuild(ctx context.Context, req *volume_server_pb.VolumeEcShardsRebuildRequest) (*volume_server_pb.VolumeEcShardsRebuildResponse, error) {
  58. baseFileName := erasure_coding.EcShardBaseFileName(req.Collection, int(req.VolumeId))
  59. var rebuiltShardIds []uint32
  60. for _, location := range vs.store.Locations {
  61. if util.FileExists(path.Join(location.Directory, baseFileName+".ecx")) {
  62. // write .ec00 ~ .ec13 files
  63. baseFileName = path.Join(location.Directory, baseFileName)
  64. if generatedShardIds, err := erasure_coding.RebuildEcFiles(baseFileName); err != nil {
  65. return nil, fmt.Errorf("RebuildEcFiles %s: %v", baseFileName, err)
  66. } else {
  67. rebuiltShardIds = generatedShardIds
  68. }
  69. if err := erasure_coding.RebuildEcxFile(baseFileName); err != nil {
  70. return nil, fmt.Errorf("RebuildEcxFile %s: %v", baseFileName, err)
  71. }
  72. break
  73. }
  74. }
  75. return &volume_server_pb.VolumeEcShardsRebuildResponse{
  76. RebuiltShardIds: rebuiltShardIds,
  77. }, nil
  78. }
  79. // VolumeEcShardsCopy copy the .ecx and some ec data slices
  80. func (vs *VolumeServer) VolumeEcShardsCopy(ctx context.Context, req *volume_server_pb.VolumeEcShardsCopyRequest) (*volume_server_pb.VolumeEcShardsCopyResponse, error) {
  81. location := vs.store.FindFreeLocation()
  82. if location == nil {
  83. return nil, fmt.Errorf("no space left")
  84. }
  85. baseFileName := storage.VolumeFileName(location.Directory, req.Collection, int(req.VolumeId))
  86. err := operation.WithVolumeServerClient(req.SourceDataNode, vs.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
  87. // copy ec data slices
  88. for _, shardId := range req.ShardIds {
  89. if err := vs.doCopyFile(client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, baseFileName, erasure_coding.ToExt(int(shardId)), false, false); err != nil {
  90. return err
  91. }
  92. }
  93. if req.CopyEcxFile {
  94. // copy ecx file
  95. if err := vs.doCopyFile(client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, baseFileName, ".ecx", false, false); err != nil {
  96. return err
  97. }
  98. return nil
  99. }
  100. if req.CopyEcjFile {
  101. // copy ecj file
  102. if err := vs.doCopyFile(client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, baseFileName, ".ecj", true, true); err != nil {
  103. return err
  104. }
  105. }
  106. if req.CopyVifFile {
  107. // copy vif file
  108. if err := vs.doCopyFile(client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, baseFileName, ".vif", false, true); err != nil {
  109. return err
  110. }
  111. }
  112. return nil
  113. })
  114. if err != nil {
  115. return nil, fmt.Errorf("VolumeEcShardsCopy volume %d: %v", req.VolumeId, err)
  116. }
  117. return &volume_server_pb.VolumeEcShardsCopyResponse{}, nil
  118. }
  119. // VolumeEcShardsDelete local delete the .ecx and some ec data slices if not needed
  120. // the shard should not be mounted before calling this.
  121. func (vs *VolumeServer) VolumeEcShardsDelete(ctx context.Context, req *volume_server_pb.VolumeEcShardsDeleteRequest) (*volume_server_pb.VolumeEcShardsDeleteResponse, error) {
  122. baseFilename := erasure_coding.EcShardBaseFileName(req.Collection, int(req.VolumeId))
  123. glog.V(0).Infof("ec volume %d shard delete %v", req.VolumeId, req.ShardIds)
  124. found := false
  125. for _, location := range vs.store.Locations {
  126. if util.FileExists(path.Join(location.Directory, baseFilename+".ecx")) {
  127. found = true
  128. baseFilename = path.Join(location.Directory, baseFilename)
  129. for _, shardId := range req.ShardIds {
  130. os.Remove(baseFilename + erasure_coding.ToExt(int(shardId)))
  131. }
  132. break
  133. }
  134. }
  135. if !found {
  136. return nil, nil
  137. }
  138. // check whether to delete the .ecx and .ecj file also
  139. hasEcxFile := false
  140. hasIdxFile := false
  141. existingShardCount := 0
  142. bName := filepath.Base(baseFilename)
  143. for _, location := range vs.store.Locations {
  144. fileInfos, err := ioutil.ReadDir(location.Directory)
  145. if err != nil {
  146. continue
  147. }
  148. for _, fileInfo := range fileInfos {
  149. if fileInfo.Name() == bName+".ecx" || fileInfo.Name() == bName+".ecj" {
  150. hasEcxFile = true
  151. continue
  152. }
  153. if fileInfo.Name() == bName+".idx" {
  154. hasIdxFile = true
  155. continue
  156. }
  157. if strings.HasPrefix(fileInfo.Name(), bName+".ec") {
  158. existingShardCount++
  159. }
  160. }
  161. }
  162. if hasEcxFile && existingShardCount == 0 {
  163. if err := os.Remove(baseFilename + ".ecx"); err != nil {
  164. return nil, err
  165. }
  166. os.Remove(baseFilename + ".ecj")
  167. }
  168. if !hasIdxFile {
  169. // .vif is used for ec volumes and normal volumes
  170. os.Remove(baseFilename + ".vif")
  171. }
  172. return &volume_server_pb.VolumeEcShardsDeleteResponse{}, nil
  173. }
  174. func (vs *VolumeServer) VolumeEcShardsMount(ctx context.Context, req *volume_server_pb.VolumeEcShardsMountRequest) (*volume_server_pb.VolumeEcShardsMountResponse, error) {
  175. for _, shardId := range req.ShardIds {
  176. err := vs.store.MountEcShards(req.Collection, needle.VolumeId(req.VolumeId), erasure_coding.ShardId(shardId))
  177. if err != nil {
  178. glog.Errorf("ec shard mount %v: %v", req, err)
  179. } else {
  180. glog.V(2).Infof("ec shard mount %v", req)
  181. }
  182. if err != nil {
  183. return nil, fmt.Errorf("mount %d.%d: %v", req.VolumeId, shardId, err)
  184. }
  185. }
  186. return &volume_server_pb.VolumeEcShardsMountResponse{}, nil
  187. }
  188. func (vs *VolumeServer) VolumeEcShardsUnmount(ctx context.Context, req *volume_server_pb.VolumeEcShardsUnmountRequest) (*volume_server_pb.VolumeEcShardsUnmountResponse, error) {
  189. for _, shardId := range req.ShardIds {
  190. err := vs.store.UnmountEcShards(needle.VolumeId(req.VolumeId), erasure_coding.ShardId(shardId))
  191. if err != nil {
  192. glog.Errorf("ec shard unmount %v: %v", req, err)
  193. } else {
  194. glog.V(2).Infof("ec shard unmount %v", req)
  195. }
  196. if err != nil {
  197. return nil, fmt.Errorf("unmount %d.%d: %v", req.VolumeId, shardId, err)
  198. }
  199. }
  200. return &volume_server_pb.VolumeEcShardsUnmountResponse{}, nil
  201. }
  202. func (vs *VolumeServer) VolumeEcShardRead(req *volume_server_pb.VolumeEcShardReadRequest, stream volume_server_pb.VolumeServer_VolumeEcShardReadServer) error {
  203. ecVolume, found := vs.store.FindEcVolume(needle.VolumeId(req.VolumeId))
  204. if !found {
  205. return fmt.Errorf("VolumeEcShardRead not found ec volume id %d", req.VolumeId)
  206. }
  207. ecShard, found := ecVolume.FindEcVolumeShard(erasure_coding.ShardId(req.ShardId))
  208. if !found {
  209. return fmt.Errorf("not found ec shard %d.%d", req.VolumeId, req.ShardId)
  210. }
  211. if req.FileKey != 0 {
  212. _, size, _ := ecVolume.FindNeedleFromEcx(types.Uint64ToNeedleId(req.FileKey))
  213. if size == types.TombstoneFileSize {
  214. return stream.Send(&volume_server_pb.VolumeEcShardReadResponse{
  215. IsDeleted: true,
  216. })
  217. }
  218. }
  219. bufSize := req.Size
  220. if bufSize > BufferSizeLimit {
  221. bufSize = BufferSizeLimit
  222. }
  223. buffer := make([]byte, bufSize)
  224. startOffset, bytesToRead := req.Offset, req.Size
  225. for bytesToRead > 0 {
  226. // min of bytesToRead and bufSize
  227. bufferSize := bufSize
  228. if bufferSize > bytesToRead {
  229. bufferSize = bytesToRead
  230. }
  231. bytesread, err := ecShard.ReadAt(buffer[0:bufferSize], startOffset)
  232. // println("read", ecShard.FileName(), "startOffset", startOffset, bytesread, "bytes, with target", bufferSize)
  233. if bytesread > 0 {
  234. if int64(bytesread) > bytesToRead {
  235. bytesread = int(bytesToRead)
  236. }
  237. err = stream.Send(&volume_server_pb.VolumeEcShardReadResponse{
  238. Data: buffer[:bytesread],
  239. })
  240. if err != nil {
  241. // println("sending", bytesread, "bytes err", err.Error())
  242. return err
  243. }
  244. startOffset += int64(bytesread)
  245. bytesToRead -= int64(bytesread)
  246. }
  247. if err != nil {
  248. if err != io.EOF {
  249. return err
  250. }
  251. return nil
  252. }
  253. }
  254. return nil
  255. }
  256. func (vs *VolumeServer) VolumeEcBlobDelete(ctx context.Context, req *volume_server_pb.VolumeEcBlobDeleteRequest) (*volume_server_pb.VolumeEcBlobDeleteResponse, error) {
  257. resp := &volume_server_pb.VolumeEcBlobDeleteResponse{}
  258. for _, location := range vs.store.Locations {
  259. if localEcVolume, found := location.FindEcVolume(needle.VolumeId(req.VolumeId)); found {
  260. _, size, _, err := localEcVolume.LocateEcShardNeedle(types.NeedleId(req.FileKey), needle.Version(req.Version))
  261. if err != nil {
  262. return nil, fmt.Errorf("locate in local ec volume: %v", err)
  263. }
  264. if size == types.TombstoneFileSize {
  265. return resp, nil
  266. }
  267. err = localEcVolume.DeleteNeedleFromEcx(types.NeedleId(req.FileKey))
  268. if err != nil {
  269. return nil, err
  270. }
  271. break
  272. }
  273. }
  274. return resp, nil
  275. }
  276. // VolumeEcShardsToVolume generates the .idx, .dat files from .ecx, .ecj and .ec01 ~ .ec14 files
  277. func (vs *VolumeServer) VolumeEcShardsToVolume(ctx context.Context, req *volume_server_pb.VolumeEcShardsToVolumeRequest) (*volume_server_pb.VolumeEcShardsToVolumeResponse, error) {
  278. v, found := vs.store.FindEcVolume(needle.VolumeId(req.VolumeId))
  279. if !found {
  280. return nil, fmt.Errorf("ec volume %d not found", req.VolumeId)
  281. }
  282. baseFileName := v.FileName()
  283. if v.Collection != req.Collection {
  284. return nil, fmt.Errorf("existing collection:%v unexpected input: %v", v.Collection, req.Collection)
  285. }
  286. // calculate .dat file size
  287. datFileSize, err := erasure_coding.FindDatFileSize(baseFileName)
  288. if err != nil {
  289. return nil, fmt.Errorf("FindDatFileSize %s: %v", baseFileName, err)
  290. }
  291. // write .dat file from .ec00 ~ .ec09 files
  292. if err := erasure_coding.WriteDatFile(baseFileName, datFileSize); err != nil {
  293. return nil, fmt.Errorf("WriteEcFiles %s: %v", baseFileName, err)
  294. }
  295. // write .idx file from .ecx and .ecj files
  296. if err := erasure_coding.WriteIdxFileFromEcIndex(baseFileName); err != nil {
  297. return nil, fmt.Errorf("WriteIdxFileFromEcIndex %s: %v", baseFileName, err)
  298. }
  299. return &volume_server_pb.VolumeEcShardsToVolumeResponse{}, nil
  300. }