You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

165 lines
4.2 KiB

  1. package storage
  2. import (
  3. "bufio"
  4. "code.google.com/p/weed-fs/go/util"
  5. "fmt"
  6. "io"
  7. "os"
  8. )
  9. type NeedleMapper interface {
  10. Put(key uint64, offset uint32, size uint32) (int, error)
  11. Get(key uint64) (element *NeedleValue, ok bool)
  12. Delete(key uint64) error
  13. Close()
  14. ContentSize() uint64
  15. DeletedSize() uint64
  16. FileCount() int
  17. DeletedCount() int
  18. Visit(visit func(NeedleValue) error) (err error)
  19. }
  20. type mapMetric struct {
  21. DeletionCounter int `json:"DeletionCounter"`
  22. FileCounter int `json:"FileCounter"`
  23. DeletionByteCounter uint64 `json:"DeletionByteCounter"`
  24. FileByteCounter uint64 `json:"FileByteCounter"`
  25. }
  26. type NeedleMap struct {
  27. indexFile *os.File
  28. m CompactMap
  29. //transient
  30. bytes []byte
  31. mapMetric
  32. }
  33. func NewNeedleMap(file *os.File) *NeedleMap {
  34. nm := &NeedleMap{
  35. m: NewCompactMap(),
  36. bytes: make([]byte, 16),
  37. indexFile: file,
  38. }
  39. return nm
  40. }
  41. const (
  42. RowsToRead = 1024
  43. )
  44. func LoadNeedleMap(file *os.File) (*NeedleMap, error) {
  45. nm := NewNeedleMap(file)
  46. e := walkIndexFile(file, func(key uint64, offset, size uint32) error {
  47. nm.FileCounter++
  48. nm.FileByteCounter = nm.FileByteCounter + uint64(size)
  49. if offset > 0 {
  50. oldSize := nm.m.Set(Key(key), offset, size)
  51. //glog.V(0).Infoln("reading key", key, "offset", offset, "size", size, "oldSize", oldSize)
  52. if oldSize > 0 {
  53. nm.DeletionCounter++
  54. nm.DeletionByteCounter = nm.DeletionByteCounter + uint64(oldSize)
  55. }
  56. } else {
  57. oldSize := nm.m.Delete(Key(key))
  58. //glog.V(0).Infoln("removing key", key, "offset", offset, "size", size, "oldSize", oldSize)
  59. nm.DeletionCounter++
  60. nm.DeletionByteCounter = nm.DeletionByteCounter + uint64(oldSize)
  61. }
  62. return nil
  63. })
  64. return nm, e
  65. }
  66. // walks through the index file, calls fn function with each key, offset, size
  67. // stops with the error returned by the fn function
  68. func walkIndexFile(r io.Reader, fn func(key uint64, offset, size uint32) error) error {
  69. br := bufio.NewReaderSize(r, 1024*1024)
  70. bytes := make([]byte, 16*RowsToRead)
  71. count, e := br.Read(bytes)
  72. var (
  73. key uint64
  74. offset, size uint32
  75. i int
  76. )
  77. for count > 0 && e == nil {
  78. for i = 0; i+16 <= count; i += 16 {
  79. key = util.BytesToUint64(bytes[i : i+8])
  80. offset = util.BytesToUint32(bytes[i+8 : i+12])
  81. size = util.BytesToUint32(bytes[i+12 : i+16])
  82. if e = fn(key, offset, size); e != nil {
  83. return e
  84. }
  85. }
  86. if count%16 != 0 {
  87. copy(bytes[:count-i], bytes[i:count])
  88. i = count - i
  89. count, e = br.Read(bytes[i:])
  90. count += i
  91. } else {
  92. count, e = br.Read(bytes)
  93. }
  94. }
  95. if e == io.EOF {
  96. return nil
  97. }
  98. return e
  99. }
  100. func (nm *NeedleMap) Put(key uint64, offset uint32, size uint32) (int, error) {
  101. oldSize := nm.m.Set(Key(key), offset, size)
  102. util.Uint64toBytes(nm.bytes[0:8], key)
  103. util.Uint32toBytes(nm.bytes[8:12], offset)
  104. util.Uint32toBytes(nm.bytes[12:16], size)
  105. nm.FileCounter++
  106. nm.FileByteCounter = nm.FileByteCounter + uint64(size)
  107. if oldSize > 0 {
  108. nm.DeletionCounter++
  109. nm.DeletionByteCounter = nm.DeletionByteCounter + uint64(oldSize)
  110. }
  111. return nm.indexFile.Write(nm.bytes)
  112. }
  113. func (nm *NeedleMap) Get(key uint64) (element *NeedleValue, ok bool) {
  114. element, ok = nm.m.Get(Key(key))
  115. return
  116. }
  117. func (nm *NeedleMap) Delete(key uint64) error {
  118. nm.DeletionByteCounter = nm.DeletionByteCounter + uint64(nm.m.Delete(Key(key)))
  119. offset, err := nm.indexFile.Seek(0, 1)
  120. if err != nil {
  121. return fmt.Errorf("cannot get position of indexfile: %s", err)
  122. }
  123. util.Uint64toBytes(nm.bytes[0:8], key)
  124. util.Uint32toBytes(nm.bytes[8:12], 0)
  125. util.Uint32toBytes(nm.bytes[12:16], 0)
  126. if _, err = nm.indexFile.Write(nm.bytes); err != nil {
  127. plus := ""
  128. if e := nm.indexFile.Truncate(offset); e != nil {
  129. plus = "\ncouldn't truncate index file: " + e.Error()
  130. }
  131. return fmt.Errorf("error writing to indexfile %s: %s%s", nm.indexFile, err, plus)
  132. }
  133. nm.DeletionCounter++
  134. return nil
  135. }
  136. func (nm *NeedleMap) Close() {
  137. _ = nm.indexFile.Close()
  138. }
  139. func (nm NeedleMap) ContentSize() uint64 {
  140. return nm.FileByteCounter
  141. }
  142. func (nm NeedleMap) DeletedSize() uint64 {
  143. return nm.DeletionByteCounter
  144. }
  145. func (nm NeedleMap) FileCount() int {
  146. return nm.FileCounter
  147. }
  148. func (nm NeedleMap) DeletedCount() int {
  149. return nm.DeletionCounter
  150. }
  151. func (nm *NeedleMap) Visit(visit func(NeedleValue) error) (err error) {
  152. return nm.m.Visit(visit)
  153. }