You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

119 lines
3.3 KiB

  1. package page_writer
  2. import (
  3. "github.com/chrislusf/seaweedfs/weed/util"
  4. "github.com/chrislusf/seaweedfs/weed/util/mem"
  5. "io"
  6. "sync"
  7. "sync/atomic"
  8. )
  9. type SaveToStorageFunc func(reader io.Reader, offset int64, size int64, cleanupFn func())
  10. // ChunkedStreamWriter assumes the write requests will come in within chunks and in streaming mode
  11. type ChunkedStreamWriter struct {
  12. activeChunks map[LogicChunkIndex]*MemChunk
  13. activeChunksLock sync.Mutex
  14. ChunkSize int64
  15. saveToStorageFn SaveToStorageFunc
  16. sync.Mutex
  17. }
  18. type MemChunk struct {
  19. buf []byte
  20. usage *ChunkWrittenIntervalList
  21. }
  22. var _ = io.WriterAt(&ChunkedStreamWriter{})
  23. func NewChunkedStreamWriter(chunkSize int64) *ChunkedStreamWriter {
  24. return &ChunkedStreamWriter{
  25. ChunkSize: chunkSize,
  26. activeChunks: make(map[LogicChunkIndex]*MemChunk),
  27. }
  28. }
  29. func (cw *ChunkedStreamWriter) SetSaveToStorageFunction(saveToStorageFn SaveToStorageFunc) {
  30. cw.saveToStorageFn = saveToStorageFn
  31. }
  32. func (cw *ChunkedStreamWriter) WriteAt(p []byte, off int64) (n int, err error) {
  33. cw.Lock()
  34. defer cw.Unlock()
  35. logicChunkIndex := LogicChunkIndex(off / cw.ChunkSize)
  36. offsetRemainder := off % cw.ChunkSize
  37. memChunk, found := cw.activeChunks[logicChunkIndex]
  38. if !found {
  39. memChunk = &MemChunk{
  40. buf: mem.Allocate(int(cw.ChunkSize)),
  41. usage: newChunkWrittenIntervalList(),
  42. }
  43. cw.activeChunks[logicChunkIndex] = memChunk
  44. }
  45. n = copy(memChunk.buf[offsetRemainder:], p)
  46. memChunk.usage.MarkWritten(offsetRemainder, offsetRemainder+int64(n))
  47. if memChunk.usage.IsComplete(cw.ChunkSize) {
  48. if cw.saveToStorageFn != nil {
  49. cw.saveOneChunk(memChunk, logicChunkIndex)
  50. delete(cw.activeChunks, logicChunkIndex)
  51. }
  52. }
  53. return
  54. }
  55. func (cw *ChunkedStreamWriter) ReadDataAt(p []byte, off int64) (maxStop int64) {
  56. cw.Lock()
  57. defer cw.Unlock()
  58. logicChunkIndex := LogicChunkIndex(off / cw.ChunkSize)
  59. memChunkBaseOffset := int64(logicChunkIndex) * cw.ChunkSize
  60. memChunk, found := cw.activeChunks[logicChunkIndex]
  61. if !found {
  62. return
  63. }
  64. for t := memChunk.usage.head.next; t != memChunk.usage.tail; t = t.next {
  65. logicStart := max(off, int64(logicChunkIndex)*cw.ChunkSize+t.StartOffset)
  66. logicStop := min(off+int64(len(p)), memChunkBaseOffset+t.stopOffset)
  67. if logicStart < logicStop {
  68. copy(p[logicStart-off:logicStop-off], memChunk.buf[logicStart-memChunkBaseOffset:logicStop-memChunkBaseOffset])
  69. maxStop = max(maxStop, logicStop)
  70. }
  71. }
  72. return
  73. }
  74. func (cw *ChunkedStreamWriter) FlushAll() {
  75. cw.Lock()
  76. defer cw.Unlock()
  77. for logicChunkIndex, memChunk := range cw.activeChunks {
  78. if cw.saveToStorageFn != nil {
  79. cw.saveOneChunk(memChunk, logicChunkIndex)
  80. delete(cw.activeChunks, logicChunkIndex)
  81. }
  82. }
  83. }
  84. func (cw *ChunkedStreamWriter) saveOneChunk(memChunk *MemChunk, logicChunkIndex LogicChunkIndex) {
  85. var referenceCounter = int32(memChunk.usage.size())
  86. for t := memChunk.usage.head.next; t != memChunk.usage.tail; t = t.next {
  87. reader := util.NewBytesReader(memChunk.buf[t.StartOffset:t.stopOffset])
  88. cw.saveToStorageFn(reader, int64(logicChunkIndex)*cw.ChunkSize+t.StartOffset, t.Size(), func() {
  89. atomic.AddInt32(&referenceCounter, -1)
  90. if atomic.LoadInt32(&referenceCounter) == 0 {
  91. mem.Free(memChunk.buf)
  92. }
  93. })
  94. }
  95. }
  96. // Reset releases used resources
  97. func (cw *ChunkedStreamWriter) Reset() {
  98. for t, memChunk := range cw.activeChunks {
  99. mem.Free(memChunk.buf)
  100. delete(cw.activeChunks, t)
  101. }
  102. }