You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

132 lines
3.6 KiB

7 years ago
  1. package storage
  2. import (
  3. "fmt"
  4. "os"
  5. "sync/atomic"
  6. "github.com/chrislusf/seaweedfs/weed/storage/idx"
  7. . "github.com/chrislusf/seaweedfs/weed/storage/types"
  8. "github.com/willf/bloom"
  9. )
  10. type mapMetric struct {
  11. DeletionCounter uint32 `json:"DeletionCounter"`
  12. FileCounter uint32 `json:"FileCounter"`
  13. DeletionByteCounter uint64 `json:"DeletionByteCounter"`
  14. FileByteCounter uint64 `json:"FileByteCounter"`
  15. MaximumFileKey uint64 `json:"MaxFileKey"`
  16. }
  17. func (mm *mapMetric) logDelete(deletedByteCount uint32) {
  18. mm.LogDeletionCounter(deletedByteCount)
  19. }
  20. func (mm *mapMetric) logPut(key NeedleId, oldSize uint32, newSize uint32) {
  21. mm.MaybeSetMaxFileKey(key)
  22. mm.LogFileCounter(newSize)
  23. if oldSize > 0 && oldSize != TombstoneFileSize {
  24. mm.LogDeletionCounter(oldSize)
  25. }
  26. }
  27. func (mm *mapMetric) LogFileCounter(newSize uint32) {
  28. atomic.AddUint32(&mm.FileCounter, 1)
  29. atomic.AddUint64(&mm.FileByteCounter, uint64(newSize))
  30. }
  31. func (mm *mapMetric) LogDeletionCounter(oldSize uint32) {
  32. if oldSize > 0 {
  33. atomic.AddUint32(&mm.DeletionCounter, 1)
  34. atomic.AddUint64(&mm.DeletionByteCounter, uint64(oldSize))
  35. }
  36. }
  37. func (mm *mapMetric) ContentSize() uint64 {
  38. return atomic.LoadUint64(&mm.FileByteCounter)
  39. }
  40. func (mm *mapMetric) DeletedSize() uint64 {
  41. return atomic.LoadUint64(&mm.DeletionByteCounter)
  42. }
  43. func (mm *mapMetric) FileCount() int {
  44. return int(atomic.LoadUint32(&mm.FileCounter))
  45. }
  46. func (mm *mapMetric) DeletedCount() int {
  47. return int(atomic.LoadUint32(&mm.DeletionCounter))
  48. }
  49. func (mm *mapMetric) MaxFileKey() NeedleId {
  50. t := uint64(mm.MaximumFileKey)
  51. return Uint64ToNeedleId(t)
  52. }
  53. func (mm *mapMetric) MaybeSetMaxFileKey(key NeedleId) {
  54. if key > mm.MaxFileKey() {
  55. atomic.StoreUint64(&mm.MaximumFileKey, uint64(key))
  56. }
  57. }
  58. func newNeedleMapMetricFromIndexFile(r *os.File) (mm *mapMetric, err error) {
  59. mm = &mapMetric{}
  60. var bf *bloom.BloomFilter
  61. buf := make([]byte, NeedleIdSize)
  62. err = reverseWalkIndexFile(r, func(entryCount int64) {
  63. bf = bloom.NewWithEstimates(uint(entryCount), 0.001)
  64. }, func(key NeedleId, offset Offset, size uint32) error {
  65. mm.MaybeSetMaxFileKey(key)
  66. NeedleIdToBytes(buf, key)
  67. if size != TombstoneFileSize {
  68. mm.FileByteCounter += uint64(size)
  69. }
  70. if !bf.Test(buf) {
  71. mm.FileCounter++
  72. bf.Add(buf)
  73. } else {
  74. // deleted file
  75. mm.DeletionCounter++
  76. if size != TombstoneFileSize {
  77. // previously already deleted file
  78. mm.DeletionByteCounter += uint64(size)
  79. }
  80. }
  81. return nil
  82. })
  83. return
  84. }
  85. func reverseWalkIndexFile(r *os.File, initFn func(entryCount int64), fn func(key NeedleId, offset Offset, size uint32) error) error {
  86. fi, err := r.Stat()
  87. if err != nil {
  88. return fmt.Errorf("file %s stat error: %v", r.Name(), err)
  89. }
  90. fileSize := fi.Size()
  91. if fileSize%NeedleMapEntrySize != 0 {
  92. return fmt.Errorf("unexpected file %s size: %d", r.Name(), fileSize)
  93. }
  94. entryCount := fileSize / NeedleMapEntrySize
  95. initFn(entryCount)
  96. batchSize := int64(1024 * 4)
  97. bytes := make([]byte, NeedleMapEntrySize*batchSize)
  98. nextBatchSize := entryCount % batchSize
  99. if nextBatchSize == 0 {
  100. nextBatchSize = batchSize
  101. }
  102. remainingCount := entryCount - nextBatchSize
  103. for remainingCount >= 0 {
  104. _, e := r.ReadAt(bytes[:NeedleMapEntrySize*nextBatchSize], NeedleMapEntrySize*remainingCount)
  105. // glog.V(0).Infoln("file", r.Name(), "readerOffset", NeedleMapEntrySize*remainingCount, "count", count, "e", e)
  106. if e != nil {
  107. return e
  108. }
  109. for i := int(nextBatchSize) - 1; i >= 0; i-- {
  110. key, offset, size := idx.IdxFileEntry(bytes[i*NeedleMapEntrySize : i*NeedleMapEntrySize+NeedleMapEntrySize])
  111. if e = fn(key, offset, size); e != nil {
  112. return e
  113. }
  114. }
  115. nextBatchSize = batchSize
  116. remainingCount -= nextBatchSize
  117. }
  118. return nil
  119. }