87 lines
2.7 KiB

  1. package erasure_coding
  2. import (
  3. "github.com/chrislusf/seaweedfs/weed/storage/types"
  4. )
  5. type Interval struct {
  6. BlockIndex int
  7. InnerBlockOffset int64
  8. Size types.Size
  9. IsLargeBlock bool
  10. LargeBlockRowsCount int
  11. }
  12. func LocateData(largeBlockLength, smallBlockLength int64, datSize int64, offset int64, size types.Size) (intervals []Interval) {
  13. blockIndex, isLargeBlock, innerBlockOffset := locateOffset(largeBlockLength, smallBlockLength, datSize, offset)
  14. // adding DataShardsCount*smallBlockLength to ensure we can derive the number of large block size from a shard size
  15. nLargeBlockRows := int((datSize + DataShardsCount*smallBlockLength) / (largeBlockLength * DataShardsCount))
  16. for size > 0 {
  17. interval := Interval{
  18. BlockIndex: blockIndex,
  19. InnerBlockOffset: innerBlockOffset,
  20. IsLargeBlock: isLargeBlock,
  21. LargeBlockRowsCount: nLargeBlockRows,
  22. }
  23. blockRemaining := largeBlockLength - innerBlockOffset
  24. if !isLargeBlock {
  25. blockRemaining = smallBlockLength - innerBlockOffset
  26. }
  27. if int64(size) <= blockRemaining {
  28. interval.Size = size
  29. intervals = append(intervals, interval)
  30. return
  31. }
  32. interval.Size = types.Size(blockRemaining)
  33. intervals = append(intervals, interval)
  34. size -= interval.Size
  35. blockIndex += 1
  36. if isLargeBlock && blockIndex == nLargeBlockRows*DataShardsCount {
  37. isLargeBlock = false
  38. blockIndex = 0
  39. }
  40. innerBlockOffset = 0
  41. }
  42. return
  43. }
  44. func locateOffset(largeBlockLength, smallBlockLength int64, datSize int64, offset int64) (blockIndex int, isLargeBlock bool, innerBlockOffset int64) {
  45. largeRowSize := largeBlockLength * DataShardsCount
  46. nLargeBlockRows := datSize / (largeBlockLength * DataShardsCount)
  47. // if offset is within the large block area
  48. if offset < nLargeBlockRows*largeRowSize {
  49. isLargeBlock = true
  50. blockIndex, innerBlockOffset = locateOffsetWithinBlocks(largeBlockLength, offset)
  51. return
  52. }
  53. isLargeBlock = false
  54. offset -= nLargeBlockRows * largeRowSize
  55. blockIndex, innerBlockOffset = locateOffsetWithinBlocks(smallBlockLength, offset)
  56. return
  57. }
  58. func locateOffsetWithinBlocks(blockLength int64, offset int64) (blockIndex int, innerBlockOffset int64) {
  59. blockIndex = int(offset / blockLength)
  60. innerBlockOffset = offset % blockLength
  61. return
  62. }
  63. func (interval Interval) ToShardIdAndOffset(largeBlockSize, smallBlockSize int64) (ShardId, int64) {
  64. ecFileOffset := interval.InnerBlockOffset
  65. rowIndex := interval.BlockIndex / DataShardsCount
  66. if interval.IsLargeBlock {
  67. ecFileOffset += int64(rowIndex) * largeBlockSize
  68. } else {
  69. ecFileOffset += int64(interval.LargeBlockRowsCount)*largeBlockSize + int64(rowIndex)*smallBlockSize
  70. }
  71. ecFileIndex := interval.BlockIndex % DataShardsCount
  72. return ShardId(ecFileIndex), ecFileOffset
  73. }