diff --git a/weed/command/export.go b/weed/command/export.go index 5a7dc71d9..0f7496472 100644 --- a/weed/command/export.go +++ b/weed/command/export.go @@ -118,7 +118,7 @@ func runExport(cmd *Command, args []string) bool { } defer indexFile.Close() - needleMap, err := storage.LoadNeedleMap(indexFile) + needleMap, err := storage.LoadBtreeNeedleMap(indexFile) if err != nil { glog.Fatalf("cannot load needle map from %s: %s", indexFile.Name(), err) } diff --git a/weed/command/fix.go b/weed/command/fix.go index 22480dcd0..f3103c6c2 100644 --- a/weed/command/fix.go +++ b/weed/command/fix.go @@ -43,7 +43,7 @@ func runFix(cmd *Command, args []string) bool { } defer indexFile.Close() - nm := storage.NewNeedleMap(indexFile) + nm := storage.NewBtreeNeedleMap(indexFile) defer nm.Close() vid := storage.VolumeId(*fixVolumeId) diff --git a/weed/command/server.go b/weed/command/server.go index e1152f23f..ad6916b8f 100644 --- a/weed/command/server.go +++ b/weed/command/server.go @@ -72,7 +72,7 @@ var ( volumeDataFolders = cmdServer.Flag.String("dir", os.TempDir(), "directories to store data files. dir[,dir]...") volumeMaxDataVolumeCounts = cmdServer.Flag.String("volume.max", "7", "maximum numbers of volumes, count[,count]...") volumePulse = cmdServer.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats") - volumeIndexType = cmdServer.Flag.String("volume.index", "memory", "Choose [memory|leveldb|boltdb] mode for memory~performance balance.") + volumeIndexType = cmdServer.Flag.String("volume.index", "memory", "Choose [memory|leveldb|boltdb|btree] mode for memory~performance balance.") volumeFixJpgOrientation = cmdServer.Flag.Bool("volume.images.fix.orientation", true, "Adjust jpg orientation when uploading.") volumeReadRedirect = cmdServer.Flag.Bool("volume.read.redirect", true, "Redirect moved or non-local volumes.") volumeServerPublicUrl = cmdServer.Flag.String("volume.publicUrl", "", "publicly accessible address") @@ -276,6 +276,8 @@ func runServer(cmd *Command, args []string) bool { volumeNeedleMapKind = storage.NeedleMapLevelDb case "boltdb": volumeNeedleMapKind = storage.NeedleMapBoltDb + case "btree": + volumeNeedleMapKind = storage.NeedleMapBtree } volumeServer := weed_server.NewVolumeServer(volumeMux, publicVolumeMux, *serverIp, *volumePort, *volumeServerPublicUrl, diff --git a/weed/command/volume.go b/weed/command/volume.go index 0e69325b6..a4e316ecb 100644 --- a/weed/command/volume.go +++ b/weed/command/volume.go @@ -52,7 +52,7 @@ func init() { v.maxCpu = cmdVolume.Flag.Int("maxCpu", 0, "maximum number of CPUs. 0 means all available CPUs") v.dataCenter = cmdVolume.Flag.String("dataCenter", "", "current volume server's data center name") v.rack = cmdVolume.Flag.String("rack", "", "current volume server's rack name") - v.indexType = cmdVolume.Flag.String("index", "memory", "Choose [memory|leveldb|boltdb] mode for memory~performance balance.") + v.indexType = cmdVolume.Flag.String("index", "memory", "Choose [memory|leveldb|boltdb|btree] mode for memory~performance balance.") v.fixJpgOrientation = cmdVolume.Flag.Bool("images.fix.orientation", true, "Adjust jpg orientation when uploading.") v.readRedirect = cmdVolume.Flag.Bool("read.redirect", true, "Redirect moved or non-local volumes.") v.enableBytesCache = cmdVolume.Flag.Bool("cache.enable", false, "direct cache instead of OS cache, cost more memory.") @@ -126,6 +126,8 @@ func runVolume(cmd *Command, args []string) bool { volumeNeedleMapKind = storage.NeedleMapLevelDb case "boltdb": volumeNeedleMapKind = storage.NeedleMapBoltDb + case "btree": + volumeNeedleMapKind = storage.NeedleMapBtree } volumeServer := weed_server.NewVolumeServer(volumeMux, publicVolumeMux, *v.ip, *v.port, *v.publicUrl, diff --git a/weed/storage/needle/btree_map.go b/weed/storage/needle/btree_map.go new file mode 100644 index 000000000..64c0bacc1 --- /dev/null +++ b/weed/storage/needle/btree_map.go @@ -0,0 +1,52 @@ +package needle + +import ( + "github.com/google/btree" +) + +//This map assumes mostly inserting increasing keys +type BtreeMap struct { + tree *btree.BTree +} + +func NewBtreeMap() *BtreeMap { + return &BtreeMap{ + tree: btree.New(32), + } +} + +func (cm *BtreeMap) Set(key Key, offset, size uint32) (oldOffset, oldSize uint32) { + found := cm.tree.ReplaceOrInsert(NeedleValue{key, offset, size}) + if found != nil { + old := found.(NeedleValue) + return old.Offset, old.Size + } + return +} + +func (cm *BtreeMap) Delete(key Key) (oldSize uint32) { + found := cm.tree.Delete(NeedleValue{key, 0, 0}) + if found != nil { + old := found.(NeedleValue) + return old.Size + } + return +} +func (cm *BtreeMap) Get(key Key) (*NeedleValue, bool) { + found := cm.tree.Get(NeedleValue{key, 0, 0}) + if found != nil { + old := found.(NeedleValue) + return &old, true + } + return nil, false +} + +// Visit visits all entries or stop if any error when visiting +func (cm *BtreeMap) Visit(visit func(NeedleValue) error) (ret error) { + cm.tree.Ascend(func(item btree.Item) bool { + needle := item.(NeedleValue) + ret = visit(needle) + return ret == nil + }) + return ret +} diff --git a/weed/storage/needle/compact_map.go b/weed/storage/needle/compact_map.go new file mode 100644 index 000000000..ea2360fa7 --- /dev/null +++ b/weed/storage/needle/compact_map.go @@ -0,0 +1,194 @@ +package needle + +import ( + "sync" +) + +type CompactSection struct { + sync.RWMutex + values []NeedleValue + overflow map[Key]NeedleValue + start Key + end Key + counter int +} + +func NewCompactSection(start Key) *CompactSection { + return &CompactSection{ + values: make([]NeedleValue, batch), + overflow: make(map[Key]NeedleValue), + start: start, + } +} + +//return old entry size +func (cs *CompactSection) Set(key Key, offset, size uint32) (oldOffset, oldSize uint32) { + cs.Lock() + if key > cs.end { + cs.end = key + } + if i := cs.binarySearchValues(key); i >= 0 { + oldOffset, oldSize = cs.values[i].Offset, cs.values[i].Size + //println("key", key, "old size", ret) + cs.values[i].Offset, cs.values[i].Size = offset, size + } else { + needOverflow := cs.counter >= batch + needOverflow = needOverflow || cs.counter > 0 && cs.values[cs.counter-1].Key > key + if needOverflow { + //println("start", cs.start, "counter", cs.counter, "key", key) + if oldValue, found := cs.overflow[key]; found { + oldOffset, oldSize = oldValue.Offset, oldValue.Size + } + cs.overflow[key] = NeedleValue{Key: key, Offset: offset, Size: size} + } else { + p := &cs.values[cs.counter] + p.Key, p.Offset, p.Size = key, offset, size + //println("added index", cs.counter, "key", key, cs.values[cs.counter].Key) + cs.counter++ + } + } + cs.Unlock() + return +} + +//return old entry size +func (cs *CompactSection) Delete(key Key) uint32 { + cs.Lock() + ret := uint32(0) + if i := cs.binarySearchValues(key); i >= 0 { + if cs.values[i].Size > 0 { + ret = cs.values[i].Size + cs.values[i].Size = 0 + } + } + if v, found := cs.overflow[key]; found { + delete(cs.overflow, key) + ret = v.Size + } + cs.Unlock() + return ret +} +func (cs *CompactSection) Get(key Key) (*NeedleValue, bool) { + cs.RLock() + if v, ok := cs.overflow[key]; ok { + cs.RUnlock() + return &v, true + } + if i := cs.binarySearchValues(key); i >= 0 { + cs.RUnlock() + return &cs.values[i], true + } + cs.RUnlock() + return nil, false +} +func (cs *CompactSection) binarySearchValues(key Key) int { + l, h := 0, cs.counter-1 + if h >= 0 && cs.values[h].Key < key { + return -2 + } + //println("looking for key", key) + for l <= h { + m := (l + h) / 2 + //println("mid", m, "key", cs.values[m].Key, cs.values[m].Offset, cs.values[m].Size) + if cs.values[m].Key < key { + l = m + 1 + } else if key < cs.values[m].Key { + h = m - 1 + } else { + //println("found", m) + return m + } + } + return -1 +} + +//This map assumes mostly inserting increasing keys +//This map assumes mostly inserting increasing keys +type CompactMap struct { + list []*CompactSection +} + +func NewCompactMap() *CompactMap { + return &CompactMap{} +} + +func (cm *CompactMap) Set(key Key, offset, size uint32) (oldOffset, oldSize uint32) { + x := cm.binarySearchCompactSection(key) + if x < 0 { + //println(x, "creating", len(cm.list), "section, starting", key) + cm.list = append(cm.list, NewCompactSection(key)) + x = len(cm.list) - 1 + //keep compact section sorted by start + for x > 0 { + if cm.list[x-1].start > cm.list[x].start { + cm.list[x-1], cm.list[x] = cm.list[x], cm.list[x-1] + x = x - 1 + } else { + break + } + } + } + return cm.list[x].Set(key, offset, size) +} +func (cm *CompactMap) Delete(key Key) uint32 { + x := cm.binarySearchCompactSection(key) + if x < 0 { + return uint32(0) + } + return cm.list[x].Delete(key) +} +func (cm *CompactMap) Get(key Key) (*NeedleValue, bool) { + x := cm.binarySearchCompactSection(key) + if x < 0 { + return nil, false + } + return cm.list[x].Get(key) +} +func (cm *CompactMap) binarySearchCompactSection(key Key) int { + l, h := 0, len(cm.list)-1 + if h < 0 { + return -5 + } + if cm.list[h].start <= key { + if cm.list[h].counter < batch || key <= cm.list[h].end { + return h + } + return -4 + } + for l <= h { + m := (l + h) / 2 + if key < cm.list[m].start { + h = m - 1 + } else { // cm.list[m].start <= key + if cm.list[m+1].start <= key { + l = m + 1 + } else { + return m + } + } + } + return -3 +} + +// Visit visits all entries or stop if any error when visiting +func (cm *CompactMap) Visit(visit func(NeedleValue) error) error { + for _, cs := range cm.list { + cs.RLock() + for _, v := range cs.overflow { + if err := visit(v); err != nil { + cs.RUnlock() + return err + } + } + for _, v := range cs.values { + if _, found := cs.overflow[v.Key]; !found { + if err := visit(v); err != nil { + cs.RUnlock() + return err + } + } + } + cs.RUnlock() + } + return nil +} diff --git a/weed/storage/compact_map_perf_test.go b/weed/storage/needle/compact_map_perf_test.go similarity index 79% rename from weed/storage/compact_map_perf_test.go rename to weed/storage/needle/compact_map_perf_test.go index cc7669139..8a26e7ed3 100644 --- a/weed/storage/compact_map_perf_test.go +++ b/weed/storage/needle/compact_map_perf_test.go @@ -1,4 +1,4 @@ -package storage +package needle import ( "log" @@ -11,15 +11,15 @@ import ( func TestMemoryUsage(t *testing.T) { - indexFile, ie := os.OpenFile("../../test/sample.idx", os.O_RDWR|os.O_RDONLY, 0644) + indexFile, ie := os.OpenFile("../../../test/sample.idx", os.O_RDWR|os.O_RDONLY, 0644) if ie != nil { log.Fatalln(ie) } - LoadNewNeedleMap(indexFile) + loadNewNeedleMap(indexFile) } -func LoadNewNeedleMap(file *os.File) CompactMap { +func loadNewNeedleMap(file *os.File) { m := NewCompactMap() bytes := make([]byte, 16*1024) count, e := file.Read(bytes) @@ -41,5 +41,4 @@ func LoadNewNeedleMap(file *os.File) CompactMap { count, e = file.Read(bytes) } - return m } diff --git a/weed/storage/compact_map_test.go b/weed/storage/needle/compact_map_test.go similarity index 99% rename from weed/storage/compact_map_test.go rename to weed/storage/needle/compact_map_test.go index 1ccb48edb..4d574bafe 100644 --- a/weed/storage/compact_map_test.go +++ b/weed/storage/needle/compact_map_test.go @@ -1,4 +1,4 @@ -package storage +package needle import ( "testing" diff --git a/weed/storage/needle/needle_value.go b/weed/storage/needle/needle_value.go new file mode 100644 index 000000000..137ab0814 --- /dev/null +++ b/weed/storage/needle/needle_value.go @@ -0,0 +1,28 @@ +package needle + +import ( + "strconv" + + "github.com/google/btree" +) + +const ( + batch = 100000 +) + +type NeedleValue struct { + Key Key + Offset uint32 `comment:"Volume offset"` //since aligned to 8 bytes, range is 4G*8=32G + Size uint32 `comment:"Size of the data portion"` +} + +func (this NeedleValue) Less(than btree.Item) bool { + that := than.(NeedleValue) + return this.Key < that.Key +} + +type Key uint64 + +func (k Key) String() string { + return strconv.FormatUint(uint64(k), 10) +} diff --git a/weed/storage/needle/needle_value_map.go b/weed/storage/needle/needle_value_map.go new file mode 100644 index 000000000..81f41b235 --- /dev/null +++ b/weed/storage/needle/needle_value_map.go @@ -0,0 +1,8 @@ +package needle + +type NeedleValueMap interface { + Set(key Key, offset, size uint32) (oldOffset, oldSize uint32) + Delete(key Key) uint32 + Get(key Key) (*NeedleValue, bool) + Visit(visit func(NeedleValue) error) error +} diff --git a/weed/storage/needle_map.go b/weed/storage/needle_map.go index 15a0387c5..14e4ccf3a 100644 --- a/weed/storage/needle_map.go +++ b/weed/storage/needle_map.go @@ -6,6 +6,7 @@ import ( "os" "sync" + "github.com/chrislusf/seaweedfs/weed/storage/needle" "github.com/chrislusf/seaweedfs/weed/util" ) @@ -15,6 +16,7 @@ const ( NeedleMapInMemory NeedleMapType = iota NeedleMapLevelDb NeedleMapBoltDb + NeedleMapBtree ) const ( @@ -23,7 +25,7 @@ const ( type NeedleMapper interface { Put(key uint64, offset uint32, size uint32) error - Get(key uint64) (element *NeedleValue, ok bool) + Get(key uint64) (element *needle.NeedleValue, ok bool) Delete(key uint64, offset uint32) error Close() Destroy() error diff --git a/weed/storage/needle_map_boltdb.go b/weed/storage/needle_map_boltdb.go index e131ea822..cbcc786af 100644 --- a/weed/storage/needle_map_boltdb.go +++ b/weed/storage/needle_map_boltdb.go @@ -7,6 +7,7 @@ import ( "github.com/boltdb/bolt" "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/storage/needle" "github.com/chrislusf/seaweedfs/weed/util" ) @@ -31,7 +32,7 @@ func NewBoltDbNeedleMap(dbFileName string, indexFile *os.File) (m *BoltDbNeedleM return } glog.V(1).Infof("Loading %s...", indexFile.Name()) - nm, indexLoadError := LoadNeedleMap(indexFile) + nm, indexLoadError := LoadBtreeNeedleMap(indexFile) if indexLoadError != nil { return nil, indexLoadError } @@ -72,7 +73,7 @@ func generateBoltDbFile(dbFileName string, indexFile *os.File) error { }) } -func (m *BoltDbNeedleMap) Get(key uint64) (element *NeedleValue, ok bool) { +func (m *BoltDbNeedleMap) Get(key uint64) (element *needle.NeedleValue, ok bool) { bytes := make([]byte, 8) var data []byte util.Uint64toBytes(bytes, key) @@ -91,7 +92,7 @@ func (m *BoltDbNeedleMap) Get(key uint64) (element *NeedleValue, ok bool) { } offset := util.BytesToUint32(data[0:4]) size := util.BytesToUint32(data[4:8]) - return &NeedleValue{Key: Key(key), Offset: offset, Size: size}, true + return &needle.NeedleValue{Key: needle.Key(key), Offset: offset, Size: size}, true } func (m *BoltDbNeedleMap) Put(key uint64, offset uint32, size uint32) error { diff --git a/weed/storage/needle_map_leveldb.go b/weed/storage/needle_map_leveldb.go index f025ea360..bd2c7c886 100644 --- a/weed/storage/needle_map_leveldb.go +++ b/weed/storage/needle_map_leveldb.go @@ -6,6 +6,7 @@ import ( "path/filepath" "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/storage/needle" "github.com/chrislusf/seaweedfs/weed/util" "github.com/syndtr/goleveldb/leveldb" ) @@ -29,7 +30,7 @@ func NewLevelDbNeedleMap(dbFileName string, indexFile *os.File) (m *LevelDbNeedl return } glog.V(1).Infof("Loading %s...", indexFile.Name()) - nm, indexLoadError := LoadNeedleMap(indexFile) + nm, indexLoadError := LoadBtreeNeedleMap(indexFile) if indexLoadError != nil { return nil, indexLoadError } @@ -70,7 +71,7 @@ func generateLevelDbFile(dbFileName string, indexFile *os.File) error { }) } -func (m *LevelDbNeedleMap) Get(key uint64) (element *NeedleValue, ok bool) { +func (m *LevelDbNeedleMap) Get(key uint64) (element *needle.NeedleValue, ok bool) { bytes := make([]byte, 8) util.Uint64toBytes(bytes, key) data, err := m.db.Get(bytes, nil) @@ -79,7 +80,7 @@ func (m *LevelDbNeedleMap) Get(key uint64) (element *NeedleValue, ok bool) { } offset := util.BytesToUint32(data[0:4]) size := util.BytesToUint32(data[4:8]) - return &NeedleValue{Key: Key(key), Offset: offset, Size: size}, true + return &needle.NeedleValue{Key: needle.Key(key), Offset: offset, Size: size}, true } func (m *LevelDbNeedleMap) Put(key uint64, offset uint32, size uint32) error { diff --git a/weed/storage/needle_map_memory.go b/weed/storage/needle_map_memory.go index ccbb21317..f34a57849 100644 --- a/weed/storage/needle_map_memory.go +++ b/weed/storage/needle_map_memory.go @@ -5,17 +5,26 @@ import ( "os" "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/storage/needle" ) type NeedleMap struct { - m CompactMap + m needle.NeedleValueMap baseNeedleMapper } -func NewNeedleMap(file *os.File) *NeedleMap { +func NewCompactNeedleMap(file *os.File) *NeedleMap { nm := &NeedleMap{ - m: NewCompactMap(), + m: needle.NewCompactMap(), + } + nm.indexFile = file + return nm +} + +func NewBtreeNeedleMap(file *os.File) *NeedleMap { + nm := &NeedleMap{ + m: needle.NewBtreeMap(), } nm.indexFile = file return nm @@ -25,8 +34,17 @@ const ( RowsToRead = 1024 ) -func LoadNeedleMap(file *os.File) (*NeedleMap, error) { - nm := NewNeedleMap(file) +func LoadCompactNeedleMap(file *os.File) (*NeedleMap, error) { + nm := NewCompactNeedleMap(file) + return doLoading(file, nm) +} + +func LoadBtreeNeedleMap(file *os.File) (*NeedleMap, error) { + nm := NewBtreeNeedleMap(file) + return doLoading(file, nm) +} + +func doLoading(file *os.File, nm *NeedleMap) (*NeedleMap, error) { e := WalkIndexFile(file, func(key uint64, offset, size uint32) error { if key > nm.MaximumFileKey { nm.MaximumFileKey = key @@ -34,14 +52,14 @@ func LoadNeedleMap(file *os.File) (*NeedleMap, error) { if offset > 0 && size != TombstoneFileSize { nm.FileCounter++ nm.FileByteCounter = nm.FileByteCounter + uint64(size) - oldOffset, oldSize := nm.m.Set(Key(key), offset, size) + oldOffset, oldSize := nm.m.Set(needle.Key(key), offset, size) glog.V(3).Infoln("reading key", key, "offset", offset*NeedlePaddingSize, "size", size, "oldSize", oldSize) if oldOffset > 0 && oldSize != TombstoneFileSize { nm.DeletionCounter++ nm.DeletionByteCounter = nm.DeletionByteCounter + uint64(oldSize) } } else { - oldSize := nm.m.Delete(Key(key)) + oldSize := nm.m.Delete(needle.Key(key)) glog.V(3).Infoln("removing key", key, "offset", offset*NeedlePaddingSize, "size", size, "oldSize", oldSize) nm.DeletionCounter++ nm.DeletionByteCounter = nm.DeletionByteCounter + uint64(oldSize) @@ -84,16 +102,16 @@ func WalkIndexFile(r *os.File, fn func(key uint64, offset, size uint32) error) e } func (nm *NeedleMap) Put(key uint64, offset uint32, size uint32) error { - _, oldSize := nm.m.Set(Key(key), offset, size) + _, oldSize := nm.m.Set(needle.Key(key), offset, size) nm.logPut(key, oldSize, size) return nm.appendToIndexFile(key, offset, size) } -func (nm *NeedleMap) Get(key uint64) (element *NeedleValue, ok bool) { - element, ok = nm.m.Get(Key(key)) +func (nm *NeedleMap) Get(key uint64) (element *needle.NeedleValue, ok bool) { + element, ok = nm.m.Get(needle.Key(key)) return } func (nm *NeedleMap) Delete(key uint64, offset uint32) error { - deletedBytes := nm.m.Delete(Key(key)) + deletedBytes := nm.m.Delete(needle.Key(key)) nm.logDelete(deletedBytes) return nm.appendToIndexFile(key, offset, TombstoneFileSize) } diff --git a/weed/storage/volume_loading.go b/weed/storage/volume_loading.go index 4be860987..457d50410 100644 --- a/weed/storage/volume_loading.go +++ b/weed/storage/volume_loading.go @@ -70,20 +70,25 @@ func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, needleMapKind } switch needleMapKind { case NeedleMapInMemory: - glog.V(0).Infoln("loading index file", fileName+".idx", "readonly", v.readOnly) - if v.nm, e = LoadNeedleMap(indexFile); e != nil { - glog.V(0).Infof("loading index %s error: %v", fileName+".idx", e) + glog.V(0).Infoln("loading index", fileName+".idx", "to memory readonly", v.readOnly) + if v.nm, e = LoadCompactNeedleMap(indexFile); e != nil { + glog.V(0).Infof("loading index %s to memory error: %v", fileName+".idx", e) } case NeedleMapLevelDb: - glog.V(0).Infoln("loading leveldb file", fileName+".ldb") + glog.V(0).Infoln("loading leveldb", fileName+".ldb") if v.nm, e = NewLevelDbNeedleMap(fileName+".ldb", indexFile); e != nil { glog.V(0).Infof("loading leveldb %s error: %v", fileName+".ldb", e) } case NeedleMapBoltDb: - glog.V(0).Infoln("loading boltdb file", fileName+".bdb") + glog.V(0).Infoln("loading boltdb", fileName+".bdb") if v.nm, e = NewBoltDbNeedleMap(fileName+".bdb", indexFile); e != nil { glog.V(0).Infof("loading boltdb %s error: %v", fileName+".bdb", e) } + case NeedleMapBtree: + glog.V(0).Infoln("loading index", fileName+".idx", "to btree readonly", v.readOnly) + if v.nm, e = LoadBtreeNeedleMap(indexFile); e != nil { + glog.V(0).Infof("loading index %s to btree error: %v", fileName+".idx", e) + } } } diff --git a/weed/storage/volume_sync.go b/weed/storage/volume_sync.go index 23d8db510..d7cae8803 100644 --- a/weed/storage/volume_sync.go +++ b/weed/storage/volume_sync.go @@ -11,6 +11,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/operation" + "github.com/chrislusf/seaweedfs/weed/storage/needle" "github.com/chrislusf/seaweedfs/weed/util" ) @@ -48,7 +49,7 @@ optimized more later). func (v *Volume) Synchronize(volumeServer string) (err error) { var lastCompactRevision uint16 = 0 var compactRevision uint16 = 0 - var masterMap CompactMap + var masterMap *needle.CompactMap for i := 0; i < 3; i++ { if masterMap, _, compactRevision, err = fetchVolumeFileEntries(volumeServer, v.Id); err != nil { return fmt.Errorf("Failed to sync volume %d entries with %s: %v", v.Id, volumeServer, err) @@ -69,7 +70,7 @@ func (v *Volume) Synchronize(volumeServer string) (err error) { return } -type ByOffset []NeedleValue +type ByOffset []needle.NeedleValue func (a ByOffset) Len() int { return len(a) } func (a ByOffset) Swap(i, j int) { a[i], a[j] = a[j], a[i] } @@ -77,18 +78,18 @@ func (a ByOffset) Less(i, j int) bool { return a[i].Offset < a[j].Offset } // trySynchronizing sync with remote volume server incrementally by // make up the local and remote delta. -func (v *Volume) trySynchronizing(volumeServer string, masterMap CompactMap, compactRevision uint16) error { +func (v *Volume) trySynchronizing(volumeServer string, masterMap *needle.CompactMap, compactRevision uint16) error { slaveIdxFile, err := os.Open(v.nm.IndexFileName()) if err != nil { return fmt.Errorf("Open volume %d index file: %v", v.Id, err) } defer slaveIdxFile.Close() - slaveMap, err := LoadNeedleMap(slaveIdxFile) + slaveMap, err := LoadBtreeNeedleMap(slaveIdxFile) if err != nil { return fmt.Errorf("Load volume %d index file: %v", v.Id, err) } - var delta []NeedleValue - if err := masterMap.Visit(func(needleValue NeedleValue) error { + var delta []needle.NeedleValue + if err := masterMap.Visit(func(needleValue needle.NeedleValue) error { if needleValue.Key == 0 { return nil } @@ -100,7 +101,7 @@ func (v *Volume) trySynchronizing(volumeServer string, masterMap CompactMap, com }); err != nil { return fmt.Errorf("Add master entry: %v", err) } - if err := slaveMap.m.Visit(func(needleValue NeedleValue) error { + if err := slaveMap.m.Visit(func(needleValue needle.NeedleValue) error { if needleValue.Key == 0 { return nil } @@ -137,8 +138,8 @@ func (v *Volume) trySynchronizing(volumeServer string, masterMap CompactMap, com return nil } -func fetchVolumeFileEntries(volumeServer string, vid VolumeId) (m CompactMap, lastOffset uint64, compactRevision uint16, err error) { - m = NewCompactMap() +func fetchVolumeFileEntries(volumeServer string, vid VolumeId) (m *needle.CompactMap, lastOffset uint64, compactRevision uint16, err error) { + m = needle.NewCompactMap() syncStatus, err := operation.GetVolumeSyncStatus(volumeServer, vid.String()) if err != nil { @@ -149,9 +150,9 @@ func fetchVolumeFileEntries(volumeServer string, vid VolumeId) (m CompactMap, la err = operation.GetVolumeIdxEntries(volumeServer, vid.String(), func(key uint64, offset, size uint32) { // println("remote key", key, "offset", offset*NeedlePaddingSize, "size", size) if offset > 0 && size != TombstoneFileSize { - m.Set(Key(key), offset, size) + m.Set(needle.Key(key), offset, size) } else { - m.Delete(Key(key)) + m.Delete(needle.Key(key)) } total++ }) @@ -178,7 +179,7 @@ func (v *Volume) IndexFileContent() ([]byte, error) { } // removeNeedle removes one needle by needle key -func (v *Volume) removeNeedle(key Key) { +func (v *Volume) removeNeedle(key needle.Key) { n := new(Needle) n.Id = uint64(key) v.deleteNeedle(n) @@ -188,7 +189,7 @@ func (v *Volume) removeNeedle(key Key) { // The compact revision is checked first in case the remote volume // is compacted and the offset is invalid any more. func (v *Volume) fetchNeedle(volumeDataContentHandlerUrl string, - needleValue NeedleValue, compactRevision uint16) error { + needleValue needle.NeedleValue, compactRevision uint16) error { // add master file entry to local data file values := make(url.Values) values.Add("revision", strconv.Itoa(int(compactRevision))) diff --git a/weed/storage/volume_vacuum.go b/weed/storage/volume_vacuum.go index 07916fe6b..f6f68d59b 100644 --- a/weed/storage/volume_vacuum.go +++ b/weed/storage/volume_vacuum.go @@ -221,7 +221,7 @@ func (v *Volume) copyDataAndGenerateIndexFile(dstName, idxName string, prealloca } defer idx.Close() - nm := NewNeedleMap(idx) + nm := NewBtreeNeedleMap(idx) new_offset := int64(SuperBlockSize) now := uint64(time.Now().Unix()) @@ -272,7 +272,7 @@ func (v *Volume) copyDataBasedOnIndexFile(dstName, idxName string) (err error) { } defer oldIndexFile.Close() - nm := NewNeedleMap(idx) + nm := NewBtreeNeedleMap(idx) now := uint64(time.Now().Unix()) v.SuperBlock.CompactRevision++