You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

174 lines
4.2 KiB

4 years ago
3 years ago
3 years ago
4 years ago
4 years ago
  1. package storage
  2. import (
  3. "fmt"
  4. "io"
  5. "os"
  6. "sync/atomic"
  7. "github.com/seaweedfs/seaweedfs/weed/storage/idx"
  8. . "github.com/seaweedfs/seaweedfs/weed/storage/types"
  9. boom "github.com/tylertreat/BoomFilters"
  10. )
  11. type mapMetric struct {
  12. DeletionCounter uint32 `json:"DeletionCounter"`
  13. FileCounter uint32 `json:"FileCounter"`
  14. DeletionByteCounter uint64 `json:"DeletionByteCounter"`
  15. FileByteCounter uint64 `json:"FileByteCounter"`
  16. MaximumFileKey uint64 `json:"MaxFileKey"`
  17. }
  18. func (mm *mapMetric) logDelete(deletedByteCount Size) {
  19. if mm == nil {
  20. return
  21. }
  22. mm.LogDeletionCounter(deletedByteCount)
  23. }
  24. func (mm *mapMetric) logPut(key NeedleId, oldSize Size, newSize Size) {
  25. if mm == nil {
  26. return
  27. }
  28. mm.MaybeSetMaxFileKey(key)
  29. mm.LogFileCounter(newSize)
  30. if oldSize > 0 && oldSize.IsValid() {
  31. mm.LogDeletionCounter(oldSize)
  32. }
  33. }
  34. func (mm *mapMetric) LogFileCounter(newSize Size) {
  35. if mm == nil {
  36. return
  37. }
  38. atomic.AddUint32(&mm.FileCounter, 1)
  39. atomic.AddUint64(&mm.FileByteCounter, uint64(newSize))
  40. }
  41. func (mm *mapMetric) LogDeletionCounter(oldSize Size) {
  42. if mm == nil {
  43. return
  44. }
  45. if oldSize > 0 {
  46. atomic.AddUint32(&mm.DeletionCounter, 1)
  47. atomic.AddUint64(&mm.DeletionByteCounter, uint64(oldSize))
  48. }
  49. }
  50. func (mm *mapMetric) ContentSize() uint64 {
  51. if mm == nil {
  52. return 0
  53. }
  54. return atomic.LoadUint64(&mm.FileByteCounter)
  55. }
  56. func (mm *mapMetric) DeletedSize() uint64 {
  57. if mm == nil {
  58. return 0
  59. }
  60. return atomic.LoadUint64(&mm.DeletionByteCounter)
  61. }
  62. func (mm *mapMetric) FileCount() int {
  63. if mm == nil {
  64. return 0
  65. }
  66. return int(atomic.LoadUint32(&mm.FileCounter))
  67. }
  68. func (mm *mapMetric) DeletedCount() int {
  69. if mm == nil {
  70. return 0
  71. }
  72. return int(atomic.LoadUint32(&mm.DeletionCounter))
  73. }
  74. func (mm *mapMetric) MaxFileKey() NeedleId {
  75. if mm == nil {
  76. return 0
  77. }
  78. t := uint64(mm.MaximumFileKey)
  79. return Uint64ToNeedleId(t)
  80. }
  81. func (mm *mapMetric) MaybeSetMaxFileKey(key NeedleId) {
  82. if mm == nil {
  83. return
  84. }
  85. if key > mm.MaxFileKey() {
  86. atomic.StoreUint64(&mm.MaximumFileKey, uint64(key))
  87. }
  88. }
  89. func needleMapMetricFromIndexFile(r *os.File, mm *mapMetric) error {
  90. var bf *boom.BloomFilter
  91. buf := make([]byte, NeedleIdSize)
  92. err := reverseWalkIndexFile(r, func(entryCount int64) {
  93. bf = boom.NewBloomFilter(uint(entryCount), 0.001)
  94. }, func(key NeedleId, offset Offset, size Size) error {
  95. mm.MaybeSetMaxFileKey(key)
  96. NeedleIdToBytes(buf, key)
  97. if size.IsValid() {
  98. mm.FileByteCounter += uint64(size)
  99. }
  100. mm.FileCounter++
  101. if !bf.TestAndAdd(buf) {
  102. // if !size.IsValid(), then this file is deleted already
  103. if !size.IsValid() {
  104. mm.DeletionCounter++
  105. }
  106. } else {
  107. // deleted file
  108. mm.DeletionCounter++
  109. if size.IsValid() {
  110. // previously already deleted file
  111. mm.DeletionByteCounter += uint64(size)
  112. }
  113. }
  114. return nil
  115. })
  116. return err
  117. }
  118. func newNeedleMapMetricFromIndexFile(r *os.File) (mm *mapMetric, err error) {
  119. mm = &mapMetric{}
  120. err = needleMapMetricFromIndexFile(r, mm)
  121. return
  122. }
  123. func reverseWalkIndexFile(r *os.File, initFn func(entryCount int64), fn func(key NeedleId, offset Offset, size Size) error) error {
  124. fi, err := r.Stat()
  125. if err != nil {
  126. return fmt.Errorf("file %s stat error: %v", r.Name(), err)
  127. }
  128. fileSize := fi.Size()
  129. if fileSize%NeedleMapEntrySize != 0 {
  130. return fmt.Errorf("unexpected file %s size: %d", r.Name(), fileSize)
  131. }
  132. entryCount := fileSize / NeedleMapEntrySize
  133. initFn(entryCount)
  134. batchSize := int64(1024 * 4)
  135. bytes := make([]byte, NeedleMapEntrySize*batchSize)
  136. nextBatchSize := entryCount % batchSize
  137. if nextBatchSize == 0 {
  138. nextBatchSize = batchSize
  139. }
  140. remainingCount := entryCount - nextBatchSize
  141. for remainingCount >= 0 {
  142. n, e := r.ReadAt(bytes[:NeedleMapEntrySize*nextBatchSize], NeedleMapEntrySize*remainingCount)
  143. // glog.V(0).Infoln("file", r.Name(), "readerOffset", NeedleMapEntrySize*remainingCount, "count", count, "e", e)
  144. if e == io.EOF && n == int(NeedleMapEntrySize*nextBatchSize) {
  145. e = nil
  146. }
  147. if e != nil {
  148. return e
  149. }
  150. for i := int(nextBatchSize) - 1; i >= 0; i-- {
  151. key, offset, size := idx.IdxFileEntry(bytes[i*NeedleMapEntrySize : i*NeedleMapEntrySize+NeedleMapEntrySize])
  152. if e = fn(key, offset, size); e != nil {
  153. return e
  154. }
  155. }
  156. nextBatchSize = batchSize
  157. remainingCount -= nextBatchSize
  158. }
  159. return nil
  160. }