You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

162 lines
4.5 KiB

3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
  1. package page_writer
  2. import (
  3. "github.com/seaweedfs/seaweedfs/weed/glog"
  4. "github.com/seaweedfs/seaweedfs/weed/util"
  5. "github.com/seaweedfs/seaweedfs/weed/util/mem"
  6. "os"
  7. "sync"
  8. )
  9. var (
  10. _ = PageChunk(&SwapFileChunk{})
  11. )
  12. type ActualChunkIndex int
  13. type SwapFile struct {
  14. dir string
  15. file *os.File
  16. logicToActualChunkIndex map[LogicChunkIndex]ActualChunkIndex
  17. logicToActualChunkIndexLock sync.Mutex
  18. chunkSize int64
  19. freeActualChunkList []ActualChunkIndex
  20. }
  21. type SwapFileChunk struct {
  22. sync.RWMutex
  23. swapfile *SwapFile
  24. usage *ChunkWrittenIntervalList
  25. logicChunkIndex LogicChunkIndex
  26. actualChunkIndex ActualChunkIndex
  27. lastModifiedTsNs int64
  28. }
  29. func NewSwapFile(dir string, chunkSize int64) *SwapFile {
  30. return &SwapFile{
  31. dir: dir,
  32. file: nil,
  33. logicToActualChunkIndex: make(map[LogicChunkIndex]ActualChunkIndex),
  34. chunkSize: chunkSize,
  35. }
  36. }
  37. func (sf *SwapFile) FreeResource() {
  38. if sf.file != nil {
  39. sf.file.Close()
  40. os.Remove(sf.file.Name())
  41. }
  42. }
  43. func (sf *SwapFile) NewTempFileChunk(logicChunkIndex LogicChunkIndex) (tc *SwapFileChunk) {
  44. if sf.file == nil {
  45. var err error
  46. sf.file, err = os.CreateTemp(sf.dir, "")
  47. if err != nil {
  48. glog.Errorf("create swap file: %v", err)
  49. return nil
  50. }
  51. }
  52. sf.logicToActualChunkIndexLock.Lock()
  53. defer sf.logicToActualChunkIndexLock.Unlock()
  54. actualChunkIndex, found := sf.logicToActualChunkIndex[logicChunkIndex]
  55. if !found {
  56. if len(sf.freeActualChunkList) > 0 {
  57. actualChunkIndex = sf.freeActualChunkList[0]
  58. sf.freeActualChunkList = sf.freeActualChunkList[1:]
  59. } else {
  60. actualChunkIndex = ActualChunkIndex(len(sf.logicToActualChunkIndex))
  61. }
  62. sf.logicToActualChunkIndex[logicChunkIndex] = actualChunkIndex
  63. }
  64. return &SwapFileChunk{
  65. swapfile: sf,
  66. usage: newChunkWrittenIntervalList(),
  67. logicChunkIndex: logicChunkIndex,
  68. actualChunkIndex: actualChunkIndex,
  69. }
  70. }
  71. func (sc *SwapFileChunk) FreeResource() {
  72. sc.swapfile.logicToActualChunkIndexLock.Lock()
  73. defer sc.swapfile.logicToActualChunkIndexLock.Unlock()
  74. sc.Lock()
  75. defer sc.Unlock()
  76. sc.swapfile.freeActualChunkList = append(sc.swapfile.freeActualChunkList, sc.actualChunkIndex)
  77. delete(sc.swapfile.logicToActualChunkIndex, sc.logicChunkIndex)
  78. }
  79. func (sc *SwapFileChunk) WriteDataAt(src []byte, offset int64, tsNs int64) (n int) {
  80. sc.Lock()
  81. defer sc.Unlock()
  82. if sc.lastModifiedTsNs > tsNs {
  83. println("write old data2", tsNs-sc.lastModifiedTsNs, "ns")
  84. }
  85. sc.lastModifiedTsNs = tsNs
  86. innerOffset := offset % sc.swapfile.chunkSize
  87. var err error
  88. n, err = sc.swapfile.file.WriteAt(src, int64(sc.actualChunkIndex)*sc.swapfile.chunkSize+innerOffset)
  89. if err == nil {
  90. sc.usage.MarkWritten(innerOffset, innerOffset+int64(n))
  91. } else {
  92. glog.Errorf("failed to write swap file %s: %v", sc.swapfile.file.Name(), err)
  93. }
  94. return
  95. }
  96. func (sc *SwapFileChunk) ReadDataAt(p []byte, off int64, tsNs int64) (maxStop int64) {
  97. sc.RLock()
  98. defer sc.RUnlock()
  99. chunkStartOffset := int64(sc.logicChunkIndex) * sc.swapfile.chunkSize
  100. for t := sc.usage.head.next; t != sc.usage.tail; t = t.next {
  101. logicStart := max(off, chunkStartOffset+t.StartOffset)
  102. logicStop := min(off+int64(len(p)), chunkStartOffset+t.stopOffset)
  103. if logicStart < logicStop {
  104. if sc.lastModifiedTsNs > tsNs {
  105. actualStart := logicStart - chunkStartOffset + int64(sc.actualChunkIndex)*sc.swapfile.chunkSize
  106. if _, err := sc.swapfile.file.ReadAt(p[logicStart-off:logicStop-off], actualStart); err != nil {
  107. glog.Errorf("failed to reading swap file %s: %v", sc.swapfile.file.Name(), err)
  108. break
  109. }
  110. maxStop = max(maxStop, logicStop)
  111. } else {
  112. println("read old data2", tsNs-sc.lastModifiedTsNs, "ns")
  113. }
  114. }
  115. }
  116. return
  117. }
  118. func (sc *SwapFileChunk) IsComplete() bool {
  119. sc.RLock()
  120. defer sc.RUnlock()
  121. return sc.usage.IsComplete(sc.swapfile.chunkSize)
  122. }
  123. func (sc *SwapFileChunk) LastModifiedTsNs() int64 {
  124. return sc.lastModifiedTsNs
  125. }
  126. func (sc *SwapFileChunk) SaveContent(saveFn SaveToStorageFunc) {
  127. if saveFn == nil {
  128. return
  129. }
  130. sc.Lock()
  131. defer sc.Unlock()
  132. for t := sc.usage.head.next; t != sc.usage.tail; t = t.next {
  133. data := mem.Allocate(int(t.Size()))
  134. sc.swapfile.file.ReadAt(data, t.StartOffset+int64(sc.actualChunkIndex)*sc.swapfile.chunkSize)
  135. reader := util.NewBytesReader(data)
  136. saveFn(reader, int64(sc.logicChunkIndex)*sc.swapfile.chunkSize+t.StartOffset, t.Size(), sc.lastModifiedTsNs, func() {
  137. })
  138. mem.Free(data)
  139. }
  140. sc.usage = newChunkWrittenIntervalList()
  141. }