Browse Source

add btree for volume index

pull/357/merge
Chris Lu 8 years ago
parent
commit
82c3ccc8dd
  1. 2
      weed/command/export.go
  2. 2
      weed/command/fix.go
  3. 4
      weed/command/server.go
  4. 4
      weed/command/volume.go
  5. 52
      weed/storage/needle/btree_map.go
  6. 194
      weed/storage/needle/compact_map.go
  7. 9
      weed/storage/needle/compact_map_perf_test.go
  8. 2
      weed/storage/needle/compact_map_test.go
  9. 28
      weed/storage/needle/needle_value.go
  10. 8
      weed/storage/needle/needle_value_map.go
  11. 4
      weed/storage/needle_map.go
  12. 7
      weed/storage/needle_map_boltdb.go
  13. 7
      weed/storage/needle_map_leveldb.go
  14. 40
      weed/storage/needle_map_memory.go
  15. 15
      weed/storage/volume_loading.go
  16. 27
      weed/storage/volume_sync.go
  17. 4
      weed/storage/volume_vacuum.go

2
weed/command/export.go

@ -118,7 +118,7 @@ func runExport(cmd *Command, args []string) bool {
} }
defer indexFile.Close() defer indexFile.Close()
needleMap, err := storage.LoadNeedleMap(indexFile)
needleMap, err := storage.LoadBtreeNeedleMap(indexFile)
if err != nil { if err != nil {
glog.Fatalf("cannot load needle map from %s: %s", indexFile.Name(), err) glog.Fatalf("cannot load needle map from %s: %s", indexFile.Name(), err)
} }

2
weed/command/fix.go

@ -43,7 +43,7 @@ func runFix(cmd *Command, args []string) bool {
} }
defer indexFile.Close() defer indexFile.Close()
nm := storage.NewNeedleMap(indexFile)
nm := storage.NewBtreeNeedleMap(indexFile)
defer nm.Close() defer nm.Close()
vid := storage.VolumeId(*fixVolumeId) vid := storage.VolumeId(*fixVolumeId)

4
weed/command/server.go

@ -72,7 +72,7 @@ var (
volumeDataFolders = cmdServer.Flag.String("dir", os.TempDir(), "directories to store data files. dir[,dir]...") volumeDataFolders = cmdServer.Flag.String("dir", os.TempDir(), "directories to store data files. dir[,dir]...")
volumeMaxDataVolumeCounts = cmdServer.Flag.String("volume.max", "7", "maximum numbers of volumes, count[,count]...") volumeMaxDataVolumeCounts = cmdServer.Flag.String("volume.max", "7", "maximum numbers of volumes, count[,count]...")
volumePulse = cmdServer.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats") volumePulse = cmdServer.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats")
volumeIndexType = cmdServer.Flag.String("volume.index", "memory", "Choose [memory|leveldb|boltdb] mode for memory~performance balance.")
volumeIndexType = cmdServer.Flag.String("volume.index", "memory", "Choose [memory|leveldb|boltdb|btree] mode for memory~performance balance.")
volumeFixJpgOrientation = cmdServer.Flag.Bool("volume.images.fix.orientation", true, "Adjust jpg orientation when uploading.") volumeFixJpgOrientation = cmdServer.Flag.Bool("volume.images.fix.orientation", true, "Adjust jpg orientation when uploading.")
volumeReadRedirect = cmdServer.Flag.Bool("volume.read.redirect", true, "Redirect moved or non-local volumes.") volumeReadRedirect = cmdServer.Flag.Bool("volume.read.redirect", true, "Redirect moved or non-local volumes.")
volumeServerPublicUrl = cmdServer.Flag.String("volume.publicUrl", "", "publicly accessible address") volumeServerPublicUrl = cmdServer.Flag.String("volume.publicUrl", "", "publicly accessible address")
@ -276,6 +276,8 @@ func runServer(cmd *Command, args []string) bool {
volumeNeedleMapKind = storage.NeedleMapLevelDb volumeNeedleMapKind = storage.NeedleMapLevelDb
case "boltdb": case "boltdb":
volumeNeedleMapKind = storage.NeedleMapBoltDb volumeNeedleMapKind = storage.NeedleMapBoltDb
case "btree":
volumeNeedleMapKind = storage.NeedleMapBtree
} }
volumeServer := weed_server.NewVolumeServer(volumeMux, publicVolumeMux, volumeServer := weed_server.NewVolumeServer(volumeMux, publicVolumeMux,
*serverIp, *volumePort, *volumeServerPublicUrl, *serverIp, *volumePort, *volumeServerPublicUrl,

4
weed/command/volume.go

@ -52,7 +52,7 @@ func init() {
v.maxCpu = cmdVolume.Flag.Int("maxCpu", 0, "maximum number of CPUs. 0 means all available CPUs") v.maxCpu = cmdVolume.Flag.Int("maxCpu", 0, "maximum number of CPUs. 0 means all available CPUs")
v.dataCenter = cmdVolume.Flag.String("dataCenter", "", "current volume server's data center name") v.dataCenter = cmdVolume.Flag.String("dataCenter", "", "current volume server's data center name")
v.rack = cmdVolume.Flag.String("rack", "", "current volume server's rack name") v.rack = cmdVolume.Flag.String("rack", "", "current volume server's rack name")
v.indexType = cmdVolume.Flag.String("index", "memory", "Choose [memory|leveldb|boltdb] mode for memory~performance balance.")
v.indexType = cmdVolume.Flag.String("index", "memory", "Choose [memory|leveldb|boltdb|btree] mode for memory~performance balance.")
v.fixJpgOrientation = cmdVolume.Flag.Bool("images.fix.orientation", true, "Adjust jpg orientation when uploading.") v.fixJpgOrientation = cmdVolume.Flag.Bool("images.fix.orientation", true, "Adjust jpg orientation when uploading.")
v.readRedirect = cmdVolume.Flag.Bool("read.redirect", true, "Redirect moved or non-local volumes.") v.readRedirect = cmdVolume.Flag.Bool("read.redirect", true, "Redirect moved or non-local volumes.")
v.enableBytesCache = cmdVolume.Flag.Bool("cache.enable", false, "direct cache instead of OS cache, cost more memory.") v.enableBytesCache = cmdVolume.Flag.Bool("cache.enable", false, "direct cache instead of OS cache, cost more memory.")
@ -126,6 +126,8 @@ func runVolume(cmd *Command, args []string) bool {
volumeNeedleMapKind = storage.NeedleMapLevelDb volumeNeedleMapKind = storage.NeedleMapLevelDb
case "boltdb": case "boltdb":
volumeNeedleMapKind = storage.NeedleMapBoltDb volumeNeedleMapKind = storage.NeedleMapBoltDb
case "btree":
volumeNeedleMapKind = storage.NeedleMapBtree
} }
volumeServer := weed_server.NewVolumeServer(volumeMux, publicVolumeMux, volumeServer := weed_server.NewVolumeServer(volumeMux, publicVolumeMux,
*v.ip, *v.port, *v.publicUrl, *v.ip, *v.port, *v.publicUrl,

52
weed/storage/needle/btree_map.go

@ -0,0 +1,52 @@
package needle
import (
"github.com/google/btree"
)
//This map assumes mostly inserting increasing keys
type BtreeMap struct {
tree *btree.BTree
}
func NewBtreeMap() *BtreeMap {
return &BtreeMap{
tree: btree.New(32),
}
}
func (cm *BtreeMap) Set(key Key, offset, size uint32) (oldOffset, oldSize uint32) {
found := cm.tree.ReplaceOrInsert(NeedleValue{key, offset, size})
if found != nil {
old := found.(NeedleValue)
return old.Offset, old.Size
}
return
}
func (cm *BtreeMap) Delete(key Key) (oldSize uint32) {
found := cm.tree.Delete(NeedleValue{key, 0, 0})
if found != nil {
old := found.(NeedleValue)
return old.Size
}
return
}
func (cm *BtreeMap) Get(key Key) (*NeedleValue, bool) {
found := cm.tree.Get(NeedleValue{key, 0, 0})
if found != nil {
old := found.(NeedleValue)
return &old, true
}
return nil, false
}
// Visit visits all entries or stop if any error when visiting
func (cm *BtreeMap) Visit(visit func(NeedleValue) error) (ret error) {
cm.tree.Ascend(func(item btree.Item) bool {
needle := item.(NeedleValue)
ret = visit(needle)
return ret == nil
})
return ret
}

194
weed/storage/needle/compact_map.go

@ -0,0 +1,194 @@
package needle
import (
"sync"
)
type CompactSection struct {
sync.RWMutex
values []NeedleValue
overflow map[Key]NeedleValue
start Key
end Key
counter int
}
func NewCompactSection(start Key) *CompactSection {
return &CompactSection{
values: make([]NeedleValue, batch),
overflow: make(map[Key]NeedleValue),
start: start,
}
}
//return old entry size
func (cs *CompactSection) Set(key Key, offset, size uint32) (oldOffset, oldSize uint32) {
cs.Lock()
if key > cs.end {
cs.end = key
}
if i := cs.binarySearchValues(key); i >= 0 {
oldOffset, oldSize = cs.values[i].Offset, cs.values[i].Size
//println("key", key, "old size", ret)
cs.values[i].Offset, cs.values[i].Size = offset, size
} else {
needOverflow := cs.counter >= batch
needOverflow = needOverflow || cs.counter > 0 && cs.values[cs.counter-1].Key > key
if needOverflow {
//println("start", cs.start, "counter", cs.counter, "key", key)
if oldValue, found := cs.overflow[key]; found {
oldOffset, oldSize = oldValue.Offset, oldValue.Size
}
cs.overflow[key] = NeedleValue{Key: key, Offset: offset, Size: size}
} else {
p := &cs.values[cs.counter]
p.Key, p.Offset, p.Size = key, offset, size
//println("added index", cs.counter, "key", key, cs.values[cs.counter].Key)
cs.counter++
}
}
cs.Unlock()
return
}
//return old entry size
func (cs *CompactSection) Delete(key Key) uint32 {
cs.Lock()
ret := uint32(0)
if i := cs.binarySearchValues(key); i >= 0 {
if cs.values[i].Size > 0 {
ret = cs.values[i].Size
cs.values[i].Size = 0
}
}
if v, found := cs.overflow[key]; found {
delete(cs.overflow, key)
ret = v.Size
}
cs.Unlock()
return ret
}
func (cs *CompactSection) Get(key Key) (*NeedleValue, bool) {
cs.RLock()
if v, ok := cs.overflow[key]; ok {
cs.RUnlock()
return &v, true
}
if i := cs.binarySearchValues(key); i >= 0 {
cs.RUnlock()
return &cs.values[i], true
}
cs.RUnlock()
return nil, false
}
func (cs *CompactSection) binarySearchValues(key Key) int {
l, h := 0, cs.counter-1
if h >= 0 && cs.values[h].Key < key {
return -2
}
//println("looking for key", key)
for l <= h {
m := (l + h) / 2
//println("mid", m, "key", cs.values[m].Key, cs.values[m].Offset, cs.values[m].Size)
if cs.values[m].Key < key {
l = m + 1
} else if key < cs.values[m].Key {
h = m - 1
} else {
//println("found", m)
return m
}
}
return -1
}
//This map assumes mostly inserting increasing keys
//This map assumes mostly inserting increasing keys
type CompactMap struct {
list []*CompactSection
}
func NewCompactMap() *CompactMap {
return &CompactMap{}
}
func (cm *CompactMap) Set(key Key, offset, size uint32) (oldOffset, oldSize uint32) {
x := cm.binarySearchCompactSection(key)
if x < 0 {
//println(x, "creating", len(cm.list), "section, starting", key)
cm.list = append(cm.list, NewCompactSection(key))
x = len(cm.list) - 1
//keep compact section sorted by start
for x > 0 {
if cm.list[x-1].start > cm.list[x].start {
cm.list[x-1], cm.list[x] = cm.list[x], cm.list[x-1]
x = x - 1
} else {
break
}
}
}
return cm.list[x].Set(key, offset, size)
}
func (cm *CompactMap) Delete(key Key) uint32 {
x := cm.binarySearchCompactSection(key)
if x < 0 {
return uint32(0)
}
return cm.list[x].Delete(key)
}
func (cm *CompactMap) Get(key Key) (*NeedleValue, bool) {
x := cm.binarySearchCompactSection(key)
if x < 0 {
return nil, false
}
return cm.list[x].Get(key)
}
func (cm *CompactMap) binarySearchCompactSection(key Key) int {
l, h := 0, len(cm.list)-1
if h < 0 {
return -5
}
if cm.list[h].start <= key {
if cm.list[h].counter < batch || key <= cm.list[h].end {
return h
}
return -4
}
for l <= h {
m := (l + h) / 2
if key < cm.list[m].start {
h = m - 1
} else { // cm.list[m].start <= key
if cm.list[m+1].start <= key {
l = m + 1
} else {
return m
}
}
}
return -3
}
// Visit visits all entries or stop if any error when visiting
func (cm *CompactMap) Visit(visit func(NeedleValue) error) error {
for _, cs := range cm.list {
cs.RLock()
for _, v := range cs.overflow {
if err := visit(v); err != nil {
cs.RUnlock()
return err
}
}
for _, v := range cs.values {
if _, found := cs.overflow[v.Key]; !found {
if err := visit(v); err != nil {
cs.RUnlock()
return err
}
}
}
cs.RUnlock()
}
return nil
}

9
weed/storage/compact_map_perf_test.go → weed/storage/needle/compact_map_perf_test.go

@ -1,4 +1,4 @@
package storage
package needle
import ( import (
"log" "log"
@ -11,15 +11,15 @@ import (
func TestMemoryUsage(t *testing.T) { func TestMemoryUsage(t *testing.T) {
indexFile, ie := os.OpenFile("../../test/sample.idx", os.O_RDWR|os.O_RDONLY, 0644)
indexFile, ie := os.OpenFile("../../../test/sample.idx", os.O_RDWR|os.O_RDONLY, 0644)
if ie != nil { if ie != nil {
log.Fatalln(ie) log.Fatalln(ie)
} }
LoadNewNeedleMap(indexFile)
loadNewNeedleMap(indexFile)
} }
func LoadNewNeedleMap(file *os.File) CompactMap {
func loadNewNeedleMap(file *os.File) {
m := NewCompactMap() m := NewCompactMap()
bytes := make([]byte, 16*1024) bytes := make([]byte, 16*1024)
count, e := file.Read(bytes) count, e := file.Read(bytes)
@ -41,5 +41,4 @@ func LoadNewNeedleMap(file *os.File) CompactMap {
count, e = file.Read(bytes) count, e = file.Read(bytes)
} }
return m
} }

2
weed/storage/compact_map_test.go → weed/storage/needle/compact_map_test.go

@ -1,4 +1,4 @@
package storage
package needle
import ( import (
"testing" "testing"

28
weed/storage/needle/needle_value.go

@ -0,0 +1,28 @@
package needle
import (
"strconv"
"github.com/google/btree"
)
const (
batch = 100000
)
type NeedleValue struct {
Key Key
Offset uint32 `comment:"Volume offset"` //since aligned to 8 bytes, range is 4G*8=32G
Size uint32 `comment:"Size of the data portion"`
}
func (this NeedleValue) Less(than btree.Item) bool {
that := than.(NeedleValue)
return this.Key < that.Key
}
type Key uint64
func (k Key) String() string {
return strconv.FormatUint(uint64(k), 10)
}

8
weed/storage/needle/needle_value_map.go

@ -0,0 +1,8 @@
package needle
type NeedleValueMap interface {
Set(key Key, offset, size uint32) (oldOffset, oldSize uint32)
Delete(key Key) uint32
Get(key Key) (*NeedleValue, bool)
Visit(visit func(NeedleValue) error) error
}

4
weed/storage/needle_map.go

@ -6,6 +6,7 @@ import (
"os" "os"
"sync" "sync"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
"github.com/chrislusf/seaweedfs/weed/util" "github.com/chrislusf/seaweedfs/weed/util"
) )
@ -15,6 +16,7 @@ const (
NeedleMapInMemory NeedleMapType = iota NeedleMapInMemory NeedleMapType = iota
NeedleMapLevelDb NeedleMapLevelDb
NeedleMapBoltDb NeedleMapBoltDb
NeedleMapBtree
) )
const ( const (
@ -23,7 +25,7 @@ const (
type NeedleMapper interface { type NeedleMapper interface {
Put(key uint64, offset uint32, size uint32) error Put(key uint64, offset uint32, size uint32) error
Get(key uint64) (element *NeedleValue, ok bool)
Get(key uint64) (element *needle.NeedleValue, ok bool)
Delete(key uint64, offset uint32) error Delete(key uint64, offset uint32) error
Close() Close()
Destroy() error Destroy() error

7
weed/storage/needle_map_boltdb.go

@ -7,6 +7,7 @@ import (
"github.com/boltdb/bolt" "github.com/boltdb/bolt"
"github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
"github.com/chrislusf/seaweedfs/weed/util" "github.com/chrislusf/seaweedfs/weed/util"
) )
@ -31,7 +32,7 @@ func NewBoltDbNeedleMap(dbFileName string, indexFile *os.File) (m *BoltDbNeedleM
return return
} }
glog.V(1).Infof("Loading %s...", indexFile.Name()) glog.V(1).Infof("Loading %s...", indexFile.Name())
nm, indexLoadError := LoadNeedleMap(indexFile)
nm, indexLoadError := LoadBtreeNeedleMap(indexFile)
if indexLoadError != nil { if indexLoadError != nil {
return nil, indexLoadError return nil, indexLoadError
} }
@ -72,7 +73,7 @@ func generateBoltDbFile(dbFileName string, indexFile *os.File) error {
}) })
} }
func (m *BoltDbNeedleMap) Get(key uint64) (element *NeedleValue, ok bool) {
func (m *BoltDbNeedleMap) Get(key uint64) (element *needle.NeedleValue, ok bool) {
bytes := make([]byte, 8) bytes := make([]byte, 8)
var data []byte var data []byte
util.Uint64toBytes(bytes, key) util.Uint64toBytes(bytes, key)
@ -91,7 +92,7 @@ func (m *BoltDbNeedleMap) Get(key uint64) (element *NeedleValue, ok bool) {
} }
offset := util.BytesToUint32(data[0:4]) offset := util.BytesToUint32(data[0:4])
size := util.BytesToUint32(data[4:8]) size := util.BytesToUint32(data[4:8])
return &NeedleValue{Key: Key(key), Offset: offset, Size: size}, true
return &needle.NeedleValue{Key: needle.Key(key), Offset: offset, Size: size}, true
} }
func (m *BoltDbNeedleMap) Put(key uint64, offset uint32, size uint32) error { func (m *BoltDbNeedleMap) Put(key uint64, offset uint32, size uint32) error {

7
weed/storage/needle_map_leveldb.go

@ -6,6 +6,7 @@ import (
"path/filepath" "path/filepath"
"github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
"github.com/chrislusf/seaweedfs/weed/util" "github.com/chrislusf/seaweedfs/weed/util"
"github.com/syndtr/goleveldb/leveldb" "github.com/syndtr/goleveldb/leveldb"
) )
@ -29,7 +30,7 @@ func NewLevelDbNeedleMap(dbFileName string, indexFile *os.File) (m *LevelDbNeedl
return return
} }
glog.V(1).Infof("Loading %s...", indexFile.Name()) glog.V(1).Infof("Loading %s...", indexFile.Name())
nm, indexLoadError := LoadNeedleMap(indexFile)
nm, indexLoadError := LoadBtreeNeedleMap(indexFile)
if indexLoadError != nil { if indexLoadError != nil {
return nil, indexLoadError return nil, indexLoadError
} }
@ -70,7 +71,7 @@ func generateLevelDbFile(dbFileName string, indexFile *os.File) error {
}) })
} }
func (m *LevelDbNeedleMap) Get(key uint64) (element *NeedleValue, ok bool) {
func (m *LevelDbNeedleMap) Get(key uint64) (element *needle.NeedleValue, ok bool) {
bytes := make([]byte, 8) bytes := make([]byte, 8)
util.Uint64toBytes(bytes, key) util.Uint64toBytes(bytes, key)
data, err := m.db.Get(bytes, nil) data, err := m.db.Get(bytes, nil)
@ -79,7 +80,7 @@ func (m *LevelDbNeedleMap) Get(key uint64) (element *NeedleValue, ok bool) {
} }
offset := util.BytesToUint32(data[0:4]) offset := util.BytesToUint32(data[0:4])
size := util.BytesToUint32(data[4:8]) size := util.BytesToUint32(data[4:8])
return &NeedleValue{Key: Key(key), Offset: offset, Size: size}, true
return &needle.NeedleValue{Key: needle.Key(key), Offset: offset, Size: size}, true
} }
func (m *LevelDbNeedleMap) Put(key uint64, offset uint32, size uint32) error { func (m *LevelDbNeedleMap) Put(key uint64, offset uint32, size uint32) error {

40
weed/storage/needle_map_memory.go

@ -5,17 +5,26 @@ import (
"os" "os"
"github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
) )
type NeedleMap struct { type NeedleMap struct {
m CompactMap
m needle.NeedleValueMap
baseNeedleMapper baseNeedleMapper
} }
func NewNeedleMap(file *os.File) *NeedleMap {
func NewCompactNeedleMap(file *os.File) *NeedleMap {
nm := &NeedleMap{ nm := &NeedleMap{
m: NewCompactMap(),
m: needle.NewCompactMap(),
}
nm.indexFile = file
return nm
}
func NewBtreeNeedleMap(file *os.File) *NeedleMap {
nm := &NeedleMap{
m: needle.NewBtreeMap(),
} }
nm.indexFile = file nm.indexFile = file
return nm return nm
@ -25,8 +34,17 @@ const (
RowsToRead = 1024 RowsToRead = 1024
) )
func LoadNeedleMap(file *os.File) (*NeedleMap, error) {
nm := NewNeedleMap(file)
func LoadCompactNeedleMap(file *os.File) (*NeedleMap, error) {
nm := NewCompactNeedleMap(file)
return doLoading(file, nm)
}
func LoadBtreeNeedleMap(file *os.File) (*NeedleMap, error) {
nm := NewBtreeNeedleMap(file)
return doLoading(file, nm)
}
func doLoading(file *os.File, nm *NeedleMap) (*NeedleMap, error) {
e := WalkIndexFile(file, func(key uint64, offset, size uint32) error { e := WalkIndexFile(file, func(key uint64, offset, size uint32) error {
if key > nm.MaximumFileKey { if key > nm.MaximumFileKey {
nm.MaximumFileKey = key nm.MaximumFileKey = key
@ -34,14 +52,14 @@ func LoadNeedleMap(file *os.File) (*NeedleMap, error) {
if offset > 0 && size != TombstoneFileSize { if offset > 0 && size != TombstoneFileSize {
nm.FileCounter++ nm.FileCounter++
nm.FileByteCounter = nm.FileByteCounter + uint64(size) nm.FileByteCounter = nm.FileByteCounter + uint64(size)
oldOffset, oldSize := nm.m.Set(Key(key), offset, size)
oldOffset, oldSize := nm.m.Set(needle.Key(key), offset, size)
glog.V(3).Infoln("reading key", key, "offset", offset*NeedlePaddingSize, "size", size, "oldSize", oldSize) glog.V(3).Infoln("reading key", key, "offset", offset*NeedlePaddingSize, "size", size, "oldSize", oldSize)
if oldOffset > 0 && oldSize != TombstoneFileSize { if oldOffset > 0 && oldSize != TombstoneFileSize {
nm.DeletionCounter++ nm.DeletionCounter++
nm.DeletionByteCounter = nm.DeletionByteCounter + uint64(oldSize) nm.DeletionByteCounter = nm.DeletionByteCounter + uint64(oldSize)
} }
} else { } else {
oldSize := nm.m.Delete(Key(key))
oldSize := nm.m.Delete(needle.Key(key))
glog.V(3).Infoln("removing key", key, "offset", offset*NeedlePaddingSize, "size", size, "oldSize", oldSize) glog.V(3).Infoln("removing key", key, "offset", offset*NeedlePaddingSize, "size", size, "oldSize", oldSize)
nm.DeletionCounter++ nm.DeletionCounter++
nm.DeletionByteCounter = nm.DeletionByteCounter + uint64(oldSize) nm.DeletionByteCounter = nm.DeletionByteCounter + uint64(oldSize)
@ -84,16 +102,16 @@ func WalkIndexFile(r *os.File, fn func(key uint64, offset, size uint32) error) e
} }
func (nm *NeedleMap) Put(key uint64, offset uint32, size uint32) error { func (nm *NeedleMap) Put(key uint64, offset uint32, size uint32) error {
_, oldSize := nm.m.Set(Key(key), offset, size)
_, oldSize := nm.m.Set(needle.Key(key), offset, size)
nm.logPut(key, oldSize, size) nm.logPut(key, oldSize, size)
return nm.appendToIndexFile(key, offset, size) return nm.appendToIndexFile(key, offset, size)
} }
func (nm *NeedleMap) Get(key uint64) (element *NeedleValue, ok bool) {
element, ok = nm.m.Get(Key(key))
func (nm *NeedleMap) Get(key uint64) (element *needle.NeedleValue, ok bool) {
element, ok = nm.m.Get(needle.Key(key))
return return
} }
func (nm *NeedleMap) Delete(key uint64, offset uint32) error { func (nm *NeedleMap) Delete(key uint64, offset uint32) error {
deletedBytes := nm.m.Delete(Key(key))
deletedBytes := nm.m.Delete(needle.Key(key))
nm.logDelete(deletedBytes) nm.logDelete(deletedBytes)
return nm.appendToIndexFile(key, offset, TombstoneFileSize) return nm.appendToIndexFile(key, offset, TombstoneFileSize)
} }

15
weed/storage/volume_loading.go

@ -70,20 +70,25 @@ func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, needleMapKind
} }
switch needleMapKind { switch needleMapKind {
case NeedleMapInMemory: case NeedleMapInMemory:
glog.V(0).Infoln("loading index file", fileName+".idx", "readonly", v.readOnly)
if v.nm, e = LoadNeedleMap(indexFile); e != nil {
glog.V(0).Infof("loading index %s error: %v", fileName+".idx", e)
glog.V(0).Infoln("loading index", fileName+".idx", "to memory readonly", v.readOnly)
if v.nm, e = LoadCompactNeedleMap(indexFile); e != nil {
glog.V(0).Infof("loading index %s to memory error: %v", fileName+".idx", e)
} }
case NeedleMapLevelDb: case NeedleMapLevelDb:
glog.V(0).Infoln("loading leveldb file", fileName+".ldb")
glog.V(0).Infoln("loading leveldb", fileName+".ldb")
if v.nm, e = NewLevelDbNeedleMap(fileName+".ldb", indexFile); e != nil { if v.nm, e = NewLevelDbNeedleMap(fileName+".ldb", indexFile); e != nil {
glog.V(0).Infof("loading leveldb %s error: %v", fileName+".ldb", e) glog.V(0).Infof("loading leveldb %s error: %v", fileName+".ldb", e)
} }
case NeedleMapBoltDb: case NeedleMapBoltDb:
glog.V(0).Infoln("loading boltdb file", fileName+".bdb")
glog.V(0).Infoln("loading boltdb", fileName+".bdb")
if v.nm, e = NewBoltDbNeedleMap(fileName+".bdb", indexFile); e != nil { if v.nm, e = NewBoltDbNeedleMap(fileName+".bdb", indexFile); e != nil {
glog.V(0).Infof("loading boltdb %s error: %v", fileName+".bdb", e) glog.V(0).Infof("loading boltdb %s error: %v", fileName+".bdb", e)
} }
case NeedleMapBtree:
glog.V(0).Infoln("loading index", fileName+".idx", "to btree readonly", v.readOnly)
if v.nm, e = LoadBtreeNeedleMap(indexFile); e != nil {
glog.V(0).Infof("loading index %s to btree error: %v", fileName+".idx", e)
}
} }
} }

27
weed/storage/volume_sync.go

@ -11,6 +11,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
"github.com/chrislusf/seaweedfs/weed/util" "github.com/chrislusf/seaweedfs/weed/util"
) )
@ -48,7 +49,7 @@ optimized more later).
func (v *Volume) Synchronize(volumeServer string) (err error) { func (v *Volume) Synchronize(volumeServer string) (err error) {
var lastCompactRevision uint16 = 0 var lastCompactRevision uint16 = 0
var compactRevision uint16 = 0 var compactRevision uint16 = 0
var masterMap CompactMap
var masterMap *needle.CompactMap
for i := 0; i < 3; i++ { for i := 0; i < 3; i++ {
if masterMap, _, compactRevision, err = fetchVolumeFileEntries(volumeServer, v.Id); err != nil { if masterMap, _, compactRevision, err = fetchVolumeFileEntries(volumeServer, v.Id); err != nil {
return fmt.Errorf("Failed to sync volume %d entries with %s: %v", v.Id, volumeServer, err) return fmt.Errorf("Failed to sync volume %d entries with %s: %v", v.Id, volumeServer, err)
@ -69,7 +70,7 @@ func (v *Volume) Synchronize(volumeServer string) (err error) {
return return
} }
type ByOffset []NeedleValue
type ByOffset []needle.NeedleValue
func (a ByOffset) Len() int { return len(a) } func (a ByOffset) Len() int { return len(a) }
func (a ByOffset) Swap(i, j int) { a[i], a[j] = a[j], a[i] } func (a ByOffset) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
@ -77,18 +78,18 @@ func (a ByOffset) Less(i, j int) bool { return a[i].Offset < a[j].Offset }
// trySynchronizing sync with remote volume server incrementally by // trySynchronizing sync with remote volume server incrementally by
// make up the local and remote delta. // make up the local and remote delta.
func (v *Volume) trySynchronizing(volumeServer string, masterMap CompactMap, compactRevision uint16) error {
func (v *Volume) trySynchronizing(volumeServer string, masterMap *needle.CompactMap, compactRevision uint16) error {
slaveIdxFile, err := os.Open(v.nm.IndexFileName()) slaveIdxFile, err := os.Open(v.nm.IndexFileName())
if err != nil { if err != nil {
return fmt.Errorf("Open volume %d index file: %v", v.Id, err) return fmt.Errorf("Open volume %d index file: %v", v.Id, err)
} }
defer slaveIdxFile.Close() defer slaveIdxFile.Close()
slaveMap, err := LoadNeedleMap(slaveIdxFile)
slaveMap, err := LoadBtreeNeedleMap(slaveIdxFile)
if err != nil { if err != nil {
return fmt.Errorf("Load volume %d index file: %v", v.Id, err) return fmt.Errorf("Load volume %d index file: %v", v.Id, err)
} }
var delta []NeedleValue
if err := masterMap.Visit(func(needleValue NeedleValue) error {
var delta []needle.NeedleValue
if err := masterMap.Visit(func(needleValue needle.NeedleValue) error {
if needleValue.Key == 0 { if needleValue.Key == 0 {
return nil return nil
} }
@ -100,7 +101,7 @@ func (v *Volume) trySynchronizing(volumeServer string, masterMap CompactMap, com
}); err != nil { }); err != nil {
return fmt.Errorf("Add master entry: %v", err) return fmt.Errorf("Add master entry: %v", err)
} }
if err := slaveMap.m.Visit(func(needleValue NeedleValue) error {
if err := slaveMap.m.Visit(func(needleValue needle.NeedleValue) error {
if needleValue.Key == 0 { if needleValue.Key == 0 {
return nil return nil
} }
@ -137,8 +138,8 @@ func (v *Volume) trySynchronizing(volumeServer string, masterMap CompactMap, com
return nil return nil
} }
func fetchVolumeFileEntries(volumeServer string, vid VolumeId) (m CompactMap, lastOffset uint64, compactRevision uint16, err error) {
m = NewCompactMap()
func fetchVolumeFileEntries(volumeServer string, vid VolumeId) (m *needle.CompactMap, lastOffset uint64, compactRevision uint16, err error) {
m = needle.NewCompactMap()
syncStatus, err := operation.GetVolumeSyncStatus(volumeServer, vid.String()) syncStatus, err := operation.GetVolumeSyncStatus(volumeServer, vid.String())
if err != nil { if err != nil {
@ -149,9 +150,9 @@ func fetchVolumeFileEntries(volumeServer string, vid VolumeId) (m CompactMap, la
err = operation.GetVolumeIdxEntries(volumeServer, vid.String(), func(key uint64, offset, size uint32) { err = operation.GetVolumeIdxEntries(volumeServer, vid.String(), func(key uint64, offset, size uint32) {
// println("remote key", key, "offset", offset*NeedlePaddingSize, "size", size) // println("remote key", key, "offset", offset*NeedlePaddingSize, "size", size)
if offset > 0 && size != TombstoneFileSize { if offset > 0 && size != TombstoneFileSize {
m.Set(Key(key), offset, size)
m.Set(needle.Key(key), offset, size)
} else { } else {
m.Delete(Key(key))
m.Delete(needle.Key(key))
} }
total++ total++
}) })
@ -178,7 +179,7 @@ func (v *Volume) IndexFileContent() ([]byte, error) {
} }
// removeNeedle removes one needle by needle key // removeNeedle removes one needle by needle key
func (v *Volume) removeNeedle(key Key) {
func (v *Volume) removeNeedle(key needle.Key) {
n := new(Needle) n := new(Needle)
n.Id = uint64(key) n.Id = uint64(key)
v.deleteNeedle(n) v.deleteNeedle(n)
@ -188,7 +189,7 @@ func (v *Volume) removeNeedle(key Key) {
// The compact revision is checked first in case the remote volume // The compact revision is checked first in case the remote volume
// is compacted and the offset is invalid any more. // is compacted and the offset is invalid any more.
func (v *Volume) fetchNeedle(volumeDataContentHandlerUrl string, func (v *Volume) fetchNeedle(volumeDataContentHandlerUrl string,
needleValue NeedleValue, compactRevision uint16) error {
needleValue needle.NeedleValue, compactRevision uint16) error {
// add master file entry to local data file // add master file entry to local data file
values := make(url.Values) values := make(url.Values)
values.Add("revision", strconv.Itoa(int(compactRevision))) values.Add("revision", strconv.Itoa(int(compactRevision)))

4
weed/storage/volume_vacuum.go

@ -221,7 +221,7 @@ func (v *Volume) copyDataAndGenerateIndexFile(dstName, idxName string, prealloca
} }
defer idx.Close() defer idx.Close()
nm := NewNeedleMap(idx)
nm := NewBtreeNeedleMap(idx)
new_offset := int64(SuperBlockSize) new_offset := int64(SuperBlockSize)
now := uint64(time.Now().Unix()) now := uint64(time.Now().Unix())
@ -272,7 +272,7 @@ func (v *Volume) copyDataBasedOnIndexFile(dstName, idxName string) (err error) {
} }
defer oldIndexFile.Close() defer oldIndexFile.Close()
nm := NewNeedleMap(idx)
nm := NewBtreeNeedleMap(idx)
now := uint64(time.Now().Unix()) now := uint64(time.Now().Unix())
v.SuperBlock.CompactRevision++ v.SuperBlock.CompactRevision++

Loading…
Cancel
Save