From 715d327df0ad64a70837711c664e1ef024e0bcc5 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sun, 5 May 2013 11:19:41 -0700 Subject: [PATCH] =?UTF-8?q?Tam=C3=A1s=20Gul=C3=A1csi=20contributed=20cdb?= =?UTF-8?q?=20map=20for=20read=20only=20extremely=20low=20memory=20impleme?= =?UTF-8?q?ntation.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- go/storage/cdb_map.go | 230 +++++++++++++++++++++++++++++++++++++ go/storage/cdb_map_test.go | 168 +++++++++++++++++++++++++++ 2 files changed, 398 insertions(+) create mode 100644 go/storage/cdb_map.go create mode 100644 go/storage/cdb_map_test.go diff --git a/go/storage/cdb_map.go b/go/storage/cdb_map.go new file mode 100644 index 000000000..ebb49d514 --- /dev/null +++ b/go/storage/cdb_map.go @@ -0,0 +1,230 @@ +package storage + +import ( + "code.google.com/p/weed-fs/go/util" + "encoding/json" + "errors" + "fmt" + "github.com/tgulacsi/go-cdb" + "os" + "path/filepath" +) + +// CDB-backed read-only needle map +type cdbMap struct { + c1, c2 *cdb.Cdb + fn1, fn2 string + mapMetric +} + +const maxCdbRecCount = 100000000 + +var errReadOnly = errors.New("cannot modify a read-only map") + +// opens the cdb file(s) (base.cdb OR base.1.cdb AND base.2.cdb) +// in case of two files, the metric (at key 'M') must be in base.2.cdb +func OpenCdbMap(fileName string) (m *cdbMap, err error) { + m = new(cdbMap) + if m.c1, err = cdb.Open(fileName); err == nil { + m.fn1 = fileName + err = getMetric(m.c1, &m.mapMetric) + return + } + if os.IsNotExist(err) { + bn, ext := nakeFilename(fileName) + m.fn1 = bn + ".1" + ext + if m.c1, err = cdb.Open(m.fn1); err != nil { + return nil, err + } + m.fn2 = bn + ".2" + ext + if m.c2, err = cdb.Open(m.fn2); err != nil { + return nil, err + } + err = getMetric(m.c2, &m.mapMetric) + return + } + return nil, err +} + +func (m *cdbMap) Put(key uint64, offset uint32, size uint32) (int, error) { + return -1, errReadOnly +} +func (m *cdbMap) Delete(key uint64) error { + return errReadOnly +} + +func (m *cdbMap) Close() { + if m.c2 != nil { + m.c2.Close() + m.c2 = nil + } + if m.c1 != nil { + m.c1.Close() + m.c1 = nil + } +} + +func (m cdbMap) ContentSize() uint64 { + return m.FileByteCounter +} +func (m cdbMap) DeletedSize() uint64 { + return m.DeletionByteCounter +} +func (m cdbMap) FileCount() int { + return m.FileCounter +} +func (m *cdbMap) DeletedCount() int { + return m.DeletionCounter +} + +func getMetric(c *cdb.Cdb, m *mapMetric) error { + data, err := c.Data([]byte{'M'}) + if err != nil { + return err + } + return json.Unmarshal(data, m) +} + +func (m cdbMap) Get(key uint64) (element *NeedleValue, ok bool) { + var ( + data []byte + k []byte = make([]byte, 8) + err error + ) + util.Uint64toBytes(k, key) + if data, err = m.c1.Data(k); err != nil || data == nil { + if m.c2 == nil { + return nil, false + } + if data, err = m.c2.Data(k); err != nil || data == nil { + return nil, false + } + } + return &NeedleValue{Key: Key(key), Offset: util.BytesToUint32(data[:4]), + Size: util.BytesToUint32(data[4:])}, true +} + +func (m cdbMap) Visit(visit func(NeedleValue) error) (err error) { + fh, err := os.Open(m.fn1) + if err != nil { + return fmt.Errorf("cannot open %s: %s", m.fn1, err) + } + defer fh.Close() + walk := func(elt cdb.Element) error { + if len(elt.Key) != 8 { + return nil + } + return visit(NeedleValue{Key: Key(util.BytesToUint64(elt.Key)), + Offset: util.BytesToUint32(elt.Data[:4]), + Size: util.BytesToUint32(elt.Data[4:8])}) + } + if err = cdb.DumpMap(fh, walk); err != nil { + return err + } + if m.c2 == nil { + return nil + } + fh.Close() + if fh, err = os.Open(m.fn2); err != nil { + return fmt.Errorf("cannot open %s: %s", m.fn2, err) + } + return cdb.DumpMap(fh, walk) +} + +// converts an .idx index to a cdb +func ConvertIndexToCdb(cdbName string, index *os.File) error { + idx, err := LoadNeedleMap(index) + if err != nil { + return fmt.Errorf("error loading needle map %s: %s", index, err) + } + defer idx.Close() + return DumpNeedleMapToCdb(cdbName, idx) +} + +// dumps a NeedleMap into a cdb +func DumpNeedleMapToCdb(cdbName string, nm *NeedleMap) error { + tempnam := cdbName + "t" + fnames := make([]string, 1, 2) + adder, closer, err := openTempCdb(tempnam) + if err != nil { + return fmt.Errorf("error creating factory: %s", err) + } + fnames[0] = tempnam + + elt := cdb.Element{Key: make([]byte, 8), Data: make([]byte, 8)} + + fcount := uint64(0) + walk := func(key uint64, offset, size uint32) error { + if fcount >= maxCdbRecCount { + if err = closer(); err != nil { + return err + } + tempnam = cdbName + "t2" + if adder, closer, err = openTempCdb(tempnam); err != nil { + return fmt.Errorf("error creating second factory: %s", err) + } + fnames = append(fnames, tempnam) + fcount = 0 + } + util.Uint64toBytes(elt.Key, key) + util.Uint32toBytes(elt.Data[:4], offset) + util.Uint32toBytes(elt.Data[4:], size) + fcount++ + return adder(elt) + } + // and write out the cdb from there + err = nm.Visit(func(nv NeedleValue) error { + return walk(uint64(nv.Key), nv.Offset, nv.Size) + }) + if err != nil { + closer() + return fmt.Errorf("error walking index %s: %s", nm, err) + } + // store fileBytes + data, e := json.Marshal(nm.mapMetric) + if e != nil { + return fmt.Errorf("error marshaling metric %s: %s", nm.mapMetric, e) + } + if err = adder(cdb.Element{Key: []byte{'M'}, Data: data}); err != nil { + return err + } + if err = closer(); err != nil { + return err + } + + os.Remove(cdbName) + if len(fnames) == 1 { + return os.Rename(fnames[0], cdbName) + } + bn, ext := nakeFilename(cdbName) + if err = os.Rename(fnames[0], bn+".1"+ext); err != nil { + return err + } + return os.Rename(fnames[1], bn+".2"+ext) +} + +func openTempCdb(fileName string) (cdb.AdderFunc, cdb.CloserFunc, error) { + fh, err := os.Create(fileName) + if err != nil { + return nil, nil, fmt.Errorf("cannot create cdb file %s: %s", fileName, err) + } + adder, closer, err := cdb.MakeFactory(fh) + if err != nil { + fh.Close() + return nil, nil, fmt.Errorf("error creating factory: %s", err) + } + return adder, func() error { + if e := closer(); e != nil { + fh.Close() + return e + } + fh.Close() + return nil + }, nil +} + +// returns filename without extension, and the extension +func nakeFilename(fileName string) (string, string) { + ext := filepath.Ext(fileName) + return fileName[:len(fileName)-len(ext)], ext +} diff --git a/go/storage/cdb_map_test.go b/go/storage/cdb_map_test.go new file mode 100644 index 000000000..e932a7ec5 --- /dev/null +++ b/go/storage/cdb_map_test.go @@ -0,0 +1,168 @@ +package storage + +import ( + "log" + "math/rand" + "os" + "runtime" + "testing" +) + +var testIndexFilename string = "../../test/sample.idx" + +func TestCdbMap0Convert(t *testing.T) { + indexFile, err := os.Open(testIndexFilename) + if err != nil { + t.Fatalf("cannot open %s: %s", testIndexFilename, err) + } + defer indexFile.Close() + + cdbFn := testIndexFilename + ".cdb" + t.Logf("converting %s to %s", cdbFn, cdbFn) + if err = ConvertIndexToCdb(cdbFn, indexFile); err != nil { + t.Fatalf("error while converting: %s", err) + } +} + +func TestCdbMap1Mem(t *testing.T) { + var nm NeedleMapper + i := 0 + visit := func(nv NeedleValue) error { + i++ + return nil + } + + a := getMemStats() + t.Logf("opening %s.cdb", testIndexFilename) + nm, err := OpenCdbMap(testIndexFilename + ".cdb") + if err != nil { + t.Fatalf("error opening cdb: %s", err) + } + b := getMemStats() + log.Printf("opening cdb consumed %d bytes", b-a) + defer nm.Close() + + a = getMemStats() + if err = nm.Visit(visit); err != nil { + t.Fatalf("error visiting %s: %s", nm, err) + } + b = getMemStats() + log.Printf("visit cdb %d consumed %d bytes", i, b-a) + nm.Close() + + indexFile, err := os.Open(testIndexFilename) + if err != nil { + t.Fatalf("error opening idx: %s", err) + } + a = getMemStats() + nm, err = LoadNeedleMap(indexFile) + if err != nil { + t.Fatalf("error loading idx: %s", err) + } + defer nm.Close() + b = getMemStats() + log.Printf("opening idx consumed %d bytes", b-a) + + i = 0 + a = getMemStats() + if err = nm.Visit(visit); err != nil { + t.Fatalf("error visiting %s: %s", nm, err) + } + b = getMemStats() + log.Printf("visit idx %d consumed %d bytes", i, b-a) +} + +func BenchmarkCdbMap9List(t *testing.B) { + t.StopTimer() + indexFile, err := os.Open(testIndexFilename) + if err != nil { + t.Fatalf("cannot open %s: %s", testIndexFilename, err) + } + defer indexFile.Close() + + a := getMemStats() + t.Logf("opening %s", indexFile) + idx, err := LoadNeedleMap(indexFile) + if err != nil { + t.Fatalf("cannot load %s: %s", indexFile, err) + } + defer idx.Close() + b := getMemStats() + log.Printf("LoadNeedleMap consumed %d bytes", b-a) + + cdbFn := testIndexFilename + ".cdb" + a = getMemStats() + t.Logf("opening %s", cdbFn) + m, err := OpenCdbMap(cdbFn) + if err != nil { + t.Fatalf("error opening %s: %s", cdbFn, err) + } + defer m.Close() + b = getMemStats() + log.Printf("OpenCdbMap consumed %d bytes", b-a) + + i := 0 + log.Printf("checking whether the cdb contains every key") + t.StartTimer() + err = idx.Visit(func(nv NeedleValue) error { + if i > t.N || rand.Intn(10) < 9 { + return nil + } + i++ + if i%1000 == 0 { + log.Printf("%d. %s", i, nv) + } + if nv2, ok := m.Get(uint64(nv.Key)); !ok || nv2 == nil { + t.Errorf("%s in index, not in cdb", nv.Key) + } else if nv2.Key != nv.Key { + t.Errorf("requested key %d from cdb, got %d", nv.Key, nv2.Key) + } else if nv2.Offset != nv.Offset { + t.Errorf("offset is %d in index, %d in cdb", nv.Offset, nv2.Offset) + } else if nv2.Size != nv.Size { + t.Errorf("size is %d in index, %d in cdb", nv.Size, nv2.Size) + } + t.SetBytes(int64(nv.Size)) + return nil + }) + t.StopTimer() + if err != nil { + t.Errorf("error visiting index: %s", err) + } + + i = 0 + log.Printf("checking wheter the cdb contains no stray keys") + t.StartTimer() + err = m.Visit(func(nv NeedleValue) error { + if i > t.N || rand.Intn(10) < 9 { + return nil + } + if nv2, ok := m.Get(uint64(nv.Key)); !ok || nv2 == nil { + t.Errorf("%s in cdb, not in index", nv.Key) + } else if nv2.Key != nv.Key { + t.Errorf("requested key %d from index, got %d", nv.Key, nv2.Key) + } else if nv2.Offset != nv.Offset { + t.Errorf("offset is %d in cdb, %d in index", nv.Offset, nv2.Offset) + } else if nv2.Size != nv.Size { + t.Errorf("size is %d in cdb, %d in index", nv.Size, nv2.Size) + } + i++ + if i%1000 == 0 { + log.Printf("%d. %s", i, nv) + } + t.SetBytes(int64(nv.Size)) + return nil + }) + t.StopTimer() + if err != nil { + t.Errorf("error visiting index: %s", err) + } +} + +var mem = new(runtime.MemStats) + +// returns MemStats.Alloc after a GC +func getMemStats() int64 { + runtime.GC() + runtime.ReadMemStats(mem) + return int64(mem.Alloc) +}