@ -19,15 +19,16 @@ import (
. "github.com/chrislusf/seaweedfs/weed/storage/types"
)
//mark it every milestoneCnt operations
const milestoneCnt = 10000
const milestoneKey = 0xffffffffffffffff - 1
//mark it every watermarkBatchSize operations
const watermarkBatchSize = 10000
var watermarkKey = [ ] byte ( "idx_entry_watermark" )
type LevelDbNeedleMap struct {
baseNeedleMapper
dbFileName string
db * leveldb . DB
recordNum uint64
recordCount uint64
}
func NewLevelDbNeedleMap ( dbFileName string , indexFile * os . File , opts * opt . Options ) ( m * LevelDbNeedleMap , err error ) {
@ -53,12 +54,12 @@ func NewLevelDbNeedleMap(dbFileName string, indexFile *os.File, opts *opt.Option
return
}
}
glog . V ( 1 ) . Infof ( "Loading %s... , milestone : %d" , dbFileName , getMileStone ( m . db ) )
m . recordNum = uint64 ( m . indexFileOffset / types . NeedleMapEntrySize )
milestone := ( m . recordNum / milestoneCnt ) * milestoneCnt
err = setMileStone ( m . db , milestone )
glog . V ( 0 ) . Infof ( "Loading %s... , watermark : %d" , dbFileName , getWatermark ( m . db ) )
m . recordCount = uint64 ( m . indexFileOffset / types . NeedleMapEntrySize )
watermark := ( m . recordCount / watermarkBatchSize ) * watermarkBatchSize
err = setWatermark ( m . db , watermark )
if err != nil {
glog . Fatalf ( "set milestone for %s error: %s\n" , dbFileName , err )
glog . Fatalf ( "set watermark for %s error: %s\n" , dbFileName , err )
return
}
mm , indexLoadError := newNeedleMapMetricFromIndexFile ( indexFile )
@ -93,17 +94,17 @@ func generateLevelDbFile(dbFileName string, indexFile *os.File) error {
}
defer db . Close ( )
milestone := getMileStone ( db )
watermark := getWatermark ( db )
if stat , err := indexFile . Stat ( ) ; err != nil {
glog . Fatalf ( "stat file %s: %v" , indexFile . Name ( ) , err )
return err
} else {
if milestone * types . NeedleMapEntrySize > uint64 ( stat . Size ( ) ) {
glog . Warningf ( "wrong milestone %d for filesize %d" , milestone , stat . Size ( ) )
if watermark * types . NeedleMapEntrySize > uint64 ( stat . Size ( ) ) {
glog . Warningf ( "wrong watermark %d for filesize %d" , watermark , stat . Size ( ) )
}
glog . V ( 0 ) . Infof ( "generateLevelDbFile %s, milestone %d, num of entries:%d" , dbFileName , milestone , ( uint64 ( stat . Size ( ) ) - milestone * types . NeedleMapEntrySize ) / types . NeedleMapEntrySize )
glog . V ( 0 ) . Infof ( "generateLevelDbFile %s, watermark %d, num of entries:%d" , dbFileName , watermark , ( uint64 ( stat . Size ( ) ) - watermark * types . NeedleMapEntrySize ) / types . NeedleMapEntrySize )
}
return idx . WalkIndexFileIncrement ( indexFile , milestone , func ( key NeedleId , offset Offset , size Size ) error {
return idx . WalkIndexFile ( indexFile , watermark , func ( key NeedleId , offset Offset , size Size ) error {
if ! offset . IsZero ( ) && size . IsValid ( ) {
levelDbWrite ( db , key , offset , size , false , 0 )
} else {
@ -127,7 +128,7 @@ func (m *LevelDbNeedleMap) Get(key NeedleId) (element *needle_map.NeedleValue, o
func ( m * LevelDbNeedleMap ) Put ( key NeedleId , offset Offset , size Size ) error {
var oldSize Size
var milestone uint64
var watermark uint64
if oldNeedle , ok := m . Get ( key ) ; ok {
oldSize = oldNeedle . Size
}
@ -136,27 +137,25 @@ func (m *LevelDbNeedleMap) Put(key NeedleId, offset Offset, size Size) error {
if err := m . appendToIndexFile ( key , offset , size ) ; err != nil {
return fmt . Errorf ( "cannot write to indexfile %s: %v" , m . indexFile . Name ( ) , err )
}
m . recordNum ++
if m . recordNum % milestoneCnt != 0 {
milestone = 0
m . recordCount ++
if m . recordCount % watermarkBatchSize != 0 {
watermark = 0
} else {
milestone = ( m . recordNum / milestoneCnt ) * milestoneCnt
glog . V ( 1 ) . Infof ( "put cnt:%d for %s,milestone : %d" , m . recordNum , m . dbFileName , milestone )
watermark = ( m . recordCount / watermarkBatchSize ) * watermarkBatchSize
glog . V ( 1 ) . Infof ( "put cnt:%d for %s,watermark : %d" , m . recordCount , m . dbFileName , watermark )
}
return levelDbWrite ( m . db , key , offset , size , milestone == 0 , milestone )
return levelDbWrite ( m . db , key , offset , size , watermark == 0 , watermark )
}
func getMileStone ( db * leveldb . DB ) uint64 {
var mskBytes = make ( [ ] byte , 8 )
util . Uint64toBytes ( mskBytes , milestoneKey )
data , err := db . Get ( mskBytes , nil )
func getWatermark ( db * leveldb . DB ) uint64 {
data , err := db . Get ( watermarkKey , nil )
if err != nil || len ( data ) != 8 {
glog . Warningf ( "get milestone from db error: %v, %d" , err , len ( data ) )
glog . Warningf ( "get watermark from db error: %v, %d" , err , len ( data ) )
/ *
if ! strings . Contains ( strings . ToLower ( err . Error ( ) ) , "not found" ) {
err = setMileStone ( db , 0 )
err = setWatermark ( db , 0 )
if err != nil {
glog . Errorf ( "failed to set milestone : %v" , err )
glog . Errorf ( "failed to set watermark : %v" , err )
}
}
* /
@ -165,28 +164,26 @@ func getMileStone(db *leveldb.DB) uint64 {
return util . BytesToUint64 ( data )
}
func setMileStone ( db * leveldb . DB , milestone uint64 ) error {
glog . V ( 1 ) . Infof ( "set milestone %d" , milestone )
var mskBytes = make ( [ ] byte , 8 )
util . Uint64toBytes ( mskBytes , milestoneKey )
var msBytes = make ( [ ] byte , 8 )
util . Uint64toBytes ( msBytes , milestone )
if err := db . Put ( mskBytes , msBytes , nil ) ; err != nil {
return fmt . Errorf ( "failed to setMileStone: %v" , err )
func setWatermark ( db * leveldb . DB , watermark uint64 ) error {
glog . V ( 1 ) . Infof ( "set watermark %d" , watermark )
var wmBytes = make ( [ ] byte , 8 )
util . Uint64toBytes ( wmBytes , watermark )
if err := db . Put ( watermarkKey , wmBytes , nil ) ; err != nil {
return fmt . Errorf ( "failed to setWatermark: %v" , err )
}
return nil
}
func levelDbWrite ( db * leveldb . DB , key NeedleId , offset Offset , size Size , upateMilstone bool , milestone uint64 ) error {
func levelDbWrite ( db * leveldb . DB , key NeedleId , offset Offset , size Size , updateWatermark bool , watermark uint64 ) error {
bytes := needle_map . ToBytes ( key , offset , size )
if err := db . Put ( bytes [ 0 : NeedleIdSize ] , bytes [ NeedleIdSize : NeedleIdSize + OffsetSize + SizeSize ] , nil ) ; err != nil {
return fmt . Errorf ( "failed to write leveldb: %v" , err )
}
// set milestone
if upateMilstone {
return setMileStone ( db , milestone )
// set watermark
if updateWatermark {
return setWatermark ( db , watermark )
}
return nil
}
@ -197,7 +194,7 @@ func levelDbDelete(db *leveldb.DB, key NeedleId) error {
}
func ( m * LevelDbNeedleMap ) Delete ( key NeedleId , offset Offset ) error {
var milestone uint64
var watermark uint64
oldNeedle , found := m . Get ( key )
if ! found || oldNeedle . Size . IsDeleted ( ) {
return nil
@ -208,13 +205,13 @@ func (m *LevelDbNeedleMap) Delete(key NeedleId, offset Offset) error {
if err := m . appendToIndexFile ( key , offset , TombstoneFileSize ) ; err != nil {
return err
}
m . recordNum ++
if m . recordNum % milestoneCnt != 0 {
milestone = 0
m . recordCount ++
if m . recordCount % watermarkBatchSize != 0 {
watermark = 0
} else {
milestone = ( m . recordNum / milestoneCnt ) * milestoneCnt
watermark = ( m . recordCount / watermarkBatchSize ) * watermarkBatchSize
}
return levelDbWrite ( m . db , key , oldNeedle . Offset , - oldNeedle . Size , milestone == 0 , milestone )
return levelDbWrite ( m . db , key , oldNeedle . Offset , - oldNeedle . Size , watermark == 0 , watermark )
}
func ( m * LevelDbNeedleMap ) Close ( ) {