You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

350 lines
9.8 KiB

2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
6 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
6 years ago
2 years ago
2 years ago
2 years ago
  1. package storage
  2. import (
  3. "fmt"
  4. "os"
  5. "path/filepath"
  6. "strings"
  7. "github.com/syndtr/goleveldb/leveldb/errors"
  8. "github.com/syndtr/goleveldb/leveldb/opt"
  9. "github.com/seaweedfs/seaweedfs/weed/storage/idx"
  10. "github.com/seaweedfs/seaweedfs/weed/util"
  11. "github.com/syndtr/goleveldb/leveldb"
  12. "github.com/seaweedfs/seaweedfs/weed/glog"
  13. "github.com/seaweedfs/seaweedfs/weed/storage/needle_map"
  14. . "github.com/seaweedfs/seaweedfs/weed/storage/types"
  15. )
  16. // mark it every watermarkBatchSize operations
  17. const watermarkBatchSize = 10000
  18. var watermarkKey = []byte("idx_entry_watermark")
  19. type LevelDbNeedleMap struct {
  20. baseNeedleMapper
  21. dbFileName string
  22. db *leveldb.DB
  23. recordCount uint64
  24. }
  25. func NewLevelDbNeedleMap(dbFileName string, indexFile *os.File, opts *opt.Options) (m *LevelDbNeedleMap, err error) {
  26. m = &LevelDbNeedleMap{dbFileName: dbFileName}
  27. m.indexFile = indexFile
  28. if !isLevelDbFresh(dbFileName, indexFile) {
  29. glog.V(1).Infof("Start to Generate %s from %s", dbFileName, indexFile.Name())
  30. generateLevelDbFile(dbFileName, indexFile)
  31. glog.V(1).Infof("Finished Generating %s from %s", dbFileName, indexFile.Name())
  32. }
  33. if stat, err := indexFile.Stat(); err != nil {
  34. glog.Fatalf("stat file %s: %v", indexFile.Name(), err)
  35. } else {
  36. m.indexFileOffset = stat.Size()
  37. }
  38. glog.V(1).Infof("Opening %s...", dbFileName)
  39. if m.db, err = leveldb.OpenFile(dbFileName, opts); err != nil {
  40. if errors.IsCorrupted(err) {
  41. m.db, err = leveldb.RecoverFile(dbFileName, opts)
  42. }
  43. if err != nil {
  44. return
  45. }
  46. }
  47. glog.V(0).Infof("Loading %s... , watermark: %d", dbFileName, getWatermark(m.db))
  48. m.recordCount = uint64(m.indexFileOffset / NeedleMapEntrySize)
  49. watermark := (m.recordCount / watermarkBatchSize) * watermarkBatchSize
  50. err = setWatermark(m.db, watermark)
  51. if err != nil {
  52. glog.Fatalf("set watermark for %s error: %s\n", dbFileName, err)
  53. return
  54. }
  55. mm, indexLoadError := newNeedleMapMetricFromIndexFile(indexFile)
  56. if indexLoadError != nil {
  57. return nil, indexLoadError
  58. }
  59. m.mapMetric = *mm
  60. return
  61. }
  62. func isLevelDbFresh(dbFileName string, indexFile *os.File) bool {
  63. // normally we always write to index file first
  64. dbLogFile, err := os.Open(filepath.Join(dbFileName, "LOG"))
  65. if err != nil {
  66. return false
  67. }
  68. defer dbLogFile.Close()
  69. dbStat, dbStatErr := dbLogFile.Stat()
  70. indexStat, indexStatErr := indexFile.Stat()
  71. if dbStatErr != nil || indexStatErr != nil {
  72. glog.V(0).Infof("Can not stat file: %v and %v", dbStatErr, indexStatErr)
  73. return false
  74. }
  75. return dbStat.ModTime().After(indexStat.ModTime())
  76. }
  77. func generateLevelDbFile(dbFileName string, indexFile *os.File) error {
  78. db, err := leveldb.OpenFile(dbFileName, nil)
  79. if err != nil {
  80. return err
  81. }
  82. defer db.Close()
  83. watermark := getWatermark(db)
  84. if stat, err := indexFile.Stat(); err != nil {
  85. glog.Fatalf("stat file %s: %v", indexFile.Name(), err)
  86. return err
  87. } else {
  88. if watermark*NeedleMapEntrySize > uint64(stat.Size()) {
  89. glog.Warningf("wrong watermark %d for filesize %d", watermark, stat.Size())
  90. }
  91. glog.V(0).Infof("generateLevelDbFile %s, watermark %d, num of entries:%d", dbFileName, watermark, (uint64(stat.Size())-watermark*NeedleMapEntrySize)/NeedleMapEntrySize)
  92. }
  93. return idx.WalkIndexFile(indexFile, watermark, func(key NeedleId, offset Offset, size Size) error {
  94. if !offset.IsZero() && size.IsValid() {
  95. levelDbWrite(db, key, offset, size, false, 0)
  96. } else {
  97. levelDbDelete(db, key)
  98. }
  99. return nil
  100. })
  101. }
  102. func (m *LevelDbNeedleMap) Get(key NeedleId) (element *needle_map.NeedleValue, ok bool) {
  103. bytes := make([]byte, NeedleIdSize)
  104. NeedleIdToBytes(bytes[0:NeedleIdSize], key)
  105. data, err := m.db.Get(bytes, nil)
  106. if err != nil || len(data) != OffsetSize+SizeSize {
  107. return nil, false
  108. }
  109. offset := BytesToOffset(data[0:OffsetSize])
  110. size := BytesToSize(data[OffsetSize : OffsetSize+SizeSize])
  111. return &needle_map.NeedleValue{Key: key, Offset: offset, Size: size}, true
  112. }
  113. func (m *LevelDbNeedleMap) Put(key NeedleId, offset Offset, size Size) error {
  114. var oldSize Size
  115. var watermark uint64
  116. if oldNeedle, ok := m.Get(key); ok {
  117. oldSize = oldNeedle.Size
  118. }
  119. m.logPut(key, oldSize, size)
  120. // write to index file first
  121. if err := m.appendToIndexFile(key, offset, size); err != nil {
  122. return fmt.Errorf("cannot write to indexfile %s: %v", m.indexFile.Name(), err)
  123. }
  124. m.recordCount++
  125. if m.recordCount%watermarkBatchSize != 0 {
  126. watermark = 0
  127. } else {
  128. watermark = (m.recordCount / watermarkBatchSize) * watermarkBatchSize
  129. glog.V(1).Infof("put cnt:%d for %s,watermark: %d", m.recordCount, m.dbFileName, watermark)
  130. }
  131. return levelDbWrite(m.db, key, offset, size, watermark == 0, watermark)
  132. }
  133. func getWatermark(db *leveldb.DB) uint64 {
  134. data, err := db.Get(watermarkKey, nil)
  135. if err != nil || len(data) != 8 {
  136. glog.V(1).Infof("read previous watermark from db: %v, %d", err, len(data))
  137. return 0
  138. }
  139. return util.BytesToUint64(data)
  140. }
  141. func setWatermark(db *leveldb.DB, watermark uint64) error {
  142. glog.V(3).Infof("set watermark %d", watermark)
  143. var wmBytes = make([]byte, 8)
  144. util.Uint64toBytes(wmBytes, watermark)
  145. if err := db.Put(watermarkKey, wmBytes, nil); err != nil {
  146. return fmt.Errorf("failed to setWatermark: %v", err)
  147. }
  148. return nil
  149. }
  150. func levelDbWrite(db *leveldb.DB, key NeedleId, offset Offset, size Size, updateWatermark bool, watermark uint64) error {
  151. bytes := needle_map.ToBytes(key, offset, size)
  152. if err := db.Put(bytes[0:NeedleIdSize], bytes[NeedleIdSize:NeedleIdSize+OffsetSize+SizeSize], nil); err != nil {
  153. return fmt.Errorf("failed to write leveldb: %v", err)
  154. }
  155. // set watermark
  156. if updateWatermark {
  157. return setWatermark(db, watermark)
  158. }
  159. return nil
  160. }
  161. func levelDbDelete(db *leveldb.DB, key NeedleId) error {
  162. bytes := make([]byte, NeedleIdSize)
  163. NeedleIdToBytes(bytes, key)
  164. return db.Delete(bytes, nil)
  165. }
  166. func (m *LevelDbNeedleMap) Delete(key NeedleId, offset Offset) error {
  167. var watermark uint64
  168. oldNeedle, found := m.Get(key)
  169. if !found || oldNeedle.Size.IsDeleted() {
  170. return nil
  171. }
  172. m.logDelete(oldNeedle.Size)
  173. // write to index file first
  174. if err := m.appendToIndexFile(key, offset, TombstoneFileSize); err != nil {
  175. return err
  176. }
  177. m.recordCount++
  178. if m.recordCount%watermarkBatchSize != 0 {
  179. watermark = 0
  180. } else {
  181. watermark = (m.recordCount / watermarkBatchSize) * watermarkBatchSize
  182. }
  183. return levelDbWrite(m.db, key, oldNeedle.Offset, -oldNeedle.Size, watermark == 0, watermark)
  184. }
  185. func (m *LevelDbNeedleMap) Close() {
  186. if m.indexFile != nil {
  187. indexFileName := m.indexFile.Name()
  188. if err := m.indexFile.Sync(); err != nil {
  189. glog.Warningf("sync file %s failed: %v", indexFileName, err)
  190. }
  191. if err := m.indexFile.Close(); err != nil {
  192. glog.Warningf("close index file %s failed: %v", indexFileName, err)
  193. }
  194. }
  195. if m.db != nil {
  196. if err := m.db.Close(); err != nil {
  197. glog.Warningf("close levelDB failed: %v", err)
  198. }
  199. }
  200. }
  201. func (m *LevelDbNeedleMap) Destroy() error {
  202. m.Close()
  203. os.Remove(m.indexFile.Name())
  204. return os.RemoveAll(m.dbFileName)
  205. }
  206. func (m *LevelDbNeedleMap) UpdateNeedleMap(v *Volume, indexFile *os.File, opts *opt.Options) error {
  207. if v.nm != nil {
  208. v.nm.Close()
  209. v.nm = nil
  210. }
  211. defer func() {
  212. if v.tmpNm != nil {
  213. v.tmpNm.Close()
  214. v.tmpNm = nil
  215. }
  216. }()
  217. levelDbFile := v.FileName(".ldb")
  218. m.indexFile = indexFile
  219. err := os.RemoveAll(levelDbFile)
  220. if err != nil {
  221. return err
  222. }
  223. if err = os.Rename(v.FileName(".cpldb"), levelDbFile); err != nil {
  224. return fmt.Errorf("rename %s: %v", levelDbFile, err)
  225. }
  226. db, err := leveldb.OpenFile(levelDbFile, opts)
  227. if err != nil {
  228. if errors.IsCorrupted(err) {
  229. db, err = leveldb.RecoverFile(levelDbFile, opts)
  230. }
  231. if err != nil {
  232. return err
  233. }
  234. }
  235. m.db = db
  236. stat, e := indexFile.Stat()
  237. if e != nil {
  238. glog.Fatalf("stat file %s: %v", indexFile.Name(), e)
  239. return e
  240. }
  241. m.indexFileOffset = stat.Size()
  242. m.recordCount = uint64(stat.Size() / NeedleMapEntrySize)
  243. //set watermark
  244. watermark := (m.recordCount / watermarkBatchSize) * watermarkBatchSize
  245. err = setWatermark(db, uint64(watermark))
  246. if err != nil {
  247. glog.Fatalf("setting watermark failed %s: %v", indexFile.Name(), err)
  248. return err
  249. }
  250. v.nm = m
  251. v.tmpNm = nil
  252. return e
  253. }
  254. func (m *LevelDbNeedleMap) DoOffsetLoading(v *Volume, indexFile *os.File, startFrom uint64) (err error) {
  255. glog.V(0).Infof("loading idx to leveldb from offset %d for file: %s", startFrom, indexFile.Name())
  256. dbFileName := v.FileName(".cpldb")
  257. db, dbErr := leveldb.OpenFile(dbFileName, nil)
  258. defer func() {
  259. if dbErr == nil {
  260. db.Close()
  261. }
  262. if err != nil {
  263. os.RemoveAll(dbFileName)
  264. }
  265. }()
  266. if dbErr != nil {
  267. if errors.IsCorrupted(err) {
  268. db, dbErr = leveldb.RecoverFile(dbFileName, nil)
  269. }
  270. if dbErr != nil {
  271. return dbErr
  272. }
  273. }
  274. err = idx.WalkIndexFile(indexFile, startFrom, func(key NeedleId, offset Offset, size Size) (e error) {
  275. m.mapMetric.FileCounter++
  276. bytes := make([]byte, NeedleIdSize)
  277. NeedleIdToBytes(bytes[0:NeedleIdSize], key)
  278. // fresh loading
  279. if startFrom == 0 {
  280. m.mapMetric.FileByteCounter += uint64(size)
  281. e = levelDbWrite(db, key, offset, size, false, 0)
  282. return e
  283. }
  284. // increment loading
  285. data, err := db.Get(bytes, nil)
  286. if err != nil {
  287. if !strings.Contains(strings.ToLower(err.Error()), "not found") {
  288. // unexpected error
  289. return err
  290. }
  291. // new needle, unlikely happen
  292. m.mapMetric.FileByteCounter += uint64(size)
  293. e = levelDbWrite(db, key, offset, size, false, 0)
  294. } else {
  295. // needle is found
  296. oldSize := BytesToSize(data[OffsetSize : OffsetSize+SizeSize])
  297. oldOffset := BytesToOffset(data[0:OffsetSize])
  298. if !offset.IsZero() && size.IsValid() {
  299. // updated needle
  300. m.mapMetric.FileByteCounter += uint64(size)
  301. if !oldOffset.IsZero() && oldSize.IsValid() {
  302. m.mapMetric.DeletionCounter++
  303. m.mapMetric.DeletionByteCounter += uint64(oldSize)
  304. }
  305. e = levelDbWrite(db, key, offset, size, false, 0)
  306. } else {
  307. // deleted needle
  308. m.mapMetric.DeletionCounter++
  309. m.mapMetric.DeletionByteCounter += uint64(oldSize)
  310. e = levelDbDelete(db, key)
  311. }
  312. }
  313. return e
  314. })
  315. return err
  316. }