You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

136 lines
4.0 KiB

3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
  1. package page_writer
  2. import (
  3. "github.com/seaweedfs/seaweedfs/weed/glog"
  4. "github.com/seaweedfs/seaweedfs/weed/util"
  5. "github.com/seaweedfs/seaweedfs/weed/util/mem"
  6. "os"
  7. "sync"
  8. )
  9. var (
  10. _ = PageChunk(&SwapFileChunk{})
  11. )
  12. type ActualChunkIndex int
  13. type SwapFile struct {
  14. dir string
  15. file *os.File
  16. logicToActualChunkIndex map[LogicChunkIndex]ActualChunkIndex
  17. logicToActualChunkIndexLock sync.Mutex
  18. chunkSize int64
  19. freeActualChunkList []ActualChunkIndex
  20. }
  21. type SwapFileChunk struct {
  22. swapfile *SwapFile
  23. usage *ChunkWrittenIntervalList
  24. logicChunkIndex LogicChunkIndex
  25. actualChunkIndex ActualChunkIndex
  26. }
  27. func NewSwapFile(dir string, chunkSize int64) *SwapFile {
  28. return &SwapFile{
  29. dir: dir,
  30. file: nil,
  31. logicToActualChunkIndex: make(map[LogicChunkIndex]ActualChunkIndex),
  32. chunkSize: chunkSize,
  33. }
  34. }
  35. func (sf *SwapFile) FreeResource() {
  36. if sf.file != nil {
  37. sf.file.Close()
  38. os.Remove(sf.file.Name())
  39. }
  40. }
  41. func (sf *SwapFile) NewTempFileChunk(logicChunkIndex LogicChunkIndex) (tc *SwapFileChunk) {
  42. if sf.file == nil {
  43. var err error
  44. sf.file, err = os.CreateTemp(sf.dir, "")
  45. if err != nil {
  46. glog.Errorf("create swap file: %v", err)
  47. return nil
  48. }
  49. }
  50. sf.logicToActualChunkIndexLock.Lock()
  51. defer sf.logicToActualChunkIndexLock.Unlock()
  52. actualChunkIndex, found := sf.logicToActualChunkIndex[logicChunkIndex]
  53. if !found {
  54. if len(sf.freeActualChunkList) > 0 {
  55. actualChunkIndex = sf.freeActualChunkList[0]
  56. sf.freeActualChunkList = sf.freeActualChunkList[1:]
  57. } else {
  58. actualChunkIndex = ActualChunkIndex(len(sf.logicToActualChunkIndex))
  59. }
  60. sf.logicToActualChunkIndex[logicChunkIndex] = actualChunkIndex
  61. }
  62. return &SwapFileChunk{
  63. swapfile: sf,
  64. usage: newChunkWrittenIntervalList(),
  65. logicChunkIndex: logicChunkIndex,
  66. actualChunkIndex: actualChunkIndex,
  67. }
  68. }
  69. func (sc *SwapFileChunk) FreeResource() {
  70. sc.swapfile.logicToActualChunkIndexLock.Lock()
  71. defer sc.swapfile.logicToActualChunkIndexLock.Unlock()
  72. sc.swapfile.freeActualChunkList = append(sc.swapfile.freeActualChunkList, sc.actualChunkIndex)
  73. delete(sc.swapfile.logicToActualChunkIndex, sc.logicChunkIndex)
  74. }
  75. func (sc *SwapFileChunk) WriteDataAt(src []byte, offset int64) (n int) {
  76. innerOffset := offset % sc.swapfile.chunkSize
  77. var err error
  78. n, err = sc.swapfile.file.WriteAt(src, int64(sc.actualChunkIndex)*sc.swapfile.chunkSize+innerOffset)
  79. if err == nil {
  80. sc.usage.MarkWritten(innerOffset, innerOffset+int64(n))
  81. } else {
  82. glog.Errorf("failed to write swap file %s: %v", sc.swapfile.file.Name(), err)
  83. }
  84. return
  85. }
  86. func (sc *SwapFileChunk) ReadDataAt(p []byte, off int64) (maxStop int64) {
  87. chunkStartOffset := int64(sc.logicChunkIndex) * sc.swapfile.chunkSize
  88. for t := sc.usage.head.next; t != sc.usage.tail; t = t.next {
  89. logicStart := max(off, chunkStartOffset+t.StartOffset)
  90. logicStop := min(off+int64(len(p)), chunkStartOffset+t.stopOffset)
  91. if logicStart < logicStop {
  92. actualStart := logicStart - chunkStartOffset + int64(sc.actualChunkIndex)*sc.swapfile.chunkSize
  93. if _, err := sc.swapfile.file.ReadAt(p[logicStart-off:logicStop-off], actualStart); err != nil {
  94. glog.Errorf("failed to reading swap file %s: %v", sc.swapfile.file.Name(), err)
  95. break
  96. }
  97. maxStop = max(maxStop, logicStop)
  98. }
  99. }
  100. return
  101. }
  102. func (sc *SwapFileChunk) IsComplete() bool {
  103. return sc.usage.IsComplete(sc.swapfile.chunkSize)
  104. }
  105. func (sc *SwapFileChunk) WrittenSize() int64 {
  106. return sc.usage.WrittenSize()
  107. }
  108. func (sc *SwapFileChunk) SaveContent(saveFn SaveToStorageFunc) {
  109. if saveFn == nil {
  110. return
  111. }
  112. for t := sc.usage.head.next; t != sc.usage.tail; t = t.next {
  113. data := mem.Allocate(int(t.Size()))
  114. sc.swapfile.file.ReadAt(data, t.StartOffset+int64(sc.actualChunkIndex)*sc.swapfile.chunkSize)
  115. reader := util.NewBytesReader(data)
  116. saveFn(reader, int64(sc.logicChunkIndex)*sc.swapfile.chunkSize+t.StartOffset, t.Size(), func() {
  117. })
  118. mem.Free(data)
  119. }
  120. sc.usage = newChunkWrittenIntervalList()
  121. }