Browse Source

refactor memory mapped file into backend storage

pull/1115/head
Chris Lu 5 years ago
parent
commit
85f8649320
  1. 2
      weed/storage/backend/memory_map/memory_map.go
  2. 60
      weed/storage/backend/memory_map/memory_map_backend.go
  3. 60
      weed/storage/needle/needle_read_write.go
  4. 5
      weed/storage/volume_create.go
  5. 5
      weed/storage/volume_create_linux.go
  6. 32
      weed/storage/volume_create_windows.go
  7. 6
      weed/storage/volume_loading.go
  8. 7
      weed/storage/volume_read_write.go
  9. 60
      weed/storage/volume_super_block.go
  10. 5
      weed/storage/volume_vacuum.go

2
weed/storage/backend/memory_map/memory_map.go

@ -21,8 +21,6 @@ type MemoryMap struct {
End_of_file int64
}
var FileMemoryMap = make(map[string]*MemoryMap)
func ReadMemoryMapMaxSizeMb(memoryMapMaxSizeMbString string) (uint32, error) {
if memoryMapMaxSizeMbString == "" {
return 0, nil

60
weed/storage/backend/memory_map/memory_map_backend.go

@ -0,0 +1,60 @@
package memory_map
import (
"os"
"time"
"github.com/chrislusf/seaweedfs/weed/storage/backend"
)
var (
_ backend.DataStorageBackend = &MemoryMappedFile{}
)
type MemoryMappedFile struct {
mm *MemoryMap
}
func NewMemoryMappedFile(f *os.File, memoryMapSizeMB uint32) *MemoryMappedFile {
mmf := &MemoryMappedFile{
mm : new(MemoryMap),
}
mmf.mm.CreateMemoryMap(f, 1024*1024*uint64(memoryMapSizeMB))
return mmf
}
func (mmf *MemoryMappedFile) ReadAt(p []byte, off int64) (n int, err error) {
readBytes, e := mmf.mm.ReadMemory(uint64(off), uint64(len(p)))
if e != nil {
return 0, e
}
// TODO avoid the extra copy
copy(p, readBytes)
return len(readBytes), nil
}
func (mmf *MemoryMappedFile) WriteAt(p []byte, off int64) (n int, err error) {
mmf.mm.WriteMemory(uint64(off), uint64(len(p)), p)
return len(p), nil
}
func (mmf *MemoryMappedFile) Truncate(off int64) error {
return nil
}
func (mmf *MemoryMappedFile) Close() error {
mmf.mm.DeleteFileAndMemoryMap()
return nil
}
func (mmf *MemoryMappedFile) GetStat() (datSize int64, modTime time.Time, err error) {
stat, e := mmf.mm.File.Stat()
if e == nil {
return stat.Size(), stat.ModTime(), nil
}
return 0, time.Time{}, err
}
func (mmf *MemoryMappedFile) String() string {
return mmf.mm.File.Name()
}

60
weed/storage/needle/needle_read_write.go

@ -8,7 +8,6 @@ import (
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/storage/backend"
"github.com/chrislusf/seaweedfs/weed/storage/backend/memory_map"
. "github.com/chrislusf/seaweedfs/weed/storage/types"
"github.com/chrislusf/seaweedfs/weed/util"
)
@ -128,33 +127,24 @@ func (n *Needle) prepareWriteBuffer(version Version) ([]byte, uint32, int64, err
func (n *Needle) Append(w backend.DataStorageBackend, version Version) (offset uint64, size uint32, actualSize int64, err error) {
mMap, exists := memory_map.FileMemoryMap[w.String()]
if !exists {
if end, _, e := w.GetStat(); e == nil {
defer func(w backend.DataStorageBackend, off int64) {
if err != nil {
if te := w.Truncate(end); te != nil {
glog.V(0).Infof("Failed to truncate %s back to %d with error: %v", w.String(), end, te)
}
if end, _, e := w.GetStat(); e == nil {
defer func(w backend.DataStorageBackend, off int64) {
if err != nil {
if te := w.Truncate(end); te != nil {
glog.V(0).Infof("Failed to truncate %s back to %d with error: %v", w.String(), end, te)
}
}(w, end)
offset = uint64(end)
} else {
err = fmt.Errorf("Cannot Read Current Volume Position: %v", e)
return
}
}
}(w, end)
offset = uint64(end)
} else {
offset = uint64(mMap.End_of_file + 1)
err = fmt.Errorf("Cannot Read Current Volume Position: %v", e)
return
}
bytesToWrite, size, actualSize, err := n.prepareWriteBuffer(version)
if err == nil {
if exists {
mMap.WriteMemory(offset, uint64(len(bytesToWrite)), bytesToWrite)
} else {
_, err = w.WriteAt(bytesToWrite, int64(offset))
}
_, err = w.WriteAt(bytesToWrite, int64(offset))
}
return offset, size, actualSize, err
@ -165,14 +155,9 @@ func ReadNeedleBlob(r backend.DataStorageBackend, offset int64, size uint32, ver
dataSize := GetActualSize(size, version)
dataSlice = make([]byte, int(dataSize))
mMap, exists := memory_map.FileMemoryMap[r.String()]
if exists {
dataSlice, err := mMap.ReadMemory(uint64(offset), uint64(dataSize))
return dataSlice, err
} else {
_, err = r.ReadAt(dataSlice, offset)
return dataSlice, err
}
_, err = r.ReadAt(dataSlice, offset)
return dataSlice, err
}
// ReadBytes hydrates the needle from the bytes buffer, with only n.Id is set.
@ -286,19 +271,12 @@ func ReadNeedleHeader(r backend.DataStorageBackend, version Version, offset int6
if version == Version1 || version == Version2 || version == Version3 {
bytes = make([]byte, NeedleHeaderSize)
mMap, exists := memory_map.FileMemoryMap[r.String()]
if exists {
bytes, err = mMap.ReadMemory(uint64(offset), NeedleHeaderSize)
if err != nil {
return nil, bytes, 0, err
}
} else {
var count int
count, err = r.ReadAt(bytes, offset)
if count <= 0 || err != nil {
return nil, bytes, 0, err
}
var count int
count, err = r.ReadAt(bytes, offset)
if count <= 0 || err != nil {
return nil, bytes, 0, err
}
n.ParseNeedleHeader(bytes)
bodyLength = NeedleBodyLength(n.Size, version)
}

5
weed/storage/volume_create.go

@ -6,12 +6,13 @@ import (
"os"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/storage/backend"
)
func createVolumeFile(fileName string, preallocate int64, memoryMapSizeMB uint32) (*os.File, error) {
func createVolumeFile(fileName string, preallocate int64, memoryMapSizeMB uint32) (backend.DataStorageBackend, error) {
file, e := os.OpenFile(fileName, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0644)
if preallocate > 0 {
glog.V(0).Infof("Preallocated disk space for %s is not supported", fileName)
}
return file, e
return backend.NewDiskFile(file), e
}

5
weed/storage/volume_create_linux.go

@ -7,13 +7,14 @@ import (
"syscall"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/storage/backend"
)
func createVolumeFile(fileName string, preallocate int64, memoryMapSizeMB uint32) (*os.File, error) {
func createVolumeFile(fileName string, preallocate int64, memoryMapSizeMB uint32) (backend.DataStorageBackend, error) {
file, e := os.OpenFile(fileName, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0644)
if preallocate != 0 {
syscall.Fallocate(int(file.Fd()), 1, 0, preallocate)
glog.V(0).Infof("Preallocated %d bytes disk space for %s", preallocate, fileName)
}
return file, e
return backend.NewDiskFile(file), e
}

32
weed/storage/volume_create_windows.go

@ -3,36 +3,26 @@
package storage
import (
"os"
"github.com/chrislusf/seaweedfs/weed/storage/backend/memory_map"
"golang.org/x/sys/windows"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/storage/backend"
"github.com/chrislusf/seaweedfs/weed/storage/backend/memory_map/os_overloads"
)
func createVolumeFile(fileName string, preallocate int64, memoryMapSizeMB uint32) (*os.File, error) {
mMap, exists := memory_map.FileMemoryMap[fileName]
if !exists {
if preallocate > 0 {
glog.V(0).Infof("Preallocated disk space for %s is not supported", fileName)
}
func createVolumeFile(fileName string, preallocate int64, memoryMapSizeMB uint32) (backend.DataStorageBackend, error) {
if memoryMapSizeMB > 0 {
file, e := os_overloads.OpenFile(fileName, windows.O_RDWR|windows.O_CREAT, 0644, true)
memory_map.FileMemoryMap[fileName] = new(memory_map.MemoryMap)
if preallocate > 0 {
glog.V(0).Infof("Preallocated disk space for %s is not supported", fileName)
}
new_mMap := memory_map.FileMemoryMap[fileName]
new_mMap.CreateMemoryMap(file, 1024*1024*uint64(memoryMapSizeMB))
return file, e
} else {
file, e := os_overloads.OpenFile(fileName, windows.O_RDWR|windows.O_CREAT|windows.O_TRUNC, 0644, false)
return file, e
}
if memoryMapSizeMB > 0 {
file, e := os_overloads.OpenFile(fileName, windows.O_RDWR|windows.O_CREAT, 0644, true)
return memory_map.NewMemoryMappedFile(file, memoryMapSizeMB), e
} else {
return mMap.File, nil
file, e := os_overloads.OpenFile(fileName, windows.O_RDWR|windows.O_CREAT|windows.O_TRUNC, 0644, false)
return backend.NewDiskFile(file), e
}
}

6
weed/storage/volume_loading.go

@ -25,13 +25,13 @@ func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, needleMapKind
var e error
fileName := v.FileName()
alreadyHasSuperBlock := false
var dataFile *os.File
// open dat file
if exists, canRead, canWrite, modifiedTime, fileSize := checkFile(fileName + ".dat"); exists {
if !canRead {
return fmt.Errorf("cannot read Volume Data file %s.dat", fileName)
}
var dataFile *os.File
if canWrite {
dataFile, e = os.OpenFile(fileName+".dat", os.O_RDWR|os.O_CREATE, 0644)
} else {
@ -43,14 +43,14 @@ func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, needleMapKind
if fileSize >= _SuperBlockSize {
alreadyHasSuperBlock = true
}
v.DataBackend = backend.NewDiskFile(dataFile)
} else {
if createDatIfMissing {
dataFile, e = createVolumeFile(fileName+".dat", preallocate, v.MemoryMapMaxSizeMb)
v.DataBackend, e = createVolumeFile(fileName+".dat", preallocate, v.MemoryMapMaxSizeMb)
} else {
return fmt.Errorf("Volume Data file %s.dat does not exist.", fileName)
}
}
v.DataBackend = backend.NewDiskFile(dataFile)
if e != nil {
if !os.IsPermission(e) {

7
weed/storage/volume_read_write.go

@ -10,7 +10,6 @@ import (
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/storage/backend"
"github.com/chrislusf/seaweedfs/weed/storage/backend/memory_map"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
. "github.com/chrislusf/seaweedfs/weed/storage/types"
)
@ -46,12 +45,6 @@ func (v *Volume) Destroy() (err error) {
err = fmt.Errorf("volume %d is compacting", v.Id)
return
}
mMap, exists := memory_map.FileMemoryMap[v.DataBackend.String()]
if exists {
mMap.DeleteFileAndMemoryMap()
delete(memory_map.FileMemoryMap, v.DataBackend.String())
}
v.Close()
os.Remove(v.FileName() + ".dat")
os.Remove(v.FileName() + ".idx")

60
weed/storage/volume_super_block.go

@ -4,11 +4,9 @@ import (
"fmt"
"os"
"github.com/chrislusf/seaweedfs/weed/storage/backend"
"github.com/chrislusf/seaweedfs/weed/storage/backend/memory_map"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
"github.com/chrislusf/seaweedfs/weed/storage/backend"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
"github.com/chrislusf/seaweedfs/weed/util"
"github.com/golang/protobuf/proto"
@ -78,35 +76,26 @@ func (s *SuperBlock) Initialized() bool {
func (v *Volume) maybeWriteSuperBlock() error {
mMap, exists := memory_map.FileMemoryMap[v.DataBackend.String()]
if exists {
if mMap.End_of_file == -1 {
v.SuperBlock.version = needle.CurrentVersion
mMap.WriteMemory(0, uint64(len(v.SuperBlock.Bytes())), v.SuperBlock.Bytes())
}
return nil
} else {
datSize, _, e := v.DataBackend.GetStat()
if e != nil {
glog.V(0).Infof("failed to stat datafile %s: %v", v.DataBackend.String(), e)
return e
}
if datSize == 0 {
v.SuperBlock.version = needle.CurrentVersion
_, e = v.DataBackend.WriteAt(v.SuperBlock.Bytes(), 0)
if e != nil && os.IsPermission(e) {
//read-only, but zero length - recreate it!
var dataFile *os.File
if dataFile, e = os.Create(v.DataBackend.String()); e == nil {
v.DataBackend = backend.NewDiskFile(dataFile)
if _, e = v.DataBackend.WriteAt(v.SuperBlock.Bytes(), 0); e == nil {
v.readOnly = false
}
datSize, _, e := v.DataBackend.GetStat()
if e != nil {
glog.V(0).Infof("failed to stat datafile %s: %v", v.DataBackend.String(), e)
return e
}
if datSize == 0 {
v.SuperBlock.version = needle.CurrentVersion
_, e = v.DataBackend.WriteAt(v.SuperBlock.Bytes(), 0)
if e != nil && os.IsPermission(e) {
//read-only, but zero length - recreate it!
var dataFile *os.File
if dataFile, e = os.Create(v.DataBackend.String()); e == nil {
v.DataBackend = backend.NewDiskFile(dataFile)
if _, e = v.DataBackend.WriteAt(v.SuperBlock.Bytes(), 0); e == nil {
v.readOnly = false
}
}
}
return e
}
return e
}
func (v *Volume) readSuperBlock() (err error) {
@ -118,18 +107,9 @@ func (v *Volume) readSuperBlock() (err error) {
func ReadSuperBlock(datBackend backend.DataStorageBackend) (superBlock SuperBlock, err error) {
header := make([]byte, _SuperBlockSize)
mMap, exists := memory_map.FileMemoryMap[datBackend.String()]
if exists {
header, err = mMap.ReadMemory(0, _SuperBlockSize)
if err != nil {
err = fmt.Errorf("cannot read volume %s super block: %v", datBackend.String(), err)
return
}
} else {
if _, e := datBackend.ReadAt(header, 0); e != nil {
err = fmt.Errorf("cannot read volume %s super block: %v", datBackend.String(), e)
return
}
if _, e := datBackend.ReadAt(header, 0); e != nil {
err = fmt.Errorf("cannot read volume %s super block: %v", datBackend.String(), e)
return
}
superBlock.version = needle.Version(header[0])

5
weed/storage/volume_vacuum.go

@ -312,7 +312,8 @@ func (scanner *VolumeFileScanner4Vacuum) VisitNeedle(n *needle.Needle, offset in
func (v *Volume) copyDataAndGenerateIndexFile(dstName, idxName string, preallocate int64, compactionBytePerSecond int64) (err error) {
var (
dst, idx *os.File
dst backend.DataStorageBackend
idx *os.File
)
if dst, err = createVolumeFile(dstName, preallocate, 0); err != nil {
return
@ -328,7 +329,7 @@ func (v *Volume) copyDataAndGenerateIndexFile(dstName, idxName string, prealloca
v: v,
now: uint64(time.Now().Unix()),
nm: NewBtreeNeedleMap(idx),
dstBackend: backend.NewDiskFile(dst),
dstBackend: dst,
writeThrottler: util.NewWriteThrottler(compactionBytePerSecond),
}
err = ScanVolumeFile(v.dir, v.Collection, v.Id, v.needleMapKind, scanner)

Loading…
Cancel
Save