You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

159 lines
4.8 KiB

  1. package page_writer
  2. import (
  3. "github.com/chrislusf/seaweedfs/weed/glog"
  4. "io"
  5. "os"
  6. "sync"
  7. )
  8. type ActualChunkIndex int
  9. // ChunkedFileWriter assumes the write requests will come in within chunks
  10. type ChunkedFileWriter struct {
  11. dir string
  12. file *os.File
  13. logicToActualChunkIndex map[LogicChunkIndex]ActualChunkIndex
  14. chunkUsages []*ChunkWrittenIntervalList
  15. ChunkSize int64
  16. sync.Mutex
  17. }
  18. var _ = io.WriterAt(&ChunkedFileWriter{})
  19. func NewChunkedFileWriter(dir string, chunkSize int64) *ChunkedFileWriter {
  20. return &ChunkedFileWriter{
  21. dir: dir,
  22. file: nil,
  23. logicToActualChunkIndex: make(map[LogicChunkIndex]ActualChunkIndex),
  24. ChunkSize: chunkSize,
  25. }
  26. }
  27. func (cw *ChunkedFileWriter) WriteAt(p []byte, off int64) (n int, err error) {
  28. cw.Lock()
  29. defer cw.Unlock()
  30. if cw.file == nil {
  31. cw.file, err = os.CreateTemp(cw.dir, "")
  32. if err != nil {
  33. glog.Errorf("create temp file: %v", err)
  34. return
  35. }
  36. }
  37. actualOffset, chunkUsage := cw.toActualWriteOffset(off)
  38. n, err = cw.file.WriteAt(p, actualOffset)
  39. if err == nil {
  40. startOffset := off % cw.ChunkSize
  41. chunkUsage.MarkWritten(startOffset, startOffset+int64(n))
  42. }
  43. return
  44. }
  45. func (cw *ChunkedFileWriter) ReadDataAt(p []byte, off int64) (maxStop int64) {
  46. cw.Lock()
  47. defer cw.Unlock()
  48. if cw.file == nil {
  49. return
  50. }
  51. logicChunkIndex := off / cw.ChunkSize
  52. actualChunkIndex, chunkUsage := cw.toActualReadOffset(off)
  53. if chunkUsage != nil {
  54. for t := chunkUsage.head.next; t != chunkUsage.tail; t = t.next {
  55. logicStart := max(off, logicChunkIndex*cw.ChunkSize+t.StartOffset)
  56. logicStop := min(off+int64(len(p)), logicChunkIndex*cw.ChunkSize+t.stopOffset)
  57. if logicStart < logicStop {
  58. actualStart := logicStart - logicChunkIndex*cw.ChunkSize + int64(actualChunkIndex)*cw.ChunkSize
  59. _, err := cw.file.ReadAt(p[logicStart-off:logicStop-off], actualStart)
  60. if err != nil {
  61. glog.Errorf("reading temp file: %v", err)
  62. break
  63. }
  64. maxStop = max(maxStop, logicStop)
  65. }
  66. }
  67. }
  68. return
  69. }
  70. func (cw *ChunkedFileWriter) toActualWriteOffset(logicOffset int64) (actualOffset int64, chunkUsage *ChunkWrittenIntervalList) {
  71. logicChunkIndex := LogicChunkIndex(logicOffset / cw.ChunkSize)
  72. offsetRemainder := logicOffset % cw.ChunkSize
  73. existingActualChunkIndex, found := cw.logicToActualChunkIndex[logicChunkIndex]
  74. if found {
  75. return int64(existingActualChunkIndex)*cw.ChunkSize + offsetRemainder, cw.chunkUsages[existingActualChunkIndex]
  76. }
  77. cw.logicToActualChunkIndex[logicChunkIndex] = ActualChunkIndex(len(cw.chunkUsages))
  78. chunkUsage = newChunkWrittenIntervalList()
  79. cw.chunkUsages = append(cw.chunkUsages, chunkUsage)
  80. return int64(len(cw.chunkUsages)-1)*cw.ChunkSize + offsetRemainder, chunkUsage
  81. }
  82. func (cw *ChunkedFileWriter) toActualReadOffset(logicOffset int64) (actualChunkIndex ActualChunkIndex, chunkUsage *ChunkWrittenIntervalList) {
  83. logicChunkIndex := LogicChunkIndex(logicOffset / cw.ChunkSize)
  84. existingActualChunkIndex, found := cw.logicToActualChunkIndex[logicChunkIndex]
  85. if found {
  86. return existingActualChunkIndex, cw.chunkUsages[existingActualChunkIndex]
  87. }
  88. return 0, nil
  89. }
  90. func (cw *ChunkedFileWriter) ProcessEachInterval(process func(file *os.File, logicChunkIndex LogicChunkIndex, interval *ChunkWrittenInterval)) {
  91. for logicChunkIndex, actualChunkIndex := range cw.logicToActualChunkIndex {
  92. chunkUsage := cw.chunkUsages[actualChunkIndex]
  93. for t := chunkUsage.head.next; t != chunkUsage.tail; t = t.next {
  94. process(cw.file, logicChunkIndex, t)
  95. }
  96. }
  97. }
  98. // Reset releases used resources
  99. func (cw *ChunkedFileWriter) Reset() {
  100. if cw.file != nil {
  101. cw.file.Close()
  102. os.Remove(cw.file.Name())
  103. cw.file = nil
  104. }
  105. cw.logicToActualChunkIndex = make(map[LogicChunkIndex]ActualChunkIndex)
  106. cw.chunkUsages = cw.chunkUsages[:0]
  107. }
  108. type FileIntervalReader struct {
  109. f *os.File
  110. startOffset int64
  111. stopOffset int64
  112. position int64
  113. }
  114. var _ = io.Reader(&FileIntervalReader{})
  115. func NewFileIntervalReader(cw *ChunkedFileWriter, logicChunkIndex LogicChunkIndex, interval *ChunkWrittenInterval) *FileIntervalReader {
  116. actualChunkIndex, found := cw.logicToActualChunkIndex[logicChunkIndex]
  117. if !found {
  118. // this should never happen
  119. return nil
  120. }
  121. return &FileIntervalReader{
  122. f: cw.file,
  123. startOffset: int64(actualChunkIndex)*cw.ChunkSize + interval.StartOffset,
  124. stopOffset: int64(actualChunkIndex)*cw.ChunkSize + interval.stopOffset,
  125. position: 0,
  126. }
  127. }
  128. func (fr *FileIntervalReader) Read(p []byte) (n int, err error) {
  129. readSize := minInt(len(p), int(fr.stopOffset-fr.startOffset-fr.position))
  130. n, err = fr.f.ReadAt(p[:readSize], fr.startOffset+fr.position)
  131. if err == nil || err == io.EOF {
  132. fr.position += int64(n)
  133. if fr.stopOffset-fr.startOffset-fr.position == 0 {
  134. // return a tiny bit faster
  135. err = io.EOF
  136. return
  137. }
  138. }
  139. return
  140. }