You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

188 lines
4.8 KiB

  1. package storage
  2. import (
  3. "bufio"
  4. "code.google.com/p/weed-fs/go/glog"
  5. "code.google.com/p/weed-fs/go/util"
  6. "fmt"
  7. "io"
  8. "os"
  9. )
  10. type NeedleMapper interface {
  11. Put(key uint64, offset uint32, size uint32) (int, error)
  12. Get(key uint64) (element *NeedleValue, ok bool)
  13. Delete(key uint64) error
  14. Close()
  15. Destroy() error
  16. ContentSize() uint64
  17. DeletedSize() uint64
  18. FileCount() int
  19. DeletedCount() int
  20. Visit(visit func(NeedleValue) error) (err error)
  21. NextFileKey(count int) uint64
  22. }
  23. type mapMetric struct {
  24. DeletionCounter int `json:"DeletionCounter"`
  25. FileCounter int `json:"FileCounter"`
  26. DeletionByteCounter uint64 `json:"DeletionByteCounter"`
  27. FileByteCounter uint64 `json:"FileByteCounter"`
  28. MaximumFileKey uint64 `json:"MaxFileKey"`
  29. }
  30. type NeedleMap struct {
  31. indexFile *os.File
  32. m CompactMap
  33. //transient
  34. bytes []byte
  35. mapMetric
  36. }
  37. func NewNeedleMap(file *os.File) *NeedleMap {
  38. nm := &NeedleMap{
  39. m: NewCompactMap(),
  40. bytes: make([]byte, 16),
  41. indexFile: file,
  42. }
  43. return nm
  44. }
  45. const (
  46. RowsToRead = 1024
  47. )
  48. func LoadNeedleMap(file *os.File) (*NeedleMap, error) {
  49. nm := NewNeedleMap(file)
  50. e := walkIndexFile(file, func(key uint64, offset, size uint32) error {
  51. if key > nm.MaximumFileKey {
  52. nm.MaximumFileKey = key
  53. }
  54. nm.FileCounter++
  55. nm.FileByteCounter = nm.FileByteCounter + uint64(size)
  56. if offset > 0 {
  57. oldSize := nm.m.Set(Key(key), offset, size)
  58. glog.V(4).Infoln("reading key", key, "offset", offset, "size", size, "oldSize", oldSize)
  59. if oldSize > 0 {
  60. nm.DeletionCounter++
  61. nm.DeletionByteCounter = nm.DeletionByteCounter + uint64(oldSize)
  62. }
  63. } else {
  64. oldSize := nm.m.Delete(Key(key))
  65. glog.V(4).Infoln("removing key", key, "offset", offset, "size", size, "oldSize", oldSize)
  66. nm.DeletionCounter++
  67. nm.DeletionByteCounter = nm.DeletionByteCounter + uint64(oldSize)
  68. }
  69. return nil
  70. })
  71. glog.V(1).Infoln("max file key:", nm.MaximumFileKey)
  72. return nm, e
  73. }
  74. // walks through the index file, calls fn function with each key, offset, size
  75. // stops with the error returned by the fn function
  76. func walkIndexFile(r io.Reader, fn func(key uint64, offset, size uint32) error) error {
  77. br := bufio.NewReaderSize(r, 1024*1024)
  78. bytes := make([]byte, 16*RowsToRead)
  79. count, e := br.Read(bytes)
  80. var (
  81. key uint64
  82. offset, size uint32
  83. i int
  84. )
  85. for count > 0 && e == nil {
  86. for i = 0; i+16 <= count; i += 16 {
  87. key = util.BytesToUint64(bytes[i : i+8])
  88. offset = util.BytesToUint32(bytes[i+8 : i+12])
  89. size = util.BytesToUint32(bytes[i+12 : i+16])
  90. if e = fn(key, offset, size); e != nil {
  91. return e
  92. }
  93. }
  94. if count%16 != 0 {
  95. copy(bytes[:count-i], bytes[i:count])
  96. i = count - i
  97. count, e = br.Read(bytes[i:])
  98. count += i
  99. } else {
  100. count, e = br.Read(bytes)
  101. }
  102. }
  103. if e == io.EOF {
  104. return nil
  105. }
  106. return e
  107. }
  108. func (nm *NeedleMap) Put(key uint64, offset uint32, size uint32) (int, error) {
  109. oldSize := nm.m.Set(Key(key), offset, size)
  110. util.Uint64toBytes(nm.bytes[0:8], key)
  111. util.Uint32toBytes(nm.bytes[8:12], offset)
  112. util.Uint32toBytes(nm.bytes[12:16], size)
  113. nm.FileCounter++
  114. nm.FileByteCounter = nm.FileByteCounter + uint64(size)
  115. if oldSize > 0 {
  116. nm.DeletionCounter++
  117. nm.DeletionByteCounter = nm.DeletionByteCounter + uint64(oldSize)
  118. }
  119. return nm.indexFile.Write(nm.bytes)
  120. }
  121. func (nm *NeedleMap) Get(key uint64) (element *NeedleValue, ok bool) {
  122. element, ok = nm.m.Get(Key(key))
  123. return
  124. }
  125. func (nm *NeedleMap) Delete(key uint64) error {
  126. nm.DeletionByteCounter = nm.DeletionByteCounter + uint64(nm.m.Delete(Key(key)))
  127. offset, err := nm.indexFile.Seek(0, 1)
  128. if err != nil {
  129. return fmt.Errorf("cannot get position of indexfile: %s", err)
  130. }
  131. util.Uint64toBytes(nm.bytes[0:8], key)
  132. util.Uint32toBytes(nm.bytes[8:12], 0)
  133. util.Uint32toBytes(nm.bytes[12:16], 0)
  134. if _, err = nm.indexFile.Write(nm.bytes); err != nil {
  135. plus := ""
  136. if e := nm.indexFile.Truncate(offset); e != nil {
  137. plus = "\ncouldn't truncate index file: " + e.Error()
  138. }
  139. return fmt.Errorf("error writing to indexfile %s: %s%s", nm.indexFile, err, plus)
  140. }
  141. nm.DeletionCounter++
  142. return nil
  143. }
  144. func (nm *NeedleMap) Close() {
  145. _ = nm.indexFile.Close()
  146. }
  147. func (nm *NeedleMap) Destroy() error {
  148. nm.Close()
  149. return os.Remove(nm.indexFile.Name())
  150. }
  151. func (nm NeedleMap) ContentSize() uint64 {
  152. return nm.FileByteCounter
  153. }
  154. func (nm NeedleMap) DeletedSize() uint64 {
  155. return nm.DeletionByteCounter
  156. }
  157. func (nm NeedleMap) FileCount() int {
  158. return nm.FileCounter
  159. }
  160. func (nm NeedleMap) DeletedCount() int {
  161. return nm.DeletionCounter
  162. }
  163. func (nm *NeedleMap) Visit(visit func(NeedleValue) error) (err error) {
  164. return nm.m.Visit(visit)
  165. }
  166. func (nm NeedleMap) MaxFileKey() uint64 {
  167. return nm.MaximumFileKey
  168. }
  169. func (nm NeedleMap) NextFileKey(count int) (ret uint64) {
  170. if count <= 0 {
  171. return 0
  172. }
  173. ret = nm.MaximumFileKey
  174. nm.MaximumFileKey += uint64(count)
  175. return
  176. }