You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

88 lines
1.9 KiB

  1. package erasure_coding
  2. import (
  3. "fmt"
  4. "io"
  5. "os"
  6. "github.com/chrislusf/seaweedfs/weed/glog"
  7. "github.com/klauspost/reedsolomon"
  8. )
  9. const (
  10. DataShardsCount = 10
  11. ParityShardsCount = 4
  12. ErasureCodingLargeBlockSize = 1024 * 1024 * 1024 // 1GB
  13. ErasureCodingSmallBlockSize = 1024 * 1024 // 1MB
  14. )
  15. func encodeData(file *os.File, enc reedsolomon.Encoder, startOffset, blockSize int64, buffers [][]byte, outputs []*os.File) error {
  16. bufferSize := int64(len(buffers[0]))
  17. batchCount := blockSize/bufferSize
  18. if blockSize%bufferSize!=0 {
  19. glog.Fatalf("unexpected block size %d buffer size %d", blockSize, bufferSize)
  20. }
  21. for b := int64(0); b < batchCount; b++ {
  22. err := encodeDataOneBatch(file, enc, startOffset+b*bufferSize, blockSize, buffers, outputs)
  23. if err != nil {
  24. return err
  25. }
  26. }
  27. return nil
  28. }
  29. func openEcFiles(baseFileName string) (files []*os.File, err error){
  30. for i := 0; i< DataShardsCount+ParityShardsCount; i++{
  31. fname := fmt.Sprintf("%s.ec%02d", baseFileName, i+1)
  32. f, err := os.OpenFile(fname, os.O_TRUNC|os.O_CREATE|os.O_WRONLY, 0644)
  33. if err != nil {
  34. return files, fmt.Errorf("failed to open file %s: %v", fname, err)
  35. }
  36. files = append(files, f)
  37. }
  38. return
  39. }
  40. func closeEcFiles(files []*os.File){
  41. for _, f := range files{
  42. if f != nil {
  43. f.Close()
  44. }
  45. }
  46. }
  47. func encodeDataOneBatch(file *os.File, enc reedsolomon.Encoder, startOffset, blockSize int64, buffers [][]byte, outputs []*os.File) error {
  48. // read data into buffers
  49. for i := 0; i < DataShardsCount; i++ {
  50. n, err := file.ReadAt(buffers[i], startOffset+blockSize*int64(i))
  51. if err != nil {
  52. if err != io.EOF {
  53. return err
  54. }
  55. }
  56. if n < len(buffers[i]) {
  57. for t := len(buffers[i]) - 1; t >= n; t-- {
  58. buffers[i][t] = 0
  59. }
  60. }
  61. }
  62. err := enc.Encode(buffers)
  63. if err != nil {
  64. return err
  65. }
  66. for i := 0; i < DataShardsCount+ParityShardsCount; i++ {
  67. _, err := outputs[i].Write(buffers[i])
  68. if err != nil {
  69. return err
  70. }
  71. }
  72. return nil
  73. }