You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

186 lines
4.8 KiB

7 years ago
7 years ago
7 years ago
  1. package storage
  2. import (
  3. "fmt"
  4. "os"
  5. "github.com/boltdb/bolt"
  6. "errors"
  7. "github.com/chrislusf/seaweedfs/weed/glog"
  8. "github.com/chrislusf/seaweedfs/weed/storage/needle"
  9. . "github.com/chrislusf/seaweedfs/weed/storage/types"
  10. "github.com/chrislusf/seaweedfs/weed/util"
  11. )
  12. type BoltDbNeedleMap struct {
  13. dbFileName string
  14. db *bolt.DB
  15. baseNeedleMapper
  16. }
  17. var boltdbBucket = []byte("weed")
  18. var NotFound = errors.New("not found")
  19. // TODO avoid using btree to count deletions.
  20. func NewBoltDbNeedleMap(dbFileName string, indexFile *os.File) (m *BoltDbNeedleMap, err error) {
  21. m = &BoltDbNeedleMap{dbFileName: dbFileName}
  22. m.indexFile = indexFile
  23. if !isBoltDbFresh(dbFileName, indexFile) {
  24. glog.V(1).Infof("Start to Generate %s from %s", dbFileName, indexFile.Name())
  25. generateBoltDbFile(dbFileName, indexFile)
  26. glog.V(1).Infof("Finished Generating %s from %s", dbFileName, indexFile.Name())
  27. }
  28. glog.V(1).Infof("Opening %s...", dbFileName)
  29. if m.db, err = bolt.Open(dbFileName, 0644, nil); err != nil {
  30. return
  31. }
  32. glog.V(1).Infof("Loading %s...", indexFile.Name())
  33. mm, indexLoadError := newNeedleMapMetricFromIndexFile(indexFile)
  34. if indexLoadError != nil {
  35. return nil, indexLoadError
  36. }
  37. m.mapMetric = *mm
  38. return
  39. }
  40. func isBoltDbFresh(dbFileName string, indexFile *os.File) bool {
  41. // normally we always write to index file first
  42. dbLogFile, err := os.Open(dbFileName)
  43. if err != nil {
  44. return false
  45. }
  46. defer dbLogFile.Close()
  47. dbStat, dbStatErr := dbLogFile.Stat()
  48. indexStat, indexStatErr := indexFile.Stat()
  49. if dbStatErr != nil || indexStatErr != nil {
  50. glog.V(0).Infof("Can not stat file: %v and %v", dbStatErr, indexStatErr)
  51. return false
  52. }
  53. return dbStat.ModTime().After(indexStat.ModTime())
  54. }
  55. func generateBoltDbFile(dbFileName string, indexFile *os.File) error {
  56. db, err := bolt.Open(dbFileName, 0644, nil)
  57. if err != nil {
  58. return err
  59. }
  60. defer db.Close()
  61. return WalkIndexFile(indexFile, func(key NeedleId, offset Offset, size uint32) error {
  62. if offset > 0 && size != TombstoneFileSize {
  63. boltDbWrite(db, key, offset, size)
  64. } else {
  65. boltDbDelete(db, key)
  66. }
  67. return nil
  68. })
  69. }
  70. func (m *BoltDbNeedleMap) Get(key NeedleId) (element *needle.NeedleValue, ok bool) {
  71. var offset Offset
  72. var size uint32
  73. bytes := make([]byte, NeedleIdSize)
  74. NeedleIdToBytes(bytes, key)
  75. err := m.db.View(func(tx *bolt.Tx) error {
  76. bucket := tx.Bucket(boltdbBucket)
  77. if bucket == nil {
  78. return fmt.Errorf("Bucket %q not found!", boltdbBucket)
  79. }
  80. data := bucket.Get(bytes)
  81. if len(data) == 0 {
  82. return NotFound
  83. }
  84. if len(data) != OffsetSize+SizeSize {
  85. glog.V(0).Infof("key:%v has wrong data length: %d", key, len(data))
  86. return fmt.Errorf("key:%v has wrong data length: %d", key, len(data))
  87. }
  88. offset = BytesToOffset(data[0:OffsetSize])
  89. size = util.BytesToUint32(data[OffsetSize : OffsetSize+SizeSize])
  90. return nil
  91. })
  92. if err != nil {
  93. return nil, false
  94. }
  95. return &needle.NeedleValue{Key: key, Offset: offset, Size: size}, true
  96. }
  97. func (m *BoltDbNeedleMap) Put(key NeedleId, offset Offset, size uint32) error {
  98. var oldSize uint32
  99. if oldNeedle, ok := m.Get(key); ok {
  100. oldSize = oldNeedle.Size
  101. }
  102. m.logPut(key, oldSize, size)
  103. // write to index file first
  104. if err := m.appendToIndexFile(key, offset, size); err != nil {
  105. return fmt.Errorf("cannot write to indexfile %s: %v", m.indexFile.Name(), err)
  106. }
  107. return boltDbWrite(m.db, key, offset, size)
  108. }
  109. func boltDbWrite(db *bolt.DB,
  110. key NeedleId, offset Offset, size uint32) error {
  111. bytes := make([]byte, NeedleIdSize+OffsetSize+SizeSize)
  112. NeedleIdToBytes(bytes[0:NeedleIdSize], key)
  113. OffsetToBytes(bytes[NeedleIdSize:NeedleIdSize+OffsetSize], offset)
  114. util.Uint32toBytes(bytes[NeedleIdSize+OffsetSize:NeedleIdSize+OffsetSize+SizeSize], size)
  115. return db.Update(func(tx *bolt.Tx) error {
  116. bucket, err := tx.CreateBucketIfNotExists(boltdbBucket)
  117. if err != nil {
  118. return err
  119. }
  120. err = bucket.Put(bytes[0:NeedleIdSize], bytes[NeedleIdSize:NeedleIdSize+OffsetSize+SizeSize])
  121. if err != nil {
  122. return err
  123. }
  124. return nil
  125. })
  126. }
  127. func boltDbDelete(db *bolt.DB, key NeedleId) error {
  128. bytes := make([]byte, NeedleIdSize)
  129. NeedleIdToBytes(bytes, key)
  130. return db.Update(func(tx *bolt.Tx) error {
  131. bucket, err := tx.CreateBucketIfNotExists(boltdbBucket)
  132. if err != nil {
  133. return err
  134. }
  135. err = bucket.Delete(bytes)
  136. if err != nil {
  137. return err
  138. }
  139. return nil
  140. })
  141. }
  142. func (m *BoltDbNeedleMap) Delete(key NeedleId, offset Offset) error {
  143. if oldNeedle, ok := m.Get(key); ok {
  144. m.logDelete(oldNeedle.Size)
  145. }
  146. // write to index file first
  147. if err := m.appendToIndexFile(key, offset, TombstoneFileSize); err != nil {
  148. return err
  149. }
  150. return boltDbDelete(m.db, key)
  151. }
  152. func (m *BoltDbNeedleMap) Close() {
  153. m.indexFile.Close()
  154. m.db.Close()
  155. }
  156. func (m *BoltDbNeedleMap) Destroy() error {
  157. m.Close()
  158. os.Remove(m.indexFile.Name())
  159. return os.Remove(m.dbFileName)
  160. }