438 lines
14 KiB

4 years ago
2 years ago
2 years ago
6 years ago
6 years ago
4 years ago
4 years ago
  1. package weed_server
  2. import (
  3. "context"
  4. "fmt"
  5. "io"
  6. "math"
  7. "os"
  8. "path"
  9. "strings"
  10. "github.com/seaweedfs/seaweedfs/weed/glog"
  11. "github.com/seaweedfs/seaweedfs/weed/operation"
  12. "github.com/seaweedfs/seaweedfs/weed/pb"
  13. "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
  14. "github.com/seaweedfs/seaweedfs/weed/storage"
  15. "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
  16. "github.com/seaweedfs/seaweedfs/weed/storage/needle"
  17. "github.com/seaweedfs/seaweedfs/weed/storage/types"
  18. "github.com/seaweedfs/seaweedfs/weed/storage/volume_info"
  19. "github.com/seaweedfs/seaweedfs/weed/util"
  20. )
  21. /*
  22. Steps to apply erasure coding to .dat .idx files
  23. 0. ensure the volume is readonly
  24. 1. client call VolumeEcShardsGenerate to generate the .ecx and .ec00 ~ .ec13 files
  25. 2. client ask master for possible servers to hold the ec files
  26. 3. client call VolumeEcShardsCopy on above target servers to copy ec files from the source server
  27. 4. target servers report the new ec files to the master
  28. 5. master stores vid -> [14]*DataNode
  29. 6. client checks master. If all 14 slices are ready, delete the original .idx, .idx files
  30. */
  31. // VolumeEcShardsGenerate generates the .ecx and .ec00 ~ .ec13 files
  32. func (vs *VolumeServer) VolumeEcShardsGenerate(ctx context.Context, req *volume_server_pb.VolumeEcShardsGenerateRequest) (*volume_server_pb.VolumeEcShardsGenerateResponse, error) {
  33. glog.V(0).Infof("VolumeEcShardsGenerate: %v", req)
  34. v := vs.store.GetVolume(needle.VolumeId(req.VolumeId))
  35. if v == nil {
  36. return nil, fmt.Errorf("volume %d not found", req.VolumeId)
  37. }
  38. baseFileName := v.DataFileName()
  39. if v.Collection != req.Collection {
  40. return nil, fmt.Errorf("existing collection:%v unexpected input: %v", v.Collection, req.Collection)
  41. }
  42. shouldCleanup := true
  43. defer func() {
  44. if !shouldCleanup {
  45. return
  46. }
  47. for i := 0; i < erasure_coding.TotalShardsCount; i++ {
  48. os.Remove(fmt.Sprintf("%s.ec%2d", baseFileName, i))
  49. }
  50. os.Remove(v.IndexFileName() + ".ecx")
  51. }()
  52. // write .ec00 ~ .ec13 files
  53. if err := erasure_coding.WriteEcFiles(baseFileName); err != nil {
  54. return nil, fmt.Errorf("WriteEcFiles %s: %v", baseFileName, err)
  55. }
  56. // write .ecx file
  57. if err := erasure_coding.WriteSortedFileFromIdx(v.IndexFileName(), ".ecx"); err != nil {
  58. return nil, fmt.Errorf("WriteSortedFileFromIdx %s: %v", v.IndexFileName(), err)
  59. }
  60. // write .vif files
  61. if err := volume_info.SaveVolumeInfo(baseFileName+".vif", &volume_server_pb.VolumeInfo{Version: uint32(v.Version())}); err != nil {
  62. return nil, fmt.Errorf("WriteEcFiles %s: %v", baseFileName, err)
  63. }
  64. shouldCleanup = false
  65. return &volume_server_pb.VolumeEcShardsGenerateResponse{}, nil
  66. }
  67. // VolumeEcShardsRebuild generates the any of the missing .ec00 ~ .ec13 files
  68. func (vs *VolumeServer) VolumeEcShardsRebuild(ctx context.Context, req *volume_server_pb.VolumeEcShardsRebuildRequest) (*volume_server_pb.VolumeEcShardsRebuildResponse, error) {
  69. glog.V(0).Infof("VolumeEcShardsRebuild: %v", req)
  70. baseFileName := erasure_coding.EcShardBaseFileName(req.Collection, int(req.VolumeId))
  71. var rebuiltShardIds []uint32
  72. for _, location := range vs.store.Locations {
  73. _, _, existingShardCount, err := checkEcVolumeStatus(baseFileName, location)
  74. if err != nil {
  75. return nil, err
  76. }
  77. if existingShardCount == 0 {
  78. continue
  79. }
  80. if util.FileExists(path.Join(location.IdxDirectory, baseFileName+".ecx")) {
  81. // write .ec00 ~ .ec13 files
  82. dataBaseFileName := path.Join(location.Directory, baseFileName)
  83. if generatedShardIds, err := erasure_coding.RebuildEcFiles(dataBaseFileName); err != nil {
  84. return nil, fmt.Errorf("RebuildEcFiles %s: %v", dataBaseFileName, err)
  85. } else {
  86. rebuiltShardIds = generatedShardIds
  87. }
  88. indexBaseFileName := path.Join(location.IdxDirectory, baseFileName)
  89. if err := erasure_coding.RebuildEcxFile(indexBaseFileName); err != nil {
  90. return nil, fmt.Errorf("RebuildEcxFile %s: %v", dataBaseFileName, err)
  91. }
  92. break
  93. }
  94. }
  95. return &volume_server_pb.VolumeEcShardsRebuildResponse{
  96. RebuiltShardIds: rebuiltShardIds,
  97. }, nil
  98. }
  99. // VolumeEcShardsCopy copy the .ecx and some ec data slices
  100. func (vs *VolumeServer) VolumeEcShardsCopy(ctx context.Context, req *volume_server_pb.VolumeEcShardsCopyRequest) (*volume_server_pb.VolumeEcShardsCopyResponse, error) {
  101. glog.V(0).Infof("VolumeEcShardsCopy: %v", req)
  102. location := vs.store.FindFreeLocation(types.HardDriveType)
  103. if location == nil {
  104. return nil, fmt.Errorf("no space left")
  105. }
  106. dataBaseFileName := storage.VolumeFileName(location.Directory, req.Collection, int(req.VolumeId))
  107. indexBaseFileName := storage.VolumeFileName(location.IdxDirectory, req.Collection, int(req.VolumeId))
  108. err := operation.WithVolumeServerClient(true, pb.ServerAddress(req.SourceDataNode), vs.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
  109. // copy ec data slices
  110. for _, shardId := range req.ShardIds {
  111. if _, err := vs.doCopyFile(client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, dataBaseFileName, erasure_coding.ToExt(int(shardId)), false, false, nil); err != nil {
  112. return err
  113. }
  114. }
  115. if req.CopyEcxFile {
  116. // copy ecx file
  117. if _, err := vs.doCopyFile(client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, indexBaseFileName, ".ecx", false, false, nil); err != nil {
  118. return err
  119. }
  120. return nil
  121. }
  122. if req.CopyEcjFile {
  123. // copy ecj file
  124. if _, err := vs.doCopyFile(client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, indexBaseFileName, ".ecj", true, true, nil); err != nil {
  125. return err
  126. }
  127. }
  128. if req.CopyVifFile {
  129. // copy vif file
  130. if _, err := vs.doCopyFile(client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, dataBaseFileName, ".vif", false, true, nil); err != nil {
  131. return err
  132. }
  133. }
  134. return nil
  135. })
  136. if err != nil {
  137. return nil, fmt.Errorf("VolumeEcShardsCopy volume %d: %v", req.VolumeId, err)
  138. }
  139. return &volume_server_pb.VolumeEcShardsCopyResponse{}, nil
  140. }
  141. // VolumeEcShardsDelete local delete the .ecx and some ec data slices if not needed
  142. // the shard should not be mounted before calling this.
  143. func (vs *VolumeServer) VolumeEcShardsDelete(ctx context.Context, req *volume_server_pb.VolumeEcShardsDeleteRequest) (*volume_server_pb.VolumeEcShardsDeleteResponse, error) {
  144. bName := erasure_coding.EcShardBaseFileName(req.Collection, int(req.VolumeId))
  145. glog.V(0).Infof("ec volume %s shard delete %v", bName, req.ShardIds)
  146. for _, location := range vs.store.Locations {
  147. if err := deleteEcShardIdsForEachLocation(bName, location, req.ShardIds); err != nil {
  148. glog.Errorf("deleteEcShards from %s %s.%v: %v", location.Directory, bName, req.ShardIds, err)
  149. return nil, err
  150. }
  151. }
  152. return &volume_server_pb.VolumeEcShardsDeleteResponse{}, nil
  153. }
  154. func deleteEcShardIdsForEachLocation(bName string, location *storage.DiskLocation, shardIds []uint32) error {
  155. found := false
  156. indexBaseFilename := path.Join(location.IdxDirectory, bName)
  157. dataBaseFilename := path.Join(location.Directory, bName)
  158. if util.FileExists(path.Join(location.IdxDirectory, bName+".ecx")) {
  159. for _, shardId := range shardIds {
  160. shardFileName := dataBaseFilename + erasure_coding.ToExt(int(shardId))
  161. if util.FileExists(shardFileName) {
  162. found = true
  163. os.Remove(shardFileName)
  164. }
  165. }
  166. }
  167. if !found {
  168. return nil
  169. }
  170. hasEcxFile, hasIdxFile, existingShardCount, err := checkEcVolumeStatus(bName, location)
  171. if err != nil {
  172. return err
  173. }
  174. if hasEcxFile && existingShardCount == 0 {
  175. if err := os.Remove(indexBaseFilename + ".ecx"); err != nil {
  176. return err
  177. }
  178. os.Remove(indexBaseFilename + ".ecj")
  179. if !hasIdxFile {
  180. // .vif is used for ec volumes and normal volumes
  181. os.Remove(dataBaseFilename + ".vif")
  182. }
  183. }
  184. return nil
  185. }
  186. func checkEcVolumeStatus(bName string, location *storage.DiskLocation) (hasEcxFile bool, hasIdxFile bool, existingShardCount int, err error) {
  187. // check whether to delete the .ecx and .ecj file also
  188. fileInfos, err := os.ReadDir(location.Directory)
  189. if err != nil {
  190. return false, false, 0, err
  191. }
  192. if location.IdxDirectory != location.Directory {
  193. idxFileInfos, err := os.ReadDir(location.IdxDirectory)
  194. if err != nil {
  195. return false, false, 0, err
  196. }
  197. fileInfos = append(fileInfos, idxFileInfos...)
  198. }
  199. for _, fileInfo := range fileInfos {
  200. if fileInfo.Name() == bName+".ecx" || fileInfo.Name() == bName+".ecj" {
  201. hasEcxFile = true
  202. continue
  203. }
  204. if fileInfo.Name() == bName+".idx" {
  205. hasIdxFile = true
  206. continue
  207. }
  208. if strings.HasPrefix(fileInfo.Name(), bName+".ec") {
  209. existingShardCount++
  210. }
  211. }
  212. return hasEcxFile, hasIdxFile, existingShardCount, nil
  213. }
  214. func (vs *VolumeServer) VolumeEcShardsMount(ctx context.Context, req *volume_server_pb.VolumeEcShardsMountRequest) (*volume_server_pb.VolumeEcShardsMountResponse, error) {
  215. glog.V(0).Infof("VolumeEcShardsMount: %v", req)
  216. for _, shardId := range req.ShardIds {
  217. err := vs.store.MountEcShards(req.Collection, needle.VolumeId(req.VolumeId), erasure_coding.ShardId(shardId))
  218. if err != nil {
  219. glog.Errorf("ec shard mount %v: %v", req, err)
  220. } else {
  221. glog.V(2).Infof("ec shard mount %v", req)
  222. }
  223. if err != nil {
  224. return nil, fmt.Errorf("mount %d.%d: %v", req.VolumeId, shardId, err)
  225. }
  226. }
  227. return &volume_server_pb.VolumeEcShardsMountResponse{}, nil
  228. }
  229. func (vs *VolumeServer) VolumeEcShardsUnmount(ctx context.Context, req *volume_server_pb.VolumeEcShardsUnmountRequest) (*volume_server_pb.VolumeEcShardsUnmountResponse, error) {
  230. glog.V(0).Infof("VolumeEcShardsUnmount: %v", req)
  231. for _, shardId := range req.ShardIds {
  232. err := vs.store.UnmountEcShards(needle.VolumeId(req.VolumeId), erasure_coding.ShardId(shardId))
  233. if err != nil {
  234. glog.Errorf("ec shard unmount %v: %v", req, err)
  235. } else {
  236. glog.V(2).Infof("ec shard unmount %v", req)
  237. }
  238. if err != nil {
  239. return nil, fmt.Errorf("unmount %d.%d: %v", req.VolumeId, shardId, err)
  240. }
  241. }
  242. return &volume_server_pb.VolumeEcShardsUnmountResponse{}, nil
  243. }
  244. func (vs *VolumeServer) VolumeEcShardRead(req *volume_server_pb.VolumeEcShardReadRequest, stream volume_server_pb.VolumeServer_VolumeEcShardReadServer) error {
  245. ecVolume, found := vs.store.FindEcVolume(needle.VolumeId(req.VolumeId))
  246. if !found {
  247. return fmt.Errorf("VolumeEcShardRead not found ec volume id %d", req.VolumeId)
  248. }
  249. ecShard, found := ecVolume.FindEcVolumeShard(erasure_coding.ShardId(req.ShardId))
  250. if !found {
  251. return fmt.Errorf("not found ec shard %d.%d", req.VolumeId, req.ShardId)
  252. }
  253. if req.FileKey != 0 {
  254. _, size, _ := ecVolume.FindNeedleFromEcx(types.Uint64ToNeedleId(req.FileKey))
  255. if size.IsDeleted() {
  256. return stream.Send(&volume_server_pb.VolumeEcShardReadResponse{
  257. IsDeleted: true,
  258. })
  259. }
  260. }
  261. bufSize := req.Size
  262. if bufSize > BufferSizeLimit {
  263. bufSize = BufferSizeLimit
  264. }
  265. buffer := make([]byte, bufSize)
  266. startOffset, bytesToRead := req.Offset, req.Size
  267. for bytesToRead > 0 {
  268. // min of bytesToRead and bufSize
  269. bufferSize := bufSize
  270. if bufferSize > bytesToRead {
  271. bufferSize = bytesToRead
  272. }
  273. bytesread, err := ecShard.ReadAt(buffer[0:bufferSize], startOffset)
  274. // println("read", ecShard.FileName(), "startOffset", startOffset, bytesread, "bytes, with target", bufferSize)
  275. if bytesread > 0 {
  276. if int64(bytesread) > bytesToRead {
  277. bytesread = int(bytesToRead)
  278. }
  279. err = stream.Send(&volume_server_pb.VolumeEcShardReadResponse{
  280. Data: buffer[:bytesread],
  281. })
  282. if err != nil {
  283. // println("sending", bytesread, "bytes err", err.Error())
  284. return err
  285. }
  286. startOffset += int64(bytesread)
  287. bytesToRead -= int64(bytesread)
  288. }
  289. if err != nil {
  290. if err != io.EOF {
  291. return err
  292. }
  293. return nil
  294. }
  295. }
  296. return nil
  297. }
  298. func (vs *VolumeServer) VolumeEcBlobDelete(ctx context.Context, req *volume_server_pb.VolumeEcBlobDeleteRequest) (*volume_server_pb.VolumeEcBlobDeleteResponse, error) {
  299. glog.V(0).Infof("VolumeEcBlobDelete: %v", req)
  300. resp := &volume_server_pb.VolumeEcBlobDeleteResponse{}
  301. for _, location := range vs.store.Locations {
  302. if localEcVolume, found := location.FindEcVolume(needle.VolumeId(req.VolumeId)); found {
  303. _, size, _, err := localEcVolume.LocateEcShardNeedle(types.NeedleId(req.FileKey), needle.Version(req.Version))
  304. if err != nil {
  305. return nil, fmt.Errorf("locate in local ec volume: %v", err)
  306. }
  307. if size.IsDeleted() {
  308. return resp, nil
  309. }
  310. err = localEcVolume.DeleteNeedleFromEcx(types.NeedleId(req.FileKey))
  311. if err != nil {
  312. return nil, err
  313. }
  314. break
  315. }
  316. }
  317. return resp, nil
  318. }
  319. // VolumeEcShardsToVolume generates the .idx, .dat files from .ecx, .ecj and .ec01 ~ .ec14 files
  320. func (vs *VolumeServer) VolumeEcShardsToVolume(ctx context.Context, req *volume_server_pb.VolumeEcShardsToVolumeRequest) (*volume_server_pb.VolumeEcShardsToVolumeResponse, error) {
  321. glog.V(0).Infof("VolumeEcShardsToVolume: %v", req)
  322. v, found := vs.store.FindEcVolume(needle.VolumeId(req.VolumeId))
  323. if !found {
  324. return nil, fmt.Errorf("ec volume %d not found", req.VolumeId)
  325. }
  326. if v.Collection != req.Collection {
  327. return nil, fmt.Errorf("existing collection:%v unexpected input: %v", v.Collection, req.Collection)
  328. }
  329. dataBaseFileName, indexBaseFileName := v.DataBaseFileName(), v.IndexBaseFileName()
  330. // calculate .dat file size
  331. datFileSize, err := erasure_coding.FindDatFileSize(dataBaseFileName, indexBaseFileName)
  332. if err != nil {
  333. return nil, fmt.Errorf("FindDatFileSize %s: %v", dataBaseFileName, err)
  334. }
  335. // write .dat file from .ec00 ~ .ec09 files
  336. if err := erasure_coding.WriteDatFile(dataBaseFileName, datFileSize); err != nil {
  337. return nil, fmt.Errorf("WriteEcFiles %s: %v", dataBaseFileName, err)
  338. }
  339. // write .idx file from .ecx and .ecj files
  340. if err := erasure_coding.WriteIdxFileFromEcIndex(indexBaseFileName); err != nil {
  341. return nil, fmt.Errorf("WriteIdxFileFromEcIndex %s: %v", v.IndexBaseFileName(), err)
  342. }
  343. return &volume_server_pb.VolumeEcShardsToVolumeResponse{}, nil
  344. }