From 020ba6c9a85efd9f2bf5f3c7ce96b38857f2128e Mon Sep 17 00:00:00 2001 From: chrislusf Date: Fri, 27 Mar 2015 16:34:58 -0700 Subject: [PATCH] add leveldb support for needle map This supposedly should reduce memory consumption. However, for tests with millions of, this shows consuming more memories. Need to see whether this will work out. If not, later boltdb will be tested. --- go/storage/needle_map.go | 158 +++--------------- go/storage/needle_map_leveldb.go | 151 +++++++++++++++++ go/storage/needle_map_memory.go | 126 ++++++++++++++ go/storage/store.go | 40 ++--- go/storage/volume.go | 83 ++------- go/storage/volume_vacuum.go | 4 +- go/util/http_util.go | 4 +- go/weed/compact.go | 2 +- go/weed/fix.go | 4 +- go/weed/server.go | 2 + go/weed/volume.go | 3 + go/weed/weed_server/volume_server.go | 10 +- .../volume_server_handlers_admin.go | 13 +- 13 files changed, 347 insertions(+), 253 deletions(-) create mode 100644 go/storage/needle_map_leveldb.go create mode 100644 go/storage/needle_map_memory.go diff --git a/go/storage/needle_map.go b/go/storage/needle_map.go index f74191742..0bfa12180 100644 --- a/go/storage/needle_map.go +++ b/go/storage/needle_map.go @@ -2,15 +2,13 @@ package storage import ( "fmt" - "io" "os" - "github.com/chrislusf/weed-fs/go/glog" "github.com/chrislusf/weed-fs/go/util" ) type NeedleMapper interface { - Put(key uint64, offset uint32, size uint32) (int, error) + Put(key uint64, offset uint32, size uint32) error Get(key uint64) (element *NeedleValue, ok bool) Delete(key uint64) error Close() @@ -19,7 +17,6 @@ type NeedleMapper interface { DeletedSize() uint64 FileCount() int DeletedCount() int - Visit(visit func(NeedleValue) error) (err error) MaxFileKey() uint64 } @@ -31,146 +28,33 @@ type mapMetric struct { MaximumFileKey uint64 `json:"MaxFileKey"` } -type NeedleMap struct { - indexFile *os.File - m CompactMap - - mapMetric -} - -func NewNeedleMap(file *os.File) *NeedleMap { - nm := &NeedleMap{ - m: NewCompactMap(), - indexFile: file, - } - return nm -} - -const ( - RowsToRead = 1024 -) - -func LoadNeedleMap(file *os.File) (*NeedleMap, error) { - nm := NewNeedleMap(file) - e := WalkIndexFile(file, func(key uint64, offset, size uint32) error { - if key > nm.MaximumFileKey { - nm.MaximumFileKey = key - } - nm.FileCounter++ - nm.FileByteCounter = nm.FileByteCounter + uint64(size) - if offset > 0 { - oldSize := nm.m.Set(Key(key), offset, size) - glog.V(3).Infoln("reading key", key, "offset", offset*NeedlePaddingSize, "size", size, "oldSize", oldSize) - if oldSize > 0 { - nm.DeletionCounter++ - nm.DeletionByteCounter = nm.DeletionByteCounter + uint64(oldSize) - } - } else { - oldSize := nm.m.Delete(Key(key)) - glog.V(3).Infoln("removing key", key, "offset", offset*NeedlePaddingSize, "size", size, "oldSize", oldSize) - nm.DeletionCounter++ - nm.DeletionByteCounter = nm.DeletionByteCounter + uint64(oldSize) - } - return nil - }) - glog.V(1).Infoln("max file key:", nm.MaximumFileKey) - return nm, e -} - -// walks through the index file, calls fn function with each key, offset, size -// stops with the error returned by the fn function -func WalkIndexFile(r *os.File, fn func(key uint64, offset, size uint32) error) error { - var readerOffset int64 - bytes := make([]byte, 16*RowsToRead) - count, e := r.ReadAt(bytes, readerOffset) - glog.V(3).Infoln("file", r.Name(), "readerOffset", readerOffset, "count", count, "e", e) - readerOffset += int64(count) - var ( - key uint64 - offset, size uint32 - i int - ) - - for count > 0 && e == nil || e == io.EOF { - for i = 0; i+16 <= count; i += 16 { - key = util.BytesToUint64(bytes[i : i+8]) - offset = util.BytesToUint32(bytes[i+8 : i+12]) - size = util.BytesToUint32(bytes[i+12 : i+16]) - if e = fn(key, offset, size); e != nil { - return e - } - } - if e == io.EOF { - return nil - } - count, e = r.ReadAt(bytes, readerOffset) - glog.V(3).Infoln("file", r.Name(), "readerOffset", readerOffset, "count", count, "e", e) - readerOffset += int64(count) - } - return e -} - -func (nm *NeedleMap) Put(key uint64, offset uint32, size uint32) (int, error) { - if key > nm.MaximumFileKey { - nm.MaximumFileKey = key - } - oldSize := nm.m.Set(Key(key), offset, size) +func appendToIndexFile(indexFile *os.File, + key uint64, offset uint32, size uint32) error { bytes := make([]byte, 16) util.Uint64toBytes(bytes[0:8], key) util.Uint32toBytes(bytes[8:12], offset) util.Uint32toBytes(bytes[12:16], size) - nm.FileCounter++ - nm.FileByteCounter = nm.FileByteCounter + uint64(size) - if oldSize > 0 { - nm.DeletionCounter++ - nm.DeletionByteCounter = nm.DeletionByteCounter + uint64(oldSize) - } - if _, err := nm.indexFile.Seek(0, 2); err != nil { - return 0, fmt.Errorf("cannot go to the end of indexfile %s: %v", nm.indexFile.Name(), err) + if _, err := indexFile.Seek(0, 2); err != nil { + return fmt.Errorf("cannot seek end of indexfile %s: %v", + indexFile.Name(), err) } - return nm.indexFile.Write(bytes) + _, err := indexFile.Write(bytes) + return err } -func (nm *NeedleMap) Get(key uint64) (element *NeedleValue, ok bool) { - element, ok = nm.m.Get(Key(key)) - return + +func (mm *mapMetric) logDelete(deletedByteCount uint32) { + mm.DeletionByteCounter = mm.DeletionByteCounter + uint64(deletedByteCount) + mm.DeletionCounter++ } -func (nm *NeedleMap) Delete(key uint64) error { - nm.DeletionByteCounter = nm.DeletionByteCounter + uint64(nm.m.Delete(Key(key))) - bytes := make([]byte, 16) - util.Uint64toBytes(bytes[0:8], key) - util.Uint32toBytes(bytes[8:12], 0) - util.Uint32toBytes(bytes[12:16], 0) - if _, err := nm.indexFile.Seek(0, 2); err != nil { - return fmt.Errorf("cannot go to the end of indexfile %s: %v", nm.indexFile.Name(), err) + +func (mm *mapMetric) logPut(key uint64, oldSize uint32, newSize uint32) { + if key > mm.MaximumFileKey { + mm.MaximumFileKey = key } - if _, err := nm.indexFile.Write(bytes); err != nil { - return fmt.Errorf("error writing to indexfile %s: %v", nm.indexFile.Name(), err) + mm.FileCounter++ + mm.FileByteCounter = mm.FileByteCounter + uint64(newSize) + if oldSize > 0 { + mm.DeletionCounter++ + mm.DeletionByteCounter = mm.DeletionByteCounter + uint64(oldSize) } - nm.DeletionCounter++ - return nil -} -func (nm *NeedleMap) Close() { - _ = nm.indexFile.Close() -} -func (nm *NeedleMap) Destroy() error { - nm.Close() - return os.Remove(nm.indexFile.Name()) -} -func (nm NeedleMap) ContentSize() uint64 { - return nm.FileByteCounter -} -func (nm NeedleMap) DeletedSize() uint64 { - return nm.DeletionByteCounter -} -func (nm NeedleMap) FileCount() int { - return nm.FileCounter -} -func (nm NeedleMap) DeletedCount() int { - return nm.DeletionCounter -} -func (nm *NeedleMap) Visit(visit func(NeedleValue) error) (err error) { - return nm.m.Visit(visit) -} -func (nm NeedleMap) MaxFileKey() uint64 { - return nm.MaximumFileKey } diff --git a/go/storage/needle_map_leveldb.go b/go/storage/needle_map_leveldb.go new file mode 100644 index 000000000..73595278d --- /dev/null +++ b/go/storage/needle_map_leveldb.go @@ -0,0 +1,151 @@ +package storage + +import ( + "fmt" + "os" + "path/filepath" + + "github.com/chrislusf/weed-fs/go/glog" + "github.com/chrislusf/weed-fs/go/util" + "github.com/syndtr/goleveldb/leveldb" +) + +type LevelDbNeedleMap struct { + dbFileName string + indexFile *os.File + db *leveldb.DB + mapMetric +} + +func NewLevelDbNeedleMap(dbFileName string, indexFile *os.File) (m *LevelDbNeedleMap, err error) { + m = &LevelDbNeedleMap{indexFile: indexFile, dbFileName: dbFileName} + if !isLevelDbFresh(dbFileName, indexFile) { + glog.V(1).Infof("Start to Generate %s from %s", dbFileName, indexFile.Name()) + generateDbFile(dbFileName, indexFile) + glog.V(1).Infof("Finished Generating %s from %s", dbFileName, indexFile.Name()) + } + glog.V(1).Infof("Opening %s...", dbFileName) + if m.db, err = leveldb.OpenFile(dbFileName, nil); err != nil { + return + } + glog.V(1).Infof("Loading %s...", indexFile.Name()) + nm, indexLoadError := LoadNeedleMap(indexFile) + if indexLoadError != nil { + return nil, indexLoadError + } + m.mapMetric = nm.mapMetric + return +} + +func isLevelDbFresh(dbFileName string, indexFile *os.File) bool { + // normally we always write to index file first + dbLogFile, err := os.Open(filepath.Join(dbFileName, "LOG")) + if err != nil { + return false + } + defer dbLogFile.Close() + dbStat, dbStatErr := dbLogFile.Stat() + indexStat, indexStatErr := indexFile.Stat() + if dbStatErr != nil || indexStatErr != nil { + glog.V(0).Infof("Can not stat file: %v and %v", dbStatErr, indexStatErr) + return false + } + + return dbStat.ModTime().After(indexStat.ModTime()) +} + +func generateDbFile(dbFileName string, indexFile *os.File) error { + db, err := leveldb.OpenFile(dbFileName, nil) + if err != nil { + return err + } + defer db.Close() + return WalkIndexFile(indexFile, func(key uint64, offset, size uint32) error { + if offset > 0 { + levelDbWrite(db, key, offset, size) + } else { + levelDbDelete(db, key) + } + return nil + }) +} + +func (m *LevelDbNeedleMap) Get(key uint64) (element *NeedleValue, ok bool) { + bytes := make([]byte, 8) + util.Uint64toBytes(bytes, key) + data, err := m.db.Get(bytes, nil) + if err != nil || len(data) != 8 { + glog.V(0).Infof("Failed to get %d %v", key, err) + return nil, false + } + offset := util.BytesToUint32(data[0:4]) + size := util.BytesToUint32(data[4:8]) + return &NeedleValue{Key: Key(key), Offset: offset, Size: size}, true +} + +func (m *LevelDbNeedleMap) Put(key uint64, offset uint32, size uint32) error { + var oldSize uint32 + if oldNeedle, ok := m.Get(key); ok { + oldSize = oldNeedle.Size + } + m.logPut(key, oldSize, size) + // write to index file first + if err := appendToIndexFile(m.indexFile, key, offset, size); err != nil { + return fmt.Errorf("cannot write to indexfile %s: %v", m.indexFile.Name(), err) + } + return levelDbWrite(m.db, key, offset, size) +} + +func levelDbWrite(db *leveldb.DB, + key uint64, offset uint32, size uint32) error { + bytes := make([]byte, 16) + util.Uint64toBytes(bytes[0:8], key) + util.Uint32toBytes(bytes[8:12], offset) + util.Uint32toBytes(bytes[12:16], size) + if err := db.Put(bytes[0:8], bytes[8:16], nil); err != nil { + return fmt.Errorf("failed to write leveldb: %v", err) + } + return nil +} +func levelDbDelete(db *leveldb.DB, key uint64) error { + bytes := make([]byte, 8) + util.Uint64toBytes(bytes, key) + return db.Delete(bytes, nil) +} + +func (m *LevelDbNeedleMap) Delete(key uint64) error { + if oldNeedle, ok := m.Get(key); ok { + m.logDelete(oldNeedle.Size) + } + // write to index file first + if err := appendToIndexFile(m.indexFile, key, 0, 0); err != nil { + return err + } + return levelDbDelete(m.db, key) +} + +func (m *LevelDbNeedleMap) Close() { + m.db.Close() +} + +func (m *LevelDbNeedleMap) Destroy() error { + m.Close() + os.Remove(m.indexFile.Name()) + return os.Remove(m.dbFileName) +} + +func (m *LevelDbNeedleMap) ContentSize() uint64 { + return m.FileByteCounter +} +func (m *LevelDbNeedleMap) DeletedSize() uint64 { + return m.DeletionByteCounter +} +func (m *LevelDbNeedleMap) FileCount() int { + return m.FileCounter +} +func (m *LevelDbNeedleMap) DeletedCount() int { + return m.DeletionCounter +} +func (m *LevelDbNeedleMap) MaxFileKey() uint64 { + return m.MaximumFileKey +} diff --git a/go/storage/needle_map_memory.go b/go/storage/needle_map_memory.go new file mode 100644 index 000000000..5fce301bc --- /dev/null +++ b/go/storage/needle_map_memory.go @@ -0,0 +1,126 @@ +package storage + +import ( + "io" + "os" + + "github.com/chrislusf/weed-fs/go/glog" + "github.com/chrislusf/weed-fs/go/util" +) + +type NeedleMap struct { + indexFile *os.File + m CompactMap + + mapMetric +} + +func NewNeedleMap(file *os.File) *NeedleMap { + nm := &NeedleMap{ + m: NewCompactMap(), + indexFile: file, + } + return nm +} + +const ( + RowsToRead = 1024 +) + +func LoadNeedleMap(file *os.File) (*NeedleMap, error) { + nm := NewNeedleMap(file) + e := WalkIndexFile(file, func(key uint64, offset, size uint32) error { + if key > nm.MaximumFileKey { + nm.MaximumFileKey = key + } + nm.FileCounter++ + nm.FileByteCounter = nm.FileByteCounter + uint64(size) + if offset > 0 { + oldSize := nm.m.Set(Key(key), offset, size) + glog.V(3).Infoln("reading key", key, "offset", offset*NeedlePaddingSize, "size", size, "oldSize", oldSize) + if oldSize > 0 { + nm.DeletionCounter++ + nm.DeletionByteCounter = nm.DeletionByteCounter + uint64(oldSize) + } + } else { + oldSize := nm.m.Delete(Key(key)) + glog.V(3).Infoln("removing key", key, "offset", offset*NeedlePaddingSize, "size", size, "oldSize", oldSize) + nm.DeletionCounter++ + nm.DeletionByteCounter = nm.DeletionByteCounter + uint64(oldSize) + } + return nil + }) + glog.V(1).Infoln("max file key:", nm.MaximumFileKey) + return nm, e +} + +// walks through the index file, calls fn function with each key, offset, size +// stops with the error returned by the fn function +func WalkIndexFile(r *os.File, fn func(key uint64, offset, size uint32) error) error { + var readerOffset int64 + bytes := make([]byte, 16*RowsToRead) + count, e := r.ReadAt(bytes, readerOffset) + glog.V(3).Infoln("file", r.Name(), "readerOffset", readerOffset, "count", count, "e", e) + readerOffset += int64(count) + var ( + key uint64 + offset, size uint32 + i int + ) + + for count > 0 && e == nil || e == io.EOF { + for i = 0; i+16 <= count; i += 16 { + key = util.BytesToUint64(bytes[i : i+8]) + offset = util.BytesToUint32(bytes[i+8 : i+12]) + size = util.BytesToUint32(bytes[i+12 : i+16]) + if e = fn(key, offset, size); e != nil { + return e + } + } + if e == io.EOF { + return nil + } + count, e = r.ReadAt(bytes, readerOffset) + glog.V(3).Infoln("file", r.Name(), "readerOffset", readerOffset, "count", count, "e", e) + readerOffset += int64(count) + } + return e +} + +func (nm *NeedleMap) Put(key uint64, offset uint32, size uint32) error { + oldSize := nm.m.Set(Key(key), offset, size) + nm.logPut(key, oldSize, size) + return appendToIndexFile(nm.indexFile, key, offset, size) +} +func (nm *NeedleMap) Get(key uint64) (element *NeedleValue, ok bool) { + element, ok = nm.m.Get(Key(key)) + return +} +func (nm *NeedleMap) Delete(key uint64) error { + deletedBytes := nm.m.Delete(Key(key)) + nm.logDelete(deletedBytes) + return appendToIndexFile(nm.indexFile, key, 0, 0) +} +func (nm *NeedleMap) Close() { + _ = nm.indexFile.Close() +} +func (nm *NeedleMap) Destroy() error { + nm.Close() + return os.Remove(nm.indexFile.Name()) +} +func (nm NeedleMap) ContentSize() uint64 { + return nm.FileByteCounter +} +func (nm NeedleMap) DeletedSize() uint64 { + return nm.DeletionByteCounter +} +func (nm NeedleMap) FileCount() int { + return nm.FileCounter +} +func (nm NeedleMap) DeletedCount() int { + return nm.DeletionCounter +} + +func (nm NeedleMap) MaxFileKey() uint64 { + return nm.MaximumFileKey +} diff --git a/go/storage/store.go b/go/storage/store.go index 2695537f6..65e5b218b 100644 --- a/go/storage/store.go +++ b/go/storage/store.go @@ -90,18 +90,18 @@ func (s *Store) String() (str string) { return } -func NewStore(port int, ip, publicUrl string, dirnames []string, maxVolumeCounts []int) (s *Store) { +func NewStore(port int, ip, publicUrl string, dirnames []string, maxVolumeCounts []int, useLevelDb bool) (s *Store) { s = &Store{Port: port, Ip: ip, PublicUrl: publicUrl} s.Locations = make([]*DiskLocation, 0) for i := 0; i < len(dirnames); i++ { location := &DiskLocation{Directory: dirnames[i], MaxVolumeCount: maxVolumeCounts[i]} location.volumes = make(map[VolumeId]*Volume) - location.loadExistingVolumes() + location.loadExistingVolumes(useLevelDb) s.Locations = append(s.Locations, location) } return } -func (s *Store) AddVolume(volumeListString string, collection string, replicaPlacement string, ttlString string) error { +func (s *Store) AddVolume(volumeListString string, collection string, useLevelDb bool, replicaPlacement string, ttlString string) error { rt, e := NewReplicaPlacementFromString(replicaPlacement) if e != nil { return e @@ -117,7 +117,7 @@ func (s *Store) AddVolume(volumeListString string, collection string, replicaPla if err != nil { return fmt.Errorf("Volume Id %s is not a valid unsigned integer!", id_string) } - e = s.addVolume(VolumeId(id), collection, rt, ttl) + e = s.addVolume(VolumeId(id), collection, useLevelDb, rt, ttl) } else { pair := strings.Split(range_string, "-") start, start_err := strconv.ParseUint(pair[0], 10, 64) @@ -129,7 +129,7 @@ func (s *Store) AddVolume(volumeListString string, collection string, replicaPla return fmt.Errorf("Volume End Id %s is not a valid unsigned integer!", pair[1]) } for id := start; id <= end; id++ { - if err := s.addVolume(VolumeId(id), collection, rt, ttl); err != nil { + if err := s.addVolume(VolumeId(id), collection, useLevelDb, rt, ttl); err != nil { e = err } } @@ -178,14 +178,14 @@ func (s *Store) findFreeLocation() (ret *DiskLocation) { } return ret } -func (s *Store) addVolume(vid VolumeId, collection string, replicaPlacement *ReplicaPlacement, ttl *TTL) error { +func (s *Store) addVolume(vid VolumeId, collection string, useLevelDb bool, replicaPlacement *ReplicaPlacement, ttl *TTL) error { if s.findVolume(vid) != nil { return fmt.Errorf("Volume Id %d already exists!", vid) } if location := s.findFreeLocation(); location != nil { glog.V(0).Infof("In dir %s adds volume:%v collection:%s replicaPlacement:%v ttl:%v", location.Directory, vid, collection, replicaPlacement, ttl) - if volume, err := NewVolume(location.Directory, collection, vid, replicaPlacement, ttl); err == nil { + if volume, err := NewVolume(location.Directory, collection, vid, useLevelDb, replicaPlacement, ttl); err == nil { location.volumes[vid] = volume return nil } else { @@ -195,20 +195,7 @@ func (s *Store) addVolume(vid VolumeId, collection string, replicaPlacement *Rep return fmt.Errorf("No more free space left") } -func (s *Store) FreezeVolume(volumeIdString string) error { - vid, err := NewVolumeId(volumeIdString) - if err != nil { - return fmt.Errorf("Volume Id %s is not a valid unsigned integer", volumeIdString) - } - if v := s.findVolume(vid); v != nil { - if v.readOnly { - return fmt.Errorf("Volume %s is already read-only", volumeIdString) - } - return v.freeze() - } - return fmt.Errorf("volume id %d is not found during freeze", vid) -} -func (l *DiskLocation) loadExistingVolumes() { +func (l *DiskLocation) loadExistingVolumes(useLevelDb bool) { if dirs, err := ioutil.ReadDir(l.Directory); err == nil { for _, dir := range dirs { name := dir.Name() @@ -221,7 +208,7 @@ func (l *DiskLocation) loadExistingVolumes() { } if vid, err := NewVolumeId(base); err == nil { if l.volumes[vid] == nil { - if v, e := NewVolume(l.Directory, collection, vid, nil, nil); e == nil { + if v, e := NewVolume(l.Directory, collection, vid, useLevelDb, nil, nil); e == nil { l.volumes[vid] = v glog.V(0).Infof("data file %s, replicaPlacement=%s v=%d size=%d ttl=%s", l.Directory+"/"+name, v.ReplicaPlacement, v.Version(), v.Size(), v.Ttl.String()) } @@ -261,7 +248,7 @@ func (s *Store) SetRack(rack string) { func (s *Store) SetBootstrapMaster(bootstrapMaster string) { s.masterNodes = NewMasterNodes(bootstrapMaster) } -func (s *Store) Join() (masterNode string, secretKey security.Secret, e error) { +func (s *Store) SendHeartbeatToMaster() (masterNode string, secretKey security.Secret, e error) { masterNode, e = s.masterNodes.findMaster() if e != nil { return @@ -317,13 +304,16 @@ func (s *Store) Join() (masterNode string, secretKey security.Secret, e error) { return "", "", err } - jsonBlob, err := util.PostBytes("http://"+masterNode+"/dir/join", data) + joinUrl := "http://" + masterNode + "/dir/join" + + jsonBlob, err := util.PostBytes(joinUrl, data) if err != nil { s.masterNodes.reset() return "", "", err } var ret operation.JoinResult if err := json.Unmarshal(jsonBlob, &ret); err != nil { + glog.V(0).Infof("Failed to join %s with response: %s", joinUrl, string(jsonBlob)) return masterNode, "", err } if ret.Error != "" { @@ -354,7 +344,7 @@ func (s *Store) Write(i VolumeId, n *Needle) (size uint32, err error) { } if s.volumeSizeLimit < v.ContentSize()+3*uint64(size) { glog.V(0).Infoln("volume", i, "size", v.ContentSize(), "will exceed limit", s.volumeSizeLimit) - if _, _, e := s.Join(); e != nil { + if _, _, e := s.SendHeartbeatToMaster(); e != nil { glog.V(0).Infoln("error when reporting size:", e) } } diff --git a/go/storage/volume.go b/go/storage/volume.go index 1988c9aac..2b47fb497 100644 --- a/go/storage/volume.go +++ b/go/storage/volume.go @@ -27,10 +27,10 @@ type Volume struct { lastModifiedTime uint64 //unix time in seconds } -func NewVolume(dirname string, collection string, id VolumeId, replicaPlacement *ReplicaPlacement, ttl *TTL) (v *Volume, e error) { +func NewVolume(dirname string, collection string, id VolumeId, useLevelDb bool, replicaPlacement *ReplicaPlacement, ttl *TTL) (v *Volume, e error) { v = &Volume{dir: dirname, Collection: collection, Id: id} v.SuperBlock = SuperBlock{ReplicaPlacement: replicaPlacement, Ttl: ttl} - e = v.load(true, true) + e = v.load(true, true, useLevelDb) return } func (v *Volume) String() string { @@ -40,7 +40,7 @@ func (v *Volume) String() string { func loadVolumeWithoutIndex(dirname string, collection string, id VolumeId) (v *Volume, e error) { v = &Volume{dir: dirname, Collection: collection, Id: id} v.SuperBlock = SuperBlock{} - e = v.load(false, false) + e = v.load(false, false, false) return } func (v *Volume) FileName() (fileName string) { @@ -51,7 +51,7 @@ func (v *Volume) FileName() (fileName string) { } return } -func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool) error { +func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, useLevelDb bool) error { var e error fileName := v.FileName() @@ -87,12 +87,6 @@ func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool) error { e = v.maybeWriteSuperBlock() } if e == nil && alsoLoadIndex { - if v.readOnly { - if v.ensureConvertIdxToCdb(fileName) { - v.nm, e = OpenCdbMap(fileName + ".cdb") - return e - } - } var indexFile *os.File if v.readOnly { glog.V(1).Infoln("open to read file", fileName+".idx") @@ -105,9 +99,16 @@ func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool) error { return fmt.Errorf("cannot write Volume Index %s.idx: %v", fileName, e) } } - glog.V(0).Infoln("loading file", fileName+".idx", "readonly", v.readOnly) - if v.nm, e = LoadNeedleMap(indexFile); e != nil { - glog.V(0).Infoln("loading error:", e) + if !useLevelDb { + glog.V(0).Infoln("loading index file", fileName+".idx", "readonly", v.readOnly) + if v.nm, e = LoadNeedleMap(indexFile); e != nil { + glog.V(0).Infof("loading index %s error: %v", fileName+".idx", e) + } + } else { + glog.V(0).Infoln("loading leveldb file", fileName+".ldb") + if v.nm, e = NewLevelDbNeedleMap(fileName+".ldb", indexFile); e != nil { + glog.V(0).Infof("loading leveldb %s error: %v", fileName+".ldb", e) + } } } return e @@ -202,7 +203,7 @@ func (v *Volume) write(n *Needle) (size uint32, err error) { } nv, ok := v.nm.Get(n.Id) if !ok || int64(nv.Offset)*NeedlePaddingSize < offset { - if _, err = v.nm.Put(n.Id, uint32(offset/NeedlePaddingSize), n.Size); err != nil { + if err = v.nm.Put(n.Id, uint32(offset/NeedlePaddingSize), n.Size); err != nil { glog.V(4).Infof("failed to save in needle map %d: %v", n.Id, err) } } @@ -261,32 +262,6 @@ func (v *Volume) read(n *Needle) (int, error) { return -1, errors.New("Not Found") } -func (v *Volume) freeze() error { - if v.readOnly { - return nil - } - nm, ok := v.nm.(*NeedleMap) - if !ok { - return nil - } - v.accessLock.Lock() - defer v.accessLock.Unlock() - bn, _ := baseFilename(v.dataFile.Name()) - cdbFn := bn + ".cdb" - glog.V(0).Infof("converting %s to %s", nm.indexFile.Name(), cdbFn) - err := DumpNeedleMapToCdb(cdbFn, nm) - if err != nil { - return err - } - if v.nm, err = OpenCdbMap(cdbFn); err != nil { - return err - } - nm.indexFile.Close() - os.Remove(nm.indexFile.Name()) - v.readOnly = true - return nil -} - func ScanVolumeFile(dirname string, collection string, id VolumeId, visitSuperBlock func(SuperBlock) error, readNeedleBody bool, @@ -365,34 +340,6 @@ func checkFile(filename string) (exists, canRead, canWrite bool, modTime time.Ti modTime = fi.ModTime() return } -func (v *Volume) ensureConvertIdxToCdb(fileName string) (cdbCanRead bool) { - var indexFile *os.File - var e error - _, cdbCanRead, cdbCanWrite, cdbModTime := checkFile(fileName + ".cdb") - _, idxCanRead, _, idxModeTime := checkFile(fileName + ".idx") - if cdbCanRead && cdbModTime.After(idxModeTime) { - return true - } - if !cdbCanWrite { - return false - } - if !idxCanRead { - glog.V(0).Infoln("Can not read file", fileName+".idx!") - return false - } - glog.V(2).Infoln("opening file", fileName+".idx") - if indexFile, e = os.Open(fileName + ".idx"); e != nil { - glog.V(0).Infoln("Failed to read file", fileName+".idx !") - return false - } - defer indexFile.Close() - glog.V(0).Infof("converting %s.idx to %s.cdb", fileName, fileName) - if e = ConvertIndexToCdb(fileName+".cdb", indexFile); e != nil { - glog.V(0).Infof("error converting %s.idx to %s.cdb: %v", fileName, fileName, e) - return false - } - return true -} // volume is expired if modified time + volume ttl < now // except when volume is empty diff --git a/go/storage/volume_vacuum.go b/go/storage/volume_vacuum.go index 7e026a61d..9f6f8e35f 100644 --- a/go/storage/volume_vacuum.go +++ b/go/storage/volume_vacuum.go @@ -38,7 +38,7 @@ func (v *Volume) commitCompact() error { } //glog.V(3).Infof("Pretending to be vacuuming...") //time.Sleep(20 * time.Second) - if e = v.load(true, false); e != nil { + if e = v.load(true, false, false); e != nil { return e } return nil @@ -73,7 +73,7 @@ func (v *Volume) copyDataAndGenerateIndexFile(dstName, idxName string) (err erro nv, ok := v.nm.Get(n.Id) glog.V(4).Infoln("needle expected offset ", offset, "ok", ok, "nv", nv) if ok && int64(nv.Offset)*NeedlePaddingSize == offset && nv.Size > 0 { - if _, err = nm.Put(n.Id, uint32(new_offset/NeedlePaddingSize), n.Size); err != nil { + if err = nm.Put(n.Id, uint32(new_offset/NeedlePaddingSize), n.Size); err != nil { return fmt.Errorf("cannot put needle: %s", err) } if _, err = n.Append(dst, v.Version()); err != nil { diff --git a/go/util/http_util.go b/go/util/http_util.go index 72cab76e1..52579d746 100644 --- a/go/util/http_util.go +++ b/go/util/http_util.go @@ -26,12 +26,12 @@ func init() { func PostBytes(url string, body []byte) ([]byte, error) { r, err := client.Post(url, "application/octet-stream", bytes.NewReader(body)) if err != nil { - return nil, err + return nil, fmt.Errorf("Post to %s: %v", url, err) } defer r.Body.Close() b, err := ioutil.ReadAll(r.Body) if err != nil { - return nil, err + return nil, fmt.Errorf("Read response body: %v", err) } return b, nil } diff --git a/go/weed/compact.go b/go/weed/compact.go index 71c4ea90f..6ce55a609 100644 --- a/go/weed/compact.go +++ b/go/weed/compact.go @@ -33,7 +33,7 @@ func runCompact(cmd *Command, args []string) bool { } vid := storage.VolumeId(*compactVolumeId) - v, err := storage.NewVolume(*compactVolumePath, *compactVolumeCollection, vid, nil, nil) + v, err := storage.NewVolume(*compactVolumePath, *compactVolumeCollection, vid, false, nil, nil) if err != nil { glog.Fatalf("Load Volume [ERROR] %s\n", err) } diff --git a/go/weed/fix.go b/go/weed/fix.go index b2df07554..d2cd40398 100644 --- a/go/weed/fix.go +++ b/go/weed/fix.go @@ -52,8 +52,8 @@ func runFix(cmd *Command, args []string) bool { }, false, func(n *storage.Needle, offset int64) error { glog.V(2).Infof("key %d offset %d size %d disk_size %d gzip %v", n.Id, offset, n.Size, n.DiskSize(), n.IsGzipped()) if n.Size > 0 { - count, pe := nm.Put(n.Id, uint32(offset/storage.NeedlePaddingSize), n.Size) - glog.V(2).Infof("saved %d with error %v", count, pe) + pe := nm.Put(n.Id, uint32(offset/storage.NeedlePaddingSize), n.Size) + glog.V(2).Infof("saved %d with error %v", n.Size, pe) } else { glog.V(2).Infof("skipping deleted file ...") return nm.Delete(n.Id) diff --git a/go/weed/server.go b/go/weed/server.go index 48612a27b..71346de0a 100644 --- a/go/weed/server.go +++ b/go/weed/server.go @@ -68,6 +68,7 @@ var ( volumeDataFolders = cmdServer.Flag.String("dir", os.TempDir(), "directories to store data files. dir[,dir]...") volumeMaxDataVolumeCounts = cmdServer.Flag.String("volume.max", "7", "maximum numbers of volumes, count[,count]...") volumePulse = cmdServer.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats") + volumeUseLevelDb = cmdServer.Flag.Bool("volume.leveldb", false, "Change to leveldb mode to save memory with reduced performance of read and write.") volumeFixJpgOrientation = cmdServer.Flag.Bool("volume.images.fix.orientation", true, "Adjust jpg orientation when uploading.") isStartingFiler = cmdServer.Flag.Bool("filer", false, "whether to start filer") @@ -235,6 +236,7 @@ func runServer(cmd *Command, args []string) bool { volumeServer := weed_server.NewVolumeServer(volumeMux, publicVolumeMux, *serverIp, *volumePort, *serverPublicUrl, folders, maxCounts, + *volumeUseLevelDb, *serverIp+":"+strconv.Itoa(*masterPort), *volumePulse, *serverDataCenter, *serverRack, serverWhiteList, *volumeFixJpgOrientation, ) diff --git a/go/weed/volume.go b/go/weed/volume.go index aa2643d20..e2c6ebd94 100644 --- a/go/weed/volume.go +++ b/go/weed/volume.go @@ -32,6 +32,7 @@ type VolumeServerOptions struct { dataCenter *string rack *string whiteList []string + useLevelDb *bool fixJpgOrientation *bool } @@ -48,6 +49,7 @@ func init() { v.maxCpu = cmdVolume.Flag.Int("maxCpu", 0, "maximum number of CPUs. 0 means all available CPUs") v.dataCenter = cmdVolume.Flag.String("dataCenter", "", "current volume server's data center name") v.rack = cmdVolume.Flag.String("rack", "", "current volume server's rack name") + v.useLevelDb = cmdVolume.Flag.Bool("leveldb", false, "Change to leveldb mode to save memory with reduced performance of read and write.") v.fixJpgOrientation = cmdVolume.Flag.Bool("images.fix.orientation", true, "Adjust jpg orientation when uploading.") } @@ -116,6 +118,7 @@ func runVolume(cmd *Command, args []string) bool { volumeServer := weed_server.NewVolumeServer(volumeMux, publicVolumeMux, *v.ip, *v.port, *v.publicUrl, v.folders, v.folderMaxLimits, + *v.useLevelDb, *v.master, *v.pulseSeconds, *v.dataCenter, *v.rack, v.whiteList, *v.fixJpgOrientation, diff --git a/go/weed/weed_server/volume_server.go b/go/weed/weed_server/volume_server.go index e3878fac4..d84b39808 100644 --- a/go/weed/weed_server/volume_server.go +++ b/go/weed/weed_server/volume_server.go @@ -20,12 +20,14 @@ type VolumeServer struct { store *storage.Store guard *security.Guard + UseLevelDb bool FixJpgOrientation bool } func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string, port int, publicUrl string, folders []string, maxCounts []int, + useLevelDb bool, masterNode string, pulseSeconds int, dataCenter string, rack string, whiteList []string, @@ -34,10 +36,11 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string, pulseSeconds: pulseSeconds, dataCenter: dataCenter, rack: rack, + UseLevelDb: useLevelDb, FixJpgOrientation: fixJpgOrientation, } vs.SetMasterNode(masterNode) - vs.store = storage.NewStore(port, ip, publicUrl, folders, maxCounts) + vs.store = storage.NewStore(port, ip, publicUrl, folders, maxCounts, vs.UseLevelDb) vs.guard = security.NewGuard(whiteList, "") @@ -47,7 +50,6 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string, adminMux.HandleFunc("/admin/vacuum_volume_check", vs.guard.WhiteList(vs.vacuumVolumeCheckHandler)) adminMux.HandleFunc("/admin/vacuum_volume_compact", vs.guard.WhiteList(vs.vacuumVolumeCompactHandler)) adminMux.HandleFunc("/admin/vacuum_volume_commit", vs.guard.WhiteList(vs.vacuumVolumeCommitHandler)) - adminMux.HandleFunc("/admin/freeze_volume", vs.guard.WhiteList(vs.freezeVolumeHandler)) adminMux.HandleFunc("/admin/delete_collection", vs.guard.WhiteList(vs.deleteCollectionHandler)) adminMux.HandleFunc("/stats/counter", vs.guard.WhiteList(statsCounterHandler)) adminMux.HandleFunc("/stats/memory", vs.guard.WhiteList(statsMemoryHandler)) @@ -66,7 +68,7 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string, vs.store.SetDataCenter(vs.dataCenter) vs.store.SetRack(vs.rack) for { - master, secretKey, err := vs.store.Join() + master, secretKey, err := vs.store.SendHeartbeatToMaster() if err == nil { if !connected { connected = true @@ -75,7 +77,7 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string, glog.V(0).Infoln("Volume Server Connected with master at", master) } } else { - glog.V(4).Infoln("Volume Server Failed to talk with master:", err.Error()) + glog.V(0).Infof("Volume Server Failed to talk with master %s: %v", vs, err) if connected { connected = false } diff --git a/go/weed/weed_server/volume_server_handlers_admin.go b/go/weed/weed_server/volume_server_handlers_admin.go index c84b72db0..0d70a757e 100644 --- a/go/weed/weed_server/volume_server_handlers_admin.go +++ b/go/weed/weed_server/volume_server_handlers_admin.go @@ -17,7 +17,7 @@ func (vs *VolumeServer) statusHandler(w http.ResponseWriter, r *http.Request) { } func (vs *VolumeServer) assignVolumeHandler(w http.ResponseWriter, r *http.Request) { - err := vs.store.AddVolume(r.FormValue("volume"), r.FormValue("collection"), r.FormValue("replication"), r.FormValue("ttl")) + err := vs.store.AddVolume(r.FormValue("volume"), r.FormValue("collection"), vs.UseLevelDb, r.FormValue("replication"), r.FormValue("ttl")) if err == nil { writeJsonQuiet(w, r, http.StatusAccepted, map[string]string{"error": ""}) } else { @@ -40,17 +40,6 @@ func (vs *VolumeServer) deleteCollectionHandler(w http.ResponseWriter, r *http.R glog.V(2).Infoln("deleting collection =", r.FormValue("collection"), ", error =", err) } -func (vs *VolumeServer) freezeVolumeHandler(w http.ResponseWriter, r *http.Request) { - //TODO: notify master that this volume will be read-only - err := vs.store.FreezeVolume(r.FormValue("volume")) - if err == nil { - writeJsonQuiet(w, r, http.StatusOK, map[string]string{"error": ""}) - } else { - writeJsonError(w, r, http.StatusInternalServerError, err) - } - glog.V(2).Infoln("freeze volume =", r.FormValue("volume"), ", error =", err) -} - func (vs *VolumeServer) statsDiskHandler(w http.ResponseWriter, r *http.Request) { m := make(map[string]interface{}) m["Version"] = util.VERSION