Browse Source

refactoring needle mapper interface to separate index file storage logic

out
pull/2/head
Chris Lu 12 years ago
parent
commit
fb635146a1
  1. 2
      go/replication/store_replicate.go
  2. 67
      go/storage/needle_map.go
  3. 22
      go/storage/store.go
  4. 14
      go/storage/volume.go

2
go/replication/store_replicate.go

@ -36,7 +36,7 @@ func ReplicatedWrite(masterNode string, s *storage.Store, volumeId storage.Volum
if errorStatus != "" { if errorStatus != "" {
if _, err = s.Delete(volumeId, needle); err != nil { if _, err = s.Delete(volumeId, needle); err != nil {
errorStatus += "\nCannot delete " + strconv.FormatUint(needle.Id, 10) + " from " + errorStatus += "\nCannot delete " + strconv.FormatUint(needle.Id, 10) + " from " +
strconv.FormatUint(uint64(volumeId), 10) + ": " + err.Error()
volumeId.String() + ": " + err.Error()
} else { } else {
distributedOperation(masterNode, s, volumeId, func(location operation.Location) bool { distributedOperation(masterNode, s, volumeId, func(location operation.Location) bool {
return nil == operation.Delete("http://"+location.Url+r.URL.Path+"?type=standard") return nil == operation.Delete("http://"+location.Url+r.URL.Path+"?type=standard")

67
go/storage/needle_map.go

@ -1,12 +1,32 @@
package storage package storage
import ( import (
"bufio"
"code.google.com/p/weed-fs/go/util" "code.google.com/p/weed-fs/go/util"
"fmt" "fmt"
"io" "io"
"os" "os"
) )
type NeedleMapper interface {
Put(key uint64, offset uint32, size uint32) (int, error)
Get(key uint64) (element *NeedleValue, ok bool)
Delete(key uint64) error
Close()
ContentSize() uint64
DeletedSize() uint64
FileCount() int
DeletedCount() int
Visit(visit func(NeedleValue) error) (err error)
}
type mapMetric struct {
DeletionCounter int `json:"DeletionCounter"`
FileCounter int `json:"FileCounter"`
DeletionByteCounter uint64 `json:"DeletionByteCounter"`
FileByteCounter uint64 `json:"FileByteCounter"`
}
type NeedleMap struct { type NeedleMap struct {
indexFile *os.File indexFile *os.File
m CompactMap m CompactMap
@ -14,10 +34,7 @@ type NeedleMap struct {
//transient //transient
bytes []byte bytes []byte
deletionCounter int
fileCounter int
deletionByteCounter uint64
fileByteCounter uint64
mapMetric
} }
func NewNeedleMap(file *os.File) *NeedleMap { func NewNeedleMap(file *os.File) *NeedleMap {
@ -35,31 +52,32 @@ const (
func LoadNeedleMap(file *os.File) (*NeedleMap, error) { func LoadNeedleMap(file *os.File) (*NeedleMap, error) {
nm := NewNeedleMap(file) nm := NewNeedleMap(file)
bufferReader := bufio.NewReaderSize(nm.indexFile, 1024*1024)
bytes := make([]byte, 16*RowsToRead) bytes := make([]byte, 16*RowsToRead)
count, e := nm.indexFile.Read(bytes)
count, e := bufferReader.Read(bytes)
for count > 0 && e == nil { for count > 0 && e == nil {
for i := 0; i < count; i += 16 { for i := 0; i < count; i += 16 {
key := util.BytesToUint64(bytes[i : i+8]) key := util.BytesToUint64(bytes[i : i+8])
offset := util.BytesToUint32(bytes[i+8 : i+12]) offset := util.BytesToUint32(bytes[i+8 : i+12])
size := util.BytesToUint32(bytes[i+12 : i+16]) size := util.BytesToUint32(bytes[i+12 : i+16])
nm.fileCounter++
nm.fileByteCounter = nm.fileByteCounter + uint64(size)
nm.FileCounter++
nm.FileByteCounter = nm.FileByteCounter + uint64(size)
if offset > 0 { if offset > 0 {
oldSize := nm.m.Set(Key(key), offset, size) oldSize := nm.m.Set(Key(key), offset, size)
//log.Println("reading key", key, "offset", offset, "size", size, "oldSize", oldSize) //log.Println("reading key", key, "offset", offset, "size", size, "oldSize", oldSize)
if oldSize > 0 { if oldSize > 0 {
nm.deletionCounter++
nm.deletionByteCounter = nm.deletionByteCounter + uint64(oldSize)
nm.DeletionCounter++
nm.DeletionByteCounter = nm.DeletionByteCounter + uint64(oldSize)
} }
} else { } else {
oldSize := nm.m.Delete(Key(key)) oldSize := nm.m.Delete(Key(key))
//log.Println("removing key", key, "offset", offset, "size", size, "oldSize", oldSize) //log.Println("removing key", key, "offset", offset, "size", size, "oldSize", oldSize)
nm.deletionCounter++
nm.deletionByteCounter = nm.deletionByteCounter + uint64(oldSize)
nm.DeletionCounter++
nm.DeletionByteCounter = nm.DeletionByteCounter + uint64(oldSize)
} }
} }
count, e = nm.indexFile.Read(bytes)
count, e = bufferReader.Read(bytes)
} }
if e == io.EOF { if e == io.EOF {
e = nil e = nil
@ -72,11 +90,11 @@ func (nm *NeedleMap) Put(key uint64, offset uint32, size uint32) (int, error) {
util.Uint64toBytes(nm.bytes[0:8], key) util.Uint64toBytes(nm.bytes[0:8], key)
util.Uint32toBytes(nm.bytes[8:12], offset) util.Uint32toBytes(nm.bytes[8:12], offset)
util.Uint32toBytes(nm.bytes[12:16], size) util.Uint32toBytes(nm.bytes[12:16], size)
nm.fileCounter++
nm.fileByteCounter = nm.fileByteCounter + uint64(size)
nm.FileCounter++
nm.FileByteCounter = nm.FileByteCounter + uint64(size)
if oldSize > 0 { if oldSize > 0 {
nm.deletionCounter++
nm.deletionByteCounter = nm.deletionByteCounter + uint64(oldSize)
nm.DeletionCounter++
nm.DeletionByteCounter = nm.DeletionByteCounter + uint64(oldSize)
} }
return nm.indexFile.Write(nm.bytes) return nm.indexFile.Write(nm.bytes)
} }
@ -85,7 +103,7 @@ func (nm *NeedleMap) Get(key uint64) (element *NeedleValue, ok bool) {
return return
} }
func (nm *NeedleMap) Delete(key uint64) error { func (nm *NeedleMap) Delete(key uint64) error {
nm.deletionByteCounter = nm.deletionByteCounter + uint64(nm.m.Delete(Key(key)))
nm.DeletionByteCounter = nm.DeletionByteCounter + uint64(nm.m.Delete(Key(key)))
offset, err := nm.indexFile.Seek(0, 1) offset, err := nm.indexFile.Seek(0, 1)
if err != nil { if err != nil {
return fmt.Errorf("cannot get position of indexfile: %s", err) return fmt.Errorf("cannot get position of indexfile: %s", err)
@ -100,14 +118,23 @@ func (nm *NeedleMap) Delete(key uint64) error {
} }
return fmt.Errorf("error writing to indexfile %s: %s%s", nm.indexFile, err, plus) return fmt.Errorf("error writing to indexfile %s: %s%s", nm.indexFile, err, plus)
} }
nm.deletionCounter++
nm.DeletionCounter++
return nil return nil
} }
func (nm *NeedleMap) Close() { func (nm *NeedleMap) Close() {
_ = nm.indexFile.Close() _ = nm.indexFile.Close()
} }
func (nm *NeedleMap) ContentSize() uint64 {
return nm.fileByteCounter
func (nm NeedleMap) ContentSize() uint64 {
return nm.FileByteCounter
}
func (nm NeedleMap) DeletedSize() uint64 {
return nm.DeletionByteCounter
}
func (nm NeedleMap) FileCount() int {
return nm.FileCounter
}
func (nm NeedleMap) DeletedCount() int {
return nm.DeletionCounter
} }
func (nm *NeedleMap) Visit(visit func(NeedleValue) error) (err error) { func (nm *NeedleMap) Visit(visit func(NeedleValue) error) (err error) {
return nm.m.Visit(visit) return nm.m.Visit(visit)

22
go/storage/store.go

@ -121,9 +121,11 @@ func (s *Store) Status() []*VolumeInfo {
var stats []*VolumeInfo var stats []*VolumeInfo
for k, v := range s.volumes { for k, v := range s.volumes {
s := &VolumeInfo{Id: VolumeId(k), Size: v.ContentSize(), s := &VolumeInfo{Id: VolumeId(k), Size: v.ContentSize(),
RepType: v.ReplicaType, Version: v.Version(), FileCount: v.nm.fileCounter,
DeleteCount: v.nm.deletionCounter, DeletedByteCount: v.nm.deletionByteCounter,
ReadOnly: v.readOnly}
RepType: v.ReplicaType, Version: v.Version(),
FileCount: v.nm.FileCount(),
DeleteCount: v.nm.DeletedCount(),
DeletedByteCount: v.nm.DeletedSize(),
ReadOnly: v.readOnly}
stats = append(stats, s) stats = append(stats, s)
} }
return stats return stats
@ -140,9 +142,11 @@ func (s *Store) Join() error {
stats := new([]*VolumeInfo) stats := new([]*VolumeInfo)
for k, v := range s.volumes { for k, v := range s.volumes {
s := &VolumeInfo{Id: VolumeId(k), Size: uint64(v.Size()), s := &VolumeInfo{Id: VolumeId(k), Size: uint64(v.Size()),
RepType: v.ReplicaType, Version: v.Version(), FileCount: v.nm.fileCounter,
DeleteCount: v.nm.deletionCounter, DeletedByteCount: v.nm.deletionByteCounter,
ReadOnly: v.readOnly}
RepType: v.ReplicaType, Version: v.Version(),
FileCount: v.nm.FileCount(),
DeleteCount: v.nm.DeletedCount(),
DeletedByteCount: v.nm.DeletedSize(),
ReadOnly: v.readOnly}
*stats = append(*stats, s) *stats = append(*stats, s)
} }
bytes, _ := json.Marshal(stats) bytes, _ := json.Marshal(stats)
@ -175,8 +179,8 @@ func (s *Store) Close() {
func (s *Store) Write(i VolumeId, n *Needle) (size uint32, err error) { func (s *Store) Write(i VolumeId, n *Needle) (size uint32, err error) {
if v := s.volumes[i]; v != nil { if v := s.volumes[i]; v != nil {
if v.readOnly { if v.readOnly {
err = errors.New("Volume " + i.String() + " is read only!")
return
err = errors.New("Volume " + i.String() + " is read only!")
return
} else { } else {
size, err = v.write(n) size, err = v.write(n)
if err != nil && s.volumeSizeLimit < v.ContentSize()+uint64(size) && s.volumeSizeLimit >= v.ContentSize() { if err != nil && s.volumeSizeLimit < v.ContentSize()+uint64(size) && s.volumeSizeLimit >= v.ContentSize() {
@ -189,7 +193,7 @@ func (s *Store) Write(i VolumeId, n *Needle) (size uint32, err error) {
return return
} }
log.Println("volume", i, "not found!") log.Println("volume", i, "not found!")
err = errors.New("Volume " + i.String() + " not found!")
err = errors.New("Volume " + i.String() + " not found!")
return return
} }
func (s *Store) Delete(i VolumeId, n *Needle) (uint32, error) { func (s *Store) Delete(i VolumeId, n *Needle) (uint32, error) {

14
go/storage/volume.go

@ -30,7 +30,7 @@ type Volume struct {
Id VolumeId Id VolumeId
dir string dir string
dataFile *os.File dataFile *os.File
nm *NeedleMap
nm NeedleMapper
readOnly bool readOnly bool
SuperBlock SuperBlock
@ -70,10 +70,10 @@ func (v *Volume) load(alsoLoadIndex bool) error {
e = v.maybeWriteSuperBlock() e = v.maybeWriteSuperBlock()
} }
if e == nil && alsoLoadIndex { if e == nil && alsoLoadIndex {
indexFile, ie := os.OpenFile(fileName+".idx", os.O_RDWR|os.O_CREATE, 0644)
if ie != nil {
return fmt.Errorf("cannot create Volume Data %s.dat: %s", fileName, e)
}
indexFile, ie := os.OpenFile(fileName+".idx", os.O_RDWR|os.O_CREATE, 0644)
if ie != nil {
return fmt.Errorf("cannot create Volume Data %s.dat: %s", fileName, e)
}
v.nm, e = LoadNeedleMap(indexFile) v.nm, e = LoadNeedleMap(indexFile)
} }
return e return e
@ -198,7 +198,7 @@ func (v *Volume) read(n *Needle) (int, error) {
} }
func (v *Volume) garbageLevel() float64 { func (v *Volume) garbageLevel() float64 {
return float64(v.nm.deletionByteCounter) / float64(v.ContentSize())
return float64(v.nm.DeletedSize()) / float64(v.ContentSize())
} }
func (v *Volume) compact() error { func (v *Volume) compact() error {
@ -305,5 +305,5 @@ func (v *Volume) copyDataAndGenerateIndexFile(dstName, idxName string) (err erro
return return
} }
func (v *Volume) ContentSize() uint64 { func (v *Volume) ContentSize() uint64 {
return v.nm.fileByteCounter
return v.nm.ContentSize()
} }
Loading…
Cancel
Save