170 lines
4.1 KiB

4 years ago
3 years ago
3 years ago
4 years ago
4 years ago
  1. package storage
  2. import (
  3. "fmt"
  4. "os"
  5. "sync/atomic"
  6. "github.com/seaweedfs/seaweedfs/weed/storage/idx"
  7. . "github.com/seaweedfs/seaweedfs/weed/storage/types"
  8. boom "github.com/tylertreat/BoomFilters"
  9. )
  10. type mapMetric struct {
  11. DeletionCounter uint32 `json:"DeletionCounter"`
  12. FileCounter uint32 `json:"FileCounter"`
  13. DeletionByteCounter uint64 `json:"DeletionByteCounter"`
  14. FileByteCounter uint64 `json:"FileByteCounter"`
  15. MaximumFileKey uint64 `json:"MaxFileKey"`
  16. }
  17. func (mm *mapMetric) logDelete(deletedByteCount Size) {
  18. if mm == nil {
  19. return
  20. }
  21. mm.LogDeletionCounter(deletedByteCount)
  22. }
  23. func (mm *mapMetric) logPut(key NeedleId, oldSize Size, newSize Size) {
  24. if mm == nil {
  25. return
  26. }
  27. mm.MaybeSetMaxFileKey(key)
  28. mm.LogFileCounter(newSize)
  29. if oldSize > 0 && oldSize.IsValid() {
  30. mm.LogDeletionCounter(oldSize)
  31. }
  32. }
  33. func (mm *mapMetric) LogFileCounter(newSize Size) {
  34. if mm == nil {
  35. return
  36. }
  37. atomic.AddUint32(&mm.FileCounter, 1)
  38. atomic.AddUint64(&mm.FileByteCounter, uint64(newSize))
  39. }
  40. func (mm *mapMetric) LogDeletionCounter(oldSize Size) {
  41. if mm == nil {
  42. return
  43. }
  44. if oldSize > 0 {
  45. atomic.AddUint32(&mm.DeletionCounter, 1)
  46. atomic.AddUint64(&mm.DeletionByteCounter, uint64(oldSize))
  47. }
  48. }
  49. func (mm *mapMetric) ContentSize() uint64 {
  50. if mm == nil {
  51. return 0
  52. }
  53. return atomic.LoadUint64(&mm.FileByteCounter)
  54. }
  55. func (mm *mapMetric) DeletedSize() uint64 {
  56. if mm == nil {
  57. return 0
  58. }
  59. return atomic.LoadUint64(&mm.DeletionByteCounter)
  60. }
  61. func (mm *mapMetric) FileCount() int {
  62. if mm == nil {
  63. return 0
  64. }
  65. return int(atomic.LoadUint32(&mm.FileCounter))
  66. }
  67. func (mm *mapMetric) DeletedCount() int {
  68. if mm == nil {
  69. return 0
  70. }
  71. return int(atomic.LoadUint32(&mm.DeletionCounter))
  72. }
  73. func (mm *mapMetric) MaxFileKey() NeedleId {
  74. if mm == nil {
  75. return 0
  76. }
  77. t := uint64(mm.MaximumFileKey)
  78. return Uint64ToNeedleId(t)
  79. }
  80. func (mm *mapMetric) MaybeSetMaxFileKey(key NeedleId) {
  81. if mm == nil {
  82. return
  83. }
  84. if key > mm.MaxFileKey() {
  85. atomic.StoreUint64(&mm.MaximumFileKey, uint64(key))
  86. }
  87. }
  88. func needleMapMetricFromIndexFile(r *os.File, mm *mapMetric) error {
  89. var bf *boom.BloomFilter
  90. buf := make([]byte, NeedleIdSize)
  91. err := reverseWalkIndexFile(r, func(entryCount int64) {
  92. bf = boom.NewBloomFilter(uint(entryCount), 0.001)
  93. }, func(key NeedleId, offset Offset, size Size) error {
  94. mm.MaybeSetMaxFileKey(key)
  95. NeedleIdToBytes(buf, key)
  96. if size.IsValid() {
  97. mm.FileByteCounter += uint64(size)
  98. }
  99. mm.FileCounter++
  100. if !bf.TestAndAdd(buf) {
  101. // if !size.IsValid(), then this file is deleted already
  102. if !size.IsValid() {
  103. mm.DeletionCounter++
  104. }
  105. } else {
  106. // deleted file
  107. mm.DeletionCounter++
  108. if size.IsValid() {
  109. // previously already deleted file
  110. mm.DeletionByteCounter += uint64(size)
  111. }
  112. }
  113. return nil
  114. })
  115. return err
  116. }
  117. func newNeedleMapMetricFromIndexFile(r *os.File) (mm *mapMetric, err error) {
  118. mm = &mapMetric{}
  119. err = needleMapMetricFromIndexFile(r, mm)
  120. return
  121. }
  122. func reverseWalkIndexFile(r *os.File, initFn func(entryCount int64), fn func(key NeedleId, offset Offset, size Size) error) error {
  123. fi, err := r.Stat()
  124. if err != nil {
  125. return fmt.Errorf("file %s stat error: %v", r.Name(), err)
  126. }
  127. fileSize := fi.Size()
  128. if fileSize%NeedleMapEntrySize != 0 {
  129. return fmt.Errorf("unexpected file %s size: %d", r.Name(), fileSize)
  130. }
  131. entryCount := fileSize / NeedleMapEntrySize
  132. initFn(entryCount)
  133. batchSize := int64(1024 * 4)
  134. bytes := make([]byte, NeedleMapEntrySize*batchSize)
  135. nextBatchSize := entryCount % batchSize
  136. if nextBatchSize == 0 {
  137. nextBatchSize = batchSize
  138. }
  139. remainingCount := entryCount - nextBatchSize
  140. for remainingCount >= 0 {
  141. _, e := r.ReadAt(bytes[:NeedleMapEntrySize*nextBatchSize], NeedleMapEntrySize*remainingCount)
  142. // glog.V(0).Infoln("file", r.Name(), "readerOffset", NeedleMapEntrySize*remainingCount, "count", count, "e", e)
  143. if e != nil {
  144. return e
  145. }
  146. for i := int(nextBatchSize) - 1; i >= 0; i-- {
  147. key, offset, size := idx.IdxFileEntry(bytes[i*NeedleMapEntrySize : i*NeedleMapEntrySize+NeedleMapEntrySize])
  148. if e = fn(key, offset, size); e != nil {
  149. return e
  150. }
  151. }
  152. nextBatchSize = batchSize
  153. remainingCount -= nextBatchSize
  154. }
  155. return nil
  156. }