You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

310 lines
8.3 KiB

5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
  1. package erasure_coding
  2. import (
  3. "fmt"
  4. "io"
  5. "os"
  6. "github.com/klauspost/reedsolomon"
  7. "github.com/seaweedfs/seaweedfs/weed/glog"
  8. "github.com/seaweedfs/seaweedfs/weed/storage/idx"
  9. "github.com/seaweedfs/seaweedfs/weed/storage/needle_map"
  10. "github.com/seaweedfs/seaweedfs/weed/storage/types"
  11. "github.com/seaweedfs/seaweedfs/weed/util"
  12. )
  13. const (
  14. DataShardsCount = 10
  15. ParityShardsCount = 4
  16. TotalShardsCount = DataShardsCount + ParityShardsCount
  17. ErasureCodingLargeBlockSize = 1024 * 1024 * 1024 // 1GB
  18. ErasureCodingSmallBlockSize = 1024 * 1024 // 1MB
  19. )
  20. // WriteSortedFileFromIdx generates .ecx file from existing .idx file
  21. // all keys are sorted in ascending order
  22. func WriteSortedFileFromIdx(baseFileName string, ext string) (e error) {
  23. nm, err := readNeedleMap(baseFileName)
  24. if nm != nil {
  25. defer nm.Close()
  26. }
  27. if err != nil {
  28. return fmt.Errorf("readNeedleMap: %v", err)
  29. }
  30. ecxFile, err := os.OpenFile(baseFileName+ext, os.O_TRUNC|os.O_CREATE|os.O_WRONLY, 0644)
  31. if err != nil {
  32. return fmt.Errorf("failed to open ecx file: %v", err)
  33. }
  34. defer ecxFile.Close()
  35. err = nm.AscendingVisit(func(value needle_map.NeedleValue) error {
  36. bytes := value.ToBytes()
  37. _, writeErr := ecxFile.Write(bytes)
  38. return writeErr
  39. })
  40. if err != nil {
  41. return fmt.Errorf("failed to visit idx file: %v", err)
  42. }
  43. return nil
  44. }
  45. // WriteEcFiles generates .ec00 ~ .ec13 files
  46. func WriteEcFiles(baseFileName string) error {
  47. return generateEcFiles(baseFileName, 256*1024, ErasureCodingLargeBlockSize, ErasureCodingSmallBlockSize)
  48. }
  49. func RebuildEcFiles(baseFileName string) ([]uint32, error) {
  50. return generateMissingEcFiles(baseFileName, 256*1024, ErasureCodingLargeBlockSize, ErasureCodingSmallBlockSize)
  51. }
  52. func ToExt(ecIndex int) string {
  53. return fmt.Sprintf(".ec%02d", ecIndex)
  54. }
  55. func generateEcFiles(baseFileName string, bufferSize int, largeBlockSize int64, smallBlockSize int64) error {
  56. file, err := os.OpenFile(baseFileName+".dat", os.O_RDONLY, 0)
  57. if err != nil {
  58. return fmt.Errorf("failed to open dat file: %v", err)
  59. }
  60. defer file.Close()
  61. fi, err := file.Stat()
  62. if err != nil {
  63. return fmt.Errorf("failed to stat dat file: %v", err)
  64. }
  65. glog.V(0).Infof("encodeDatFile %s.dat size:%d", baseFileName, fi.Size())
  66. err = encodeDatFile(fi.Size(), baseFileName, bufferSize, largeBlockSize, file, smallBlockSize)
  67. if err != nil {
  68. return fmt.Errorf("encodeDatFile: %v", err)
  69. }
  70. return nil
  71. }
  72. func generateMissingEcFiles(baseFileName string, bufferSize int, largeBlockSize int64, smallBlockSize int64) (generatedShardIds []uint32, err error) {
  73. shardHasData := make([]bool, TotalShardsCount)
  74. inputFiles := make([]*os.File, TotalShardsCount)
  75. outputFiles := make([]*os.File, TotalShardsCount)
  76. for shardId := 0; shardId < TotalShardsCount; shardId++ {
  77. shardFileName := baseFileName + ToExt(shardId)
  78. if util.FileExists(shardFileName) {
  79. shardHasData[shardId] = true
  80. inputFiles[shardId], err = os.OpenFile(shardFileName, os.O_RDONLY, 0)
  81. if err != nil {
  82. return nil, err
  83. }
  84. defer inputFiles[shardId].Close()
  85. } else {
  86. outputFiles[shardId], err = os.OpenFile(shardFileName, os.O_TRUNC|os.O_WRONLY|os.O_CREATE, 0644)
  87. if err != nil {
  88. return nil, err
  89. }
  90. defer outputFiles[shardId].Close()
  91. generatedShardIds = append(generatedShardIds, uint32(shardId))
  92. }
  93. }
  94. err = rebuildEcFiles(shardHasData, inputFiles, outputFiles)
  95. if err != nil {
  96. return nil, fmt.Errorf("rebuildEcFiles: %v", err)
  97. }
  98. return
  99. }
  100. func encodeData(file *os.File, enc reedsolomon.Encoder, startOffset, blockSize int64, buffers [][]byte, outputs []*os.File) error {
  101. bufferSize := int64(len(buffers[0]))
  102. if bufferSize == 0 {
  103. glog.Fatal("unexpected zero buffer size")
  104. }
  105. batchCount := blockSize / bufferSize
  106. if blockSize%bufferSize != 0 {
  107. glog.Fatalf("unexpected block size %d buffer size %d", blockSize, bufferSize)
  108. }
  109. for b := int64(0); b < batchCount; b++ {
  110. err := encodeDataOneBatch(file, enc, startOffset+b*bufferSize, blockSize, buffers, outputs)
  111. if err != nil {
  112. return err
  113. }
  114. }
  115. return nil
  116. }
  117. func openEcFiles(baseFileName string, forRead bool) (files []*os.File, err error) {
  118. for i := 0; i < TotalShardsCount; i++ {
  119. fname := baseFileName + ToExt(i)
  120. openOption := os.O_TRUNC | os.O_CREATE | os.O_WRONLY
  121. if forRead {
  122. openOption = os.O_RDONLY
  123. }
  124. f, err := os.OpenFile(fname, openOption, 0644)
  125. if err != nil {
  126. return files, fmt.Errorf("failed to open file %s: %v", fname, err)
  127. }
  128. files = append(files, f)
  129. }
  130. return
  131. }
  132. func closeEcFiles(files []*os.File) {
  133. for _, f := range files {
  134. if f != nil {
  135. f.Close()
  136. }
  137. }
  138. }
  139. func encodeDataOneBatch(file *os.File, enc reedsolomon.Encoder, startOffset, blockSize int64, buffers [][]byte, outputs []*os.File) error {
  140. // read data into buffers
  141. for i := 0; i < DataShardsCount; i++ {
  142. n, err := file.ReadAt(buffers[i], startOffset+blockSize*int64(i))
  143. if err != nil {
  144. if err != io.EOF {
  145. return err
  146. }
  147. }
  148. if n < len(buffers[i]) {
  149. for t := len(buffers[i]) - 1; t >= n; t-- {
  150. buffers[i][t] = 0
  151. }
  152. }
  153. }
  154. err := enc.Encode(buffers)
  155. if err != nil {
  156. return err
  157. }
  158. for i := 0; i < TotalShardsCount; i++ {
  159. _, err := outputs[i].Write(buffers[i])
  160. if err != nil {
  161. return err
  162. }
  163. }
  164. return nil
  165. }
  166. func encodeDatFile(remainingSize int64, baseFileName string, bufferSize int, largeBlockSize int64, file *os.File, smallBlockSize int64) error {
  167. var processedSize int64
  168. enc, err := reedsolomon.New(DataShardsCount, ParityShardsCount)
  169. if err != nil {
  170. return fmt.Errorf("failed to create encoder: %v", err)
  171. }
  172. buffers := make([][]byte, TotalShardsCount)
  173. for i := range buffers {
  174. buffers[i] = make([]byte, bufferSize)
  175. }
  176. outputs, err := openEcFiles(baseFileName, false)
  177. defer closeEcFiles(outputs)
  178. if err != nil {
  179. return fmt.Errorf("failed to open ec files %s: %v", baseFileName, err)
  180. }
  181. for remainingSize > largeBlockSize*DataShardsCount {
  182. err = encodeData(file, enc, processedSize, largeBlockSize, buffers, outputs)
  183. if err != nil {
  184. return fmt.Errorf("failed to encode large chunk data: %v", err)
  185. }
  186. remainingSize -= largeBlockSize * DataShardsCount
  187. processedSize += largeBlockSize * DataShardsCount
  188. }
  189. for remainingSize > 0 {
  190. err = encodeData(file, enc, processedSize, smallBlockSize, buffers, outputs)
  191. if err != nil {
  192. return fmt.Errorf("failed to encode small chunk data: %v", err)
  193. }
  194. remainingSize -= smallBlockSize * DataShardsCount
  195. processedSize += smallBlockSize * DataShardsCount
  196. }
  197. return nil
  198. }
  199. func rebuildEcFiles(shardHasData []bool, inputFiles []*os.File, outputFiles []*os.File) error {
  200. enc, err := reedsolomon.New(DataShardsCount, ParityShardsCount)
  201. if err != nil {
  202. return fmt.Errorf("failed to create encoder: %v", err)
  203. }
  204. buffers := make([][]byte, TotalShardsCount)
  205. for i := range buffers {
  206. if shardHasData[i] {
  207. buffers[i] = make([]byte, ErasureCodingSmallBlockSize)
  208. }
  209. }
  210. var startOffset int64
  211. var inputBufferDataSize int
  212. for {
  213. // read the input data from files
  214. for i := 0; i < TotalShardsCount; i++ {
  215. if shardHasData[i] {
  216. n, _ := inputFiles[i].ReadAt(buffers[i], startOffset)
  217. if n == 0 {
  218. return nil
  219. }
  220. if inputBufferDataSize == 0 {
  221. inputBufferDataSize = n
  222. }
  223. if inputBufferDataSize != n {
  224. return fmt.Errorf("ec shard size expected %d actual %d", inputBufferDataSize, n)
  225. }
  226. } else {
  227. buffers[i] = nil
  228. }
  229. }
  230. // encode the data
  231. err = enc.Reconstruct(buffers)
  232. if err != nil {
  233. return fmt.Errorf("reconstruct: %v", err)
  234. }
  235. // write the data to output files
  236. for i := 0; i < TotalShardsCount; i++ {
  237. if !shardHasData[i] {
  238. n, _ := outputFiles[i].WriteAt(buffers[i][:inputBufferDataSize], startOffset)
  239. if inputBufferDataSize != n {
  240. return fmt.Errorf("fail to write to %s", outputFiles[i].Name())
  241. }
  242. }
  243. }
  244. startOffset += int64(inputBufferDataSize)
  245. }
  246. }
  247. func readNeedleMap(baseFileName string) (*needle_map.MemDb, error) {
  248. indexFile, err := os.OpenFile(baseFileName+".idx", os.O_RDONLY, 0644)
  249. if err != nil {
  250. return nil, fmt.Errorf("cannot read Volume Index %s.idx: %v", baseFileName, err)
  251. }
  252. defer indexFile.Close()
  253. cm := needle_map.NewMemDb()
  254. err = idx.WalkIndexFile(indexFile, 0, func(key types.NeedleId, offset types.Offset, size types.Size) error {
  255. if !offset.IsZero() && size != types.TombstoneFileSize {
  256. cm.Set(key, offset, size)
  257. } else {
  258. cm.Delete(key)
  259. }
  260. return nil
  261. })
  262. return cm, err
  263. }