You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

240 lines
7.1 KiB

3 years ago
6 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
6 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
6 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
  1. package storage
  2. import (
  3. "fmt"
  4. "os"
  5. "path/filepath"
  6. "github.com/syndtr/goleveldb/leveldb/errors"
  7. "github.com/syndtr/goleveldb/leveldb/opt"
  8. "github.com/chrislusf/seaweedfs/weed/storage/idx"
  9. "github.com/chrislusf/seaweedfs/weed/storage/types"
  10. "github.com/chrislusf/seaweedfs/weed/util"
  11. "github.com/syndtr/goleveldb/leveldb"
  12. "github.com/chrislusf/seaweedfs/weed/glog"
  13. "github.com/chrislusf/seaweedfs/weed/storage/needle_map"
  14. . "github.com/chrislusf/seaweedfs/weed/storage/types"
  15. )
  16. //mark it every milestoneCnt operations
  17. const milestoneCnt = 10000
  18. const milestoneKey = 0xffffffffffffffff - 1
  19. type LevelDbNeedleMap struct {
  20. baseNeedleMapper
  21. dbFileName string
  22. db *leveldb.DB
  23. recordNum uint64
  24. }
  25. func NewLevelDbNeedleMap(dbFileName string, indexFile *os.File, opts *opt.Options) (m *LevelDbNeedleMap, err error) {
  26. m = &LevelDbNeedleMap{dbFileName: dbFileName}
  27. m.indexFile = indexFile
  28. if !isLevelDbFresh(dbFileName, indexFile) {
  29. glog.V(1).Infof("Start to Generate %s from %s", dbFileName, indexFile.Name())
  30. generateLevelDbFile(dbFileName, indexFile)
  31. glog.V(1).Infof("Finished Generating %s from %s", dbFileName, indexFile.Name())
  32. }
  33. if stat, err := indexFile.Stat(); err != nil {
  34. glog.Fatalf("stat file %s: %v", indexFile.Name(), err)
  35. } else {
  36. m.indexFileOffset = stat.Size()
  37. }
  38. glog.V(1).Infof("Opening %s...", dbFileName)
  39. if m.db, err = leveldb.OpenFile(dbFileName, opts); err != nil {
  40. if errors.IsCorrupted(err) {
  41. m.db, err = leveldb.RecoverFile(dbFileName, opts)
  42. }
  43. if err != nil {
  44. return
  45. }
  46. }
  47. glog.V(1).Infof("Loading %s... , milestone: %d", dbFileName, getMileStone(m.db))
  48. m.recordNum = uint64(m.indexFileOffset / types.NeedleMapEntrySize)
  49. milestone := (m.recordNum / milestoneCnt) * milestoneCnt
  50. err = setMileStone(m.db, milestone)
  51. if err != nil {
  52. glog.Fatalf("set milestone for %s error: %s\n", dbFileName, err)
  53. return
  54. }
  55. mm, indexLoadError := newNeedleMapMetricFromIndexFile(indexFile)
  56. if indexLoadError != nil {
  57. return nil, indexLoadError
  58. }
  59. m.mapMetric = *mm
  60. return
  61. }
  62. func isLevelDbFresh(dbFileName string, indexFile *os.File) bool {
  63. // normally we always write to index file first
  64. dbLogFile, err := os.Open(filepath.Join(dbFileName, "LOG"))
  65. if err != nil {
  66. return false
  67. }
  68. defer dbLogFile.Close()
  69. dbStat, dbStatErr := dbLogFile.Stat()
  70. indexStat, indexStatErr := indexFile.Stat()
  71. if dbStatErr != nil || indexStatErr != nil {
  72. glog.V(0).Infof("Can not stat file: %v and %v", dbStatErr, indexStatErr)
  73. return false
  74. }
  75. return dbStat.ModTime().After(indexStat.ModTime())
  76. }
  77. func generateLevelDbFile(dbFileName string, indexFile *os.File) error {
  78. db, err := leveldb.OpenFile(dbFileName, nil)
  79. if err != nil {
  80. return err
  81. }
  82. defer db.Close()
  83. milestone := getMileStone(db)
  84. if stat, err := indexFile.Stat(); err != nil {
  85. glog.Fatalf("stat file %s: %v", indexFile.Name(), err)
  86. return err
  87. } else {
  88. if milestone*types.NeedleMapEntrySize > uint64(stat.Size()) {
  89. glog.Warningf("wrong milestone %d for filesize %d", milestone, stat.Size())
  90. }
  91. glog.V(0).Infof("generateLevelDbFile %s, milestone %d, num of entries:%d", dbFileName, milestone, (uint64(stat.Size())-milestone*types.NeedleMapEntrySize)/types.NeedleMapEntrySize)
  92. }
  93. return idx.WalkIndexFileIncrement(indexFile, milestone, func(key NeedleId, offset Offset, size Size) error {
  94. if !offset.IsZero() && size.IsValid() {
  95. levelDbWrite(db, key, offset, size, false, 0)
  96. } else {
  97. levelDbDelete(db, key)
  98. }
  99. return nil
  100. })
  101. }
  102. func (m *LevelDbNeedleMap) Get(key NeedleId) (element *needle_map.NeedleValue, ok bool) {
  103. bytes := make([]byte, NeedleIdSize)
  104. NeedleIdToBytes(bytes[0:NeedleIdSize], key)
  105. data, err := m.db.Get(bytes, nil)
  106. if err != nil || len(data) != OffsetSize+SizeSize {
  107. return nil, false
  108. }
  109. offset := BytesToOffset(data[0:OffsetSize])
  110. size := BytesToSize(data[OffsetSize : OffsetSize+SizeSize])
  111. return &needle_map.NeedleValue{Key: key, Offset: offset, Size: size}, true
  112. }
  113. func (m *LevelDbNeedleMap) Put(key NeedleId, offset Offset, size Size) error {
  114. var oldSize Size
  115. var milestone uint64
  116. if oldNeedle, ok := m.Get(key); ok {
  117. oldSize = oldNeedle.Size
  118. }
  119. m.logPut(key, oldSize, size)
  120. // write to index file first
  121. if err := m.appendToIndexFile(key, offset, size); err != nil {
  122. return fmt.Errorf("cannot write to indexfile %s: %v", m.indexFile.Name(), err)
  123. }
  124. m.recordNum++
  125. if m.recordNum%milestoneCnt != 0 {
  126. milestone = 0
  127. } else {
  128. milestone = (m.recordNum / milestoneCnt) * milestoneCnt
  129. glog.V(1).Infof("put cnt:%d for %s,milestone: %d", m.recordNum, m.dbFileName, milestone)
  130. }
  131. return levelDbWrite(m.db, key, offset, size, milestone == 0, milestone)
  132. }
  133. func getMileStone(db *leveldb.DB) uint64 {
  134. var mskBytes = make([]byte, 8)
  135. util.Uint64toBytes(mskBytes, milestoneKey)
  136. data, err := db.Get(mskBytes, nil)
  137. if err != nil || len(data) != 8 {
  138. glog.Warningf("get milestone from db error: %v, %d", err, len(data))
  139. /*
  140. if !strings.Contains(strings.ToLower(err.Error()), "not found") {
  141. err = setMileStone(db, 0)
  142. if err != nil {
  143. glog.Errorf("failed to set milestone: %v", err)
  144. }
  145. }
  146. */
  147. return 0
  148. }
  149. return util.BytesToUint64(data)
  150. }
  151. func setMileStone(db *leveldb.DB, milestone uint64) error {
  152. glog.V(1).Infof("set milestone %d", milestone)
  153. var mskBytes = make([]byte, 8)
  154. util.Uint64toBytes(mskBytes, milestoneKey)
  155. var msBytes = make([]byte, 8)
  156. util.Uint64toBytes(msBytes, milestone)
  157. if err := db.Put(mskBytes, msBytes, nil); err != nil {
  158. return fmt.Errorf("failed to setMileStone: %v", err)
  159. }
  160. return nil
  161. }
  162. func levelDbWrite(db *leveldb.DB, key NeedleId, offset Offset, size Size, upateMilstone bool, milestone uint64) error {
  163. bytes := needle_map.ToBytes(key, offset, size)
  164. if err := db.Put(bytes[0:NeedleIdSize], bytes[NeedleIdSize:NeedleIdSize+OffsetSize+SizeSize], nil); err != nil {
  165. return fmt.Errorf("failed to write leveldb: %v", err)
  166. }
  167. // set milestone
  168. if upateMilstone {
  169. return setMileStone(db, milestone)
  170. }
  171. return nil
  172. }
  173. func levelDbDelete(db *leveldb.DB, key NeedleId) error {
  174. bytes := make([]byte, NeedleIdSize)
  175. NeedleIdToBytes(bytes, key)
  176. return db.Delete(bytes, nil)
  177. }
  178. func (m *LevelDbNeedleMap) Delete(key NeedleId, offset Offset) error {
  179. var milestone uint64
  180. oldNeedle, found := m.Get(key)
  181. if !found || oldNeedle.Size.IsDeleted() {
  182. return nil
  183. }
  184. m.logDelete(oldNeedle.Size)
  185. // write to index file first
  186. if err := m.appendToIndexFile(key, offset, TombstoneFileSize); err != nil {
  187. return err
  188. }
  189. m.recordNum++
  190. if m.recordNum%milestoneCnt != 0 {
  191. milestone = 0
  192. } else {
  193. milestone = (m.recordNum / milestoneCnt) * milestoneCnt
  194. }
  195. return levelDbWrite(m.db, key, oldNeedle.Offset, -oldNeedle.Size, milestone == 0, milestone)
  196. }
  197. func (m *LevelDbNeedleMap) Close() {
  198. indexFileName := m.indexFile.Name()
  199. if err := m.indexFile.Sync(); err != nil {
  200. glog.Warningf("sync file %s failed: %v", indexFileName, err)
  201. }
  202. if err := m.indexFile.Close(); err != nil {
  203. glog.Warningf("close index file %s failed: %v", indexFileName, err)
  204. }
  205. if m.db != nil {
  206. if err := m.db.Close(); err != nil {
  207. glog.Warningf("close levelDB failed: %v", err)
  208. }
  209. }
  210. }
  211. func (m *LevelDbNeedleMap) Destroy() error {
  212. m.Close()
  213. os.Remove(m.indexFile.Name())
  214. return os.RemoveAll(m.dbFileName)
  215. }