@ -9,6 +9,8 @@ import (
"github.com/syndtr/goleveldb/leveldb/opt"
"github.com/chrislusf/seaweedfs/weed/storage/idx"
"github.com/chrislusf/seaweedfs/weed/storage/types"
"github.com/chrislusf/seaweedfs/weed/util"
"github.com/syndtr/goleveldb/leveldb"
@ -17,10 +19,16 @@ import (
. "github.com/chrislusf/seaweedfs/weed/storage/types"
)
//mark it every watermarkBatchSize operations
const watermarkBatchSize = 10000
var watermarkKey = [ ] byte ( "idx_entry_watermark" )
type LevelDbNeedleMap struct {
baseNeedleMapper
dbFileName string
db * leveldb . DB
dbFileName string
db * leveldb . DB
recordCount uint64
}
func NewLevelDbNeedleMap ( dbFileName string , indexFile * os . File , opts * opt . Options ) ( m * LevelDbNeedleMap , err error ) {
@ -46,7 +54,14 @@ func NewLevelDbNeedleMap(dbFileName string, indexFile *os.File, opts *opt.Option
return
}
}
glog . V ( 1 ) . Infof ( "Loading %s..." , indexFile . Name ( ) )
glog . V ( 0 ) . Infof ( "Loading %s... , watermark: %d" , dbFileName , getWatermark ( m . db ) )
m . recordCount = uint64 ( m . indexFileOffset / types . NeedleMapEntrySize )
watermark := ( m . recordCount / watermarkBatchSize ) * watermarkBatchSize
err = setWatermark ( m . db , watermark )
if err != nil {
glog . Fatalf ( "set watermark for %s error: %s\n" , dbFileName , err )
return
}
mm , indexLoadError := newNeedleMapMetricFromIndexFile ( indexFile )
if indexLoadError != nil {
return nil , indexLoadError
@ -78,9 +93,20 @@ func generateLevelDbFile(dbFileName string, indexFile *os.File) error {
return err
}
defer db . Close ( )
return idx . WalkIndexFile ( indexFile , func ( key NeedleId , offset Offset , size Size ) error {
watermark := getWatermark ( db )
if stat , err := indexFile . Stat ( ) ; err != nil {
glog . Fatalf ( "stat file %s: %v" , indexFile . Name ( ) , err )
return err
} else {
if watermark * types . NeedleMapEntrySize > uint64 ( stat . Size ( ) ) {
glog . Warningf ( "wrong watermark %d for filesize %d" , watermark , stat . Size ( ) )
}
glog . V ( 0 ) . Infof ( "generateLevelDbFile %s, watermark %d, num of entries:%d" , dbFileName , watermark , ( uint64 ( stat . Size ( ) ) - watermark * types . NeedleMapEntrySize ) / types . NeedleMapEntrySize )
}
return idx . WalkIndexFile ( indexFile , watermark , func ( key NeedleId , offset Offset , size Size ) error {
if ! offset . IsZero ( ) && size . IsValid ( ) {
levelDbWrite ( db , key , offset , size )
levelDbWrite ( db , key , offset , size , false , 0 )
} else {
levelDbDelete ( db , key )
}
@ -102,6 +128,7 @@ func (m *LevelDbNeedleMap) Get(key NeedleId) (element *needle_map.NeedleValue, o
func ( m * LevelDbNeedleMap ) Put ( key NeedleId , offset Offset , size Size ) error {
var oldSize Size
var watermark uint64
if oldNeedle , ok := m . Get ( key ) ; ok {
oldSize = oldNeedle . Size
}
@ -110,16 +137,54 @@ func (m *LevelDbNeedleMap) Put(key NeedleId, offset Offset, size Size) error {
if err := m . appendToIndexFile ( key , offset , size ) ; err != nil {
return fmt . Errorf ( "cannot write to indexfile %s: %v" , m . indexFile . Name ( ) , err )
}
return levelDbWrite ( m . db , key , offset , size )
m . recordCount ++
if m . recordCount % watermarkBatchSize != 0 {
watermark = 0
} else {
watermark = ( m . recordCount / watermarkBatchSize ) * watermarkBatchSize
glog . V ( 1 ) . Infof ( "put cnt:%d for %s,watermark: %d" , m . recordCount , m . dbFileName , watermark )
}
return levelDbWrite ( m . db , key , offset , size , watermark == 0 , watermark )
}
func getWatermark ( db * leveldb . DB ) uint64 {
data , err := db . Get ( watermarkKey , nil )
if err != nil || len ( data ) != 8 {
glog . Warningf ( "get watermark from db error: %v, %d" , err , len ( data ) )
/ *
if ! strings . Contains ( strings . ToLower ( err . Error ( ) ) , "not found" ) {
err = setWatermark ( db , 0 )
if err != nil {
glog . Errorf ( "failed to set watermark: %v" , err )
}
}
* /
return 0
}
return util . BytesToUint64 ( data )
}
func levelDbWrite ( db * leveldb . DB , key NeedleId , offset Offset , size Size ) error {
func setWatermark ( db * leveldb . DB , watermark uint64 ) error {
glog . V ( 1 ) . Infof ( "set watermark %d" , watermark )
var wmBytes = make ( [ ] byte , 8 )
util . Uint64toBytes ( wmBytes , watermark )
if err := db . Put ( watermarkKey , wmBytes , nil ) ; err != nil {
return fmt . Errorf ( "failed to setWatermark: %v" , err )
}
return nil
}
func levelDbWrite ( db * leveldb . DB , key NeedleId , offset Offset , size Size , updateWatermark bool , watermark uint64 ) error {
bytes := needle_map . ToBytes ( key , offset , size )
if err := db . Put ( bytes [ 0 : NeedleIdSize ] , bytes [ NeedleIdSize : NeedleIdSize + OffsetSize + SizeSize ] , nil ) ; err != nil {
return fmt . Errorf ( "failed to write leveldb: %v" , err )
}
// set watermark
if updateWatermark {
return setWatermark ( db , watermark )
}
return nil
}
func levelDbDelete ( db * leveldb . DB , key NeedleId ) error {
@ -129,6 +194,7 @@ func levelDbDelete(db *leveldb.DB, key NeedleId) error {
}
func ( m * LevelDbNeedleMap ) Delete ( key NeedleId , offset Offset ) error {
var watermark uint64
oldNeedle , found := m . Get ( key )
if ! found || oldNeedle . Size . IsDeleted ( ) {
return nil
@ -139,8 +205,13 @@ func (m *LevelDbNeedleMap) Delete(key NeedleId, offset Offset) error {
if err := m . appendToIndexFile ( key , offset , TombstoneFileSize ) ; err != nil {
return err
}
return levelDbWrite ( m . db , key , oldNeedle . Offset , - oldNeedle . Size )
m . recordCount ++
if m . recordCount % watermarkBatchSize != 0 {
watermark = 0
} else {
watermark = ( m . recordCount / watermarkBatchSize ) * watermarkBatchSize
}
return levelDbWrite ( m . db , key , oldNeedle . Offset , - oldNeedle . Size , watermark == 0 , watermark )
}
func ( m * LevelDbNeedleMap ) Close ( ) {