You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

335 lines
9.3 KiB

2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
6 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
6 years ago
2 years ago
2 years ago
2 years ago
  1. package storage
  2. import (
  3. "fmt"
  4. "os"
  5. "path/filepath"
  6. "github.com/syndtr/goleveldb/leveldb/errors"
  7. "github.com/syndtr/goleveldb/leveldb/opt"
  8. "github.com/seaweedfs/seaweedfs/weed/storage/idx"
  9. "github.com/seaweedfs/seaweedfs/weed/storage/types"
  10. "github.com/seaweedfs/seaweedfs/weed/util"
  11. "github.com/syndtr/goleveldb/leveldb"
  12. "github.com/seaweedfs/seaweedfs/weed/glog"
  13. "github.com/seaweedfs/seaweedfs/weed/storage/needle_map"
  14. . "github.com/seaweedfs/seaweedfs/weed/storage/types"
  15. )
  16. // mark it every watermarkBatchSize operations
  17. const watermarkBatchSize = 10000
  18. var watermarkKey = []byte("idx_entry_watermark")
  19. type LevelDbNeedleMap struct {
  20. baseNeedleMapper
  21. dbFileName string
  22. db *leveldb.DB
  23. recordCount uint64
  24. }
  25. func NewLevelDbNeedleMap(dbFileName string, indexFile *os.File, opts *opt.Options) (m *LevelDbNeedleMap, err error) {
  26. m = &LevelDbNeedleMap{dbFileName: dbFileName}
  27. m.indexFile = indexFile
  28. if !isLevelDbFresh(dbFileName, indexFile) {
  29. glog.V(1).Infof("Start to Generate %s from %s", dbFileName, indexFile.Name())
  30. generateLevelDbFile(dbFileName, indexFile)
  31. glog.V(1).Infof("Finished Generating %s from %s", dbFileName, indexFile.Name())
  32. }
  33. if stat, err := indexFile.Stat(); err != nil {
  34. glog.Fatalf("stat file %s: %v", indexFile.Name(), err)
  35. } else {
  36. m.indexFileOffset = stat.Size()
  37. }
  38. glog.V(1).Infof("Opening %s...", dbFileName)
  39. if m.db, err = leveldb.OpenFile(dbFileName, opts); err != nil {
  40. if errors.IsCorrupted(err) {
  41. m.db, err = leveldb.RecoverFile(dbFileName, opts)
  42. }
  43. if err != nil {
  44. return
  45. }
  46. }
  47. glog.V(0).Infof("Loading %s... , watermark: %d", dbFileName, getWatermark(m.db))
  48. m.recordCount = uint64(m.indexFileOffset / types.NeedleMapEntrySize)
  49. watermark := (m.recordCount / watermarkBatchSize) * watermarkBatchSize
  50. err = setWatermark(m.db, watermark)
  51. if err != nil {
  52. glog.Fatalf("set watermark for %s error: %s\n", dbFileName, err)
  53. return
  54. }
  55. mm, indexLoadError := newNeedleMapMetricFromIndexFile(indexFile)
  56. if indexLoadError != nil {
  57. return nil, indexLoadError
  58. }
  59. m.mapMetric = *mm
  60. return
  61. }
  62. func isLevelDbFresh(dbFileName string, indexFile *os.File) bool {
  63. // normally we always write to index file first
  64. dbLogFile, err := os.Open(filepath.Join(dbFileName, "LOG"))
  65. if err != nil {
  66. return false
  67. }
  68. defer dbLogFile.Close()
  69. dbStat, dbStatErr := dbLogFile.Stat()
  70. indexStat, indexStatErr := indexFile.Stat()
  71. if dbStatErr != nil || indexStatErr != nil {
  72. glog.V(0).Infof("Can not stat file: %v and %v", dbStatErr, indexStatErr)
  73. return false
  74. }
  75. return dbStat.ModTime().After(indexStat.ModTime())
  76. }
  77. func generateLevelDbFile(dbFileName string, indexFile *os.File) error {
  78. db, err := leveldb.OpenFile(dbFileName, nil)
  79. if err != nil {
  80. return err
  81. }
  82. defer db.Close()
  83. watermark := getWatermark(db)
  84. if stat, err := indexFile.Stat(); err != nil {
  85. glog.Fatalf("stat file %s: %v", indexFile.Name(), err)
  86. return err
  87. } else {
  88. if watermark*types.NeedleMapEntrySize > uint64(stat.Size()) {
  89. glog.Warningf("wrong watermark %d for filesize %d", watermark, stat.Size())
  90. }
  91. glog.V(0).Infof("generateLevelDbFile %s, watermark %d, num of entries:%d", dbFileName, watermark, (uint64(stat.Size())-watermark*types.NeedleMapEntrySize)/types.NeedleMapEntrySize)
  92. }
  93. return idx.WalkIndexFile(indexFile, watermark, func(key NeedleId, offset Offset, size Size) error {
  94. if !offset.IsZero() && size.IsValid() {
  95. levelDbWrite(db, key, offset, size, false, 0)
  96. } else {
  97. levelDbDelete(db, key)
  98. }
  99. return nil
  100. })
  101. }
  102. func (m *LevelDbNeedleMap) Get(key NeedleId) (element *needle_map.NeedleValue, ok bool) {
  103. bytes := make([]byte, NeedleIdSize)
  104. NeedleIdToBytes(bytes[0:NeedleIdSize], key)
  105. data, err := m.db.Get(bytes, nil)
  106. if err != nil || len(data) != OffsetSize+SizeSize {
  107. return nil, false
  108. }
  109. offset := BytesToOffset(data[0:OffsetSize])
  110. size := BytesToSize(data[OffsetSize : OffsetSize+SizeSize])
  111. return &needle_map.NeedleValue{Key: key, Offset: offset, Size: size}, true
  112. }
  113. func (m *LevelDbNeedleMap) Put(key NeedleId, offset Offset, size Size) error {
  114. var oldSize Size
  115. var watermark uint64
  116. if oldNeedle, ok := m.Get(key); ok {
  117. oldSize = oldNeedle.Size
  118. }
  119. m.logPut(key, oldSize, size)
  120. // write to index file first
  121. if err := m.appendToIndexFile(key, offset, size); err != nil {
  122. return fmt.Errorf("cannot write to indexfile %s: %v", m.indexFile.Name(), err)
  123. }
  124. m.recordCount++
  125. if m.recordCount%watermarkBatchSize != 0 {
  126. watermark = 0
  127. } else {
  128. watermark = (m.recordCount / watermarkBatchSize) * watermarkBatchSize
  129. glog.V(1).Infof("put cnt:%d for %s,watermark: %d", m.recordCount, m.dbFileName, watermark)
  130. }
  131. return levelDbWrite(m.db, key, offset, size, watermark == 0, watermark)
  132. }
  133. func getWatermark(db *leveldb.DB) uint64 {
  134. data, err := db.Get(watermarkKey, nil)
  135. if err != nil || len(data) != 8 {
  136. glog.Warningf("get watermark from db error: %v, %d", err, len(data))
  137. /*
  138. if !strings.Contains(strings.ToLower(err.Error()), "not found") {
  139. err = setWatermark(db, 0)
  140. if err != nil {
  141. glog.Errorf("failed to set watermark: %v", err)
  142. }
  143. }
  144. */
  145. return 0
  146. }
  147. return util.BytesToUint64(data)
  148. }
  149. func setWatermark(db *leveldb.DB, watermark uint64) error {
  150. glog.V(3).Infof("set watermark %d", watermark)
  151. var wmBytes = make([]byte, 8)
  152. util.Uint64toBytes(wmBytes, watermark)
  153. if err := db.Put(watermarkKey, wmBytes, nil); err != nil {
  154. return fmt.Errorf("failed to setWatermark: %v", err)
  155. }
  156. return nil
  157. }
  158. func levelDbWrite(db *leveldb.DB, key NeedleId, offset Offset, size Size, updateWatermark bool, watermark uint64) error {
  159. bytes := needle_map.ToBytes(key, offset, size)
  160. if err := db.Put(bytes[0:NeedleIdSize], bytes[NeedleIdSize:NeedleIdSize+OffsetSize+SizeSize], nil); err != nil {
  161. return fmt.Errorf("failed to write leveldb: %v", err)
  162. }
  163. // set watermark
  164. if updateWatermark {
  165. return setWatermark(db, watermark)
  166. }
  167. return nil
  168. }
  169. func levelDbDelete(db *leveldb.DB, key NeedleId) error {
  170. bytes := make([]byte, NeedleIdSize)
  171. NeedleIdToBytes(bytes, key)
  172. return db.Delete(bytes, nil)
  173. }
  174. func (m *LevelDbNeedleMap) Delete(key NeedleId, offset Offset) error {
  175. var watermark uint64
  176. oldNeedle, found := m.Get(key)
  177. if !found || oldNeedle.Size.IsDeleted() {
  178. return nil
  179. }
  180. m.logDelete(oldNeedle.Size)
  181. // write to index file first
  182. if err := m.appendToIndexFile(key, offset, TombstoneFileSize); err != nil {
  183. return err
  184. }
  185. m.recordCount++
  186. if m.recordCount%watermarkBatchSize != 0 {
  187. watermark = 0
  188. } else {
  189. watermark = (m.recordCount / watermarkBatchSize) * watermarkBatchSize
  190. }
  191. return levelDbWrite(m.db, key, oldNeedle.Offset, -oldNeedle.Size, watermark == 0, watermark)
  192. }
  193. func (m *LevelDbNeedleMap) Close() {
  194. if m.indexFile != nil {
  195. indexFileName := m.indexFile.Name()
  196. if err := m.indexFile.Sync(); err != nil {
  197. glog.Warningf("sync file %s failed: %v", indexFileName, err)
  198. }
  199. if err := m.indexFile.Close(); err != nil {
  200. glog.Warningf("close index file %s failed: %v", indexFileName, err)
  201. }
  202. }
  203. if m.db != nil {
  204. if err := m.db.Close(); err != nil {
  205. glog.Warningf("close levelDB failed: %v", err)
  206. }
  207. }
  208. }
  209. func (m *LevelDbNeedleMap) Destroy() error {
  210. m.Close()
  211. os.Remove(m.indexFile.Name())
  212. return os.RemoveAll(m.dbFileName)
  213. }
  214. func (m *LevelDbNeedleMap) UpdateNeedleMap(v *Volume, indexFile *os.File, opts *opt.Options) error {
  215. if v.nm != nil {
  216. v.nm.Close()
  217. v.nm = nil
  218. }
  219. defer func() {
  220. if v.tmpNm != nil {
  221. v.tmpNm.Close()
  222. v.tmpNm = nil
  223. }
  224. }()
  225. levelDbFile := v.FileName(".ldb")
  226. m.indexFile = indexFile
  227. err := os.RemoveAll(levelDbFile)
  228. if err != nil {
  229. return err
  230. }
  231. if err = os.Rename(v.FileName(".cpldb"), levelDbFile); err != nil {
  232. return fmt.Errorf("rename %s: %v", levelDbFile, err)
  233. }
  234. db, err := leveldb.OpenFile(levelDbFile, opts)
  235. if err != nil {
  236. if errors.IsCorrupted(err) {
  237. db, err = leveldb.RecoverFile(levelDbFile, opts)
  238. }
  239. if err != nil {
  240. return err
  241. }
  242. }
  243. m.db = db
  244. stat, e := indexFile.Stat()
  245. if e != nil {
  246. glog.Fatalf("stat file %s: %v", indexFile.Name(), e)
  247. return e
  248. }
  249. m.indexFileOffset = stat.Size()
  250. m.recordCount = uint64(stat.Size() / types.NeedleMapEntrySize)
  251. //set watermark
  252. watermark := (m.recordCount / watermarkBatchSize) * watermarkBatchSize
  253. err = setWatermark(db, uint64(watermark))
  254. if err != nil {
  255. glog.Fatalf("setting watermark failed %s: %v", indexFile.Name(), err)
  256. return err
  257. }
  258. v.nm = m
  259. v.tmpNm = nil
  260. return e
  261. }
  262. func (m *LevelDbNeedleMap) DoOffsetLoading(v *Volume, indexFile *os.File, startFrom uint64) (err error) {
  263. glog.V(0).Infof("loading idx to leveldb from offset %d for file: %s", startFrom, indexFile.Name())
  264. dbFileName := v.FileName(".cpldb")
  265. db, dbErr := leveldb.OpenFile(dbFileName, nil)
  266. defer func() {
  267. if dbErr == nil {
  268. db.Close()
  269. }
  270. if err != nil {
  271. os.RemoveAll(dbFileName)
  272. }
  273. }()
  274. if dbErr != nil {
  275. if errors.IsCorrupted(err) {
  276. db, dbErr = leveldb.RecoverFile(dbFileName, nil)
  277. }
  278. if dbErr != nil {
  279. return dbErr
  280. }
  281. }
  282. err = idx.WalkIndexFile(indexFile, startFrom, func(key NeedleId, offset Offset, size Size) (e error) {
  283. if !offset.IsZero() && size.IsValid() {
  284. e = levelDbWrite(db, key, offset, size, false, 0)
  285. } else {
  286. e = levelDbDelete(db, key)
  287. }
  288. return e
  289. })
  290. if err != nil {
  291. return err
  292. }
  293. if startFrom != 0 {
  294. return needleMapMetricFromIndexFile(indexFile, &m.mapMetric)
  295. }
  296. return nil
  297. }
  298. func (m *LevelDbNeedleMap) UpdateNeedleMapMetric(indexFile *os.File) error {
  299. return needleMapMetricFromIndexFile(indexFile, &m.mapMetric)
  300. }