You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

152 lines
4.5 KiB

  1. package page_writer
  2. import (
  3. "github.com/chrislusf/seaweedfs/weed/glog"
  4. "io"
  5. "os"
  6. "sync"
  7. )
  8. // ChunkedFileWriter assumes the write requests will come in within chunks
  9. type ChunkedFileWriter struct {
  10. dir string
  11. file *os.File
  12. logicToActualChunkIndex map[int]int
  13. chunkUsages []*ChunkWrittenIntervalList
  14. ChunkSize int64
  15. sync.Mutex
  16. }
  17. var _ = io.WriterAt(&ChunkedFileWriter{})
  18. func NewChunkedFileWriter(dir string, chunkSize int64) *ChunkedFileWriter {
  19. return &ChunkedFileWriter{
  20. dir: dir,
  21. file: nil,
  22. logicToActualChunkIndex: make(map[int]int),
  23. ChunkSize: chunkSize,
  24. }
  25. }
  26. func (cw *ChunkedFileWriter) WriteAt(p []byte, off int64) (n int, err error) {
  27. cw.Lock()
  28. defer cw.Unlock()
  29. if cw.file == nil {
  30. cw.file, err = os.CreateTemp(cw.dir, "")
  31. if err != nil {
  32. glog.Errorf("create temp file: %v", err)
  33. return
  34. }
  35. }
  36. actualOffset, chunkUsage := cw.toActualWriteOffset(off)
  37. n, err = cw.file.WriteAt(p, actualOffset)
  38. if err == nil {
  39. startOffset := off % cw.ChunkSize
  40. chunkUsage.MarkWritten(startOffset, startOffset+int64(n))
  41. }
  42. return
  43. }
  44. func (cw *ChunkedFileWriter) ReadDataAt(p []byte, off int64) (maxStop int64) {
  45. cw.Lock()
  46. defer cw.Unlock()
  47. if cw.file == nil {
  48. return
  49. }
  50. logicChunkIndex := off / cw.ChunkSize
  51. actualChunkIndex, chunkUsage := cw.toActualReadOffset(off)
  52. if chunkUsage != nil {
  53. for t := chunkUsage.head.next; t != chunkUsage.tail; t = t.next {
  54. logicStart := max(off, logicChunkIndex*cw.ChunkSize+t.startOffset)
  55. logicStop := min(off+int64(len(p)), logicChunkIndex*cw.ChunkSize+t.stopOffset)
  56. if logicStart < logicStop {
  57. actualStart := logicStart - logicChunkIndex*cw.ChunkSize + int64(actualChunkIndex)*cw.ChunkSize
  58. _, err := cw.file.ReadAt(p[logicStart-off:logicStop-off], actualStart)
  59. if err != nil {
  60. glog.Errorf("reading temp file: %v", err)
  61. break
  62. }
  63. maxStop = max(maxStop, logicStop)
  64. }
  65. }
  66. }
  67. return
  68. }
  69. func (cw *ChunkedFileWriter) toActualWriteOffset(logicOffset int64) (actualOffset int64, chunkUsage *ChunkWrittenIntervalList) {
  70. logicChunkIndex := int(logicOffset / cw.ChunkSize)
  71. offsetRemainder := logicOffset % cw.ChunkSize
  72. existingActualChunkIndex, found := cw.logicToActualChunkIndex[logicChunkIndex]
  73. if found {
  74. return int64(existingActualChunkIndex)*cw.ChunkSize + offsetRemainder, cw.chunkUsages[existingActualChunkIndex]
  75. }
  76. cw.logicToActualChunkIndex[logicChunkIndex] = len(cw.chunkUsages)
  77. chunkUsage = newChunkWrittenIntervalList()
  78. cw.chunkUsages = append(cw.chunkUsages, chunkUsage)
  79. return int64(len(cw.chunkUsages)-1)*cw.ChunkSize + offsetRemainder, chunkUsage
  80. }
  81. func (cw *ChunkedFileWriter) toActualReadOffset(logicOffset int64) (actualChunkIndex int, chunkUsage *ChunkWrittenIntervalList) {
  82. logicChunkIndex := int(logicOffset / cw.ChunkSize)
  83. existingActualChunkIndex, found := cw.logicToActualChunkIndex[logicChunkIndex]
  84. if found {
  85. return existingActualChunkIndex, cw.chunkUsages[existingActualChunkIndex]
  86. }
  87. return 0, nil
  88. }
  89. func (cw *ChunkedFileWriter) ProcessEachInterval(process func(file *os.File, logicChunkIndex int, interval *ChunkWrittenInterval)) {
  90. for logicChunkIndex, actualChunkIndex := range cw.logicToActualChunkIndex {
  91. chunkUsage := cw.chunkUsages[actualChunkIndex]
  92. for t := chunkUsage.head.next; t != chunkUsage.tail; t = t.next {
  93. process(cw.file, logicChunkIndex, t)
  94. }
  95. }
  96. }
  97. func (cw *ChunkedFileWriter) Destroy() {
  98. if cw.file != nil {
  99. cw.file.Close()
  100. os.Remove(cw.file.Name())
  101. }
  102. }
  103. type FileIntervalReader struct {
  104. f *os.File
  105. startOffset int64
  106. stopOffset int64
  107. position int64
  108. }
  109. var _ = io.Reader(&FileIntervalReader{})
  110. func NewFileIntervalReader(cw *ChunkedFileWriter, logicChunkIndex int, interval *ChunkWrittenInterval) *FileIntervalReader {
  111. actualChunkIndex, found := cw.logicToActualChunkIndex[logicChunkIndex]
  112. if !found {
  113. // this should never happen
  114. return nil
  115. }
  116. return &FileIntervalReader{
  117. f: cw.file,
  118. startOffset: int64(actualChunkIndex)*cw.ChunkSize + interval.startOffset,
  119. stopOffset: int64(actualChunkIndex)*cw.ChunkSize + interval.stopOffset,
  120. position: 0,
  121. }
  122. }
  123. func (fr *FileIntervalReader) Read(p []byte) (n int, err error) {
  124. readSize := minInt(len(p), int(fr.stopOffset-fr.startOffset-fr.position))
  125. n, err = fr.f.ReadAt(p[:readSize], fr.startOffset+fr.position)
  126. if err == nil || err == io.EOF {
  127. fr.position += int64(n)
  128. if fr.stopOffset-fr.startOffset-fr.position == 0 {
  129. // return a tiny bit faster
  130. err = io.EOF
  131. return
  132. }
  133. }
  134. return
  135. }