You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

183 lines
4.7 KiB

  1. package storage
  2. import (
  3. "bufio"
  4. "code.google.com/p/weed-fs/go/glog"
  5. "code.google.com/p/weed-fs/go/util"
  6. "fmt"
  7. "io"
  8. "os"
  9. )
  10. type NeedleMapper interface {
  11. Put(key uint64, offset uint32, size uint32) (int, error)
  12. Get(key uint64) (element *NeedleValue, ok bool)
  13. Delete(key uint64) error
  14. Close()
  15. ContentSize() uint64
  16. DeletedSize() uint64
  17. FileCount() int
  18. DeletedCount() int
  19. Visit(visit func(NeedleValue) error) (err error)
  20. NextFileKey(count int) uint64
  21. }
  22. type mapMetric struct {
  23. DeletionCounter int `json:"DeletionCounter"`
  24. FileCounter int `json:"FileCounter"`
  25. DeletionByteCounter uint64 `json:"DeletionByteCounter"`
  26. FileByteCounter uint64 `json:"FileByteCounter"`
  27. MaximumFileKey uint64 `json:"MaxFileKey"`
  28. }
  29. type NeedleMap struct {
  30. indexFile *os.File
  31. m CompactMap
  32. //transient
  33. bytes []byte
  34. mapMetric
  35. }
  36. func NewNeedleMap(file *os.File) *NeedleMap {
  37. nm := &NeedleMap{
  38. m: NewCompactMap(),
  39. bytes: make([]byte, 16),
  40. indexFile: file,
  41. }
  42. return nm
  43. }
  44. const (
  45. RowsToRead = 1024
  46. )
  47. func LoadNeedleMap(file *os.File) (*NeedleMap, error) {
  48. nm := NewNeedleMap(file)
  49. e := walkIndexFile(file, func(key uint64, offset, size uint32) error {
  50. if key > nm.MaximumFileKey {
  51. nm.MaximumFileKey = key
  52. }
  53. nm.FileCounter++
  54. nm.FileByteCounter = nm.FileByteCounter + uint64(size)
  55. if offset > 0 {
  56. oldSize := nm.m.Set(Key(key), offset, size)
  57. glog.V(4).Infoln("reading key", key, "offset", offset, "size", size, "oldSize", oldSize)
  58. if oldSize > 0 {
  59. nm.DeletionCounter++
  60. nm.DeletionByteCounter = nm.DeletionByteCounter + uint64(oldSize)
  61. }
  62. } else {
  63. oldSize := nm.m.Delete(Key(key))
  64. glog.V(4).Infoln("removing key", key, "offset", offset, "size", size, "oldSize", oldSize)
  65. nm.DeletionCounter++
  66. nm.DeletionByteCounter = nm.DeletionByteCounter + uint64(oldSize)
  67. }
  68. return nil
  69. })
  70. glog.V(1).Infoln("max file key:", nm.MaximumFileKey)
  71. return nm, e
  72. }
  73. // walks through the index file, calls fn function with each key, offset, size
  74. // stops with the error returned by the fn function
  75. func walkIndexFile(r io.Reader, fn func(key uint64, offset, size uint32) error) error {
  76. br := bufio.NewReaderSize(r, 1024*1024)
  77. bytes := make([]byte, 16*RowsToRead)
  78. count, e := br.Read(bytes)
  79. var (
  80. key uint64
  81. offset, size uint32
  82. i int
  83. )
  84. for count > 0 && e == nil {
  85. for i = 0; i+16 <= count; i += 16 {
  86. key = util.BytesToUint64(bytes[i : i+8])
  87. offset = util.BytesToUint32(bytes[i+8 : i+12])
  88. size = util.BytesToUint32(bytes[i+12 : i+16])
  89. if e = fn(key, offset, size); e != nil {
  90. return e
  91. }
  92. }
  93. if count%16 != 0 {
  94. copy(bytes[:count-i], bytes[i:count])
  95. i = count - i
  96. count, e = br.Read(bytes[i:])
  97. count += i
  98. } else {
  99. count, e = br.Read(bytes)
  100. }
  101. }
  102. if e == io.EOF {
  103. return nil
  104. }
  105. return e
  106. }
  107. func (nm *NeedleMap) Put(key uint64, offset uint32, size uint32) (int, error) {
  108. oldSize := nm.m.Set(Key(key), offset, size)
  109. util.Uint64toBytes(nm.bytes[0:8], key)
  110. util.Uint32toBytes(nm.bytes[8:12], offset)
  111. util.Uint32toBytes(nm.bytes[12:16], size)
  112. nm.FileCounter++
  113. nm.FileByteCounter = nm.FileByteCounter + uint64(size)
  114. if oldSize > 0 {
  115. nm.DeletionCounter++
  116. nm.DeletionByteCounter = nm.DeletionByteCounter + uint64(oldSize)
  117. }
  118. return nm.indexFile.Write(nm.bytes)
  119. }
  120. func (nm *NeedleMap) Get(key uint64) (element *NeedleValue, ok bool) {
  121. element, ok = nm.m.Get(Key(key))
  122. return
  123. }
  124. func (nm *NeedleMap) Delete(key uint64) error {
  125. nm.DeletionByteCounter = nm.DeletionByteCounter + uint64(nm.m.Delete(Key(key)))
  126. offset, err := nm.indexFile.Seek(0, 1)
  127. if err != nil {
  128. return fmt.Errorf("cannot get position of indexfile: %s", err)
  129. }
  130. util.Uint64toBytes(nm.bytes[0:8], key)
  131. util.Uint32toBytes(nm.bytes[8:12], 0)
  132. util.Uint32toBytes(nm.bytes[12:16], 0)
  133. if _, err = nm.indexFile.Write(nm.bytes); err != nil {
  134. plus := ""
  135. if e := nm.indexFile.Truncate(offset); e != nil {
  136. plus = "\ncouldn't truncate index file: " + e.Error()
  137. }
  138. return fmt.Errorf("error writing to indexfile %s: %s%s", nm.indexFile, err, plus)
  139. }
  140. nm.DeletionCounter++
  141. return nil
  142. }
  143. func (nm *NeedleMap) Close() {
  144. _ = nm.indexFile.Close()
  145. }
  146. func (nm NeedleMap) ContentSize() uint64 {
  147. return nm.FileByteCounter
  148. }
  149. func (nm NeedleMap) DeletedSize() uint64 {
  150. return nm.DeletionByteCounter
  151. }
  152. func (nm NeedleMap) FileCount() int {
  153. return nm.FileCounter
  154. }
  155. func (nm NeedleMap) DeletedCount() int {
  156. return nm.DeletionCounter
  157. }
  158. func (nm *NeedleMap) Visit(visit func(NeedleValue) error) (err error) {
  159. return nm.m.Visit(visit)
  160. }
  161. func (nm NeedleMap) MaxFileKey() uint64 {
  162. return nm.MaximumFileKey
  163. }
  164. func (nm NeedleMap) NextFileKey(count int) (ret uint64) {
  165. if count <= 0 {
  166. return 0
  167. }
  168. ret = nm.MaximumFileKey
  169. nm.MaximumFileKey += uint64(count)
  170. return
  171. }