Browse Source

change btree map to in memory level db

pull/1171/head
Chris Lu 5 years ago
parent
commit
abffe857a1
  1. 14
      weed/command/export.go
  2. 10
      weed/command/fix.go
  3. 53
      weed/storage/needle_map/btree_map.go
  4. 112
      weed/storage/needle_map/memdb.go
  5. 13
      weed/storage/needle_map_memory.go
  6. 30
      weed/storage/needle_map_metric_test.go
  7. 32
      weed/storage/volume_vacuum.go

14
weed/command/export.go

@ -16,6 +16,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/storage" "github.com/chrislusf/seaweedfs/weed/storage"
"github.com/chrislusf/seaweedfs/weed/storage/needle" "github.com/chrislusf/seaweedfs/weed/storage/needle"
"github.com/chrislusf/seaweedfs/weed/storage/needle_map"
"github.com/chrislusf/seaweedfs/weed/storage/super_block" "github.com/chrislusf/seaweedfs/weed/storage/super_block"
"github.com/chrislusf/seaweedfs/weed/storage/types" "github.com/chrislusf/seaweedfs/weed/storage/types"
) )
@ -89,7 +90,7 @@ func printNeedle(vid needle.VolumeId, n *needle.Needle, version needle.Version,
type VolumeFileScanner4Export struct { type VolumeFileScanner4Export struct {
version needle.Version version needle.Version
counter int counter int
needleMap *storage.NeedleMap
needleMap *needle_map.MemDb
vid needle.VolumeId vid needle.VolumeId
} }
@ -192,15 +193,10 @@ func runExport(cmd *Command, args []string) bool {
fileName = *export.collection + "_" + fileName fileName = *export.collection + "_" + fileName
} }
vid := needle.VolumeId(*export.volumeId) vid := needle.VolumeId(*export.volumeId)
indexFile, err := os.OpenFile(path.Join(*export.dir, fileName+".idx"), os.O_RDONLY, 0644)
if err != nil {
glog.Fatalf("Create Volume Index [ERROR] %s\n", err)
}
defer indexFile.Close()
needleMap, err := storage.LoadBtreeNeedleMap(indexFile)
if err != nil {
glog.Fatalf("cannot load needle map from %s: %s", indexFile.Name(), err)
needleMap := needle_map.NewMemDb()
if err := needleMap.LoadFromIdx(path.Join(*export.dir, fileName+".idx")); err != nil {
glog.Fatalf("cannot load needle map from %s.idx: %s", fileName, err)
} }
volumeFileScanner := &VolumeFileScanner4Export{ volumeFileScanner := &VolumeFileScanner4Export{

10
weed/command/fix.go

@ -8,6 +8,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/storage" "github.com/chrislusf/seaweedfs/weed/storage"
"github.com/chrislusf/seaweedfs/weed/storage/needle" "github.com/chrislusf/seaweedfs/weed/storage/needle"
"github.com/chrislusf/seaweedfs/weed/storage/needle_map"
"github.com/chrislusf/seaweedfs/weed/storage/super_block" "github.com/chrislusf/seaweedfs/weed/storage/super_block"
"github.com/chrislusf/seaweedfs/weed/storage/types" "github.com/chrislusf/seaweedfs/weed/storage/types"
) )
@ -32,7 +33,7 @@ var (
type VolumeFileScanner4Fix struct { type VolumeFileScanner4Fix struct {
version needle.Version version needle.Version
nm *storage.NeedleMap
nm *needle_map.MemDb
} }
func (scanner *VolumeFileScanner4Fix) VisitSuperBlock(superBlock super_block.SuperBlock) error { func (scanner *VolumeFileScanner4Fix) VisitSuperBlock(superBlock super_block.SuperBlock) error {
@ -47,11 +48,11 @@ func (scanner *VolumeFileScanner4Fix) ReadNeedleBody() bool {
func (scanner *VolumeFileScanner4Fix) VisitNeedle(n *needle.Needle, offset int64, needleHeader, needleBody []byte) error { func (scanner *VolumeFileScanner4Fix) VisitNeedle(n *needle.Needle, offset int64, needleHeader, needleBody []byte) error {
glog.V(2).Infof("key %d offset %d size %d disk_size %d gzip %v", n.Id, offset, n.Size, n.DiskSize(scanner.version), n.IsGzipped()) glog.V(2).Infof("key %d offset %d size %d disk_size %d gzip %v", n.Id, offset, n.Size, n.DiskSize(scanner.version), n.IsGzipped())
if n.Size > 0 && n.Size != types.TombstoneFileSize { if n.Size > 0 && n.Size != types.TombstoneFileSize {
pe := scanner.nm.Put(n.Id, types.ToOffset(offset), n.Size)
pe := scanner.nm.Set(n.Id, types.ToOffset(offset), n.Size)
glog.V(2).Infof("saved %d with error %v", n.Size, pe) glog.V(2).Infof("saved %d with error %v", n.Size, pe)
} else { } else {
glog.V(2).Infof("skipping deleted file ...") glog.V(2).Infof("skipping deleted file ...")
return scanner.nm.Delete(n.Id, types.ToOffset(offset))
return scanner.nm.Delete(n.Id)
} }
return nil return nil
} }
@ -73,8 +74,7 @@ func runFix(cmd *Command, args []string) bool {
} }
defer indexFile.Close() defer indexFile.Close()
nm := storage.NewBtreeNeedleMap(indexFile)
defer nm.Close()
nm := needle_map.NewMemDb()
vid := needle.VolumeId(*fixVolumeId) vid := needle.VolumeId(*fixVolumeId)
scanner := &VolumeFileScanner4Fix{ scanner := &VolumeFileScanner4Fix{

53
weed/storage/needle_map/btree_map.go

@ -1,53 +0,0 @@
package needle_map
import (
. "github.com/chrislusf/seaweedfs/weed/storage/types"
"github.com/google/btree"
)
//This map assumes mostly inserting increasing keys
type BtreeMap struct {
tree *btree.BTree
}
func NewBtreeMap() *BtreeMap {
return &BtreeMap{
tree: btree.New(32),
}
}
func (cm *BtreeMap) Set(key NeedleId, offset Offset, size uint32) (oldOffset Offset, oldSize uint32) {
found := cm.tree.ReplaceOrInsert(NeedleValue{key, offset, size})
if found != nil {
old := found.(NeedleValue)
return old.Offset, old.Size
}
return
}
func (cm *BtreeMap) Delete(key NeedleId) (oldSize uint32) {
found := cm.tree.Delete(NeedleValue{key, Offset{}, 0})
if found != nil {
old := found.(NeedleValue)
return old.Size
}
return
}
func (cm *BtreeMap) Get(key NeedleId) (*NeedleValue, bool) {
found := cm.tree.Get(NeedleValue{key, Offset{}, 0})
if found != nil {
old := found.(NeedleValue)
return &old, true
}
return nil, false
}
// Visit visits all entries or stop if any error when visiting
func (cm *BtreeMap) AscendingVisit(visit func(NeedleValue) error) (ret error) {
cm.tree.Ascend(func(item btree.Item) bool {
needle := item.(NeedleValue)
ret = visit(needle)
return ret == nil
})
return ret
}

112
weed/storage/needle_map/memdb.go

@ -0,0 +1,112 @@
package needle_map
import (
"fmt"
"os"
"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/opt"
"github.com/syndtr/goleveldb/leveldb/storage"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/storage/idx"
. "github.com/chrislusf/seaweedfs/weed/storage/types"
"github.com/chrislusf/seaweedfs/weed/util"
)
//This map uses in memory level db
type MemDb struct {
db *leveldb.DB
}
func NewMemDb() *MemDb {
opts := &opt.Options{}
var err error
t := &MemDb{}
if t.db, err = leveldb.Open(storage.NewMemStorage(), opts); err != nil {
glog.V(0).Infof("MemDb fails to open: %v", err)
return nil
}
return t
}
func (cm *MemDb) Set(key NeedleId, offset Offset, size uint32) error {
bytes := ToBytes(key, offset, size)
if err := cm.db.Put(bytes[0:NeedleIdSize], bytes[NeedleIdSize:NeedleIdSize+OffsetSize+SizeSize], nil); err != nil {
return fmt.Errorf("failed to write temp leveldb: %v", err)
}
return nil
}
func (cm *MemDb) Delete(key NeedleId) error {
bytes := make([]byte, NeedleIdSize)
NeedleIdToBytes(bytes, key)
return cm.db.Delete(bytes, nil)
}
func (cm *MemDb) Get(key NeedleId) (*NeedleValue, bool) {
bytes := make([]byte, NeedleIdSize)
NeedleIdToBytes(bytes[0:NeedleIdSize], key)
data, err := cm.db.Get(bytes, nil)
if err != nil || len(data) != OffsetSize+SizeSize {
return nil, false
}
offset := BytesToOffset(data[0:OffsetSize])
size := util.BytesToUint32(data[OffsetSize : OffsetSize+SizeSize])
return &NeedleValue{Key: key, Offset: offset, Size: size}, true
}
// Visit visits all entries or stop if any error when visiting
func (cm *MemDb) AscendingVisit(visit func(NeedleValue) error) (ret error) {
iter := cm.db.NewIterator(nil, nil)
for iter.Next() {
key := BytesToNeedleId(iter.Key())
data := iter.Value()
offset := BytesToOffset(data[0:OffsetSize])
size := util.BytesToUint32(data[OffsetSize : OffsetSize+SizeSize])
needle := NeedleValue{Key: key, Offset: offset, Size: size}
ret = visit(needle)
if ret != nil {
return
}
}
iter.Release()
ret = iter.Error()
return
}
func (cm *MemDb) SaveToIdx(idxName string) (ret error) {
idxFile, err := os.OpenFile(idxName, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644)
if err != nil {
return
}
defer idxFile.Close()
return cm.AscendingVisit(func(value NeedleValue) error {
_, err := idxFile.Write(value.ToBytes())
return err
})
}
func (cm *MemDb) LoadFromIdx(idxName string) (ret error) {
idxFile, err := os.OpenFile(idxName, os.O_RDONLY, 0644)
if err != nil {
return
}
defer idxFile.Close()
return idx.WalkIndexFile(idxFile, func(key NeedleId, offset Offset, size uint32) error {
if offset.IsZero() || size == TombstoneFileSize {
return nil
}
return cm.Set(key, offset, size)
})
}

13
weed/storage/needle_map_memory.go

@ -22,24 +22,11 @@ func NewCompactNeedleMap(file *os.File) *NeedleMap {
return nm return nm
} }
func NewBtreeNeedleMap(file *os.File) *NeedleMap {
nm := &NeedleMap{
m: needle_map.NewBtreeMap(),
}
nm.indexFile = file
return nm
}
func LoadCompactNeedleMap(file *os.File) (*NeedleMap, error) { func LoadCompactNeedleMap(file *os.File) (*NeedleMap, error) {
nm := NewCompactNeedleMap(file) nm := NewCompactNeedleMap(file)
return doLoading(file, nm) return doLoading(file, nm)
} }
func LoadBtreeNeedleMap(file *os.File) (*NeedleMap, error) {
nm := NewBtreeNeedleMap(file)
return doLoading(file, nm)
}
func doLoading(file *os.File, nm *NeedleMap) (*NeedleMap, error) { func doLoading(file *os.File, nm *NeedleMap) (*NeedleMap, error) {
e := idx.WalkIndexFile(file, func(key NeedleId, offset Offset, size uint32) error { e := idx.WalkIndexFile(file, func(key NeedleId, offset Offset, size uint32) error {
nm.MaybeSetMaxFileKey(key) nm.MaybeSetMaxFileKey(key)

30
weed/storage/needle_map_metric_test.go

@ -1,30 +0,0 @@
package storage
import (
"github.com/chrislusf/seaweedfs/weed/glog"
. "github.com/chrislusf/seaweedfs/weed/storage/types"
"io/ioutil"
"math/rand"
"testing"
)
func TestFastLoadingNeedleMapMetrics(t *testing.T) {
idxFile, _ := ioutil.TempFile("", "tmp.idx")
nm := NewBtreeNeedleMap(idxFile)
for i := 0; i < 10000; i++ {
nm.Put(Uint64ToNeedleId(uint64(i+1)), Uint32ToOffset(uint32(0)), uint32(1))
if rand.Float32() < 0.2 {
nm.Delete(Uint64ToNeedleId(uint64(rand.Int63n(int64(i))+1)), Uint32ToOffset(uint32(0)))
}
}
mm, _ := newNeedleMapMetricFromIndexFile(idxFile)
glog.V(0).Infof("FileCount expected %d actual %d", nm.FileCount(), mm.FileCount())
glog.V(0).Infof("DeletedSize expected %d actual %d", nm.DeletedSize(), mm.DeletedSize())
glog.V(0).Infof("ContentSize expected %d actual %d", nm.ContentSize(), mm.ContentSize())
glog.V(0).Infof("DeletedCount expected %d actual %d", nm.DeletedCount(), mm.DeletedCount())
glog.V(0).Infof("MaxFileKey expected %d actual %d", nm.MaxFileKey(), mm.MaxFileKey())
}

32
weed/storage/volume_vacuum.go

@ -115,6 +115,7 @@ func (v *Volume) CommitCompact() error {
if e = v.load(true, false, v.needleMapKind, 0); e != nil { if e = v.load(true, false, v.needleMapKind, 0); e != nil {
return e return e
} }
return nil
} }
func (v *Volume) cleanupCompact() error { func (v *Volume) cleanupCompact() error {
@ -270,7 +271,7 @@ type VolumeFileScanner4Vacuum struct {
version needle.Version version needle.Version
v *Volume v *Volume
dstBackend backend.BackendStorageFile dstBackend backend.BackendStorageFile
nm *NeedleMap
nm *needle_map.MemDb
newOffset int64 newOffset int64
now uint64 now uint64
writeThrottler *util.WriteThrottler writeThrottler *util.WriteThrottler
@ -295,7 +296,7 @@ func (scanner *VolumeFileScanner4Vacuum) VisitNeedle(n *needle.Needle, offset in
nv, ok := scanner.v.nm.Get(n.Id) nv, ok := scanner.v.nm.Get(n.Id)
glog.V(4).Infoln("needle expected offset ", offset, "ok", ok, "nv", nv) glog.V(4).Infoln("needle expected offset ", offset, "ok", ok, "nv", nv)
if ok && nv.Offset.ToAcutalOffset() == offset && nv.Size > 0 && nv.Size != TombstoneFileSize { if ok && nv.Offset.ToAcutalOffset() == offset && nv.Size > 0 && nv.Size != TombstoneFileSize {
if err := scanner.nm.Put(n.Id, ToOffset(scanner.newOffset), n.Size); err != nil {
if err := scanner.nm.Set(n.Id, ToOffset(scanner.newOffset), n.Size); err != nil {
return fmt.Errorf("cannot put needle: %s", err) return fmt.Errorf("cannot put needle: %s", err)
} }
if _, _, _, err := n.Append(scanner.dstBackend, scanner.v.Version()); err != nil { if _, _, _, err := n.Append(scanner.dstBackend, scanner.v.Version()); err != nil {
@ -312,32 +313,33 @@ func (scanner *VolumeFileScanner4Vacuum) VisitNeedle(n *needle.Needle, offset in
func (v *Volume) copyDataAndGenerateIndexFile(dstName, idxName string, preallocate int64, compactionBytePerSecond int64) (err error) { func (v *Volume) copyDataAndGenerateIndexFile(dstName, idxName string, preallocate int64, compactionBytePerSecond int64) (err error) {
var ( var (
dst backend.BackendStorageFile dst backend.BackendStorageFile
idx *os.File
) )
if dst, err = createVolumeFile(dstName, preallocate, 0); err != nil { if dst, err = createVolumeFile(dstName, preallocate, 0); err != nil {
return return
} }
defer dst.Close() defer dst.Close()
if idx, err = os.OpenFile(idxName, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644); err != nil {
return
}
defer idx.Close()
nm := needle_map.NewMemDb()
scanner := &VolumeFileScanner4Vacuum{ scanner := &VolumeFileScanner4Vacuum{
v: v, v: v,
now: uint64(time.Now().Unix()), now: uint64(time.Now().Unix()),
nm: NewBtreeNeedleMap(idx),
nm: nm,
dstBackend: dst, dstBackend: dst,
writeThrottler: util.NewWriteThrottler(compactionBytePerSecond), writeThrottler: util.NewWriteThrottler(compactionBytePerSecond),
} }
err = ScanVolumeFile(v.dir, v.Collection, v.Id, v.needleMapKind, scanner) err = ScanVolumeFile(v.dir, v.Collection, v.Id, v.needleMapKind, scanner)
if err != nil {
return nil
}
err = nm.SaveToIdx(idxName)
return return
} }
func (v *Volume) copyDataBasedOnIndexFile(dstName, idxName string) (err error) { func (v *Volume) copyDataBasedOnIndexFile(dstName, idxName string) (err error) {
var ( var (
dst, idx, oldIndexFile *os.File
dst, oldIndexFile *os.File
) )
if dst, err = os.OpenFile(dstName, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644); err != nil { if dst, err = os.OpenFile(dstName, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644); err != nil {
return return
@ -345,17 +347,13 @@ func (v *Volume) copyDataBasedOnIndexFile(dstName, idxName string) (err error) {
dstDatBackend := backend.NewDiskFile(dst) dstDatBackend := backend.NewDiskFile(dst)
defer dstDatBackend.Close() defer dstDatBackend.Close()
if idx, err = os.OpenFile(idxName, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644); err != nil {
return
}
defer idx.Close()
if oldIndexFile, err = os.OpenFile(v.FileName()+".idx", os.O_RDONLY, 0644); err != nil { if oldIndexFile, err = os.OpenFile(v.FileName()+".idx", os.O_RDONLY, 0644); err != nil {
return return
} }
defer oldIndexFile.Close() defer oldIndexFile.Close()
nm := NewBtreeNeedleMap(idx)
nm := needle_map.NewMemDb()
now := uint64(time.Now().Unix()) now := uint64(time.Now().Unix())
v.SuperBlock.CompactionRevision++ v.SuperBlock.CompactionRevision++
@ -384,7 +382,7 @@ func (v *Volume) copyDataBasedOnIndexFile(dstName, idxName string) (err error) {
glog.V(4).Infoln("needle expected offset ", offset, "ok", ok, "nv", nv) glog.V(4).Infoln("needle expected offset ", offset, "ok", ok, "nv", nv)
if nv.Offset == offset && nv.Size > 0 { if nv.Offset == offset && nv.Size > 0 {
if err = nm.Put(n.Id, ToOffset(newOffset), n.Size); err != nil {
if err = nm.Set(n.Id, ToOffset(newOffset), n.Size); err != nil {
return fmt.Errorf("cannot put needle: %s", err) return fmt.Errorf("cannot put needle: %s", err)
} }
if _, _, _, err = n.Append(dstDatBackend, v.Version()); err != nil { if _, _, _, err = n.Append(dstDatBackend, v.Version()); err != nil {
@ -396,5 +394,7 @@ func (v *Volume) copyDataBasedOnIndexFile(dstName, idxName string) (err error) {
return nil return nil
}) })
nm.SaveToIdx(idxName)
return return
} }
Loading…
Cancel
Save