You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

92 lines
2.0 KiB

  1. package erasure_coding
  2. import (
  3. "fmt"
  4. "io"
  5. "os"
  6. "github.com/chrislusf/seaweedfs/weed/glog"
  7. "github.com/klauspost/reedsolomon"
  8. )
  9. const (
  10. DataShardsCount = 10
  11. ParityShardsCount = 4
  12. ErasureCodingLargeBlockSize = 1024 * 1024 * 1024 // 1GB
  13. ErasureCodingSmallBlockSize = 1024 * 1024 // 1MB
  14. )
  15. func encodeData(file *os.File, enc reedsolomon.Encoder, startOffset, blockSize int64, buffers [][]byte, outputs []*os.File) error {
  16. bufferSize := int64(len(buffers[0]))
  17. batchCount := blockSize/bufferSize
  18. if blockSize%bufferSize!=0 {
  19. glog.Fatalf("unexpected block size %d buffer size %d", blockSize, bufferSize)
  20. }
  21. for b := int64(0); b < batchCount; b++ {
  22. err := encodeDataOneBatch(file, enc, startOffset+b*bufferSize, blockSize, buffers, outputs)
  23. if err != nil {
  24. return err
  25. }
  26. }
  27. return nil
  28. }
  29. func openEcFiles(baseFileName string, forRead bool) (files []*os.File, err error){
  30. for i := 0; i< DataShardsCount+ParityShardsCount; i++{
  31. fname := fmt.Sprintf("%s.ec%02d", baseFileName, i+1)
  32. openOption := os.O_TRUNC|os.O_CREATE|os.O_WRONLY
  33. if forRead {
  34. openOption = os.O_RDONLY
  35. }
  36. f, err := os.OpenFile(fname, openOption, 0644)
  37. if err != nil {
  38. return files, fmt.Errorf("failed to open file %s: %v", fname, err)
  39. }
  40. files = append(files, f)
  41. }
  42. return
  43. }
  44. func closeEcFiles(files []*os.File){
  45. for _, f := range files{
  46. if f != nil {
  47. f.Close()
  48. }
  49. }
  50. }
  51. func encodeDataOneBatch(file *os.File, enc reedsolomon.Encoder, startOffset, blockSize int64, buffers [][]byte, outputs []*os.File) error {
  52. // read data into buffers
  53. for i := 0; i < DataShardsCount; i++ {
  54. n, err := file.ReadAt(buffers[i], startOffset+blockSize*int64(i))
  55. if err != nil {
  56. if err != io.EOF {
  57. return err
  58. }
  59. }
  60. if n < len(buffers[i]) {
  61. for t := len(buffers[i]) - 1; t >= n; t-- {
  62. buffers[i][t] = 0
  63. }
  64. }
  65. }
  66. err := enc.Encode(buffers)
  67. if err != nil {
  68. return err
  69. }
  70. for i := 0; i < DataShardsCount+ParityShardsCount; i++ {
  71. _, err := outputs[i].Write(buffers[i])
  72. if err != nil {
  73. return err
  74. }
  75. }
  76. return nil
  77. }