@ -4,11 +4,14 @@ import (
"fmt"
"os"
"path/filepath"
"strings"
"github.com/syndtr/goleveldb/leveldb/errors"
"github.com/syndtr/goleveldb/leveldb/opt"
"github.com/chrislusf/seaweedfs/weed/storage/idx"
"github.com/chrislusf/seaweedfs/weed/storage/types"
"github.com/chrislusf/seaweedfs/weed/util"
"github.com/syndtr/goleveldb/leveldb"
@ -17,26 +20,36 @@ import (
. "github.com/chrislusf/seaweedfs/weed/storage/types"
)
//use "2 >> 16" to reduce cpu cost
const milestoneCnt = 40
const milestoneKey = 0xffffffffffffffff - 1
type LevelDbNeedleMap struct {
baseNeedleMapper
dbFileName string
db * leveldb . DB
recordNum uint64
}
func NewLevelDbNeedleMap ( dbFileName string , indexFile * os . File , opts * opt . Options ) ( m * LevelDbNeedleMap , err error ) {
glog . V ( 0 ) . Infof ( "NewLevelDbNeedleMap pocessing %s..." , indexFile . Name ( ) )
db , errd := leveldb . OpenFile ( dbFileName , opts )
glog . V ( 0 ) . Infof ( "begain %v %s %d" , errd , dbFileName , getMileStone ( db ) )
db . Close ( )
m = & LevelDbNeedleMap { dbFileName : dbFileName }
m . indexFile = indexFile
if ! isLevelDbFresh ( dbFileName , indexFile ) {
glog . V ( 1 ) . Infof ( "Start to Generate %s from %s" , dbFileName , indexFile . Name ( ) )
glog . V ( 0 ) . Infof ( "Start to Generate %s from %s" , dbFileName , indexFile . Name ( ) )
generateLevelDbFile ( dbFileName , indexFile )
glog . V ( 1 ) . Infof ( "Finished Generating %s from %s" , dbFileName , indexFile . Name ( ) )
glog . V ( 0 ) . Infof ( "Finished Generating %s from %s" , dbFileName , indexFile . Name ( ) )
}
if stat , err := indexFile . Stat ( ) ; err != nil {
glog . Fatalf ( "stat file %s: %v" , indexFile . Name ( ) , err )
} else {
m . indexFileOffset = stat . Size ( )
}
glog . V ( 1 ) . Infof ( "Opening %s..." , dbFileName )
glog . V ( 0 ) . Infof ( "Opening %s..." , dbFileName )
if m . db , err = leveldb . OpenFile ( dbFileName , opts ) ; err != nil {
if errors . IsCorrupted ( err ) {
@ -46,11 +59,19 @@ func NewLevelDbNeedleMap(dbFileName string, indexFile *os.File, opts *opt.Option
return
}
}
glog . V ( 1 ) . Infof ( "Loading %s..." , indexFile . Name ( ) )
glog . V ( 0 ) . Infof ( "getMileStone %s : %d" , dbFileName , getMileStone ( m . db ) )
m . recordNum = uint64 ( m . indexFileOffset / types . NeedleMapEntrySize )
milestone := ( m . recordNum / milestoneCnt ) * milestoneCnt
err = setMileStone ( m . db , milestone )
if err != nil {
return
}
glog . V ( 0 ) . Infof ( "Loading %s... %d %d" , indexFile . Name ( ) , milestone , getMileStone ( m . db ) )
mm , indexLoadError := newNeedleMapMetricFromIndexFile ( indexFile )
if indexLoadError != nil {
return nil , indexLoadError
}
glog . V ( 0 ) . Infof ( "finish Loading %s..." , indexFile . Name ( ) )
m . mapMetric = * mm
return
}
@ -78,9 +99,21 @@ func generateLevelDbFile(dbFileName string, indexFile *os.File) error {
return err
}
defer db . Close ( )
return idx . WalkIndexFile ( indexFile , func ( key NeedleId , offset Offset , size Size ) error {
milestone := getMileStone ( db )
if stat , err := indexFile . Stat ( ) ; err != nil {
glog . Fatalf ( "stat file %s: %v" , indexFile . Name ( ) , err )
return err
} else {
if milestone * types . NeedleMapEntrySize > uint64 ( stat . Size ( ) ) {
glog . Warningf ( "wrong milestone %d for filesize %d, set milestone to 0" , milestone , stat . Size ( ) )
milestone = 0
}
glog . V ( 0 ) . Infof ( "generateLevelDbFile %s, milestone %d, num of entries:%d" , dbFileName , milestone , ( uint64 ( stat . Size ( ) ) - milestone * types . NeedleMapEntrySize ) / types . NeedleMapEntrySize )
}
return idx . WalkIndexFileIncrent ( indexFile , milestone , func ( key NeedleId , offset Offset , size Size ) error {
if ! offset . IsZero ( ) && size . IsValid ( ) {
levelDbWrite ( db , key , offset , size )
levelDbWrite ( db , key , offset , size , 0 )
} else {
levelDbDelete ( db , key )
}
@ -102,6 +135,7 @@ func (m *LevelDbNeedleMap) Get(key NeedleId) (element *needle_map.NeedleValue, o
func ( m * LevelDbNeedleMap ) Put ( key NeedleId , offset Offset , size Size ) error {
var oldSize Size
var milestone uint64
if oldNeedle , ok := m . Get ( key ) ; ok {
oldSize = oldNeedle . Size
}
@ -110,16 +144,61 @@ func (m *LevelDbNeedleMap) Put(key NeedleId, offset Offset, size Size) error {
if err := m . appendToIndexFile ( key , offset , size ) ; err != nil {
return fmt . Errorf ( "cannot write to indexfile %s: %v" , m . indexFile . Name ( ) , err )
}
return levelDbWrite ( m . db , key , offset , size )
//atomic.AddUint64(&m.recordNum, 1)
//milestone = atomic.LoadUint64(&m.recordNum)
m . recordNum ++
if m . recordNum % milestoneCnt != 0 {
milestone = 0
} else {
milestone = ( m . recordNum / milestoneCnt ) * milestoneCnt
glog . V ( 0 ) . Infof ( "put cnt:%d milestone:%s %d" , m . recordNum , m . dbFileName , milestone )
}
return levelDbWrite ( m . db , key , offset , size , milestone )
}
func levelDbWrite ( db * leveldb . DB , key NeedleId , offset Offset , size Size ) error {
func getMileStone ( db * leveldb . DB ) uint64 {
var mskBytes = make ( [ ] byte , 8 )
util . Uint64toBytes ( mskBytes , milestoneKey )
data , err := db . Get ( mskBytes , nil )
if err != nil || len ( data ) != 8 {
glog . Warningf ( "get milestone from db error: %v, %d" , err , len ( data ) )
if ! strings . Contains ( strings . ToLower ( err . Error ( ) ) , "not found" ) {
err = setMileStone ( db , 0 )
if err != nil {
glog . Errorf ( "failed to set milestone: %v" , err )
}
}
return 0
}
return util . BytesToUint64 ( data )
}
func setMileStone ( db * leveldb . DB , milestone uint64 ) error {
glog . V ( 0 ) . Infof ( "set milestone %d" , milestone )
var mskBytes = make ( [ ] byte , 8 )
util . Uint64toBytes ( mskBytes , milestoneKey )
var msBytes = make ( [ ] byte , 8 )
util . Uint64toBytes ( msBytes , milestone )
if err := db . Put ( mskBytes , msBytes , nil ) ; err != nil {
return fmt . Errorf ( "failed to setMileStone: %v" , err )
}
glog . V ( 0 ) . Infof ( "ssset milestone %d, %d" , milestone , getMileStone ( db ) )
return nil
}
func levelDbWrite ( db * leveldb . DB , key NeedleId , offset Offset , size Size , milestone uint64 ) error {
bytes := needle_map . ToBytes ( key , offset , size )
if err := db . Put ( bytes [ 0 : NeedleIdSize ] , bytes [ NeedleIdSize : NeedleIdSize + OffsetSize + SizeSize ] , nil ) ; err != nil {
return fmt . Errorf ( "failed to write leveldb: %v" , err )
}
// set milestone
if milestone != 0 {
glog . V ( 0 ) . Infof ( "actually set milestone %d" , milestone )
return setMileStone ( db , milestone )
}
return nil
}
func levelDbDelete ( db * leveldb . DB , key NeedleId ) error {
@ -129,6 +208,7 @@ func levelDbDelete(db *leveldb.DB, key NeedleId) error {
}
func ( m * LevelDbNeedleMap ) Delete ( key NeedleId , offset Offset ) error {
var milestone uint64
oldNeedle , found := m . Get ( key )
if ! found || oldNeedle . Size . IsDeleted ( ) {
return nil
@ -139,8 +219,16 @@ func (m *LevelDbNeedleMap) Delete(key NeedleId, offset Offset) error {
if err := m . appendToIndexFile ( key , offset , TombstoneFileSize ) ; err != nil {
return err
}
return levelDbWrite ( m . db , key , oldNeedle . Offset , - oldNeedle . Size )
//atomic.AddUint64(&m.recordNum, 1)
//milestone = atomic.LoadUint64(&m.recordNum)
m . recordNum ++
if m . recordNum % milestoneCnt != 0 {
milestone = 0
} else {
milestone = ( m . recordNum / milestoneCnt ) * milestoneCnt
}
glog . V ( 0 ) . Infof ( "delete cnt:%d milestone:%s %d" , m . recordNum , m . dbFileName , milestone )
return levelDbWrite ( m . db , key , oldNeedle . Offset , - oldNeedle . Size , milestone )
}
func ( m * LevelDbNeedleMap ) Close ( ) {