You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

240 lines
7.1 KiB

3 years ago
3 years ago
6 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
6 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
6 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
  1. package storage
  2. import (
  3. "fmt"
  4. "os"
  5. "path/filepath"
  6. "strings"
  7. "github.com/syndtr/goleveldb/leveldb/errors"
  8. "github.com/syndtr/goleveldb/leveldb/opt"
  9. "github.com/chrislusf/seaweedfs/weed/storage/idx"
  10. "github.com/chrislusf/seaweedfs/weed/storage/types"
  11. "github.com/chrislusf/seaweedfs/weed/util"
  12. "github.com/syndtr/goleveldb/leveldb"
  13. "github.com/chrislusf/seaweedfs/weed/glog"
  14. "github.com/chrislusf/seaweedfs/weed/storage/needle_map"
  15. . "github.com/chrislusf/seaweedfs/weed/storage/types"
  16. )
  17. //mark it every milestoneCnt operations
  18. const milestoneCnt = 10000
  19. const milestoneKey = 0xffffffffffffffff - 1
  20. type LevelDbNeedleMap struct {
  21. baseNeedleMapper
  22. dbFileName string
  23. db *leveldb.DB
  24. recordNum uint64
  25. }
  26. func NewLevelDbNeedleMap(dbFileName string, indexFile *os.File, opts *opt.Options) (m *LevelDbNeedleMap, err error) {
  27. m = &LevelDbNeedleMap{dbFileName: dbFileName}
  28. m.indexFile = indexFile
  29. if !isLevelDbFresh(dbFileName, indexFile) {
  30. glog.V(1).Infof("Start to Generate %s from %s", dbFileName, indexFile.Name())
  31. generateLevelDbFile(dbFileName, indexFile)
  32. glog.V(1).Infof("Finished Generating %s from %s", dbFileName, indexFile.Name())
  33. }
  34. if stat, err := indexFile.Stat(); err != nil {
  35. glog.Fatalf("stat file %s: %v", indexFile.Name(), err)
  36. } else {
  37. m.indexFileOffset = stat.Size()
  38. }
  39. glog.V(1).Infof("Opening %s...", dbFileName)
  40. if m.db, err = leveldb.OpenFile(dbFileName, opts); err != nil {
  41. if errors.IsCorrupted(err) {
  42. m.db, err = leveldb.RecoverFile(dbFileName, opts)
  43. }
  44. if err != nil {
  45. return
  46. }
  47. }
  48. glog.V(1).Infof("Loading %s... , milestone: %d", dbFileName, getMileStone(m.db))
  49. m.recordNum = uint64(m.indexFileOffset / types.NeedleMapEntrySize)
  50. milestone := (m.recordNum / milestoneCnt) * milestoneCnt
  51. err = setMileStone(m.db, milestone)
  52. if err != nil {
  53. glog.Fatalf("set milestone for %s error: %s\n", dbFileName, err)
  54. return
  55. }
  56. mm, indexLoadError := newNeedleMapMetricFromIndexFile(indexFile)
  57. if indexLoadError != nil {
  58. return nil, indexLoadError
  59. }
  60. m.mapMetric = *mm
  61. return
  62. }
  63. func isLevelDbFresh(dbFileName string, indexFile *os.File) bool {
  64. // normally we always write to index file first
  65. dbLogFile, err := os.Open(filepath.Join(dbFileName, "LOG"))
  66. if err != nil {
  67. return false
  68. }
  69. defer dbLogFile.Close()
  70. dbStat, dbStatErr := dbLogFile.Stat()
  71. indexStat, indexStatErr := indexFile.Stat()
  72. if dbStatErr != nil || indexStatErr != nil {
  73. glog.V(0).Infof("Can not stat file: %v and %v", dbStatErr, indexStatErr)
  74. return false
  75. }
  76. return dbStat.ModTime().After(indexStat.ModTime())
  77. }
  78. func generateLevelDbFile(dbFileName string, indexFile *os.File) error {
  79. db, err := leveldb.OpenFile(dbFileName, nil)
  80. if err != nil {
  81. return err
  82. }
  83. defer db.Close()
  84. milestone := getMileStone(db)
  85. if stat, err := indexFile.Stat(); err != nil {
  86. glog.Fatalf("stat file %s: %v", indexFile.Name(), err)
  87. return err
  88. } else {
  89. if milestone*types.NeedleMapEntrySize > uint64(stat.Size()) {
  90. glog.Warningf("wrong milestone %d for filesize %d", milestone, stat.Size())
  91. }
  92. glog.V(0).Infof("generateLevelDbFile %s, milestone %d, num of entries:%d", dbFileName, milestone, (uint64(stat.Size())-milestone*types.NeedleMapEntrySize)/types.NeedleMapEntrySize)
  93. }
  94. return idx.WalkIndexFileIncrement(indexFile, milestone, func(key NeedleId, offset Offset, size Size) error {
  95. if !offset.IsZero() && size.IsValid() {
  96. levelDbWrite(db, key, offset, size, false, 0)
  97. } else {
  98. levelDbDelete(db, key)
  99. }
  100. return nil
  101. })
  102. }
  103. func (m *LevelDbNeedleMap) Get(key NeedleId) (element *needle_map.NeedleValue, ok bool) {
  104. bytes := make([]byte, NeedleIdSize)
  105. NeedleIdToBytes(bytes[0:NeedleIdSize], key)
  106. data, err := m.db.Get(bytes, nil)
  107. if err != nil || len(data) != OffsetSize+SizeSize {
  108. return nil, false
  109. }
  110. offset := BytesToOffset(data[0:OffsetSize])
  111. size := BytesToSize(data[OffsetSize : OffsetSize+SizeSize])
  112. return &needle_map.NeedleValue{Key: key, Offset: offset, Size: size}, true
  113. }
  114. func (m *LevelDbNeedleMap) Put(key NeedleId, offset Offset, size Size) error {
  115. var oldSize Size
  116. var milestone uint64
  117. if oldNeedle, ok := m.Get(key); ok {
  118. oldSize = oldNeedle.Size
  119. }
  120. m.logPut(key, oldSize, size)
  121. // write to index file first
  122. if err := m.appendToIndexFile(key, offset, size); err != nil {
  123. return fmt.Errorf("cannot write to indexfile %s: %v", m.indexFile.Name(), err)
  124. }
  125. m.recordNum++
  126. if m.recordNum%milestoneCnt != 0 {
  127. milestone = 0
  128. } else {
  129. milestone = (m.recordNum / milestoneCnt) * milestoneCnt
  130. glog.V(1).Infof("put cnt:%d for %s,milestone: %d", m.recordNum, m.dbFileName, milestone)
  131. }
  132. return levelDbWrite(m.db, key, offset, size, milestone == 0, milestone)
  133. }
  134. func getMileStone(db *leveldb.DB) uint64 {
  135. var mskBytes = make([]byte, 8)
  136. util.Uint64toBytes(mskBytes, milestoneKey)
  137. data, err := db.Get(mskBytes, nil)
  138. if err != nil || len(data) != 8 {
  139. glog.Warningf("get milestone from db error: %v, %d", err, len(data))
  140. if !strings.Contains(strings.ToLower(err.Error()), "not found") {
  141. err = setMileStone(db, 0)
  142. if err != nil {
  143. glog.Errorf("failed to set milestone: %v", err)
  144. }
  145. }
  146. return 0
  147. }
  148. return util.BytesToUint64(data)
  149. }
  150. func setMileStone(db *leveldb.DB, milestone uint64) error {
  151. glog.V(1).Infof("set milestone %d", milestone)
  152. var mskBytes = make([]byte, 8)
  153. util.Uint64toBytes(mskBytes, milestoneKey)
  154. var msBytes = make([]byte, 8)
  155. util.Uint64toBytes(msBytes, milestone)
  156. if err := db.Put(mskBytes, msBytes, nil); err != nil {
  157. return fmt.Errorf("failed to setMileStone: %v", err)
  158. }
  159. return nil
  160. }
  161. func levelDbWrite(db *leveldb.DB, key NeedleId, offset Offset, size Size, upateMilstone bool, milestone uint64) error {
  162. bytes := needle_map.ToBytes(key, offset, size)
  163. if err := db.Put(bytes[0:NeedleIdSize], bytes[NeedleIdSize:NeedleIdSize+OffsetSize+SizeSize], nil); err != nil {
  164. return fmt.Errorf("failed to write leveldb: %v", err)
  165. }
  166. // set milestone
  167. if upateMilstone {
  168. return setMileStone(db, milestone)
  169. }
  170. return nil
  171. }
  172. func levelDbDelete(db *leveldb.DB, key NeedleId) error {
  173. bytes := make([]byte, NeedleIdSize)
  174. NeedleIdToBytes(bytes, key)
  175. return db.Delete(bytes, nil)
  176. }
  177. func (m *LevelDbNeedleMap) Delete(key NeedleId, offset Offset) error {
  178. var milestone uint64
  179. oldNeedle, found := m.Get(key)
  180. if !found || oldNeedle.Size.IsDeleted() {
  181. return nil
  182. }
  183. m.logDelete(oldNeedle.Size)
  184. // write to index file first
  185. if err := m.appendToIndexFile(key, offset, TombstoneFileSize); err != nil {
  186. return err
  187. }
  188. m.recordNum++
  189. if m.recordNum%milestoneCnt != 0 {
  190. milestone = 0
  191. } else {
  192. milestone = (m.recordNum / milestoneCnt) * milestoneCnt
  193. }
  194. return levelDbWrite(m.db, key, oldNeedle.Offset, -oldNeedle.Size, milestone == 0, milestone)
  195. }
  196. func (m *LevelDbNeedleMap) Close() {
  197. indexFileName := m.indexFile.Name()
  198. if err := m.indexFile.Sync(); err != nil {
  199. glog.Warningf("sync file %s failed: %v", indexFileName, err)
  200. }
  201. if err := m.indexFile.Close(); err != nil {
  202. glog.Warningf("close index file %s failed: %v", indexFileName, err)
  203. }
  204. if m.db != nil {
  205. if err := m.db.Close(); err != nil {
  206. glog.Warningf("close levelDB failed: %v", err)
  207. }
  208. }
  209. }
  210. func (m *LevelDbNeedleMap) Destroy() error {
  211. m.Close()
  212. os.Remove(m.indexFile.Name())
  213. return os.RemoveAll(m.dbFileName)
  214. }