You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

162 lines
4.9 KiB

  1. package page_writer
  2. import (
  3. "github.com/chrislusf/seaweedfs/weed/glog"
  4. "io"
  5. "os"
  6. "sync"
  7. )
  8. type LogicChunkIndex int
  9. type ActualChunkIndex int
  10. // ChunkedFileWriter assumes the write requests will come in within chunks
  11. type ChunkedFileWriter struct {
  12. dir string
  13. file *os.File
  14. logicToActualChunkIndex map[LogicChunkIndex]ActualChunkIndex
  15. chunkUsages []*ChunkWrittenIntervalList
  16. ChunkSize int64
  17. sync.Mutex
  18. }
  19. var _ = io.WriterAt(&ChunkedFileWriter{})
  20. func NewChunkedFileWriter(dir string, chunkSize int64) *ChunkedFileWriter {
  21. return &ChunkedFileWriter{
  22. dir: dir,
  23. file: nil,
  24. logicToActualChunkIndex: make(map[LogicChunkIndex]ActualChunkIndex),
  25. ChunkSize: chunkSize,
  26. }
  27. }
  28. func (cw *ChunkedFileWriter) WriteAt(p []byte, off int64) (n int, err error) {
  29. cw.Lock()
  30. defer cw.Unlock()
  31. if cw.file == nil {
  32. cw.file, err = os.CreateTemp(cw.dir, "")
  33. if err != nil {
  34. glog.Errorf("create temp file: %v", err)
  35. return
  36. }
  37. }
  38. actualOffset, chunkUsage := cw.toActualWriteOffset(off)
  39. n, err = cw.file.WriteAt(p, actualOffset)
  40. if err == nil {
  41. startOffset := off % cw.ChunkSize
  42. chunkUsage.MarkWritten(startOffset, startOffset+int64(n))
  43. }
  44. return
  45. }
  46. func (cw *ChunkedFileWriter) ReadDataAt(p []byte, off int64) (maxStop int64) {
  47. cw.Lock()
  48. defer cw.Unlock()
  49. if cw.file == nil {
  50. return
  51. }
  52. logicChunkIndex := off / cw.ChunkSize
  53. actualChunkIndex, chunkUsage := cw.toActualReadOffset(off)
  54. if chunkUsage != nil {
  55. for t := chunkUsage.head.next; t != chunkUsage.tail; t = t.next {
  56. logicStart := max(off, logicChunkIndex*cw.ChunkSize+t.StartOffset)
  57. logicStop := min(off+int64(len(p)), logicChunkIndex*cw.ChunkSize+t.stopOffset)
  58. if logicStart < logicStop {
  59. actualStart := logicStart - logicChunkIndex*cw.ChunkSize + int64(actualChunkIndex)*cw.ChunkSize
  60. _, err := cw.file.ReadAt(p[logicStart-off:logicStop-off], actualStart)
  61. if err != nil {
  62. glog.Errorf("reading temp file: %v", err)
  63. break
  64. }
  65. maxStop = max(maxStop, logicStop)
  66. }
  67. }
  68. }
  69. return
  70. }
  71. func (cw *ChunkedFileWriter) toActualWriteOffset(logicOffset int64) (actualOffset int64, chunkUsage *ChunkWrittenIntervalList) {
  72. logicChunkIndex := LogicChunkIndex(logicOffset / cw.ChunkSize)
  73. offsetRemainder := logicOffset % cw.ChunkSize
  74. existingActualChunkIndex, found := cw.logicToActualChunkIndex[logicChunkIndex]
  75. if found {
  76. return int64(existingActualChunkIndex)*cw.ChunkSize + offsetRemainder, cw.chunkUsages[existingActualChunkIndex]
  77. }
  78. cw.logicToActualChunkIndex[logicChunkIndex] = ActualChunkIndex(len(cw.chunkUsages))
  79. chunkUsage = newChunkWrittenIntervalList()
  80. cw.chunkUsages = append(cw.chunkUsages, chunkUsage)
  81. return int64(len(cw.chunkUsages)-1)*cw.ChunkSize + offsetRemainder, chunkUsage
  82. }
  83. func (cw *ChunkedFileWriter) toActualReadOffset(logicOffset int64) (actualChunkIndex ActualChunkIndex, chunkUsage *ChunkWrittenIntervalList) {
  84. logicChunkIndex := LogicChunkIndex(logicOffset / cw.ChunkSize)
  85. existingActualChunkIndex, found := cw.logicToActualChunkIndex[logicChunkIndex]
  86. if found {
  87. return existingActualChunkIndex, cw.chunkUsages[existingActualChunkIndex]
  88. }
  89. return 0, nil
  90. }
  91. func (cw *ChunkedFileWriter) ProcessEachInterval(process func(file *os.File, logicChunkIndex LogicChunkIndex, interval *ChunkWrittenInterval)) {
  92. for logicChunkIndex, actualChunkIndex := range cw.logicToActualChunkIndex {
  93. chunkUsage := cw.chunkUsages[actualChunkIndex]
  94. for t := chunkUsage.head.next; t != chunkUsage.tail; t = t.next {
  95. if !t.flushed {
  96. process(cw.file, logicChunkIndex, t)
  97. }
  98. }
  99. }
  100. }
  101. // Destroy releases used resources
  102. func (cw *ChunkedFileWriter) Destroy() {
  103. if cw.file != nil {
  104. cw.file.Close()
  105. os.Remove(cw.file.Name())
  106. cw.file = nil
  107. }
  108. cw.logicToActualChunkIndex = make(map[LogicChunkIndex]ActualChunkIndex)
  109. cw.chunkUsages = cw.chunkUsages[:0]
  110. }
  111. type FileIntervalReader struct {
  112. f *os.File
  113. startOffset int64
  114. stopOffset int64
  115. position int64
  116. }
  117. var _ = io.Reader(&FileIntervalReader{})
  118. func NewFileIntervalReader(cw *ChunkedFileWriter, logicChunkIndex LogicChunkIndex, interval *ChunkWrittenInterval) *FileIntervalReader {
  119. actualChunkIndex, found := cw.logicToActualChunkIndex[logicChunkIndex]
  120. if !found {
  121. // this should never happen
  122. return nil
  123. }
  124. return &FileIntervalReader{
  125. f: cw.file,
  126. startOffset: int64(actualChunkIndex)*cw.ChunkSize + interval.StartOffset,
  127. stopOffset: int64(actualChunkIndex)*cw.ChunkSize + interval.stopOffset,
  128. position: 0,
  129. }
  130. }
  131. func (fr *FileIntervalReader) Read(p []byte) (n int, err error) {
  132. readSize := minInt(len(p), int(fr.stopOffset-fr.startOffset-fr.position))
  133. n, err = fr.f.ReadAt(p[:readSize], fr.startOffset+fr.position)
  134. if err == nil || err == io.EOF {
  135. fr.position += int64(n)
  136. if fr.stopOffset-fr.startOffset-fr.position == 0 {
  137. // return a tiny bit faster
  138. err = io.EOF
  139. return
  140. }
  141. }
  142. return
  143. }