You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

121 lines
3.2 KiB

7 years ago
7 years ago
  1. package storage
  2. import (
  3. "fmt"
  4. . "github.com/chrislusf/seaweedfs/weed/storage/types"
  5. "github.com/willf/bloom"
  6. "os"
  7. )
  8. type mapMetric struct {
  9. DeletionCounter int `json:"DeletionCounter"`
  10. FileCounter int `json:"FileCounter"`
  11. DeletionByteCounter uint64 `json:"DeletionByteCounter"`
  12. FileByteCounter uint64 `json:"FileByteCounter"`
  13. MaximumFileKey NeedleId `json:"MaxFileKey"`
  14. }
  15. func (mm *mapMetric) logDelete(deletedByteCount uint32) {
  16. mm.DeletionByteCounter = mm.DeletionByteCounter + uint64(deletedByteCount)
  17. mm.DeletionCounter++
  18. }
  19. func (mm *mapMetric) logPut(key NeedleId, oldSize uint32, newSize uint32) {
  20. if key > mm.MaximumFileKey {
  21. mm.MaximumFileKey = key
  22. }
  23. mm.FileCounter++
  24. mm.FileByteCounter = mm.FileByteCounter + uint64(newSize)
  25. if oldSize > 0 {
  26. mm.DeletionCounter++
  27. mm.DeletionByteCounter = mm.DeletionByteCounter + uint64(oldSize)
  28. }
  29. }
  30. func (mm mapMetric) ContentSize() uint64 {
  31. return mm.FileByteCounter
  32. }
  33. func (mm mapMetric) DeletedSize() uint64 {
  34. return mm.DeletionByteCounter
  35. }
  36. func (mm mapMetric) FileCount() int {
  37. return mm.FileCounter
  38. }
  39. func (mm mapMetric) DeletedCount() int {
  40. return mm.DeletionCounter
  41. }
  42. func (mm mapMetric) MaxFileKey() NeedleId {
  43. return mm.MaximumFileKey
  44. }
  45. func newNeedleMapMetricFromIndexFile(r *os.File) (mm *mapMetric, err error) {
  46. mm = &mapMetric{}
  47. var bf *bloom.BloomFilter
  48. buf := make([]byte, NeedleIdSize)
  49. err = reverseWalkIndexFile(r, func(entryCount int64) {
  50. bf = bloom.NewWithEstimates(uint(entryCount), 0.001)
  51. }, func(key NeedleId, offset Offset, size uint32) error {
  52. if key > mm.MaximumFileKey {
  53. mm.MaximumFileKey = key
  54. }
  55. NeedleIdToBytes(buf, key)
  56. if size != TombstoneFileSize {
  57. mm.FileByteCounter += uint64(size)
  58. }
  59. if !bf.Test(buf) {
  60. mm.FileCounter++
  61. bf.Add(buf)
  62. } else {
  63. // deleted file
  64. mm.DeletionCounter++
  65. if size != TombstoneFileSize {
  66. // previously already deleted file
  67. mm.DeletionByteCounter += uint64(size)
  68. }
  69. }
  70. return nil
  71. })
  72. return
  73. }
  74. func reverseWalkIndexFile(r *os.File, initFn func(entryCount int64), fn func(key NeedleId, offset Offset, size uint32) error) error {
  75. fi, err := r.Stat()
  76. if err != nil {
  77. return fmt.Errorf("file %s stat error: %v", r.Name(), err)
  78. }
  79. fileSize := fi.Size()
  80. if fileSize%NeedleEntrySize != 0 {
  81. return fmt.Errorf("unexpected file %s size: %d", r.Name(), fileSize)
  82. }
  83. entryCount := fileSize / NeedleEntrySize
  84. initFn(entryCount)
  85. batchSize := int64(1024 * 4)
  86. bytes := make([]byte, NeedleEntrySize*batchSize)
  87. nextBatchSize := entryCount % batchSize
  88. if nextBatchSize == 0 {
  89. nextBatchSize = batchSize
  90. }
  91. remainingCount := entryCount - nextBatchSize
  92. for remainingCount >= 0 {
  93. _, e := r.ReadAt(bytes[:NeedleEntrySize*nextBatchSize], NeedleEntrySize*remainingCount)
  94. // glog.V(0).Infoln("file", r.Name(), "readerOffset", NeedleEntrySize*remainingCount, "count", count, "e", e)
  95. if e != nil {
  96. return e
  97. }
  98. for i := int(nextBatchSize) - 1; i >= 0; i-- {
  99. key, offset, size := IdxFileEntry(bytes[i*NeedleEntrySize : i*NeedleEntrySize+NeedleEntrySize])
  100. if e = fn(key, offset, size); e != nil {
  101. return e
  102. }
  103. }
  104. nextBatchSize = batchSize
  105. remainingCount -= nextBatchSize
  106. }
  107. return nil
  108. }