You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

225 lines
5.8 KiB

7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
  1. package filer2
  2. import (
  3. "sort"
  4. "log"
  5. "math"
  6. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  7. )
  8. func TotalSize(chunks []*filer_pb.FileChunk) (size uint64) {
  9. for _, c := range chunks {
  10. t := uint64(c.Offset + int64(c.Size))
  11. if size < t {
  12. size = t
  13. }
  14. }
  15. return
  16. }
  17. func CompactFileChunks(chunks []*filer_pb.FileChunk) (compacted, garbage []*filer_pb.FileChunk) {
  18. visibles := nonOverlappingVisibleIntervals(chunks)
  19. fileIds := make(map[string]bool)
  20. for _, interval := range visibles {
  21. fileIds[interval.fileId] = true
  22. }
  23. for _, chunk := range chunks {
  24. if found := fileIds[chunk.FileId]; found {
  25. compacted = append(compacted, chunk)
  26. } else {
  27. garbage = append(garbage, chunk)
  28. }
  29. }
  30. return
  31. }
  32. func FindUnusedFileChunks(oldChunks, newChunks []*filer_pb.FileChunk) (unused []*filer_pb.FileChunk) {
  33. fileIds := make(map[string]bool)
  34. for _, interval := range newChunks {
  35. fileIds[interval.FileId] = true
  36. }
  37. for _, chunk := range oldChunks {
  38. if found := fileIds[chunk.FileId]; !found {
  39. unused = append(unused, chunk)
  40. }
  41. }
  42. return
  43. }
  44. func logPrintf(name string, visibles []*visibleInterval) {
  45. return
  46. log.Printf("%s len %d", name, len(visibles))
  47. for _, v := range visibles {
  48. log.Printf("%s: => %+v", name, v)
  49. }
  50. }
  51. func nonOverlappingVisibleIntervals(chunks []*filer_pb.FileChunk) (visibles []*visibleInterval) {
  52. sort.Slice(chunks, func(i, j int) bool {
  53. if chunks[i].Offset < chunks[j].Offset {
  54. return true
  55. }
  56. if chunks[i].Offset == chunks[j].Offset {
  57. return chunks[i].Mtime < chunks[j].Mtime
  58. }
  59. return false
  60. })
  61. if len(chunks) == 0 {
  62. return
  63. }
  64. var parallelIntervals, intervals []*visibleInterval
  65. var minStopInterval, upToDateInterval *visibleInterval
  66. watermarkStart := chunks[0].Offset
  67. for _, chunk := range chunks {
  68. // log.Printf("checking chunk: [%d,%d)", chunk.Offset, chunk.Offset+int64(chunk.Size))
  69. logPrintf("parallelIntervals", parallelIntervals)
  70. for len(parallelIntervals) > 0 && watermarkStart < chunk.Offset {
  71. logPrintf("parallelIntervals loop 1", parallelIntervals)
  72. logPrintf("parallelIntervals loop 1 intervals", intervals)
  73. minStopInterval, upToDateInterval = findMinStopInterval(parallelIntervals)
  74. nextStop := min(minStopInterval.stop, chunk.Offset)
  75. intervals = append(intervals, newVisibleInterval(
  76. max(watermarkStart, minStopInterval.start),
  77. nextStop,
  78. upToDateInterval.fileId,
  79. upToDateInterval.modifiedTime,
  80. ))
  81. watermarkStart = nextStop
  82. logPrintf("parallelIntervals loop intervals =>", intervals)
  83. // remove processed intervals, possibly multiple
  84. var remaining []*visibleInterval
  85. for _, interval := range parallelIntervals {
  86. if interval.stop != watermarkStart {
  87. remaining = append(remaining, interval)
  88. }
  89. }
  90. parallelIntervals = remaining
  91. logPrintf("parallelIntervals loop 2", parallelIntervals)
  92. logPrintf("parallelIntervals loop 2 intervals", intervals)
  93. }
  94. parallelIntervals = append(parallelIntervals, newVisibleInterval(
  95. chunk.Offset,
  96. chunk.Offset+int64(chunk.Size),
  97. chunk.FileId,
  98. chunk.Mtime,
  99. ))
  100. }
  101. logPrintf("parallelIntervals loop 3", parallelIntervals)
  102. logPrintf("parallelIntervals loop 3 intervals", intervals)
  103. for len(parallelIntervals) > 0 {
  104. minStopInterval, upToDateInterval = findMinStopInterval(parallelIntervals)
  105. intervals = append(intervals, newVisibleInterval(
  106. max(watermarkStart, minStopInterval.start),
  107. minStopInterval.stop,
  108. upToDateInterval.fileId,
  109. upToDateInterval.modifiedTime,
  110. ))
  111. watermarkStart = minStopInterval.stop
  112. // remove processed intervals, possibly multiple
  113. var remaining []*visibleInterval
  114. for _, interval := range parallelIntervals {
  115. if interval.stop != watermarkStart {
  116. remaining = append(remaining, interval)
  117. }
  118. }
  119. parallelIntervals = remaining
  120. }
  121. logPrintf("parallelIntervals loop 4", parallelIntervals)
  122. logPrintf("intervals", intervals)
  123. // merge connected intervals, now the intervals are non-intersecting
  124. var lastIntervalIndex int
  125. var prevIntervalIndex int
  126. for i, interval := range intervals {
  127. if i == 0 {
  128. prevIntervalIndex = i
  129. lastIntervalIndex = i
  130. continue
  131. }
  132. if intervals[i-1].fileId != interval.fileId ||
  133. intervals[i-1].stop < intervals[i].start {
  134. visibles = append(visibles, newVisibleInterval(
  135. intervals[prevIntervalIndex].start,
  136. intervals[i-1].stop,
  137. intervals[prevIntervalIndex].fileId,
  138. intervals[prevIntervalIndex].modifiedTime,
  139. ))
  140. prevIntervalIndex = i
  141. }
  142. lastIntervalIndex = i
  143. logPrintf("intervals loop 1 visibles", visibles)
  144. }
  145. visibles = append(visibles, newVisibleInterval(
  146. intervals[prevIntervalIndex].start,
  147. intervals[lastIntervalIndex].stop,
  148. intervals[prevIntervalIndex].fileId,
  149. intervals[prevIntervalIndex].modifiedTime,
  150. ))
  151. logPrintf("visibles", visibles)
  152. return
  153. }
  154. func findMinStopInterval(intervals []*visibleInterval) (minStopInterval, upToDateInterval *visibleInterval) {
  155. var latestMtime int64
  156. latestIntervalIndex := 0
  157. minStop := int64(math.MaxInt64)
  158. minIntervalIndex := 0
  159. for i, interval := range intervals {
  160. if minStop > interval.stop {
  161. minIntervalIndex = i
  162. minStop = interval.stop
  163. }
  164. if latestMtime < interval.modifiedTime {
  165. latestMtime = interval.modifiedTime
  166. latestIntervalIndex = i
  167. }
  168. }
  169. minStopInterval = intervals[minIntervalIndex]
  170. upToDateInterval = intervals[latestIntervalIndex]
  171. return
  172. }
  173. // find non-overlapping visible intervals
  174. // visible interval map to one file chunk
  175. type visibleInterval struct {
  176. start int64
  177. stop int64
  178. modifiedTime int64
  179. fileId string
  180. }
  181. func newVisibleInterval(start, stop int64, fileId string, modifiedTime int64) *visibleInterval {
  182. return &visibleInterval{start: start, stop: stop, fileId: fileId, modifiedTime: modifiedTime}
  183. }
  184. func min(x, y int64) int64 {
  185. if x <= y {
  186. return x
  187. }
  188. return y
  189. }
  190. func max(x, y int64) int64 {
  191. if x > y {
  192. return x
  193. }
  194. return y
  195. }