You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

198 lines
6.1 KiB

3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
  1. package page_writer
  2. import (
  3. "fmt"
  4. "github.com/seaweedfs/seaweedfs/weed/glog"
  5. "github.com/seaweedfs/seaweedfs/weed/util"
  6. "sync"
  7. "sync/atomic"
  8. )
  9. type LogicChunkIndex int
  10. type UploadPipeline struct {
  11. uploaderCount int32
  12. uploaderCountCond *sync.Cond
  13. filepath util.FullPath
  14. ChunkSize int64
  15. uploaders *util.LimitedConcurrentExecutor
  16. saveToStorageFn SaveToStorageFunc
  17. writableChunkLimit int
  18. swapFile *SwapFile
  19. chunksLock sync.Mutex
  20. writableChunks map[LogicChunkIndex]PageChunk
  21. sealedChunks map[LogicChunkIndex]*SealedChunk
  22. activeReadChunks map[LogicChunkIndex]int
  23. readerCountCond *sync.Cond
  24. }
  25. type SealedChunk struct {
  26. chunk PageChunk
  27. referenceCounter int // track uploading or reading processes
  28. }
  29. func (sc *SealedChunk) FreeReference(messageOnFree string) {
  30. sc.referenceCounter--
  31. if sc.referenceCounter == 0 {
  32. glog.V(4).Infof("Free sealed chunk: %s", messageOnFree)
  33. sc.chunk.FreeResource()
  34. }
  35. }
  36. func NewUploadPipeline(writers *util.LimitedConcurrentExecutor, chunkSize int64, saveToStorageFn SaveToStorageFunc, bufferChunkLimit int, swapFileDir string) *UploadPipeline {
  37. t := &UploadPipeline{
  38. ChunkSize: chunkSize,
  39. writableChunks: make(map[LogicChunkIndex]PageChunk),
  40. sealedChunks: make(map[LogicChunkIndex]*SealedChunk),
  41. uploaders: writers,
  42. uploaderCountCond: sync.NewCond(&sync.Mutex{}),
  43. saveToStorageFn: saveToStorageFn,
  44. activeReadChunks: make(map[LogicChunkIndex]int),
  45. writableChunkLimit: bufferChunkLimit,
  46. swapFile: NewSwapFile(swapFileDir, chunkSize),
  47. }
  48. t.readerCountCond = sync.NewCond(&t.chunksLock)
  49. return t
  50. }
  51. func (up *UploadPipeline) SaveDataAt(p []byte, off int64, isSequential bool) (n int) {
  52. up.chunksLock.Lock()
  53. defer up.chunksLock.Unlock()
  54. logicChunkIndex := LogicChunkIndex(off / up.ChunkSize)
  55. pageChunk, found := up.writableChunks[logicChunkIndex]
  56. if !found {
  57. if len(up.writableChunks) > up.writableChunkLimit {
  58. // if current file chunks is over the per file buffer count limit
  59. fullestChunkIndex, fullness := LogicChunkIndex(-1), int64(0)
  60. for lci, mc := range up.writableChunks {
  61. chunkFullness := mc.WrittenSize()
  62. if fullness < chunkFullness {
  63. fullestChunkIndex = lci
  64. fullness = chunkFullness
  65. }
  66. }
  67. fullWritableChunk := up.writableChunks[fullestChunkIndex]
  68. delete(up.writableChunks, fullestChunkIndex)
  69. up.moveToSealed(fullWritableChunk, fullestChunkIndex)
  70. // fmt.Printf("flush chunk %d with %d bytes written\n", logicChunkIndex, fullness)
  71. }
  72. if isSequential &&
  73. len(up.writableChunks) < up.writableChunkLimit &&
  74. atomic.LoadInt64(&memChunkCounter) < 4*int64(up.writableChunkLimit) {
  75. pageChunk = NewMemChunk(logicChunkIndex, up.ChunkSize)
  76. } else {
  77. pageChunk = up.swapFile.NewTempFileChunk(logicChunkIndex)
  78. }
  79. up.writableChunks[logicChunkIndex] = pageChunk
  80. }
  81. n = pageChunk.WriteDataAt(p, off)
  82. up.maybeMoveToSealed(pageChunk, logicChunkIndex)
  83. return
  84. }
  85. func (up *UploadPipeline) MaybeReadDataAt(p []byte, off int64) (maxStop int64) {
  86. logicChunkIndex := LogicChunkIndex(off / up.ChunkSize)
  87. up.chunksLock.Lock()
  88. defer func() {
  89. up.readerCountCond.Signal()
  90. up.chunksLock.Unlock()
  91. }()
  92. // read from sealed chunks first
  93. sealedChunk, found := up.sealedChunks[logicChunkIndex]
  94. if found {
  95. sealedChunk.referenceCounter++
  96. }
  97. if found {
  98. maxStop = sealedChunk.chunk.ReadDataAt(p, off)
  99. glog.V(4).Infof("%s read sealed memchunk [%d,%d)", up.filepath, off, maxStop)
  100. sealedChunk.FreeReference(fmt.Sprintf("%s finish reading chunk %d", up.filepath, logicChunkIndex))
  101. }
  102. // read from writable chunks last
  103. writableChunk, found := up.writableChunks[logicChunkIndex]
  104. if !found {
  105. return
  106. }
  107. writableMaxStop := writableChunk.ReadDataAt(p, off)
  108. glog.V(4).Infof("%s read writable memchunk [%d,%d)", up.filepath, off, writableMaxStop)
  109. maxStop = max(maxStop, writableMaxStop)
  110. return
  111. }
  112. func (up *UploadPipeline) FlushAll() {
  113. up.chunksLock.Lock()
  114. defer up.chunksLock.Unlock()
  115. for logicChunkIndex, memChunk := range up.writableChunks {
  116. up.moveToSealed(memChunk, logicChunkIndex)
  117. }
  118. up.waitForCurrentWritersToComplete()
  119. }
  120. func (up *UploadPipeline) maybeMoveToSealed(memChunk PageChunk, logicChunkIndex LogicChunkIndex) {
  121. if memChunk.IsComplete() {
  122. up.moveToSealed(memChunk, logicChunkIndex)
  123. }
  124. }
  125. func (up *UploadPipeline) moveToSealed(memChunk PageChunk, logicChunkIndex LogicChunkIndex) {
  126. atomic.AddInt32(&up.uploaderCount, 1)
  127. glog.V(4).Infof("%s uploaderCount %d ++> %d", up.filepath, up.uploaderCount-1, up.uploaderCount)
  128. if oldMemChunk, found := up.sealedChunks[logicChunkIndex]; found {
  129. oldMemChunk.FreeReference(fmt.Sprintf("%s replace chunk %d", up.filepath, logicChunkIndex))
  130. }
  131. sealedChunk := &SealedChunk{
  132. chunk: memChunk,
  133. referenceCounter: 1, // default 1 is for uploading process
  134. }
  135. up.sealedChunks[logicChunkIndex] = sealedChunk
  136. delete(up.writableChunks, logicChunkIndex)
  137. // unlock before submitting the uploading jobs
  138. up.chunksLock.Unlock()
  139. up.uploaders.Execute(func() {
  140. // first add to the file chunks
  141. sealedChunk.chunk.SaveContent(up.saveToStorageFn)
  142. // notify waiting process
  143. atomic.AddInt32(&up.uploaderCount, -1)
  144. glog.V(4).Infof("%s uploaderCount %d --> %d", up.filepath, up.uploaderCount+1, up.uploaderCount)
  145. // Lock and Unlock are not required,
  146. // but it may signal multiple times during one wakeup,
  147. // and the waiting goroutine may miss some of them!
  148. up.uploaderCountCond.L.Lock()
  149. up.uploaderCountCond.Broadcast()
  150. up.uploaderCountCond.L.Unlock()
  151. // wait for readers
  152. up.chunksLock.Lock()
  153. defer up.chunksLock.Unlock()
  154. for up.IsLocked(logicChunkIndex) {
  155. up.readerCountCond.Wait()
  156. }
  157. // then remove from sealed chunks
  158. delete(up.sealedChunks, logicChunkIndex)
  159. sealedChunk.FreeReference(fmt.Sprintf("%s finished uploading chunk %d", up.filepath, logicChunkIndex))
  160. })
  161. up.chunksLock.Lock()
  162. }
  163. func (up *UploadPipeline) Shutdown() {
  164. up.swapFile.FreeResource()
  165. up.chunksLock.Lock()
  166. defer up.chunksLock.Unlock()
  167. for logicChunkIndex, sealedChunk := range up.sealedChunks {
  168. sealedChunk.FreeReference(fmt.Sprintf("%s uploadpipeline shutdown chunk %d", up.filepath, logicChunkIndex))
  169. }
  170. }