diff --git a/weed-fs/src/pkg/storage/cdb_map.go b/weed-fs/src/pkg/storage/cdb_map.go new file mode 100644 index 000000000..b80b3cde9 --- /dev/null +++ b/weed-fs/src/pkg/storage/cdb_map.go @@ -0,0 +1,107 @@ +package storage + +import ( + "github.com/tgulacsi/go-cdb" + "io" + "log" + "os" + "pkg/util" + "strings" +) + +type CdbMap struct { + db *cdb.Cdb + transient []byte + Filename string +} + +// Opens the CDB file and servers as a needle map +func NewCdbMap(filename string) (*CdbMap, error) { + m, err := cdb.Open(filename) + if err != nil { + return nil, err + } + return &CdbMap{db: m, transient: make([]byte, 8), + Filename: filename}, nil +} + +// writes the content of the index file to a CDB and returns that +func NewCdbMapFromIndex(indexFile *os.File) (*CdbMap, error) { + nm := indexFile.Name() + nm = nm[strings.LastIndex(nm, ".")+1:] + "cdb" + + var ( + key uint64 + offset uint32 + ok bool + ) + deleted := make(map[uint64]bool, 16) + gatherDeletes := func(buf []byte) error { + key = util.BytesToUint64(buf[:8]) + offset = util.BytesToUint32(buf[8:12]) + if offset > 0 { + if _, ok = deleted[key]; ok { //undelete + delete(deleted, key) + } + } else { + deleted[key] = true + } + return nil + } + if err := readIndexFile(indexFile, gatherDeletes); err != nil { + return nil, err + } + + w, err := cdb.NewWriter(nm) + if err != nil { + return nil, err + } + iterFun := func(buf []byte) error { + key = util.BytesToUint64(buf[:8]) + if _, ok = deleted[key]; !ok { + w.PutPair(buf[:8], buf[8:16]) + } + return nil + } + indexFile.Seek(0, 0) + err = readIndexFile(indexFile, iterFun) + w.Close() + if err != nil { + return nil, err + } + + return NewCdbMap(nm) +} + +func (m *CdbMap) Get(key Key) (element *NeedleValue, ok bool) { + util.Uint64toBytes(m.transient, uint64(key)) + data, err := m.db.Data(m.transient) + if err != nil { + if err == io.EOF { + return nil, false + } + log.Printf("error getting %s: %s", key, err) + return nil, false + } + return &NeedleValue{Key: key, + Offset: util.BytesToUint32(data[:4]), + Size: util.BytesToUint32(data[4:8]), + }, true +} + +func (m *CdbMap) Walk(pedestrian func(*NeedleValue) error) (err error) { + r, err := os.Open(m.Filename) + if err != nil { + return err + } + defer r.Close() + + iterFunc := func(elt cdb.Element) error { + return pedestrian(&NeedleValue{ + Key: Key(util.BytesToUint64(elt.Key[:8])), + Offset: util.BytesToUint32(elt.Data[:4]), + Size: util.BytesToUint32(elt.Data[4:8]), + }) + } + return cdb.DumpMap(r, iterFunc) +} diff --git a/weed-fs/src/pkg/storage/compact_map.go b/weed-fs/src/pkg/storage/compact_map.go index 90ed42198..61cc2c841 100644 --- a/weed-fs/src/pkg/storage/compact_map.go +++ b/weed-fs/src/pkg/storage/compact_map.go @@ -109,8 +109,8 @@ type CompactMap struct { list []CompactSection } -func NewCompactMap() CompactMap { - return CompactMap{} +func NewCompactMap() *CompactMap { + return &CompactMap{} } func (cm *CompactMap) Set(key Key, offset uint32, size uint32) uint32 { diff --git a/weed-fs/src/pkg/storage/needle_map.go b/weed-fs/src/pkg/storage/needle_map.go index 9ce38c1f2..9d7369509 100644 --- a/weed-fs/src/pkg/storage/needle_map.go +++ b/weed-fs/src/pkg/storage/needle_map.go @@ -1,6 +1,7 @@ package storage import ( + "io" "log" "os" "pkg/util" @@ -8,7 +9,8 @@ import ( type NeedleMap struct { indexFile *os.File - m CompactMap + m MapGetSetter // modifiable map + fm MapGetter // frozen map //transient bytes []byte @@ -19,52 +21,106 @@ type NeedleMap struct { fileByteCounter uint64 } +// Map interface for frozen maps +type MapGetter interface { + Get(key Key) (element *NeedleValue, ok bool) + Walk(pedestrian func(*NeedleValue) error) error +} + +// Modifiable map interface +type MapSetter interface { + Set(key Key, offset, size uint32) (oldsize uint32) + Delete(key Key) uint32 +} + +// Settable and gettable map +type MapGetSetter interface { + MapGetter + MapSetter +} + +// New in-memory needle map, backed by "file" index file func NewNeedleMap(file *os.File) *NeedleMap { - nm := &NeedleMap{ + return &NeedleMap{ m: NewCompactMap(), bytes: make([]byte, 16), indexFile: file, } - return nm +} + +// Nes frozen (on-disk, not modifiable(!)) needle map +func NewFrozenNeedleMap(file *os.File) (*NeedleMap, error) { + fm, err := NewCdbMapFromIndex(file) + if err != nil { + return nil, err + } + return &NeedleMap{ + fm: fm, + bytes: make([]byte, 16), + }, nil } const ( RowsToRead = 1024 ) -func LoadNeedleMap(file *os.File) *NeedleMap { +func LoadNeedleMap(file *os.File) (*NeedleMap, error) { nm := NewNeedleMap(file) - bytes := make([]byte, 16*RowsToRead) - count, e := nm.indexFile.Read(bytes) - if count > 0 { - fstat, _ := file.Stat() - log.Println("Loading index file", fstat.Name(), "size", fstat.Size()) + + var ( + key uint64 + offset, size, oldSize uint32 + ) + iterFun := func(buf []byte) error { + key = util.BytesToUint64(buf[:8]) + offset = util.BytesToUint32(buf[8:12]) + size = util.BytesToUint32(buf[12:16]) + nm.fileCounter++ + nm.fileByteCounter = nm.fileByteCounter + uint64(size) + if offset > 0 { + oldSize = nm.m.Set(Key(key), offset, size) + //log.Println("reading key", key, "offset", offset, "size", size, "oldSize", oldSize) + if oldSize > 0 { + nm.deletionCounter++ + nm.deletionByteCounter = nm.deletionByteCounter + uint64(oldSize) + } + } else { + nm.m.Delete(Key(key)) + //log.Println("removing key", key) + nm.deletionCounter++ + nm.deletionByteCounter = nm.deletionByteCounter + uint64(size) + } + + return nil + } + if err := readIndexFile(file, iterFun); err != nil { + return nil, err + } + return nm, nil +} + +// calls iterFun with each row (raw 16 bytes) +func readIndexFile(indexFile *os.File, iterFun func([]byte) error) error { + buf := make([]byte, 16*RowsToRead) + count, e := io.ReadAtLeast(indexFile, buf, 16) + if e != nil && count > 0 { + fstat, err := indexFile.Stat() + if err != nil { + log.Println("ERROR stating %s: %s", indexFile, err) + } else { + log.Println("Loading index file", fstat.Name(), "size", fstat.Size()) + } } for count > 0 && e == nil { for i := 0; i < count; i += 16 { - key := util.BytesToUint64(bytes[i : i+8]) - offset := util.BytesToUint32(bytes[i+8 : i+12]) - size := util.BytesToUint32(bytes[i+12 : i+16]) - nm.fileCounter++ - nm.fileByteCounter = nm.fileByteCounter + uint64(size) - if offset > 0 { - oldSize := nm.m.Set(Key(key), offset, size) - //log.Println("reading key", key, "offset", offset, "size", size, "oldSize", oldSize) - if oldSize > 0 { - nm.deletionCounter++ - nm.deletionByteCounter = nm.deletionByteCounter + uint64(oldSize) - } - } else { - nm.m.Delete(Key(key)) - //log.Println("removing key", key) - nm.deletionCounter++ - nm.deletionByteCounter = nm.deletionByteCounter + uint64(size) + if e = iterFun(buf[i : i+16]); e != nil { + return e } } - count, e = nm.indexFile.Read(bytes) + count, e = io.ReadAtLeast(indexFile, buf, 16) } - return nm + return nil } func (nm *NeedleMap) Put(key uint64, offset uint32, size uint32) (int, error) { diff --git a/weed-fs/src/pkg/storage/volume.go b/weed-fs/src/pkg/storage/volume.go index 5e64d0763..71dfb5aee 100644 --- a/weed-fs/src/pkg/storage/volume.go +++ b/weed-fs/src/pkg/storage/volume.go @@ -48,8 +48,8 @@ func (v *Volume) load() error { if ie != nil { return fmt.Errorf("cannot create Volume Data %s.dat: %s", fileName, e) } - v.nm = LoadNeedleMap(indexFile) - return nil + v.nm, e = LoadNeedleMap(indexFile) + return e } func (v *Volume) Version() Version { return v.version