You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

269 lines
7.7 KiB

4 years ago
  1. package filesys
  2. import (
  3. "github.com/chrislusf/seaweedfs/weed/glog"
  4. "io"
  5. "os"
  6. )
  7. type WrittenIntervalNode struct {
  8. DataOffset int64
  9. TempOffset int64
  10. Size int64
  11. Next *WrittenIntervalNode
  12. }
  13. type WrittenIntervalLinkedList struct {
  14. tempFile *os.File
  15. Head *WrittenIntervalNode
  16. Tail *WrittenIntervalNode
  17. }
  18. type WrittenContinuousIntervals struct {
  19. tempFile *os.File
  20. lists []*WrittenIntervalLinkedList
  21. }
  22. func (list *WrittenIntervalLinkedList) Offset() int64 {
  23. return list.Head.DataOffset
  24. }
  25. func (list *WrittenIntervalLinkedList) Size() int64 {
  26. return list.Tail.DataOffset + list.Tail.Size - list.Head.DataOffset
  27. }
  28. func (list *WrittenIntervalLinkedList) addNodeToTail(node *WrittenIntervalNode) {
  29. // glog.V(4).Infof("add to tail [%d,%d) + [%d,%d) => [%d,%d)", list.Head.Offset, list.Tail.Offset+list.Tail.Size, node.Offset, node.Offset+node.Size, list.Head.Offset, node.Offset+node.Size)
  30. if list.Tail.TempOffset+list.Tail.Size == node.TempOffset {
  31. // already connected
  32. list.Tail.Size += node.Size
  33. } else {
  34. list.Tail.Next = node
  35. list.Tail = node
  36. }
  37. }
  38. func (list *WrittenIntervalLinkedList) addNodeToHead(node *WrittenIntervalNode) {
  39. // glog.V(4).Infof("add to head [%d,%d) + [%d,%d) => [%d,%d)", node.Offset, node.Offset+node.Size, list.Head.Offset, list.Tail.Offset+list.Tail.Size, node.Offset, list.Tail.Offset+list.Tail.Size)
  40. node.Next = list.Head
  41. list.Head = node
  42. }
  43. func (list *WrittenIntervalLinkedList) ReadData(buf []byte, start, stop int64) {
  44. t := list.Head
  45. for {
  46. nodeStart, nodeStop := max(start, t.DataOffset), min(stop, t.DataOffset+t.Size)
  47. if nodeStart < nodeStop {
  48. // glog.V(0).Infof("copying start=%d stop=%d t=[%d,%d) t.data=%d => bufSize=%d nodeStart=%d, nodeStop=%d", start, stop, t.Offset, t.Offset+t.Size, len(t.Data), len(buf), nodeStart, nodeStop)
  49. list.tempFile.ReadAt(buf[nodeStart-start:nodeStop-start], nodeStart-t.TempOffset)
  50. }
  51. if t.Next == nil {
  52. break
  53. }
  54. t = t.Next
  55. }
  56. }
  57. func (c *WrittenContinuousIntervals) TotalSize() (total int64) {
  58. for _, list := range c.lists {
  59. total += list.Size()
  60. }
  61. return
  62. }
  63. func (list *WrittenIntervalLinkedList) subList(start, stop int64) *WrittenIntervalLinkedList {
  64. var nodes []*WrittenIntervalNode
  65. for t := list.Head; t != nil; t = t.Next {
  66. nodeStart, nodeStop := max(start, t.DataOffset), min(stop, t.DataOffset+t.Size)
  67. if nodeStart >= nodeStop {
  68. // skip non overlapping WrittenIntervalNode
  69. continue
  70. }
  71. nodes = append(nodes, &WrittenIntervalNode{
  72. TempOffset: t.TempOffset + nodeStart - t.DataOffset,
  73. DataOffset: nodeStart,
  74. Size: nodeStop - nodeStart,
  75. Next: nil,
  76. })
  77. }
  78. for i := 1; i < len(nodes); i++ {
  79. nodes[i-1].Next = nodes[i]
  80. }
  81. return &WrittenIntervalLinkedList{
  82. tempFile: list.tempFile,
  83. Head: nodes[0],
  84. Tail: nodes[len(nodes)-1],
  85. }
  86. }
  87. func (c *WrittenContinuousIntervals) AddInterval(tempOffset int64, dataSize int, dataOffset int64) {
  88. interval := &WrittenIntervalNode{DataOffset: dataOffset, TempOffset: tempOffset, Size: int64(dataSize)}
  89. // append to the tail and return
  90. if len(c.lists) == 1 {
  91. lastSpan := c.lists[0]
  92. if lastSpan.Tail.DataOffset+lastSpan.Tail.Size == dataOffset {
  93. lastSpan.addNodeToTail(interval)
  94. return
  95. }
  96. }
  97. var newLists []*WrittenIntervalLinkedList
  98. for _, list := range c.lists {
  99. // if list is to the left of new interval, add to the new list
  100. if list.Tail.DataOffset+list.Tail.Size <= interval.DataOffset {
  101. newLists = append(newLists, list)
  102. }
  103. // if list is to the right of new interval, add to the new list
  104. if interval.DataOffset+interval.Size <= list.Head.DataOffset {
  105. newLists = append(newLists, list)
  106. }
  107. // if new interval overwrite the right part of the list
  108. if list.Head.DataOffset < interval.DataOffset && interval.DataOffset < list.Tail.DataOffset+list.Tail.Size {
  109. // create a new list of the left part of existing list
  110. newLists = append(newLists, list.subList(list.Offset(), interval.DataOffset))
  111. }
  112. // if new interval overwrite the left part of the list
  113. if list.Head.DataOffset < interval.DataOffset+interval.Size && interval.DataOffset+interval.Size < list.Tail.DataOffset+list.Tail.Size {
  114. // create a new list of the right part of existing list
  115. newLists = append(newLists, list.subList(interval.DataOffset+interval.Size, list.Tail.DataOffset+list.Tail.Size))
  116. }
  117. // skip anything that is fully overwritten by the new interval
  118. }
  119. c.lists = newLists
  120. // add the new interval to the lists, connecting neighbor lists
  121. var prevList, nextList *WrittenIntervalLinkedList
  122. for _, list := range c.lists {
  123. if list.Head.DataOffset == interval.DataOffset+interval.Size {
  124. nextList = list
  125. break
  126. }
  127. }
  128. for _, list := range c.lists {
  129. if list.Head.DataOffset+list.Size() == dataOffset {
  130. list.addNodeToTail(interval)
  131. prevList = list
  132. break
  133. }
  134. }
  135. if prevList != nil && nextList != nil {
  136. // glog.V(4).Infof("connecting [%d,%d) + [%d,%d) => [%d,%d)", prevList.Head.Offset, prevList.Tail.Offset+prevList.Tail.Size, nextList.Head.Offset, nextList.Tail.Offset+nextList.Tail.Size, prevList.Head.Offset, nextList.Tail.Offset+nextList.Tail.Size)
  137. prevList.Tail.Next = nextList.Head
  138. prevList.Tail = nextList.Tail
  139. c.removeList(nextList)
  140. } else if nextList != nil {
  141. // add to head was not done when checking
  142. nextList.addNodeToHead(interval)
  143. }
  144. if prevList == nil && nextList == nil {
  145. c.lists = append(c.lists, &WrittenIntervalLinkedList{
  146. tempFile: c.tempFile,
  147. Head: interval,
  148. Tail: interval,
  149. })
  150. }
  151. return
  152. }
  153. func (c *WrittenContinuousIntervals) RemoveLargestIntervalLinkedList() *WrittenIntervalLinkedList {
  154. var maxSize int64
  155. maxIndex := -1
  156. for k, list := range c.lists {
  157. if maxSize <= list.Size() {
  158. maxSize = list.Size()
  159. maxIndex = k
  160. }
  161. }
  162. if maxSize <= 0 {
  163. return nil
  164. }
  165. t := c.lists[maxIndex]
  166. t.tempFile = c.tempFile
  167. c.lists = append(c.lists[0:maxIndex], c.lists[maxIndex+1:]...)
  168. return t
  169. }
  170. func (c *WrittenContinuousIntervals) removeList(target *WrittenIntervalLinkedList) {
  171. index := -1
  172. for k, list := range c.lists {
  173. if list.Offset() == target.Offset() {
  174. index = k
  175. }
  176. }
  177. if index < 0 {
  178. return
  179. }
  180. c.lists = append(c.lists[0:index], c.lists[index+1:]...)
  181. }
  182. func (c *WrittenContinuousIntervals) ReadDataAt(data []byte, startOffset int64) (maxStop int64) {
  183. for _, list := range c.lists {
  184. start := max(startOffset, list.Offset())
  185. stop := min(startOffset+int64(len(data)), list.Offset()+list.Size())
  186. if start < stop {
  187. list.ReadData(data[start-startOffset:], start, stop)
  188. maxStop = max(maxStop, stop)
  189. }
  190. }
  191. return
  192. }
  193. func (l *WrittenIntervalLinkedList) ToReader(start int64, stop int64) io.Reader {
  194. // TODO: optimize this to avoid another loop
  195. var readers []io.Reader
  196. for t := l.Head; ; t = t.Next {
  197. startOffset, stopOffset := max(t.DataOffset, start), min(t.DataOffset+t.Size, stop)
  198. if startOffset < stopOffset {
  199. readers = append(readers, newFileSectionReader(l.tempFile, startOffset-t.DataOffset+t.TempOffset, startOffset, stopOffset-startOffset))
  200. }
  201. if t.Next == nil {
  202. break
  203. }
  204. }
  205. if len(readers) == 1 {
  206. return readers[0]
  207. }
  208. return io.MultiReader(readers...)
  209. }
  210. type FileSectionReader struct {
  211. file *os.File
  212. Offset int64
  213. dataStart int64
  214. dataStop int64
  215. }
  216. var _ = io.Reader(&FileSectionReader{})
  217. func newFileSectionReader(tempfile *os.File, offset int64, dataOffset int64, size int64) *FileSectionReader {
  218. return &FileSectionReader{
  219. file: tempfile,
  220. Offset: offset,
  221. dataStart: dataOffset,
  222. dataStop: dataOffset + size,
  223. }
  224. }
  225. func (f *FileSectionReader) Read(p []byte) (n int, err error) {
  226. dataLen := min(f.dataStop-f.Offset, int64(len(p)))
  227. if dataLen < 0 {
  228. return 0, io.EOF
  229. }
  230. glog.V(4).Infof("reading %v [%d,%d)", f.file.Name(), f.Offset, f.Offset+dataLen)
  231. n, err = f.file.ReadAt(p[:dataLen], f.Offset)
  232. if n > 0 {
  233. f.Offset += int64(n)
  234. } else {
  235. err = io.EOF
  236. }
  237. return
  238. }