391 lines
12 KiB

5 years ago
6 years ago
6 years ago
  1. package weed_server
  2. import (
  3. "context"
  4. "fmt"
  5. "io"
  6. "io/ioutil"
  7. "math"
  8. "os"
  9. "path"
  10. "path/filepath"
  11. "strings"
  12. "github.com/chrislusf/seaweedfs/weed/glog"
  13. "github.com/chrislusf/seaweedfs/weed/operation"
  14. "github.com/chrislusf/seaweedfs/weed/pb"
  15. "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
  16. "github.com/chrislusf/seaweedfs/weed/storage"
  17. "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding"
  18. "github.com/chrislusf/seaweedfs/weed/storage/needle"
  19. "github.com/chrislusf/seaweedfs/weed/storage/types"
  20. "github.com/chrislusf/seaweedfs/weed/util"
  21. )
  22. /*
  23. Steps to apply erasure coding to .dat .idx files
  24. 0. ensure the volume is readonly
  25. 1. client call VolumeEcShardsGenerate to generate the .ecx and .ec00 ~ .ec13 files
  26. 2. client ask master for possible servers to hold the ec files, at least 4 servers
  27. 3. client call VolumeEcShardsCopy on above target servers to copy ec files from the source server
  28. 4. target servers report the new ec files to the master
  29. 5. master stores vid -> [14]*DataNode
  30. 6. client checks master. If all 14 slices are ready, delete the original .idx, .idx files
  31. */
  32. // VolumeEcShardsGenerate generates the .ecx and .ec00 ~ .ec13 files
  33. func (vs *VolumeServer) VolumeEcShardsGenerate(ctx context.Context, req *volume_server_pb.VolumeEcShardsGenerateRequest) (*volume_server_pb.VolumeEcShardsGenerateResponse, error) {
  34. glog.V(0).Infof("VolumeEcShardsGenerate: %v", req)
  35. v := vs.store.GetVolume(needle.VolumeId(req.VolumeId))
  36. if v == nil {
  37. return nil, fmt.Errorf("volume %d not found", req.VolumeId)
  38. }
  39. baseFileName := v.FileName()
  40. if v.Collection != req.Collection {
  41. return nil, fmt.Errorf("existing collection:%v unexpected input: %v", v.Collection, req.Collection)
  42. }
  43. // write .ec00 ~ .ec13 files
  44. if err := erasure_coding.WriteEcFiles(baseFileName); err != nil {
  45. return nil, fmt.Errorf("WriteEcFiles %s: %v", baseFileName, err)
  46. }
  47. // write .ecx file
  48. if err := erasure_coding.WriteSortedFileFromIdx(baseFileName, ".ecx"); err != nil {
  49. return nil, fmt.Errorf("WriteSortedFileFromIdx %s: %v", baseFileName, err)
  50. }
  51. // write .vif files
  52. if err := pb.SaveVolumeInfo(baseFileName+".vif", &volume_server_pb.VolumeInfo{Version: uint32(v.Version())}); err != nil {
  53. return nil, fmt.Errorf("WriteEcFiles %s: %v", baseFileName, err)
  54. }
  55. return &volume_server_pb.VolumeEcShardsGenerateResponse{}, nil
  56. }
  57. // VolumeEcShardsRebuild generates the any of the missing .ec00 ~ .ec13 files
  58. func (vs *VolumeServer) VolumeEcShardsRebuild(ctx context.Context, req *volume_server_pb.VolumeEcShardsRebuildRequest) (*volume_server_pb.VolumeEcShardsRebuildResponse, error) {
  59. glog.V(0).Infof("VolumeEcShardsRebuild: %v", req)
  60. baseFileName := erasure_coding.EcShardBaseFileName(req.Collection, int(req.VolumeId))
  61. var rebuiltShardIds []uint32
  62. for _, location := range vs.store.Locations {
  63. if util.FileExists(path.Join(location.Directory, baseFileName+".ecx")) {
  64. // write .ec00 ~ .ec13 files
  65. baseFileName = path.Join(location.Directory, baseFileName)
  66. if generatedShardIds, err := erasure_coding.RebuildEcFiles(baseFileName); err != nil {
  67. return nil, fmt.Errorf("RebuildEcFiles %s: %v", baseFileName, err)
  68. } else {
  69. rebuiltShardIds = generatedShardIds
  70. }
  71. if err := erasure_coding.RebuildEcxFile(baseFileName); err != nil {
  72. return nil, fmt.Errorf("RebuildEcxFile %s: %v", baseFileName, err)
  73. }
  74. break
  75. }
  76. }
  77. return &volume_server_pb.VolumeEcShardsRebuildResponse{
  78. RebuiltShardIds: rebuiltShardIds,
  79. }, nil
  80. }
  81. // VolumeEcShardsCopy copy the .ecx and some ec data slices
  82. func (vs *VolumeServer) VolumeEcShardsCopy(ctx context.Context, req *volume_server_pb.VolumeEcShardsCopyRequest) (*volume_server_pb.VolumeEcShardsCopyResponse, error) {
  83. glog.V(0).Infof("VolumeEcShardsCopy: %v", req)
  84. location := vs.store.FindFreeLocation()
  85. if location == nil {
  86. return nil, fmt.Errorf("no space left")
  87. }
  88. baseFileName := storage.VolumeFileName(location.Directory, req.Collection, int(req.VolumeId))
  89. err := operation.WithVolumeServerClient(req.SourceDataNode, vs.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
  90. // copy ec data slices
  91. for _, shardId := range req.ShardIds {
  92. if err := vs.doCopyFile(client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, baseFileName, erasure_coding.ToExt(int(shardId)), false, false); err != nil {
  93. return err
  94. }
  95. }
  96. if req.CopyEcxFile {
  97. // copy ecx file
  98. if err := vs.doCopyFile(client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, baseFileName, ".ecx", false, false); err != nil {
  99. return err
  100. }
  101. return nil
  102. }
  103. if req.CopyEcjFile {
  104. // copy ecj file
  105. if err := vs.doCopyFile(client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, baseFileName, ".ecj", true, true); err != nil {
  106. return err
  107. }
  108. }
  109. if req.CopyVifFile {
  110. // copy vif file
  111. if err := vs.doCopyFile(client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, baseFileName, ".vif", false, true); err != nil {
  112. return err
  113. }
  114. }
  115. return nil
  116. })
  117. if err != nil {
  118. return nil, fmt.Errorf("VolumeEcShardsCopy volume %d: %v", req.VolumeId, err)
  119. }
  120. return &volume_server_pb.VolumeEcShardsCopyResponse{}, nil
  121. }
  122. // VolumeEcShardsDelete local delete the .ecx and some ec data slices if not needed
  123. // the shard should not be mounted before calling this.
  124. func (vs *VolumeServer) VolumeEcShardsDelete(ctx context.Context, req *volume_server_pb.VolumeEcShardsDeleteRequest) (*volume_server_pb.VolumeEcShardsDeleteResponse, error) {
  125. baseFilename := erasure_coding.EcShardBaseFileName(req.Collection, int(req.VolumeId))
  126. glog.V(0).Infof("ec volume %d shard delete %v", req.VolumeId, req.ShardIds)
  127. found := false
  128. for _, location := range vs.store.Locations {
  129. if util.FileExists(path.Join(location.Directory, baseFilename+".ecx")) {
  130. found = true
  131. baseFilename = path.Join(location.Directory, baseFilename)
  132. for _, shardId := range req.ShardIds {
  133. os.Remove(baseFilename + erasure_coding.ToExt(int(shardId)))
  134. }
  135. break
  136. }
  137. }
  138. if !found {
  139. return nil, nil
  140. }
  141. // check whether to delete the .ecx and .ecj file also
  142. hasEcxFile := false
  143. hasIdxFile := false
  144. existingShardCount := 0
  145. bName := filepath.Base(baseFilename)
  146. for _, location := range vs.store.Locations {
  147. fileInfos, err := ioutil.ReadDir(location.Directory)
  148. if err != nil {
  149. continue
  150. }
  151. for _, fileInfo := range fileInfos {
  152. if fileInfo.Name() == bName+".ecx" || fileInfo.Name() == bName+".ecj" {
  153. hasEcxFile = true
  154. continue
  155. }
  156. if fileInfo.Name() == bName+".idx" {
  157. hasIdxFile = true
  158. continue
  159. }
  160. if strings.HasPrefix(fileInfo.Name(), bName+".ec") {
  161. existingShardCount++
  162. }
  163. }
  164. }
  165. if hasEcxFile && existingShardCount == 0 {
  166. if err := os.Remove(baseFilename + ".ecx"); err != nil {
  167. return nil, err
  168. }
  169. os.Remove(baseFilename + ".ecj")
  170. }
  171. if !hasIdxFile {
  172. // .vif is used for ec volumes and normal volumes
  173. os.Remove(baseFilename + ".vif")
  174. }
  175. return &volume_server_pb.VolumeEcShardsDeleteResponse{}, nil
  176. }
  177. func (vs *VolumeServer) VolumeEcShardsMount(ctx context.Context, req *volume_server_pb.VolumeEcShardsMountRequest) (*volume_server_pb.VolumeEcShardsMountResponse, error) {
  178. glog.V(0).Infof("VolumeEcShardsMount: %v", req)
  179. for _, shardId := range req.ShardIds {
  180. err := vs.store.MountEcShards(req.Collection, needle.VolumeId(req.VolumeId), erasure_coding.ShardId(shardId))
  181. if err != nil {
  182. glog.Errorf("ec shard mount %v: %v", req, err)
  183. } else {
  184. glog.V(2).Infof("ec shard mount %v", req)
  185. }
  186. if err != nil {
  187. return nil, fmt.Errorf("mount %d.%d: %v", req.VolumeId, shardId, err)
  188. }
  189. }
  190. return &volume_server_pb.VolumeEcShardsMountResponse{}, nil
  191. }
  192. func (vs *VolumeServer) VolumeEcShardsUnmount(ctx context.Context, req *volume_server_pb.VolumeEcShardsUnmountRequest) (*volume_server_pb.VolumeEcShardsUnmountResponse, error) {
  193. glog.V(0).Infof("VolumeEcShardsUnmount: %v", req)
  194. for _, shardId := range req.ShardIds {
  195. err := vs.store.UnmountEcShards(needle.VolumeId(req.VolumeId), erasure_coding.ShardId(shardId))
  196. if err != nil {
  197. glog.Errorf("ec shard unmount %v: %v", req, err)
  198. } else {
  199. glog.V(2).Infof("ec shard unmount %v", req)
  200. }
  201. if err != nil {
  202. return nil, fmt.Errorf("unmount %d.%d: %v", req.VolumeId, shardId, err)
  203. }
  204. }
  205. return &volume_server_pb.VolumeEcShardsUnmountResponse{}, nil
  206. }
  207. func (vs *VolumeServer) VolumeEcShardRead(req *volume_server_pb.VolumeEcShardReadRequest, stream volume_server_pb.VolumeServer_VolumeEcShardReadServer) error {
  208. ecVolume, found := vs.store.FindEcVolume(needle.VolumeId(req.VolumeId))
  209. if !found {
  210. return fmt.Errorf("VolumeEcShardRead not found ec volume id %d", req.VolumeId)
  211. }
  212. ecShard, found := ecVolume.FindEcVolumeShard(erasure_coding.ShardId(req.ShardId))
  213. if !found {
  214. return fmt.Errorf("not found ec shard %d.%d", req.VolumeId, req.ShardId)
  215. }
  216. if req.FileKey != 0 {
  217. _, size, _ := ecVolume.FindNeedleFromEcx(types.Uint64ToNeedleId(req.FileKey))
  218. if size == types.TombstoneFileSize {
  219. return stream.Send(&volume_server_pb.VolumeEcShardReadResponse{
  220. IsDeleted: true,
  221. })
  222. }
  223. }
  224. bufSize := req.Size
  225. if bufSize > BufferSizeLimit {
  226. bufSize = BufferSizeLimit
  227. }
  228. buffer := make([]byte, bufSize)
  229. startOffset, bytesToRead := req.Offset, req.Size
  230. for bytesToRead > 0 {
  231. // min of bytesToRead and bufSize
  232. bufferSize := bufSize
  233. if bufferSize > bytesToRead {
  234. bufferSize = bytesToRead
  235. }
  236. bytesread, err := ecShard.ReadAt(buffer[0:bufferSize], startOffset)
  237. // println("read", ecShard.FileName(), "startOffset", startOffset, bytesread, "bytes, with target", bufferSize)
  238. if bytesread > 0 {
  239. if int64(bytesread) > bytesToRead {
  240. bytesread = int(bytesToRead)
  241. }
  242. err = stream.Send(&volume_server_pb.VolumeEcShardReadResponse{
  243. Data: buffer[:bytesread],
  244. })
  245. if err != nil {
  246. // println("sending", bytesread, "bytes err", err.Error())
  247. return err
  248. }
  249. startOffset += int64(bytesread)
  250. bytesToRead -= int64(bytesread)
  251. }
  252. if err != nil {
  253. if err != io.EOF {
  254. return err
  255. }
  256. return nil
  257. }
  258. }
  259. return nil
  260. }
  261. func (vs *VolumeServer) VolumeEcBlobDelete(ctx context.Context, req *volume_server_pb.VolumeEcBlobDeleteRequest) (*volume_server_pb.VolumeEcBlobDeleteResponse, error) {
  262. glog.V(0).Infof("VolumeEcBlobDelete: %v", req)
  263. resp := &volume_server_pb.VolumeEcBlobDeleteResponse{}
  264. for _, location := range vs.store.Locations {
  265. if localEcVolume, found := location.FindEcVolume(needle.VolumeId(req.VolumeId)); found {
  266. _, size, _, err := localEcVolume.LocateEcShardNeedle(types.NeedleId(req.FileKey), needle.Version(req.Version))
  267. if err != nil {
  268. return nil, fmt.Errorf("locate in local ec volume: %v", err)
  269. }
  270. if size == types.TombstoneFileSize {
  271. return resp, nil
  272. }
  273. err = localEcVolume.DeleteNeedleFromEcx(types.NeedleId(req.FileKey))
  274. if err != nil {
  275. return nil, err
  276. }
  277. break
  278. }
  279. }
  280. return resp, nil
  281. }
  282. // VolumeEcShardsToVolume generates the .idx, .dat files from .ecx, .ecj and .ec01 ~ .ec14 files
  283. func (vs *VolumeServer) VolumeEcShardsToVolume(ctx context.Context, req *volume_server_pb.VolumeEcShardsToVolumeRequest) (*volume_server_pb.VolumeEcShardsToVolumeResponse, error) {
  284. glog.V(0).Infof("VolumeEcShardsToVolume: %v", req)
  285. v, found := vs.store.FindEcVolume(needle.VolumeId(req.VolumeId))
  286. if !found {
  287. return nil, fmt.Errorf("ec volume %d not found", req.VolumeId)
  288. }
  289. baseFileName := v.FileName()
  290. if v.Collection != req.Collection {
  291. return nil, fmt.Errorf("existing collection:%v unexpected input: %v", v.Collection, req.Collection)
  292. }
  293. // calculate .dat file size
  294. datFileSize, err := erasure_coding.FindDatFileSize(baseFileName)
  295. if err != nil {
  296. return nil, fmt.Errorf("FindDatFileSize %s: %v", baseFileName, err)
  297. }
  298. // write .dat file from .ec00 ~ .ec09 files
  299. if err := erasure_coding.WriteDatFile(baseFileName, datFileSize); err != nil {
  300. return nil, fmt.Errorf("WriteEcFiles %s: %v", baseFileName, err)
  301. }
  302. // write .idx file from .ecx and .ecj files
  303. if err := erasure_coding.WriteIdxFileFromEcIndex(baseFileName); err != nil {
  304. return nil, fmt.Errorf("WriteIdxFileFromEcIndex %s: %v", baseFileName, err)
  305. }
  306. return &volume_server_pb.VolumeEcShardsToVolumeResponse{}, nil
  307. }