You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

195 lines
6.0 KiB

3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
  1. package page_writer
  2. import (
  3. "fmt"
  4. "github.com/chrislusf/seaweedfs/weed/glog"
  5. "github.com/chrislusf/seaweedfs/weed/util"
  6. "github.com/chrislusf/seaweedfs/weed/util/mem"
  7. "sync"
  8. "sync/atomic"
  9. )
  10. type UploadPipeline struct {
  11. writableChunks map[LogicChunkIndex]*MemChunk
  12. writableChunksLock sync.Mutex
  13. sealedChunks map[LogicChunkIndex]*SealedChunk
  14. sealedChunksLock sync.Mutex
  15. ChunkSize int64
  16. writers *util.LimitedConcurrentExecutor
  17. activeWriterCond *sync.Cond
  18. activeWriterCount int32
  19. saveToStorageFn SaveToStorageFunc
  20. filepath util.FullPath
  21. }
  22. type SealedChunk struct {
  23. chunk *MemChunk
  24. referenceCounter int // track uploading or reading processes
  25. }
  26. func (sc *SealedChunk) FreeReference(messageOnFree string) {
  27. sc.referenceCounter--
  28. if sc.referenceCounter == 0 {
  29. glog.V(4).Infof("Free sealed chunk: %s", messageOnFree)
  30. mem.Free(sc.chunk.buf)
  31. }
  32. }
  33. func NewUploadPipeline(filepath util.FullPath, writers *util.LimitedConcurrentExecutor, chunkSize int64, saveToStorageFn SaveToStorageFunc) *UploadPipeline {
  34. return &UploadPipeline{
  35. ChunkSize: chunkSize,
  36. writableChunks: make(map[LogicChunkIndex]*MemChunk),
  37. sealedChunks: make(map[LogicChunkIndex]*SealedChunk),
  38. writers: writers,
  39. activeWriterCond: sync.NewCond(&sync.Mutex{}),
  40. saveToStorageFn: saveToStorageFn,
  41. filepath: filepath,
  42. }
  43. }
  44. func (cw *UploadPipeline) SaveDataAt(p []byte, off int64) (n int) {
  45. cw.writableChunksLock.Lock()
  46. defer cw.writableChunksLock.Unlock()
  47. logicChunkIndex := LogicChunkIndex(off / cw.ChunkSize)
  48. offsetRemainder := off % cw.ChunkSize
  49. memChunk, found := cw.writableChunks[logicChunkIndex]
  50. if !found {
  51. memChunk = &MemChunk{
  52. buf: mem.Allocate(int(cw.ChunkSize)),
  53. usage: newChunkWrittenIntervalList(),
  54. }
  55. cw.writableChunks[logicChunkIndex] = memChunk
  56. }
  57. n = copy(memChunk.buf[offsetRemainder:], p)
  58. memChunk.usage.MarkWritten(offsetRemainder, offsetRemainder+int64(n))
  59. cw.maybeMoveToSealed(memChunk, logicChunkIndex)
  60. return
  61. }
  62. func (cw *UploadPipeline) MaybeReadDataAt(p []byte, off int64) (maxStop int64) {
  63. logicChunkIndex := LogicChunkIndex(off / cw.ChunkSize)
  64. // read from sealed chunks first
  65. cw.sealedChunksLock.Lock()
  66. sealedChunk, found := cw.sealedChunks[logicChunkIndex]
  67. if found {
  68. sealedChunk.referenceCounter++
  69. }
  70. cw.sealedChunksLock.Unlock()
  71. if found {
  72. maxStop = readMemChunk(sealedChunk.chunk, p, off, logicChunkIndex, cw.ChunkSize)
  73. sealedChunk.FreeReference(fmt.Sprintf("%s finish reading chunk %d", cw.filepath, logicChunkIndex))
  74. }
  75. // read from writable chunks last
  76. cw.writableChunksLock.Lock()
  77. defer cw.writableChunksLock.Unlock()
  78. writableChunk, found := cw.writableChunks[logicChunkIndex]
  79. if !found {
  80. return
  81. }
  82. maxStop = max(maxStop, readMemChunk(writableChunk, p, off, logicChunkIndex, cw.ChunkSize))
  83. return
  84. }
  85. func (cw *UploadPipeline) FlushAll() {
  86. cw.writableChunksLock.Lock()
  87. defer cw.writableChunksLock.Unlock()
  88. for logicChunkIndex, memChunk := range cw.writableChunks {
  89. cw.moveToSealed(memChunk, logicChunkIndex)
  90. }
  91. cw.waitForCurrentWritersToComplete()
  92. }
  93. func (cw *UploadPipeline) waitForCurrentWritersToComplete() {
  94. cw.activeWriterCond.L.Lock()
  95. t := int32(100)
  96. for {
  97. t = atomic.LoadInt32(&cw.activeWriterCount)
  98. if t <= 0 {
  99. break
  100. }
  101. glog.V(4).Infof("activeWriterCond is %d", t)
  102. cw.activeWriterCond.Wait()
  103. }
  104. cw.activeWriterCond.L.Unlock()
  105. }
  106. func (cw *UploadPipeline) maybeMoveToSealed(memChunk *MemChunk, logicChunkIndex LogicChunkIndex) {
  107. if memChunk.usage.IsComplete(cw.ChunkSize) {
  108. cw.moveToSealed(memChunk, logicChunkIndex)
  109. }
  110. }
  111. func (cw *UploadPipeline) moveToSealed(memChunk *MemChunk, logicChunkIndex LogicChunkIndex) {
  112. atomic.AddInt32(&cw.activeWriterCount, 1)
  113. glog.V(4).Infof("%s activeWriterCount %d ++> %d", cw.filepath, cw.activeWriterCount-1, cw.activeWriterCount)
  114. cw.sealedChunksLock.Lock()
  115. if oldMemChunk, found := cw.sealedChunks[logicChunkIndex]; found {
  116. oldMemChunk.FreeReference(fmt.Sprintf("%s replace chunk %d", cw.filepath, logicChunkIndex))
  117. }
  118. sealedChunk := &SealedChunk{
  119. chunk: memChunk,
  120. referenceCounter: 1, // default 1 is for uploading process
  121. }
  122. cw.sealedChunks[logicChunkIndex] = sealedChunk
  123. delete(cw.writableChunks, logicChunkIndex)
  124. cw.sealedChunksLock.Unlock()
  125. cw.writers.Execute(func() {
  126. // first add to the file chunks
  127. cw.saveOneChunk(sealedChunk.chunk, logicChunkIndex)
  128. // then remove from sealed chunks
  129. cw.sealedChunksLock.Lock()
  130. defer cw.sealedChunksLock.Unlock()
  131. delete(cw.sealedChunks, logicChunkIndex)
  132. sealedChunk.FreeReference(fmt.Sprintf("%s finished uploading chunk %d", cw.filepath, logicChunkIndex))
  133. atomic.AddInt32(&cw.activeWriterCount, -1)
  134. glog.V(4).Infof("%s activeWriterCount %d --> %d", cw.filepath, cw.activeWriterCount+1, cw.activeWriterCount)
  135. // Lock and Unlock are not required,
  136. // but it may signal multiple times during one wakeup,
  137. // and the waiting goroutine may miss some of them!
  138. cw.activeWriterCond.L.Lock()
  139. cw.activeWriterCond.Broadcast()
  140. cw.activeWriterCond.L.Unlock()
  141. })
  142. }
  143. func (cw *UploadPipeline) saveOneChunk(memChunk *MemChunk, logicChunkIndex LogicChunkIndex) {
  144. if cw.saveToStorageFn == nil {
  145. return
  146. }
  147. for t := memChunk.usage.head.next; t != memChunk.usage.tail; t = t.next {
  148. reader := util.NewBytesReader(memChunk.buf[t.StartOffset:t.stopOffset])
  149. cw.saveToStorageFn(reader, int64(logicChunkIndex)*cw.ChunkSize+t.StartOffset, t.Size(), func() {
  150. })
  151. }
  152. }
  153. func readMemChunk(memChunk *MemChunk, p []byte, off int64, logicChunkIndex LogicChunkIndex, chunkSize int64) (maxStop int64) {
  154. memChunkBaseOffset := int64(logicChunkIndex) * chunkSize
  155. for t := memChunk.usage.head.next; t != memChunk.usage.tail; t = t.next {
  156. logicStart := max(off, int64(logicChunkIndex)*chunkSize+t.StartOffset)
  157. logicStop := min(off+int64(len(p)), memChunkBaseOffset+t.stopOffset)
  158. if logicStart < logicStop {
  159. copy(p[logicStart-off:logicStop-off], memChunk.buf[logicStart-memChunkBaseOffset:logicStop-memChunkBaseOffset])
  160. maxStop = max(maxStop, logicStop)
  161. }
  162. }
  163. return
  164. }
  165. func (p2 *UploadPipeline) Shutdown() {
  166. }