You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

177 lines
4.8 KiB

  1. package storage
  2. import (
  3. "code.google.com/p/weed-fs/go/glog"
  4. "code.google.com/p/weed-fs/go/util"
  5. "fmt"
  6. "io"
  7. "os"
  8. )
  9. type NeedleMapper interface {
  10. Put(key uint64, offset uint32, size uint32) (int, error)
  11. Get(key uint64) (element *NeedleValue, ok bool)
  12. Delete(key uint64) error
  13. Close()
  14. Destroy() error
  15. ContentSize() uint64
  16. DeletedSize() uint64
  17. FileCount() int
  18. DeletedCount() int
  19. Visit(visit func(NeedleValue) error) (err error)
  20. MaxFileKey() uint64
  21. }
  22. type mapMetric struct {
  23. DeletionCounter int `json:"DeletionCounter"`
  24. FileCounter int `json:"FileCounter"`
  25. DeletionByteCounter uint64 `json:"DeletionByteCounter"`
  26. FileByteCounter uint64 `json:"FileByteCounter"`
  27. MaximumFileKey uint64 `json:"MaxFileKey"`
  28. }
  29. type NeedleMap struct {
  30. indexFile *os.File
  31. m CompactMap
  32. mapMetric
  33. }
  34. func NewNeedleMap(file *os.File) *NeedleMap {
  35. nm := &NeedleMap{
  36. m: NewCompactMap(),
  37. indexFile: file,
  38. }
  39. return nm
  40. }
  41. const (
  42. RowsToRead = 1024
  43. )
  44. func LoadNeedleMap(file *os.File) (*NeedleMap, error) {
  45. nm := NewNeedleMap(file)
  46. e := walkIndexFile(file, func(key uint64, offset, size uint32) error {
  47. if key > nm.MaximumFileKey {
  48. nm.MaximumFileKey = key
  49. }
  50. nm.FileCounter++
  51. nm.FileByteCounter = nm.FileByteCounter + uint64(size)
  52. if offset > 0 {
  53. oldSize := nm.m.Set(Key(key), offset, size)
  54. glog.V(3).Infoln("reading key", key, "offset", offset, "size", size, "oldSize", oldSize)
  55. if oldSize > 0 {
  56. nm.DeletionCounter++
  57. nm.DeletionByteCounter = nm.DeletionByteCounter + uint64(oldSize)
  58. }
  59. } else {
  60. oldSize := nm.m.Delete(Key(key))
  61. glog.V(3).Infoln("removing key", key, "offset", offset, "size", size, "oldSize", oldSize)
  62. nm.DeletionCounter++
  63. nm.DeletionByteCounter = nm.DeletionByteCounter + uint64(oldSize)
  64. }
  65. return nil
  66. })
  67. glog.V(1).Infoln("max file key:", nm.MaximumFileKey)
  68. return nm, e
  69. }
  70. // walks through the index file, calls fn function with each key, offset, size
  71. // stops with the error returned by the fn function
  72. func walkIndexFile(r *os.File, fn func(key uint64, offset, size uint32) error) error {
  73. var readerOffset int64
  74. bytes := make([]byte, 16*RowsToRead)
  75. count, e := r.ReadAt(bytes, readerOffset)
  76. glog.V(3).Infoln("file", r.Name(), "readerOffset", readerOffset, "count", count, "e", e)
  77. readerOffset += int64(count)
  78. var (
  79. key uint64
  80. offset, size uint32
  81. i int
  82. )
  83. for count > 0 && e == nil || e == io.EOF {
  84. for i = 0; i+16 <= count; i += 16 {
  85. key = util.BytesToUint64(bytes[i : i+8])
  86. offset = util.BytesToUint32(bytes[i+8 : i+12])
  87. size = util.BytesToUint32(bytes[i+12 : i+16])
  88. if e = fn(key, offset, size); e != nil {
  89. return e
  90. }
  91. }
  92. if e == io.EOF {
  93. return nil
  94. }
  95. count, e = r.ReadAt(bytes, readerOffset)
  96. glog.V(3).Infoln("file", r.Name(), "readerOffset", readerOffset, "count", count, "e", e)
  97. readerOffset += int64(count)
  98. }
  99. return e
  100. }
  101. func (nm *NeedleMap) Put(key uint64, offset uint32, size uint32) (int, error) {
  102. if key > nm.MaximumFileKey {
  103. nm.MaximumFileKey = key
  104. }
  105. oldSize := nm.m.Set(Key(key), offset, size)
  106. bytes := make([]byte, 16)
  107. util.Uint64toBytes(bytes[0:8], key)
  108. util.Uint32toBytes(bytes[8:12], offset)
  109. util.Uint32toBytes(bytes[12:16], size)
  110. nm.FileCounter++
  111. nm.FileByteCounter = nm.FileByteCounter + uint64(size)
  112. if oldSize > 0 {
  113. nm.DeletionCounter++
  114. nm.DeletionByteCounter = nm.DeletionByteCounter + uint64(oldSize)
  115. }
  116. return nm.indexFile.Write(bytes)
  117. }
  118. func (nm *NeedleMap) Get(key uint64) (element *NeedleValue, ok bool) {
  119. element, ok = nm.m.Get(Key(key))
  120. return
  121. }
  122. func (nm *NeedleMap) Delete(key uint64) error {
  123. nm.DeletionByteCounter = nm.DeletionByteCounter + uint64(nm.m.Delete(Key(key)))
  124. offset, err := nm.indexFile.Seek(0, 1)
  125. if err != nil {
  126. return fmt.Errorf("cannot get position of indexfile: %s", err)
  127. }
  128. bytes := make([]byte, 16)
  129. util.Uint64toBytes(bytes[0:8], key)
  130. util.Uint32toBytes(bytes[8:12], 0)
  131. util.Uint32toBytes(bytes[12:16], 0)
  132. if _, err = nm.indexFile.Write(bytes); err != nil {
  133. plus := ""
  134. if e := nm.indexFile.Truncate(offset); e != nil {
  135. plus = "\ncouldn't truncate index file: " + e.Error()
  136. }
  137. return fmt.Errorf("error writing to indexfile %s: %s%s", nm.indexFile.Name(), err, plus)
  138. }
  139. nm.DeletionCounter++
  140. return nil
  141. }
  142. func (nm *NeedleMap) Close() {
  143. _ = nm.indexFile.Close()
  144. }
  145. func (nm *NeedleMap) Destroy() error {
  146. nm.Close()
  147. return os.Remove(nm.indexFile.Name())
  148. }
  149. func (nm NeedleMap) ContentSize() uint64 {
  150. return nm.FileByteCounter
  151. }
  152. func (nm NeedleMap) DeletedSize() uint64 {
  153. return nm.DeletionByteCounter
  154. }
  155. func (nm NeedleMap) FileCount() int {
  156. return nm.FileCounter
  157. }
  158. func (nm NeedleMap) DeletedCount() int {
  159. return nm.DeletionCounter
  160. }
  161. func (nm *NeedleMap) Visit(visit func(NeedleValue) error) (err error) {
  162. return nm.m.Visit(visit)
  163. }
  164. func (nm NeedleMap) MaxFileKey() uint64 {
  165. return nm.MaximumFileKey
  166. }