You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

252 lines
6.4 KiB

7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
  1. package filer2
  2. import (
  3. "math"
  4. "sort"
  5. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  6. )
  7. func TotalSize(chunks []*filer_pb.FileChunk) (size uint64) {
  8. for _, c := range chunks {
  9. t := uint64(c.Offset + int64(c.Size))
  10. if size < t {
  11. size = t
  12. }
  13. }
  14. return
  15. }
  16. func CompactFileChunks(chunks []*filer_pb.FileChunk) (compacted, garbage []*filer_pb.FileChunk) {
  17. visibles := nonOverlappingVisibleIntervals(chunks)
  18. fileIds := make(map[string]bool)
  19. for _, interval := range visibles {
  20. fileIds[interval.fileId] = true
  21. }
  22. for _, chunk := range chunks {
  23. if found := fileIds[chunk.FileId]; found {
  24. compacted = append(compacted, chunk)
  25. } else {
  26. garbage = append(garbage, chunk)
  27. }
  28. }
  29. return
  30. }
  31. func FindUnusedFileChunks(oldChunks, newChunks []*filer_pb.FileChunk) (unused []*filer_pb.FileChunk) {
  32. fileIds := make(map[string]bool)
  33. for _, interval := range newChunks {
  34. fileIds[interval.FileId] = true
  35. }
  36. for _, chunk := range oldChunks {
  37. if found := fileIds[chunk.FileId]; !found {
  38. unused = append(unused, chunk)
  39. }
  40. }
  41. return
  42. }
  43. type ChunkView struct {
  44. FileId string
  45. Offset int64
  46. Size uint64
  47. LogicOffset int64
  48. }
  49. func ViewFromChunks(chunks []*filer_pb.FileChunk, offset int64, size int) (views []*ChunkView) {
  50. visibles := nonOverlappingVisibleIntervals(chunks)
  51. stop := offset + int64(size)
  52. for _, chunk := range visibles {
  53. if chunk.start <= offset && offset < chunk.stop && offset < stop {
  54. views = append(views, &ChunkView{
  55. FileId: chunk.fileId,
  56. Offset: offset - chunk.start, // offset is the data starting location in this file id
  57. Size: uint64(min(chunk.stop, stop) - offset),
  58. LogicOffset: offset,
  59. })
  60. offset = min(chunk.stop, stop)
  61. }
  62. }
  63. return views
  64. }
  65. func logPrintf(name string, visibles []*visibleInterval) {
  66. /*
  67. log.Printf("%s len %d", name, len(visibles))
  68. for _, v := range visibles {
  69. log.Printf("%s: => %+v", name, v)
  70. }
  71. */
  72. }
  73. func nonOverlappingVisibleIntervals(chunks []*filer_pb.FileChunk) (visibles []*visibleInterval) {
  74. sort.Slice(chunks, func(i, j int) bool {
  75. if chunks[i].Offset < chunks[j].Offset {
  76. return true
  77. }
  78. if chunks[i].Offset == chunks[j].Offset {
  79. return chunks[i].Mtime < chunks[j].Mtime
  80. }
  81. return false
  82. })
  83. if len(chunks) == 0 {
  84. return
  85. }
  86. var parallelIntervals, intervals []*visibleInterval
  87. var minStopInterval, upToDateInterval *visibleInterval
  88. watermarkStart := chunks[0].Offset
  89. for _, chunk := range chunks {
  90. // log.Printf("checking chunk: [%d,%d)", chunk.Offset, chunk.Offset+int64(chunk.Size))
  91. logPrintf("parallelIntervals", parallelIntervals)
  92. for len(parallelIntervals) > 0 && watermarkStart < chunk.Offset {
  93. logPrintf("parallelIntervals loop 1", parallelIntervals)
  94. logPrintf("parallelIntervals loop 1 intervals", intervals)
  95. minStopInterval, upToDateInterval = findMinStopInterval(parallelIntervals)
  96. nextStop := min(minStopInterval.stop, chunk.Offset)
  97. intervals = append(intervals, newVisibleInterval(
  98. max(watermarkStart, minStopInterval.start),
  99. nextStop,
  100. upToDateInterval.fileId,
  101. upToDateInterval.modifiedTime,
  102. ))
  103. watermarkStart = nextStop
  104. logPrintf("parallelIntervals loop intervals =>", intervals)
  105. // remove processed intervals, possibly multiple
  106. var remaining []*visibleInterval
  107. for _, interval := range parallelIntervals {
  108. if interval.stop != watermarkStart {
  109. remaining = append(remaining, interval)
  110. }
  111. }
  112. parallelIntervals = remaining
  113. logPrintf("parallelIntervals loop 2", parallelIntervals)
  114. logPrintf("parallelIntervals loop 2 intervals", intervals)
  115. }
  116. parallelIntervals = append(parallelIntervals, newVisibleInterval(
  117. chunk.Offset,
  118. chunk.Offset+int64(chunk.Size),
  119. chunk.FileId,
  120. chunk.Mtime,
  121. ))
  122. }
  123. logPrintf("parallelIntervals loop 3", parallelIntervals)
  124. logPrintf("parallelIntervals loop 3 intervals", intervals)
  125. for len(parallelIntervals) > 0 {
  126. minStopInterval, upToDateInterval = findMinStopInterval(parallelIntervals)
  127. intervals = append(intervals, newVisibleInterval(
  128. max(watermarkStart, minStopInterval.start),
  129. minStopInterval.stop,
  130. upToDateInterval.fileId,
  131. upToDateInterval.modifiedTime,
  132. ))
  133. watermarkStart = minStopInterval.stop
  134. // remove processed intervals, possibly multiple
  135. var remaining []*visibleInterval
  136. for _, interval := range parallelIntervals {
  137. if interval.stop != watermarkStart {
  138. remaining = append(remaining, interval)
  139. }
  140. }
  141. parallelIntervals = remaining
  142. }
  143. logPrintf("parallelIntervals loop 4", parallelIntervals)
  144. logPrintf("intervals", intervals)
  145. // merge connected intervals, now the intervals are non-intersecting
  146. var lastIntervalIndex int
  147. var prevIntervalIndex int
  148. for i, interval := range intervals {
  149. if i == 0 {
  150. prevIntervalIndex = i
  151. lastIntervalIndex = i
  152. continue
  153. }
  154. if intervals[i-1].fileId != interval.fileId ||
  155. intervals[i-1].stop < intervals[i].start {
  156. visibles = append(visibles, newVisibleInterval(
  157. intervals[prevIntervalIndex].start,
  158. intervals[i-1].stop,
  159. intervals[prevIntervalIndex].fileId,
  160. intervals[prevIntervalIndex].modifiedTime,
  161. ))
  162. prevIntervalIndex = i
  163. }
  164. lastIntervalIndex = i
  165. logPrintf("intervals loop 1 visibles", visibles)
  166. }
  167. visibles = append(visibles, newVisibleInterval(
  168. intervals[prevIntervalIndex].start,
  169. intervals[lastIntervalIndex].stop,
  170. intervals[prevIntervalIndex].fileId,
  171. intervals[prevIntervalIndex].modifiedTime,
  172. ))
  173. logPrintf("visibles", visibles)
  174. return
  175. }
  176. func findMinStopInterval(intervals []*visibleInterval) (minStopInterval, upToDateInterval *visibleInterval) {
  177. var latestMtime int64
  178. latestIntervalIndex := 0
  179. minStop := int64(math.MaxInt64)
  180. minIntervalIndex := 0
  181. for i, interval := range intervals {
  182. if minStop > interval.stop {
  183. minIntervalIndex = i
  184. minStop = interval.stop
  185. }
  186. if latestMtime < interval.modifiedTime {
  187. latestMtime = interval.modifiedTime
  188. latestIntervalIndex = i
  189. }
  190. }
  191. minStopInterval = intervals[minIntervalIndex]
  192. upToDateInterval = intervals[latestIntervalIndex]
  193. return
  194. }
  195. // find non-overlapping visible intervals
  196. // visible interval map to one file chunk
  197. type visibleInterval struct {
  198. start int64
  199. stop int64
  200. modifiedTime int64
  201. fileId string
  202. }
  203. func newVisibleInterval(start, stop int64, fileId string, modifiedTime int64) *visibleInterval {
  204. return &visibleInterval{start: start, stop: stop, fileId: fileId, modifiedTime: modifiedTime}
  205. }
  206. func min(x, y int64) int64 {
  207. if x <= y {
  208. return x
  209. }
  210. return y
  211. }
  212. func max(x, y int64) int64 {
  213. if x > y {
  214. return x
  215. }
  216. return y
  217. }