You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

206 lines
5.3 KiB

7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
  1. package filer2
  2. import (
  3. "sort"
  4. "log"
  5. "math"
  6. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  7. )
  8. func TotalSize(chunks []*filer_pb.FileChunk) (size uint64) {
  9. for _, c := range chunks {
  10. t := uint64(c.Offset + int64(c.Size))
  11. if size < t {
  12. size = t
  13. }
  14. }
  15. return
  16. }
  17. func CompactFileChunks(chunks []*filer_pb.FileChunk) (compacted, garbage []*filer_pb.FileChunk) {
  18. return
  19. }
  20. func logPrintf(name string, visibles []*visibleInterval) {
  21. return
  22. log.Printf("%s len %d", name, len(visibles))
  23. for _, v := range visibles {
  24. log.Printf("%s: => %+v", name, v)
  25. }
  26. }
  27. func nonOverlappingVisibleIntervals(chunks []*filer_pb.FileChunk) (visibles []*visibleInterval) {
  28. sort.Slice(chunks, func(i, j int) bool {
  29. if chunks[i].Offset < chunks[j].Offset {
  30. return true
  31. }
  32. if chunks[i].Offset == chunks[j].Offset {
  33. return chunks[i].Mtime < chunks[j].Mtime
  34. }
  35. return false
  36. })
  37. if len(chunks) == 0 {
  38. return
  39. }
  40. var parallelIntervals, intervals []*visibleInterval
  41. var minStopInterval, upToDateInterval *visibleInterval
  42. watermarkStart := chunks[0].Offset
  43. for _, chunk := range chunks {
  44. // log.Printf("checking chunk: [%d,%d)", chunk.Offset, chunk.Offset+int64(chunk.Size))
  45. logPrintf("parallelIntervals", parallelIntervals)
  46. for len(parallelIntervals) > 0 && watermarkStart < chunk.Offset {
  47. logPrintf("parallelIntervals loop 1", parallelIntervals)
  48. logPrintf("parallelIntervals loop 1 intervals", intervals)
  49. minStopInterval, upToDateInterval = findMinStopInterval(parallelIntervals)
  50. nextStop := min(minStopInterval.stop, chunk.Offset)
  51. intervals = append(intervals, newVisibleInterval(
  52. max(watermarkStart, minStopInterval.start),
  53. nextStop,
  54. upToDateInterval.fileId,
  55. upToDateInterval.modifiedTime,
  56. ))
  57. watermarkStart = nextStop
  58. logPrintf("parallelIntervals loop intervals =>", intervals)
  59. // remove processed intervals, possibly multiple
  60. var remaining []*visibleInterval
  61. for _, interval := range parallelIntervals {
  62. if interval.stop != watermarkStart {
  63. remaining = append(remaining, newVisibleInterval(
  64. interval.start,
  65. interval.stop,
  66. interval.fileId,
  67. interval.modifiedTime,
  68. ))
  69. }
  70. }
  71. parallelIntervals = remaining
  72. logPrintf("parallelIntervals loop 2", parallelIntervals)
  73. logPrintf("parallelIntervals loop 2 intervals", intervals)
  74. }
  75. parallelIntervals = append(parallelIntervals, newVisibleInterval(
  76. chunk.Offset,
  77. chunk.Offset+int64(chunk.Size),
  78. chunk.FileId,
  79. chunk.Mtime,
  80. ))
  81. }
  82. logPrintf("parallelIntervals loop 3", parallelIntervals)
  83. logPrintf("parallelIntervals loop 3 intervals", intervals)
  84. for len(parallelIntervals) > 0 {
  85. minStopInterval, upToDateInterval = findMinStopInterval(parallelIntervals)
  86. intervals = append(intervals, newVisibleInterval(
  87. max(watermarkStart, minStopInterval.start),
  88. minStopInterval.stop,
  89. upToDateInterval.fileId,
  90. upToDateInterval.modifiedTime,
  91. ))
  92. watermarkStart = minStopInterval.stop
  93. // remove processed intervals, possibly multiple
  94. var remaining []*visibleInterval
  95. for _, interval := range parallelIntervals {
  96. if interval.stop != watermarkStart {
  97. remaining = append(remaining, newVisibleInterval(
  98. interval.start,
  99. interval.stop,
  100. interval.fileId,
  101. interval.modifiedTime,
  102. ))
  103. }
  104. }
  105. parallelIntervals = remaining
  106. }
  107. logPrintf("parallelIntervals loop 4", parallelIntervals)
  108. logPrintf("intervals", intervals)
  109. // merge connected intervals, now the intervals are non-intersecting
  110. var lastInterval *visibleInterval
  111. var prevIntervalIndex int
  112. for i, interval := range intervals {
  113. if i == 0 {
  114. prevIntervalIndex = i
  115. continue
  116. }
  117. if intervals[i-1].fileId != interval.fileId ||
  118. intervals[i-1].stop < intervals[i].start {
  119. visibles = append(visibles, newVisibleInterval(
  120. intervals[prevIntervalIndex].start,
  121. intervals[i-1].stop,
  122. intervals[prevIntervalIndex].fileId,
  123. intervals[prevIntervalIndex].modifiedTime,
  124. ))
  125. prevIntervalIndex = i
  126. }
  127. lastInterval = intervals[i]
  128. logPrintf("intervals loop 1 visibles", visibles)
  129. }
  130. if lastInterval != nil {
  131. visibles = append(visibles, newVisibleInterval(
  132. intervals[prevIntervalIndex].start,
  133. lastInterval.stop,
  134. intervals[prevIntervalIndex].fileId,
  135. intervals[prevIntervalIndex].modifiedTime,
  136. ))
  137. }
  138. logPrintf("visibles", visibles)
  139. return
  140. }
  141. func findMinStopInterval(intervals []*visibleInterval) (minStopInterval, upToDateInterval *visibleInterval) {
  142. var latestMtime int64
  143. latestIntervalIndex := 0
  144. minStop := int64(math.MaxInt64)
  145. minIntervalIndex := 0
  146. for i, interval := range intervals {
  147. if minStop > interval.stop {
  148. minIntervalIndex = i
  149. minStop = interval.stop
  150. }
  151. if latestMtime < interval.modifiedTime {
  152. latestMtime = interval.modifiedTime
  153. latestIntervalIndex = i
  154. }
  155. }
  156. minStopInterval = intervals[minIntervalIndex]
  157. upToDateInterval = intervals[latestIntervalIndex]
  158. return
  159. }
  160. // find non-overlapping visible intervals
  161. // visible interval map to one file chunk
  162. type visibleInterval struct {
  163. start int64
  164. stop int64
  165. modifiedTime int64
  166. fileId string
  167. }
  168. func newVisibleInterval(start, stop int64, fileId string, modifiedTime int64) *visibleInterval {
  169. return &visibleInterval{start: start, stop: stop, fileId: fileId, modifiedTime: modifiedTime}
  170. }
  171. func min(x, y int64) int64 {
  172. if x <= y {
  173. return x
  174. }
  175. return y
  176. }
  177. func max(x, y int64) int64 {
  178. if x > y {
  179. return x
  180. }
  181. return y
  182. }