You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

414 lines
13 KiB

4 years ago
5 years ago
6 years ago
6 years ago
4 years ago
4 years ago
  1. package weed_server
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/chrislusf/seaweedfs/weed/pb"
  6. "github.com/chrislusf/seaweedfs/weed/storage/volume_info"
  7. "io"
  8. "io/ioutil"
  9. "math"
  10. "os"
  11. "path"
  12. "strings"
  13. "github.com/chrislusf/seaweedfs/weed/glog"
  14. "github.com/chrislusf/seaweedfs/weed/operation"
  15. "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
  16. "github.com/chrislusf/seaweedfs/weed/storage"
  17. "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding"
  18. "github.com/chrislusf/seaweedfs/weed/storage/needle"
  19. "github.com/chrislusf/seaweedfs/weed/storage/types"
  20. "github.com/chrislusf/seaweedfs/weed/util"
  21. )
  22. /*
  23. Steps to apply erasure coding to .dat .idx files
  24. 0. ensure the volume is readonly
  25. 1. client call VolumeEcShardsGenerate to generate the .ecx and .ec00 ~ .ec13 files
  26. 2. client ask master for possible servers to hold the ec files
  27. 3. client call VolumeEcShardsCopy on above target servers to copy ec files from the source server
  28. 4. target servers report the new ec files to the master
  29. 5. master stores vid -> [14]*DataNode
  30. 6. client checks master. If all 14 slices are ready, delete the original .idx, .idx files
  31. */
  32. // VolumeEcShardsGenerate generates the .ecx and .ec00 ~ .ec13 files
  33. func (vs *VolumeServer) VolumeEcShardsGenerate(ctx context.Context, req *volume_server_pb.VolumeEcShardsGenerateRequest) (*volume_server_pb.VolumeEcShardsGenerateResponse, error) {
  34. glog.V(0).Infof("VolumeEcShardsGenerate: %v", req)
  35. v := vs.store.GetVolume(needle.VolumeId(req.VolumeId))
  36. if v == nil {
  37. return nil, fmt.Errorf("volume %d not found", req.VolumeId)
  38. }
  39. baseFileName := v.DataFileName()
  40. if v.Collection != req.Collection {
  41. return nil, fmt.Errorf("existing collection:%v unexpected input: %v", v.Collection, req.Collection)
  42. }
  43. shouldCleanup := true
  44. defer func() {
  45. if !shouldCleanup {
  46. return
  47. }
  48. for i := 0; i < erasure_coding.TotalShardsCount; i++ {
  49. os.Remove(fmt.Sprintf("%s.ec%2d", baseFileName, i))
  50. }
  51. os.Remove(v.IndexFileName() + ".ecx")
  52. }()
  53. // write .ec00 ~ .ec13 files
  54. if err := erasure_coding.WriteEcFiles(baseFileName); err != nil {
  55. return nil, fmt.Errorf("WriteEcFiles %s: %v", baseFileName, err)
  56. }
  57. // write .ecx file
  58. if err := erasure_coding.WriteSortedFileFromIdx(v.IndexFileName(), ".ecx"); err != nil {
  59. return nil, fmt.Errorf("WriteSortedFileFromIdx %s: %v", v.IndexFileName(), err)
  60. }
  61. // write .vif files
  62. if err := volume_info.SaveVolumeInfo(baseFileName+".vif", &volume_server_pb.VolumeInfo{Version: uint32(v.Version())}); err != nil {
  63. return nil, fmt.Errorf("WriteEcFiles %s: %v", baseFileName, err)
  64. }
  65. shouldCleanup = false
  66. return &volume_server_pb.VolumeEcShardsGenerateResponse{}, nil
  67. }
  68. // VolumeEcShardsRebuild generates the any of the missing .ec00 ~ .ec13 files
  69. func (vs *VolumeServer) VolumeEcShardsRebuild(ctx context.Context, req *volume_server_pb.VolumeEcShardsRebuildRequest) (*volume_server_pb.VolumeEcShardsRebuildResponse, error) {
  70. glog.V(0).Infof("VolumeEcShardsRebuild: %v", req)
  71. baseFileName := erasure_coding.EcShardBaseFileName(req.Collection, int(req.VolumeId))
  72. var rebuiltShardIds []uint32
  73. for _, location := range vs.store.Locations {
  74. if util.FileExists(path.Join(location.IdxDirectory, baseFileName+".ecx")) {
  75. // write .ec00 ~ .ec13 files
  76. dataBaseFileName := path.Join(location.Directory, baseFileName)
  77. if generatedShardIds, err := erasure_coding.RebuildEcFiles(dataBaseFileName); err != nil {
  78. return nil, fmt.Errorf("RebuildEcFiles %s: %v", dataBaseFileName, err)
  79. } else {
  80. rebuiltShardIds = generatedShardIds
  81. }
  82. indexBaseFileName := path.Join(location.IdxDirectory, baseFileName)
  83. if err := erasure_coding.RebuildEcxFile(indexBaseFileName); err != nil {
  84. return nil, fmt.Errorf("RebuildEcxFile %s: %v", dataBaseFileName, err)
  85. }
  86. break
  87. }
  88. }
  89. return &volume_server_pb.VolumeEcShardsRebuildResponse{
  90. RebuiltShardIds: rebuiltShardIds,
  91. }, nil
  92. }
  93. // VolumeEcShardsCopy copy the .ecx and some ec data slices
  94. func (vs *VolumeServer) VolumeEcShardsCopy(ctx context.Context, req *volume_server_pb.VolumeEcShardsCopyRequest) (*volume_server_pb.VolumeEcShardsCopyResponse, error) {
  95. glog.V(0).Infof("VolumeEcShardsCopy: %v", req)
  96. location := vs.store.FindFreeLocation(types.HardDriveType)
  97. if location == nil {
  98. return nil, fmt.Errorf("no space left")
  99. }
  100. dataBaseFileName := storage.VolumeFileName(location.Directory, req.Collection, int(req.VolumeId))
  101. indexBaseFileName := storage.VolumeFileName(location.IdxDirectory, req.Collection, int(req.VolumeId))
  102. err := operation.WithVolumeServerClient(pb.ServerAddress(req.SourceDataNode), vs.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
  103. // copy ec data slices
  104. for _, shardId := range req.ShardIds {
  105. if _, err := vs.doCopyFile(client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, dataBaseFileName, erasure_coding.ToExt(int(shardId)), false, false); err != nil {
  106. return err
  107. }
  108. }
  109. if req.CopyEcxFile {
  110. // copy ecx file
  111. if _, err := vs.doCopyFile(client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, indexBaseFileName, ".ecx", false, false); err != nil {
  112. return err
  113. }
  114. return nil
  115. }
  116. if req.CopyEcjFile {
  117. // copy ecj file
  118. if _, err := vs.doCopyFile(client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, indexBaseFileName, ".ecj", true, true); err != nil {
  119. return err
  120. }
  121. }
  122. if req.CopyVifFile {
  123. // copy vif file
  124. if _, err := vs.doCopyFile(client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, dataBaseFileName, ".vif", false, true); err != nil {
  125. return err
  126. }
  127. }
  128. return nil
  129. })
  130. if err != nil {
  131. return nil, fmt.Errorf("VolumeEcShardsCopy volume %d: %v", req.VolumeId, err)
  132. }
  133. return &volume_server_pb.VolumeEcShardsCopyResponse{}, nil
  134. }
  135. // VolumeEcShardsDelete local delete the .ecx and some ec data slices if not needed
  136. // the shard should not be mounted before calling this.
  137. func (vs *VolumeServer) VolumeEcShardsDelete(ctx context.Context, req *volume_server_pb.VolumeEcShardsDeleteRequest) (*volume_server_pb.VolumeEcShardsDeleteResponse, error) {
  138. bName := erasure_coding.EcShardBaseFileName(req.Collection, int(req.VolumeId))
  139. glog.V(0).Infof("ec volume %d shard delete %v", req.VolumeId, req.ShardIds)
  140. found := false
  141. var indexBaseFilename, dataBaseFilename string
  142. for _, location := range vs.store.Locations {
  143. if util.FileExists(path.Join(location.IdxDirectory, bName+".ecx")) {
  144. found = true
  145. indexBaseFilename = path.Join(location.IdxDirectory, bName)
  146. dataBaseFilename = path.Join(location.Directory, bName)
  147. for _, shardId := range req.ShardIds {
  148. os.Remove(dataBaseFilename + erasure_coding.ToExt(int(shardId)))
  149. }
  150. break
  151. }
  152. }
  153. if !found {
  154. return nil, nil
  155. }
  156. // check whether to delete the .ecx and .ecj file also
  157. hasEcxFile := false
  158. hasIdxFile := false
  159. existingShardCount := 0
  160. for _, location := range vs.store.Locations {
  161. fileInfos, err := ioutil.ReadDir(location.Directory)
  162. if err != nil {
  163. continue
  164. }
  165. if location.IdxDirectory != location.Directory {
  166. idxFileInfos, err := ioutil.ReadDir(location.IdxDirectory)
  167. if err != nil {
  168. continue
  169. }
  170. fileInfos = append(fileInfos, idxFileInfos...)
  171. }
  172. for _, fileInfo := range fileInfos {
  173. if fileInfo.Name() == bName+".ecx" || fileInfo.Name() == bName+".ecj" {
  174. hasEcxFile = true
  175. continue
  176. }
  177. if fileInfo.Name() == bName+".idx" {
  178. hasIdxFile = true
  179. continue
  180. }
  181. if strings.HasPrefix(fileInfo.Name(), bName+".ec") {
  182. existingShardCount++
  183. }
  184. }
  185. }
  186. if hasEcxFile && existingShardCount == 0 {
  187. if err := os.Remove(indexBaseFilename + ".ecx"); err != nil {
  188. return nil, err
  189. }
  190. os.Remove(indexBaseFilename + ".ecj")
  191. }
  192. if !hasIdxFile {
  193. // .vif is used for ec volumes and normal volumes
  194. os.Remove(dataBaseFilename + ".vif")
  195. }
  196. return &volume_server_pb.VolumeEcShardsDeleteResponse{}, nil
  197. }
  198. func (vs *VolumeServer) VolumeEcShardsMount(ctx context.Context, req *volume_server_pb.VolumeEcShardsMountRequest) (*volume_server_pb.VolumeEcShardsMountResponse, error) {
  199. glog.V(0).Infof("VolumeEcShardsMount: %v", req)
  200. for _, shardId := range req.ShardIds {
  201. err := vs.store.MountEcShards(req.Collection, needle.VolumeId(req.VolumeId), erasure_coding.ShardId(shardId))
  202. if err != nil {
  203. glog.Errorf("ec shard mount %v: %v", req, err)
  204. } else {
  205. glog.V(2).Infof("ec shard mount %v", req)
  206. }
  207. if err != nil {
  208. return nil, fmt.Errorf("mount %d.%d: %v", req.VolumeId, shardId, err)
  209. }
  210. }
  211. return &volume_server_pb.VolumeEcShardsMountResponse{}, nil
  212. }
  213. func (vs *VolumeServer) VolumeEcShardsUnmount(ctx context.Context, req *volume_server_pb.VolumeEcShardsUnmountRequest) (*volume_server_pb.VolumeEcShardsUnmountResponse, error) {
  214. glog.V(0).Infof("VolumeEcShardsUnmount: %v", req)
  215. for _, shardId := range req.ShardIds {
  216. err := vs.store.UnmountEcShards(needle.VolumeId(req.VolumeId), erasure_coding.ShardId(shardId))
  217. if err != nil {
  218. glog.Errorf("ec shard unmount %v: %v", req, err)
  219. } else {
  220. glog.V(2).Infof("ec shard unmount %v", req)
  221. }
  222. if err != nil {
  223. return nil, fmt.Errorf("unmount %d.%d: %v", req.VolumeId, shardId, err)
  224. }
  225. }
  226. return &volume_server_pb.VolumeEcShardsUnmountResponse{}, nil
  227. }
  228. func (vs *VolumeServer) VolumeEcShardRead(req *volume_server_pb.VolumeEcShardReadRequest, stream volume_server_pb.VolumeServer_VolumeEcShardReadServer) error {
  229. ecVolume, found := vs.store.FindEcVolume(needle.VolumeId(req.VolumeId))
  230. if !found {
  231. return fmt.Errorf("VolumeEcShardRead not found ec volume id %d", req.VolumeId)
  232. }
  233. ecShard, found := ecVolume.FindEcVolumeShard(erasure_coding.ShardId(req.ShardId))
  234. if !found {
  235. return fmt.Errorf("not found ec shard %d.%d", req.VolumeId, req.ShardId)
  236. }
  237. if req.FileKey != 0 {
  238. _, size, _ := ecVolume.FindNeedleFromEcx(types.Uint64ToNeedleId(req.FileKey))
  239. if size.IsDeleted() {
  240. return stream.Send(&volume_server_pb.VolumeEcShardReadResponse{
  241. IsDeleted: true,
  242. })
  243. }
  244. }
  245. bufSize := req.Size
  246. if bufSize > BufferSizeLimit {
  247. bufSize = BufferSizeLimit
  248. }
  249. buffer := make([]byte, bufSize)
  250. startOffset, bytesToRead := req.Offset, req.Size
  251. for bytesToRead > 0 {
  252. // min of bytesToRead and bufSize
  253. bufferSize := bufSize
  254. if bufferSize > bytesToRead {
  255. bufferSize = bytesToRead
  256. }
  257. bytesread, err := ecShard.ReadAt(buffer[0:bufferSize], startOffset)
  258. // println("read", ecShard.FileName(), "startOffset", startOffset, bytesread, "bytes, with target", bufferSize)
  259. if bytesread > 0 {
  260. if int64(bytesread) > bytesToRead {
  261. bytesread = int(bytesToRead)
  262. }
  263. err = stream.Send(&volume_server_pb.VolumeEcShardReadResponse{
  264. Data: buffer[:bytesread],
  265. })
  266. if err != nil {
  267. // println("sending", bytesread, "bytes err", err.Error())
  268. return err
  269. }
  270. startOffset += int64(bytesread)
  271. bytesToRead -= int64(bytesread)
  272. }
  273. if err != nil {
  274. if err != io.EOF {
  275. return err
  276. }
  277. return nil
  278. }
  279. }
  280. return nil
  281. }
  282. func (vs *VolumeServer) VolumeEcBlobDelete(ctx context.Context, req *volume_server_pb.VolumeEcBlobDeleteRequest) (*volume_server_pb.VolumeEcBlobDeleteResponse, error) {
  283. glog.V(0).Infof("VolumeEcBlobDelete: %v", req)
  284. resp := &volume_server_pb.VolumeEcBlobDeleteResponse{}
  285. for _, location := range vs.store.Locations {
  286. if localEcVolume, found := location.FindEcVolume(needle.VolumeId(req.VolumeId)); found {
  287. _, size, _, err := localEcVolume.LocateEcShardNeedle(types.NeedleId(req.FileKey), needle.Version(req.Version))
  288. if err != nil {
  289. return nil, fmt.Errorf("locate in local ec volume: %v", err)
  290. }
  291. if size.IsDeleted() {
  292. return resp, nil
  293. }
  294. err = localEcVolume.DeleteNeedleFromEcx(types.NeedleId(req.FileKey))
  295. if err != nil {
  296. return nil, err
  297. }
  298. break
  299. }
  300. }
  301. return resp, nil
  302. }
  303. // VolumeEcShardsToVolume generates the .idx, .dat files from .ecx, .ecj and .ec01 ~ .ec14 files
  304. func (vs *VolumeServer) VolumeEcShardsToVolume(ctx context.Context, req *volume_server_pb.VolumeEcShardsToVolumeRequest) (*volume_server_pb.VolumeEcShardsToVolumeResponse, error) {
  305. glog.V(0).Infof("VolumeEcShardsToVolume: %v", req)
  306. v, found := vs.store.FindEcVolume(needle.VolumeId(req.VolumeId))
  307. if !found {
  308. return nil, fmt.Errorf("ec volume %d not found", req.VolumeId)
  309. }
  310. if v.Collection != req.Collection {
  311. return nil, fmt.Errorf("existing collection:%v unexpected input: %v", v.Collection, req.Collection)
  312. }
  313. dataBaseFileName, indexBaseFileName := v.DataBaseFileName(), v.IndexBaseFileName()
  314. // calculate .dat file size
  315. datFileSize, err := erasure_coding.FindDatFileSize(dataBaseFileName, indexBaseFileName)
  316. if err != nil {
  317. return nil, fmt.Errorf("FindDatFileSize %s: %v", dataBaseFileName, err)
  318. }
  319. // write .dat file from .ec00 ~ .ec09 files
  320. if err := erasure_coding.WriteDatFile(dataBaseFileName, datFileSize); err != nil {
  321. return nil, fmt.Errorf("WriteEcFiles %s: %v", dataBaseFileName, err)
  322. }
  323. // write .idx file from .ecx and .ecj files
  324. if err := erasure_coding.WriteIdxFileFromEcIndex(indexBaseFileName); err != nil {
  325. return nil, fmt.Errorf("WriteIdxFileFromEcIndex %s: %v", v.IndexBaseFileName(), err)
  326. }
  327. return &volume_server_pb.VolumeEcShardsToVolumeResponse{}, nil
  328. }