You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

106 lines
2.9 KiB

  1. package storage
  2. import (
  3. "io"
  4. "os"
  5. "github.com/chrislusf/seaweedfs/weed/glog"
  6. )
  7. type NeedleMap struct {
  8. m CompactMap
  9. baseNeedleMapper
  10. }
  11. func NewNeedleMap(file *os.File) *NeedleMap {
  12. nm := &NeedleMap{
  13. m: NewCompactMap(),
  14. }
  15. nm.indexFile = file
  16. return nm
  17. }
  18. const (
  19. RowsToRead = 1024
  20. )
  21. func LoadNeedleMap(file *os.File) (*NeedleMap, error) {
  22. nm := NewNeedleMap(file)
  23. e := WalkIndexFile(file, func(key uint64, offset, size uint32) error {
  24. if key > nm.MaximumFileKey {
  25. nm.MaximumFileKey = key
  26. }
  27. if offset > 0 && size != TombstoneFileSize {
  28. nm.FileCounter++
  29. nm.FileByteCounter = nm.FileByteCounter + uint64(size)
  30. oldOffset, oldSize := nm.m.Set(Key(key), offset, size)
  31. glog.V(3).Infoln("reading key", key, "offset", offset*NeedlePaddingSize, "size", size, "oldSize", oldSize)
  32. if oldOffset > 0 && oldSize != TombstoneFileSize {
  33. nm.DeletionCounter++
  34. nm.DeletionByteCounter = nm.DeletionByteCounter + uint64(oldSize)
  35. }
  36. } else {
  37. oldSize := nm.m.Delete(Key(key))
  38. glog.V(3).Infoln("removing key", key, "offset", offset*NeedlePaddingSize, "size", size, "oldSize", oldSize)
  39. nm.DeletionCounter++
  40. nm.DeletionByteCounter = nm.DeletionByteCounter + uint64(oldSize)
  41. }
  42. return nil
  43. })
  44. glog.V(1).Infof("max file key: %d for file: %s", nm.MaximumFileKey, file.Name())
  45. return nm, e
  46. }
  47. // walks through the index file, calls fn function with each key, offset, size
  48. // stops with the error returned by the fn function
  49. func WalkIndexFile(r *os.File, fn func(key uint64, offset, size uint32) error) error {
  50. var readerOffset int64
  51. bytes := make([]byte, 16*RowsToRead)
  52. count, e := r.ReadAt(bytes, readerOffset)
  53. glog.V(3).Infoln("file", r.Name(), "readerOffset", readerOffset, "count", count, "e", e)
  54. readerOffset += int64(count)
  55. var (
  56. key uint64
  57. offset, size uint32
  58. i int
  59. )
  60. for count > 0 && e == nil || e == io.EOF {
  61. for i = 0; i+16 <= count; i += 16 {
  62. key, offset, size = idxFileEntry(bytes[i : i+16])
  63. if e = fn(key, offset, size); e != nil {
  64. return e
  65. }
  66. }
  67. if e == io.EOF {
  68. return nil
  69. }
  70. count, e = r.ReadAt(bytes, readerOffset)
  71. glog.V(3).Infoln("file", r.Name(), "readerOffset", readerOffset, "count", count, "e", e)
  72. readerOffset += int64(count)
  73. }
  74. return e
  75. }
  76. func (nm *NeedleMap) Put(key uint64, offset uint32, size uint32) error {
  77. _, oldSize := nm.m.Set(Key(key), offset, size)
  78. nm.logPut(key, oldSize, size)
  79. return nm.appendToIndexFile(key, offset, size)
  80. }
  81. func (nm *NeedleMap) Get(key uint64) (element *NeedleValue, ok bool) {
  82. element, ok = nm.m.Get(Key(key))
  83. return
  84. }
  85. func (nm *NeedleMap) Delete(key uint64, offset uint32) error {
  86. deletedBytes := nm.m.Delete(Key(key))
  87. nm.logDelete(deletedBytes)
  88. return nm.appendToIndexFile(key, offset, TombstoneFileSize)
  89. }
  90. func (nm *NeedleMap) Close() {
  91. _ = nm.indexFile.Close()
  92. }
  93. func (nm *NeedleMap) Destroy() error {
  94. nm.Close()
  95. return os.Remove(nm.indexFile.Name())
  96. }