You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

351 lines
9.9 KiB

3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
6 years ago
3 years ago
3 years ago
3 years ago
2 years ago
3 years ago
3 years ago
6 years ago
3 years ago
3 years ago
3 years ago
  1. package storage
  2. import (
  3. "fmt"
  4. "os"
  5. "path/filepath"
  6. "strings"
  7. "github.com/syndtr/goleveldb/leveldb/errors"
  8. "github.com/syndtr/goleveldb/leveldb/opt"
  9. "github.com/seaweedfs/seaweedfs/weed/storage/idx"
  10. "github.com/seaweedfs/seaweedfs/weed/storage/types"
  11. "github.com/seaweedfs/seaweedfs/weed/util"
  12. "github.com/syndtr/goleveldb/leveldb"
  13. "github.com/seaweedfs/seaweedfs/weed/glog"
  14. "github.com/seaweedfs/seaweedfs/weed/storage/needle_map"
  15. . "github.com/seaweedfs/seaweedfs/weed/storage/types"
  16. )
  17. // mark it every watermarkBatchSize operations
  18. const watermarkBatchSize = 10000
  19. var watermarkKey = []byte("idx_entry_watermark")
  20. type LevelDbNeedleMap struct {
  21. baseNeedleMapper
  22. dbFileName string
  23. db *leveldb.DB
  24. recordCount uint64
  25. }
  26. func NewLevelDbNeedleMap(dbFileName string, indexFile *os.File, opts *opt.Options) (m *LevelDbNeedleMap, err error) {
  27. m = &LevelDbNeedleMap{dbFileName: dbFileName}
  28. m.indexFile = indexFile
  29. if !isLevelDbFresh(dbFileName, indexFile) {
  30. glog.V(1).Infof("Start to Generate %s from %s", dbFileName, indexFile.Name())
  31. generateLevelDbFile(dbFileName, indexFile)
  32. glog.V(1).Infof("Finished Generating %s from %s", dbFileName, indexFile.Name())
  33. }
  34. if stat, err := indexFile.Stat(); err != nil {
  35. glog.Fatalf("stat file %s: %v", indexFile.Name(), err)
  36. } else {
  37. m.indexFileOffset = stat.Size()
  38. }
  39. glog.V(1).Infof("Opening %s...", dbFileName)
  40. if m.db, err = leveldb.OpenFile(dbFileName, opts); err != nil {
  41. if errors.IsCorrupted(err) {
  42. m.db, err = leveldb.RecoverFile(dbFileName, opts)
  43. }
  44. if err != nil {
  45. return
  46. }
  47. }
  48. glog.V(0).Infof("Loading %s... , watermark: %d", dbFileName, getWatermark(m.db))
  49. m.recordCount = uint64(m.indexFileOffset / types.NeedleMapEntrySize)
  50. watermark := (m.recordCount / watermarkBatchSize) * watermarkBatchSize
  51. err = setWatermark(m.db, watermark)
  52. if err != nil {
  53. glog.Fatalf("set watermark for %s error: %s\n", dbFileName, err)
  54. return
  55. }
  56. mm, indexLoadError := newNeedleMapMetricFromIndexFile(indexFile)
  57. if indexLoadError != nil {
  58. return nil, indexLoadError
  59. }
  60. m.mapMetric = *mm
  61. return
  62. }
  63. func isLevelDbFresh(dbFileName string, indexFile *os.File) bool {
  64. // normally we always write to index file first
  65. dbLogFile, err := os.Open(filepath.Join(dbFileName, "LOG"))
  66. if err != nil {
  67. return false
  68. }
  69. defer dbLogFile.Close()
  70. dbStat, dbStatErr := dbLogFile.Stat()
  71. indexStat, indexStatErr := indexFile.Stat()
  72. if dbStatErr != nil || indexStatErr != nil {
  73. glog.V(0).Infof("Can not stat file: %v and %v", dbStatErr, indexStatErr)
  74. return false
  75. }
  76. return dbStat.ModTime().After(indexStat.ModTime())
  77. }
  78. func generateLevelDbFile(dbFileName string, indexFile *os.File) error {
  79. db, err := leveldb.OpenFile(dbFileName, nil)
  80. if err != nil {
  81. return err
  82. }
  83. defer db.Close()
  84. watermark := getWatermark(db)
  85. if stat, err := indexFile.Stat(); err != nil {
  86. glog.Fatalf("stat file %s: %v", indexFile.Name(), err)
  87. return err
  88. } else {
  89. if watermark*types.NeedleMapEntrySize > uint64(stat.Size()) {
  90. glog.Warningf("wrong watermark %d for filesize %d", watermark, stat.Size())
  91. }
  92. glog.V(0).Infof("generateLevelDbFile %s, watermark %d, num of entries:%d", dbFileName, watermark, (uint64(stat.Size())-watermark*types.NeedleMapEntrySize)/types.NeedleMapEntrySize)
  93. }
  94. return idx.WalkIndexFile(indexFile, watermark, func(key NeedleId, offset Offset, size Size) error {
  95. if !offset.IsZero() && size.IsValid() {
  96. levelDbWrite(db, key, offset, size, false, 0)
  97. } else {
  98. levelDbDelete(db, key)
  99. }
  100. return nil
  101. })
  102. }
  103. func (m *LevelDbNeedleMap) Get(key NeedleId) (element *needle_map.NeedleValue, ok bool) {
  104. bytes := make([]byte, NeedleIdSize)
  105. NeedleIdToBytes(bytes[0:NeedleIdSize], key)
  106. data, err := m.db.Get(bytes, nil)
  107. if err != nil || len(data) != OffsetSize+SizeSize {
  108. return nil, false
  109. }
  110. offset := BytesToOffset(data[0:OffsetSize])
  111. size := BytesToSize(data[OffsetSize : OffsetSize+SizeSize])
  112. return &needle_map.NeedleValue{Key: key, Offset: offset, Size: size}, true
  113. }
  114. func (m *LevelDbNeedleMap) Put(key NeedleId, offset Offset, size Size) error {
  115. var oldSize Size
  116. var watermark uint64
  117. if oldNeedle, ok := m.Get(key); ok {
  118. oldSize = oldNeedle.Size
  119. }
  120. m.logPut(key, oldSize, size)
  121. // write to index file first
  122. if err := m.appendToIndexFile(key, offset, size); err != nil {
  123. return fmt.Errorf("cannot write to indexfile %s: %v", m.indexFile.Name(), err)
  124. }
  125. m.recordCount++
  126. if m.recordCount%watermarkBatchSize != 0 {
  127. watermark = 0
  128. } else {
  129. watermark = (m.recordCount / watermarkBatchSize) * watermarkBatchSize
  130. glog.V(1).Infof("put cnt:%d for %s,watermark: %d", m.recordCount, m.dbFileName, watermark)
  131. }
  132. return levelDbWrite(m.db, key, offset, size, watermark == 0, watermark)
  133. }
  134. func getWatermark(db *leveldb.DB) uint64 {
  135. data, err := db.Get(watermarkKey, nil)
  136. if err != nil || len(data) != 8 {
  137. glog.V(1).Infof("read previous watermark from db: %v, %d", err, len(data))
  138. return 0
  139. }
  140. return util.BytesToUint64(data)
  141. }
  142. func setWatermark(db *leveldb.DB, watermark uint64) error {
  143. glog.V(3).Infof("set watermark %d", watermark)
  144. var wmBytes = make([]byte, 8)
  145. util.Uint64toBytes(wmBytes, watermark)
  146. if err := db.Put(watermarkKey, wmBytes, nil); err != nil {
  147. return fmt.Errorf("failed to setWatermark: %v", err)
  148. }
  149. return nil
  150. }
  151. func levelDbWrite(db *leveldb.DB, key NeedleId, offset Offset, size Size, updateWatermark bool, watermark uint64) error {
  152. bytes := needle_map.ToBytes(key, offset, size)
  153. if err := db.Put(bytes[0:NeedleIdSize], bytes[NeedleIdSize:NeedleIdSize+OffsetSize+SizeSize], nil); err != nil {
  154. return fmt.Errorf("failed to write leveldb: %v", err)
  155. }
  156. // set watermark
  157. if updateWatermark {
  158. return setWatermark(db, watermark)
  159. }
  160. return nil
  161. }
  162. func levelDbDelete(db *leveldb.DB, key NeedleId) error {
  163. bytes := make([]byte, NeedleIdSize)
  164. NeedleIdToBytes(bytes, key)
  165. return db.Delete(bytes, nil)
  166. }
  167. func (m *LevelDbNeedleMap) Delete(key NeedleId, offset Offset) error {
  168. var watermark uint64
  169. oldNeedle, found := m.Get(key)
  170. if !found || oldNeedle.Size.IsDeleted() {
  171. return nil
  172. }
  173. m.logDelete(oldNeedle.Size)
  174. // write to index file first
  175. if err := m.appendToIndexFile(key, offset, TombstoneFileSize); err != nil {
  176. return err
  177. }
  178. m.recordCount++
  179. if m.recordCount%watermarkBatchSize != 0 {
  180. watermark = 0
  181. } else {
  182. watermark = (m.recordCount / watermarkBatchSize) * watermarkBatchSize
  183. }
  184. return levelDbWrite(m.db, key, oldNeedle.Offset, -oldNeedle.Size, watermark == 0, watermark)
  185. }
  186. func (m *LevelDbNeedleMap) Close() {
  187. if m.indexFile != nil {
  188. indexFileName := m.indexFile.Name()
  189. if err := m.indexFile.Sync(); err != nil {
  190. glog.Warningf("sync file %s failed: %v", indexFileName, err)
  191. }
  192. if err := m.indexFile.Close(); err != nil {
  193. glog.Warningf("close index file %s failed: %v", indexFileName, err)
  194. }
  195. }
  196. if m.db != nil {
  197. if err := m.db.Close(); err != nil {
  198. glog.Warningf("close levelDB failed: %v", err)
  199. }
  200. }
  201. }
  202. func (m *LevelDbNeedleMap) Destroy() error {
  203. m.Close()
  204. os.Remove(m.indexFile.Name())
  205. return os.RemoveAll(m.dbFileName)
  206. }
  207. func (m *LevelDbNeedleMap) UpdateNeedleMap(v *Volume, indexFile *os.File, opts *opt.Options) error {
  208. if v.nm != nil {
  209. v.nm.Close()
  210. v.nm = nil
  211. }
  212. defer func() {
  213. if v.tmpNm != nil {
  214. v.tmpNm.Close()
  215. v.tmpNm = nil
  216. }
  217. }()
  218. levelDbFile := v.FileName(".ldb")
  219. m.indexFile = indexFile
  220. err := os.RemoveAll(levelDbFile)
  221. if err != nil {
  222. return err
  223. }
  224. if err = os.Rename(v.FileName(".cpldb"), levelDbFile); err != nil {
  225. return fmt.Errorf("rename %s: %v", levelDbFile, err)
  226. }
  227. db, err := leveldb.OpenFile(levelDbFile, opts)
  228. if err != nil {
  229. if errors.IsCorrupted(err) {
  230. db, err = leveldb.RecoverFile(levelDbFile, opts)
  231. }
  232. if err != nil {
  233. return err
  234. }
  235. }
  236. m.db = db
  237. stat, e := indexFile.Stat()
  238. if e != nil {
  239. glog.Fatalf("stat file %s: %v", indexFile.Name(), e)
  240. return e
  241. }
  242. m.indexFileOffset = stat.Size()
  243. m.recordCount = uint64(stat.Size() / types.NeedleMapEntrySize)
  244. //set watermark
  245. watermark := (m.recordCount / watermarkBatchSize) * watermarkBatchSize
  246. err = setWatermark(db, uint64(watermark))
  247. if err != nil {
  248. glog.Fatalf("setting watermark failed %s: %v", indexFile.Name(), err)
  249. return err
  250. }
  251. v.nm = m
  252. v.tmpNm = nil
  253. return e
  254. }
  255. func (m *LevelDbNeedleMap) DoOffsetLoading(v *Volume, indexFile *os.File, startFrom uint64) (err error) {
  256. glog.V(0).Infof("loading idx to leveldb from offset %d for file: %s", startFrom, indexFile.Name())
  257. dbFileName := v.FileName(".cpldb")
  258. db, dbErr := leveldb.OpenFile(dbFileName, nil)
  259. defer func() {
  260. if dbErr == nil {
  261. db.Close()
  262. }
  263. if err != nil {
  264. os.RemoveAll(dbFileName)
  265. }
  266. }()
  267. if dbErr != nil {
  268. if errors.IsCorrupted(err) {
  269. db, dbErr = leveldb.RecoverFile(dbFileName, nil)
  270. }
  271. if dbErr != nil {
  272. return dbErr
  273. }
  274. }
  275. err = idx.WalkIndexFile(indexFile, startFrom, func(key NeedleId, offset Offset, size Size) (e error) {
  276. m.mapMetric.FileCounter++
  277. bytes := make([]byte, NeedleIdSize)
  278. NeedleIdToBytes(bytes[0:NeedleIdSize], key)
  279. // fresh loading
  280. if startFrom == 0 {
  281. m.mapMetric.FileByteCounter += uint64(size)
  282. e = levelDbWrite(db, key, offset, size, false, 0)
  283. return e
  284. }
  285. // increment loading
  286. data, err := db.Get(bytes, nil)
  287. if err != nil {
  288. if !strings.Contains(strings.ToLower(err.Error()), "not found") {
  289. // unexpected error
  290. return err
  291. }
  292. // new needle, unlikely happen
  293. m.mapMetric.FileByteCounter += uint64(size)
  294. e = levelDbWrite(db, key, offset, size, false, 0)
  295. } else {
  296. // needle is found
  297. oldSize := BytesToSize(data[OffsetSize : OffsetSize+SizeSize])
  298. oldOffset := BytesToOffset(data[0:OffsetSize])
  299. if !offset.IsZero() && size.IsValid() {
  300. // updated needle
  301. m.mapMetric.FileByteCounter += uint64(size)
  302. if !oldOffset.IsZero() && oldSize.IsValid() {
  303. m.mapMetric.DeletionCounter++
  304. m.mapMetric.DeletionByteCounter += uint64(oldSize)
  305. }
  306. e = levelDbWrite(db, key, offset, size, false, 0)
  307. } else {
  308. // deleted needle
  309. m.mapMetric.DeletionCounter++
  310. m.mapMetric.DeletionByteCounter += uint64(oldSize)
  311. e = levelDbDelete(db, key)
  312. }
  313. }
  314. return e
  315. })
  316. return err
  317. }