You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

198 lines
5.1 KiB

  1. package erasure_coding
  2. import (
  3. "fmt"
  4. "io"
  5. "os"
  6. "github.com/chrislusf/seaweedfs/weed/storage/backend"
  7. "github.com/chrislusf/seaweedfs/weed/storage/idx"
  8. "github.com/chrislusf/seaweedfs/weed/storage/needle"
  9. "github.com/chrislusf/seaweedfs/weed/storage/needle_map"
  10. "github.com/chrislusf/seaweedfs/weed/storage/super_block"
  11. "github.com/chrislusf/seaweedfs/weed/storage/types"
  12. )
  13. // write .idx file from .ecx and .ecj files
  14. func WriteIdxFileFromEcIndex(baseFileName string) (err error) {
  15. ecxFile, openErr := os.OpenFile(baseFileName+".ecx", os.O_RDONLY, 0644)
  16. if openErr != nil {
  17. return fmt.Errorf("cannot open ec index %s.ecx: %v", baseFileName, openErr)
  18. }
  19. defer ecxFile.Close()
  20. idxFile, openErr := os.OpenFile(baseFileName+".idx", os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644)
  21. if openErr != nil {
  22. return fmt.Errorf("cannot open %s.idx: %v", baseFileName, openErr)
  23. }
  24. defer idxFile.Close()
  25. io.Copy(idxFile, ecxFile)
  26. err = iterateEcjFile(baseFileName, func(key types.NeedleId) error {
  27. bytes := needle_map.ToBytes(key, types.Offset{}, types.TombstoneFileSize)
  28. idxFile.Write(bytes)
  29. return nil
  30. })
  31. return err
  32. }
  33. // FindDatFileSize calculate .dat file size from max offset entry
  34. // there may be extra deletions after that entry
  35. // but they are deletions anyway
  36. func FindDatFileSize(baseFileName string) (datSize int64, err error) {
  37. version, err := readEcVolumeVersion(baseFileName)
  38. if err != nil {
  39. return 0, fmt.Errorf("read ec volume %s version: %v", baseFileName, err)
  40. }
  41. err = iterateEcxFile(baseFileName, func(key types.NeedleId, offset types.Offset, size uint32) error {
  42. if size == types.TombstoneFileSize {
  43. return nil
  44. }
  45. entryStopOffset := offset.ToAcutalOffset() + needle.GetActualSize(size, version)
  46. if datSize < entryStopOffset {
  47. datSize = entryStopOffset
  48. }
  49. return nil
  50. })
  51. return
  52. }
  53. func readEcVolumeVersion(baseFileName string) (version needle.Version, err error) {
  54. // find volume version
  55. datFile, err := os.OpenFile(baseFileName+".ec00", os.O_RDONLY, 0644)
  56. if err != nil {
  57. return 0, fmt.Errorf("open ec volume %s superblock: %v", baseFileName, err)
  58. }
  59. datBackend := backend.NewDiskFile(datFile)
  60. superBlock, err := super_block.ReadSuperBlock(datBackend)
  61. datBackend.Close()
  62. if err != nil {
  63. return 0, fmt.Errorf("read ec volume %s superblock: %v", baseFileName, err)
  64. }
  65. return superBlock.Version, nil
  66. }
  67. func iterateEcxFile(baseFileName string, processNeedleFn func(key types.NeedleId, offset types.Offset, size uint32) error) error {
  68. ecxFile, openErr := os.OpenFile(baseFileName+".ecx", os.O_RDONLY, 0644)
  69. if openErr != nil {
  70. return fmt.Errorf("cannot open ec index %s.ecx: %v", baseFileName, openErr)
  71. }
  72. defer ecxFile.Close()
  73. buf := make([]byte, types.NeedleMapEntrySize)
  74. for {
  75. n, err := ecxFile.Read(buf)
  76. if n != types.NeedleMapEntrySize {
  77. if err == io.EOF {
  78. return nil
  79. }
  80. return err
  81. }
  82. key, offset, size := idx.IdxFileEntry(buf)
  83. if processNeedleFn != nil {
  84. err = processNeedleFn(key, offset, size)
  85. }
  86. if err != nil {
  87. if err != io.EOF {
  88. return err
  89. }
  90. return nil
  91. }
  92. }
  93. }
  94. func iterateEcjFile(baseFileName string, processNeedleFn func(key types.NeedleId) error) error {
  95. ecjFile, openErr := os.OpenFile(baseFileName+".ecj", os.O_RDONLY, 0644)
  96. if openErr != nil {
  97. return fmt.Errorf("cannot open ec index %s.ecx: %v", baseFileName, openErr)
  98. }
  99. defer ecjFile.Close()
  100. buf := make([]byte, types.NeedleIdSize)
  101. for {
  102. n, err := ecjFile.Read(buf)
  103. if n != types.NeedleIdSize {
  104. if err == io.EOF {
  105. return nil
  106. }
  107. return err
  108. }
  109. if processNeedleFn != nil {
  110. err = processNeedleFn(types.BytesToNeedleId(buf))
  111. }
  112. if err != nil {
  113. if err == io.EOF {
  114. return nil
  115. }
  116. return err
  117. }
  118. }
  119. }
  120. // WriteDatFile generates .dat from from .ec00 ~ .ec09 files
  121. func WriteDatFile(baseFileName string, datFileSize int64) error {
  122. datFile, openErr := os.OpenFile(baseFileName+".dat", os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644)
  123. if openErr != nil {
  124. return fmt.Errorf("cannot write volume %s.dat: %v", baseFileName, openErr)
  125. }
  126. defer datFile.Close()
  127. inputFiles := make([]*os.File, DataShardsCount)
  128. for shardId := 0; shardId < DataShardsCount; shardId++ {
  129. shardFileName := baseFileName + ToExt(shardId)
  130. inputFiles[shardId], openErr = os.OpenFile(shardFileName, os.O_RDONLY, 0)
  131. if openErr != nil {
  132. return openErr
  133. }
  134. defer inputFiles[shardId].Close()
  135. }
  136. for datFileSize >= DataShardsCount*ErasureCodingLargeBlockSize {
  137. for shardId := 0; shardId < DataShardsCount; shardId++ {
  138. w, err := io.CopyN(datFile, inputFiles[shardId], ErasureCodingLargeBlockSize)
  139. if w != ErasureCodingLargeBlockSize {
  140. return fmt.Errorf("copy %s large block %d: %v", baseFileName, shardId, err)
  141. }
  142. datFileSize -= ErasureCodingLargeBlockSize
  143. }
  144. }
  145. for datFileSize > 0 {
  146. for shardId := 0; shardId < DataShardsCount; shardId++ {
  147. toRead := min(datFileSize, ErasureCodingSmallBlockSize)
  148. w, err := io.CopyN(datFile, inputFiles[shardId], toRead)
  149. if w != toRead {
  150. return fmt.Errorf("copy %s small block %d: %v", baseFileName, shardId, err)
  151. }
  152. datFileSize -= toRead
  153. }
  154. }
  155. return nil
  156. }
  157. func min(x, y int64) int64 {
  158. if x > y {
  159. return y
  160. }
  161. return x
  162. }