You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

124 lines
3.4 KiB

3 years ago
  1. package page_writer
  2. import (
  3. "github.com/chrislusf/seaweedfs/weed/util"
  4. "github.com/chrislusf/seaweedfs/weed/util/mem"
  5. "io"
  6. "sync"
  7. "sync/atomic"
  8. )
  9. type SaveToStorageFunc func(reader io.Reader, offset int64, size int64, cleanupFn func())
  10. // ChunkedStreamWriter assumes the write requests will come in within chunks and in streaming mode
  11. type ChunkedStreamWriter struct {
  12. activeChunks map[LogicChunkIndex]*MemChunk
  13. activeChunksLock sync.Mutex
  14. ChunkSize int64
  15. saveToStorageFn SaveToStorageFunc
  16. sync.Mutex
  17. }
  18. type MemChunk struct {
  19. buf []byte
  20. usage *ChunkWrittenIntervalList
  21. }
  22. var _ = io.WriterAt(&ChunkedStreamWriter{})
  23. func NewChunkedStreamWriter(chunkSize int64) *ChunkedStreamWriter {
  24. return &ChunkedStreamWriter{
  25. ChunkSize: chunkSize,
  26. activeChunks: make(map[LogicChunkIndex]*MemChunk),
  27. }
  28. }
  29. func (cw *ChunkedStreamWriter) SetSaveToStorageFunction(saveToStorageFn SaveToStorageFunc) {
  30. cw.saveToStorageFn = saveToStorageFn
  31. }
  32. func (cw *ChunkedStreamWriter) WriteAt(p []byte, off int64) (n int, err error) {
  33. cw.Lock()
  34. defer cw.Unlock()
  35. logicChunkIndex := LogicChunkIndex(off / cw.ChunkSize)
  36. offsetRemainder := off % cw.ChunkSize
  37. memChunk, found := cw.activeChunks[logicChunkIndex]
  38. if !found {
  39. memChunk = &MemChunk{
  40. buf: mem.Allocate(int(cw.ChunkSize)),
  41. usage: newChunkWrittenIntervalList(),
  42. }
  43. cw.activeChunks[logicChunkIndex] = memChunk
  44. }
  45. n = copy(memChunk.buf[offsetRemainder:], p)
  46. memChunk.usage.MarkWritten(offsetRemainder, offsetRemainder+int64(n))
  47. if memChunk.usage.IsComplete(cw.ChunkSize) {
  48. if cw.saveToStorageFn != nil {
  49. cw.saveOneChunk(memChunk, logicChunkIndex)
  50. }
  51. }
  52. return
  53. }
  54. func (cw *ChunkedStreamWriter) ReadDataAt(p []byte, off int64) (maxStop int64) {
  55. cw.Lock()
  56. defer cw.Unlock()
  57. logicChunkIndex := LogicChunkIndex(off / cw.ChunkSize)
  58. memChunkBaseOffset := int64(logicChunkIndex) * cw.ChunkSize
  59. memChunk, found := cw.activeChunks[logicChunkIndex]
  60. if !found {
  61. return
  62. }
  63. for t := memChunk.usage.head.next; t != memChunk.usage.tail; t = t.next {
  64. logicStart := max(off, int64(logicChunkIndex)*cw.ChunkSize+t.StartOffset)
  65. logicStop := min(off+int64(len(p)), memChunkBaseOffset+t.stopOffset)
  66. if logicStart < logicStop {
  67. copy(p[logicStart-off:logicStop-off], memChunk.buf[logicStart-memChunkBaseOffset:logicStop-memChunkBaseOffset])
  68. maxStop = max(maxStop, logicStop)
  69. CheckByteZero("stream writer read", p, logicStart-off, logicStop-off)
  70. }
  71. }
  72. return
  73. }
  74. func (cw *ChunkedStreamWriter) FlushAll() {
  75. cw.Lock()
  76. defer cw.Unlock()
  77. for logicChunkIndex, memChunk := range cw.activeChunks {
  78. if cw.saveToStorageFn != nil {
  79. cw.saveOneChunk(memChunk, logicChunkIndex)
  80. }
  81. }
  82. }
  83. func (cw *ChunkedStreamWriter) saveOneChunk(memChunk *MemChunk, logicChunkIndex LogicChunkIndex) {
  84. var referenceCounter = int32(memChunk.usage.size())
  85. for t := memChunk.usage.head.next; t != memChunk.usage.tail; t = t.next {
  86. reader := util.NewBytesReader(memChunk.buf[t.StartOffset:t.stopOffset])
  87. cw.saveToStorageFn(reader, int64(logicChunkIndex)*cw.ChunkSize+t.StartOffset, t.Size(), func() {
  88. atomic.AddInt32(&referenceCounter, -1)
  89. if atomic.LoadInt32(&referenceCounter) == 0 {
  90. mem.Free(memChunk.buf)
  91. delete(cw.activeChunks, logicChunkIndex)
  92. }
  93. })
  94. }
  95. }
  96. // Destroy releases used resources
  97. func (cw *ChunkedStreamWriter) Destroy() {
  98. cw.Lock()
  99. defer cw.Unlock()
  100. for t, memChunk := range cw.activeChunks {
  101. mem.Free(memChunk.buf)
  102. delete(cw.activeChunks, t)
  103. }
  104. }