You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

129 lines
3.8 KiB

3 years ago
3 years ago
3 years ago
3 years ago
  1. package page_writer
  2. import (
  3. "github.com/chrislusf/seaweedfs/weed/glog"
  4. "github.com/chrislusf/seaweedfs/weed/util"
  5. "github.com/chrislusf/seaweedfs/weed/util/mem"
  6. "os"
  7. )
  8. var (
  9. _ = PageChunk(&SwapFileChunk{})
  10. )
  11. type ActualChunkIndex int
  12. type SwapFile struct {
  13. dir string
  14. file *os.File
  15. logicToActualChunkIndex map[LogicChunkIndex]ActualChunkIndex
  16. chunkSize int64
  17. freeActualChunkList []ActualChunkIndex
  18. }
  19. type SwapFileChunk struct {
  20. swapfile *SwapFile
  21. usage *ChunkWrittenIntervalList
  22. logicChunkIndex LogicChunkIndex
  23. actualChunkIndex ActualChunkIndex
  24. }
  25. func NewSwapFile(dir string, chunkSize int64) *SwapFile {
  26. return &SwapFile{
  27. dir: dir,
  28. file: nil,
  29. logicToActualChunkIndex: make(map[LogicChunkIndex]ActualChunkIndex),
  30. chunkSize: chunkSize,
  31. }
  32. }
  33. func (sf *SwapFile) FreeResource() {
  34. if sf.file != nil {
  35. sf.file.Close()
  36. os.Remove(sf.file.Name())
  37. }
  38. }
  39. func (sf *SwapFile) NewTempFileChunk(logicChunkIndex LogicChunkIndex) (tc *SwapFileChunk) {
  40. if sf.file == nil {
  41. var err error
  42. sf.file, err = os.CreateTemp(sf.dir, "")
  43. if err != nil {
  44. glog.Errorf("create swap file: %v", err)
  45. return nil
  46. }
  47. }
  48. actualChunkIndex, found := sf.logicToActualChunkIndex[logicChunkIndex]
  49. if !found {
  50. if len(sf.freeActualChunkList) > 0 {
  51. actualChunkIndex = sf.freeActualChunkList[0]
  52. sf.freeActualChunkList = sf.freeActualChunkList[1:]
  53. } else {
  54. actualChunkIndex = ActualChunkIndex(len(sf.logicToActualChunkIndex))
  55. }
  56. sf.logicToActualChunkIndex[logicChunkIndex] = actualChunkIndex
  57. }
  58. return &SwapFileChunk{
  59. swapfile: sf,
  60. usage: newChunkWrittenIntervalList(),
  61. logicChunkIndex: logicChunkIndex,
  62. actualChunkIndex: actualChunkIndex,
  63. }
  64. }
  65. func (sc *SwapFileChunk) FreeResource() {
  66. sc.swapfile.freeActualChunkList = append(sc.swapfile.freeActualChunkList, sc.actualChunkIndex)
  67. delete(sc.swapfile.logicToActualChunkIndex, sc.logicChunkIndex)
  68. }
  69. func (sc *SwapFileChunk) WriteDataAt(src []byte, offset int64) (n int) {
  70. innerOffset := offset % sc.swapfile.chunkSize
  71. var err error
  72. n, err = sc.swapfile.file.WriteAt(src, int64(sc.actualChunkIndex)*sc.swapfile.chunkSize+innerOffset)
  73. if err == nil {
  74. sc.usage.MarkWritten(innerOffset, innerOffset+int64(n))
  75. } else {
  76. glog.Errorf("failed to write swap file %s: %v", sc.swapfile.file.Name(), err)
  77. }
  78. return
  79. }
  80. func (sc *SwapFileChunk) ReadDataAt(p []byte, off int64) (maxStop int64) {
  81. chunkStartOffset := int64(sc.logicChunkIndex) * sc.swapfile.chunkSize
  82. for t := sc.usage.head.next; t != sc.usage.tail; t = t.next {
  83. logicStart := max(off, chunkStartOffset+t.StartOffset)
  84. logicStop := min(off+int64(len(p)), chunkStartOffset+t.stopOffset)
  85. if logicStart < logicStop {
  86. actualStart := logicStart - chunkStartOffset + int64(sc.actualChunkIndex)*sc.swapfile.chunkSize
  87. if _, err := sc.swapfile.file.ReadAt(p[logicStart-off:logicStop-off], actualStart); err != nil {
  88. glog.Errorf("failed to reading swap file %s: %v", sc.swapfile.file.Name(), err)
  89. break
  90. }
  91. maxStop = max(maxStop, logicStop)
  92. }
  93. }
  94. return
  95. }
  96. func (sc *SwapFileChunk) IsComplete() bool {
  97. return sc.usage.IsComplete(sc.swapfile.chunkSize)
  98. }
  99. func (sc *SwapFileChunk) WrittenSize() int64 {
  100. return sc.usage.WrittenSize()
  101. }
  102. func (sc *SwapFileChunk) SaveContent(saveFn SaveToStorageFunc) {
  103. if saveFn == nil {
  104. return
  105. }
  106. for t := sc.usage.head.next; t != sc.usage.tail; t = t.next {
  107. data := mem.Allocate(int(t.Size()))
  108. sc.swapfile.file.ReadAt(data, t.StartOffset+int64(sc.actualChunkIndex)*sc.swapfile.chunkSize)
  109. reader := util.NewBytesReader(data)
  110. saveFn(reader, int64(sc.logicChunkIndex)*sc.swapfile.chunkSize+t.StartOffset, t.Size(), func() {
  111. })
  112. mem.Free(data)
  113. }
  114. sc.usage = newChunkWrittenIntervalList()
  115. }