Browse Source

mount: add on disk caching

pull/1273/head
Chris Lu 5 years ago
parent
commit
df97da25f9
  1. 10
      weed/command/mount.go
  2. 6
      weed/command/mount_std.go
  3. 7
      weed/command/webdav.go
  4. 31
      weed/filesys/wfs.go
  5. 9
      weed/server/webdav_server.go
  6. 111
      weed/util/chunk_cache/chunk_cache.go
  7. 36
      weed/util/chunk_cache/chunk_cache_in_memory.go
  8. 145
      weed/util/chunk_cache/chunk_cache_on_disk.go
  9. 58
      weed/util/chunk_cache/chunk_cache_on_disk_test.go

10
weed/command/mount.go

@ -1,5 +1,9 @@
package command
import (
"os"
)
type MountOptions struct {
filer *string
filerMountRootPath *string
@ -9,7 +13,8 @@ type MountOptions struct {
replication *string
ttlSec *int
chunkSizeLimitMB *int
chunkCacheCountLimit *int64
cacheDir *string
cacheSizeMB *int64
dataCenter *string
allowOthers *bool
umaskString *string
@ -33,7 +38,8 @@ func init() {
mountOptions.replication = cmdMount.Flag.String("replication", "", "replication(e.g. 000, 001) to create to files. If empty, let filer decide.")
mountOptions.ttlSec = cmdMount.Flag.Int("ttl", 0, "file ttl in seconds")
mountOptions.chunkSizeLimitMB = cmdMount.Flag.Int("chunkSizeLimitMB", 4, "local write buffer size, also chunk large files")
mountOptions.chunkCacheCountLimit = cmdMount.Flag.Int64("chunkCacheCountLimit", 1000, "number of file chunks to cache in memory")
mountOptions.cacheDir = cmdMount.Flag.String("cacheDir", os.TempDir(), "local cache directory for file chunks")
mountOptions.cacheSizeMB = cmdMount.Flag.Int64("cacheCapacityMB", 1000, "local cache capacity in MB")
mountOptions.dataCenter = cmdMount.Flag.String("dataCenter", "", "prefer to write to the data center")
mountOptions.allowOthers = cmdMount.Flag.Bool("allowOthers", true, "allows other users to access the file system")
mountOptions.umaskString = cmdMount.Flag.String("umask", "022", "octal umask, e.g., 022, 0111")

6
weed/command/mount_std.go

@ -129,7 +129,6 @@ func RunMount(option *MountOptions, umask os.FileMode) bool {
}
options = append(options, osSpecificMountOptions()...)
if *option.allowOthers {
options = append(options, fuse.AllowOther())
}
@ -137,12 +136,12 @@ func RunMount(option *MountOptions, umask os.FileMode) bool {
options = append(options, fuse.AllowNonEmptyMount())
}
// mount
c, err := fuse.Mount(dir, options...)
if err != nil {
glog.V(0).Infof("mount: %v", err)
return true
}
defer fuse.Unmount(dir)
util.OnInterrupt(func() {
@ -164,7 +163,8 @@ func RunMount(option *MountOptions, umask os.FileMode) bool {
Replication: *option.replication,
TtlSec: int32(*option.ttlSec),
ChunkSizeLimit: int64(chunkSizeLimitMB) * 1024 * 1024,
ChunkCacheCountLimit: *option.chunkCacheCountLimit,
CacheDir: *option.cacheDir,
CacheSizeMB: *option.cacheSizeMB,
DataCenter: *option.dataCenter,
DirListCacheLimit: *option.dirListCacheLimit,
EntryCacheTtl: 3 * time.Second,

7
weed/command/webdav.go

@ -4,6 +4,7 @@ import (
"context"
"fmt"
"net/http"
"os"
"os/user"
"strconv"
"time"
@ -26,6 +27,8 @@ type WebDavOption struct {
collection *string
tlsPrivateKey *string
tlsCertificate *string
cacheDir *string
cacheSizeMB *int64
}
func init() {
@ -35,6 +38,8 @@ func init() {
webDavStandaloneOptions.collection = cmdWebDav.Flag.String("collection", "", "collection to create the files")
webDavStandaloneOptions.tlsPrivateKey = cmdWebDav.Flag.String("key.file", "", "path to the TLS private key file")
webDavStandaloneOptions.tlsCertificate = cmdWebDav.Flag.String("cert.file", "", "path to the TLS certificate file")
webDavStandaloneOptions.cacheDir = cmdWebDav.Flag.String("cacheDir", os.TempDir(), "local cache directory for file chunks")
webDavStandaloneOptions.cacheSizeMB = cmdWebDav.Flag.Int64("cacheCapacityMB", 1000, "local cache capacity in MB")
}
var cmdWebDav = &Command{
@ -105,6 +110,8 @@ func (wo *WebDavOption) startWebDav() bool {
Uid: uid,
Gid: gid,
Cipher: cipher,
CacheDir: *wo.cacheDir,
CacheSizeMB: *wo.cacheSizeMB,
})
if webdavServer_err != nil {
glog.Fatalf("WebDav Server startup error: %v", webdavServer_err)

31
weed/filesys/wfs.go

@ -22,18 +22,19 @@ import (
)
type Option struct {
FilerGrpcAddress string
GrpcDialOption grpc.DialOption
FilerMountRootPath string
Collection string
Replication string
TtlSec int32
ChunkSizeLimit int64
ChunkCacheCountLimit int64
DataCenter string
DirListCacheLimit int64
EntryCacheTtl time.Duration
Umask os.FileMode
FilerGrpcAddress string
GrpcDialOption grpc.DialOption
FilerMountRootPath string
Collection string
Replication string
TtlSec int32
ChunkSizeLimit int64
CacheDir string
CacheSizeMB int64
DataCenter string
DirListCacheLimit int64
EntryCacheTtl time.Duration
Umask os.FileMode
MountUid uint32
MountGid uint32
@ -72,6 +73,10 @@ type statsCache struct {
}
func NewSeaweedFileSystem(option *Option) *WFS {
chunkCache := chunk_cache.NewChunkCache(256, option.CacheDir, option.CacheSizeMB, 4)
util.OnInterrupt(func() {
chunkCache.Shutdown()
})
wfs := &WFS{
option: option,
listDirectoryEntriesCache: ccache.New(ccache.Configure().MaxSize(option.DirListCacheLimit * 3).ItemsToPrune(100)),
@ -81,7 +86,7 @@ func NewSeaweedFileSystem(option *Option) *WFS {
return make([]byte, option.ChunkSizeLimit)
},
},
chunkCache: chunk_cache.NewChunkCache(option.ChunkCacheCountLimit),
chunkCache: chunkCache,
}
wfs.root = &Dir{name: wfs.option.FilerMountRootPath, wfs: wfs}

9
weed/server/webdav_server.go

@ -34,6 +34,8 @@ type WebDavOption struct {
Uid uint32
Gid uint32
Cipher bool
CacheDir string
CacheSizeMB int64
}
type WebDavServer struct {
@ -96,9 +98,14 @@ type WebDavFile struct {
}
func NewWebDavFileSystem(option *WebDavOption) (webdav.FileSystem, error) {
chunkCache := chunk_cache.NewChunkCache(256, option.CacheDir, option.CacheSizeMB, 4)
util.OnInterrupt(func() {
chunkCache.Shutdown()
})
return &WebDavFileSystem{
option: option,
chunkCache: chunk_cache.NewChunkCache(1000),
chunkCache: chunkCache,
}, nil
}

111
weed/util/chunk_cache/chunk_cache.go

@ -1,36 +1,115 @@
package chunk_cache
import (
"time"
"fmt"
"path"
"sort"
"sync"
"github.com/karlseguin/ccache"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/storage"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
)
// a global cache for recently accessed file chunks
type ChunkCache struct {
cache *ccache.Cache
memCache *ChunkCacheInMemory
diskCaches []*ChunkCacheVolume
sync.RWMutex
}
func NewChunkCache(maxEntries int64) *ChunkCache {
pruneCount := maxEntries >> 3
if pruneCount <= 0 {
pruneCount = 500
func NewChunkCache(maxEntries int64, dir string, diskSizeMB int64, segmentCount int) *ChunkCache {
c := &ChunkCache{
memCache: NewChunkCacheInMemory(maxEntries),
}
return &ChunkCache{
cache: ccache.New(ccache.Configure().MaxSize(maxEntries).ItemsToPrune(uint32(pruneCount))),
volumeCount, volumeSize := int(diskSizeMB/30000), int64(30000)
if volumeCount < segmentCount {
volumeCount, volumeSize = segmentCount, diskSizeMB/int64(segmentCount)
}
for i := 0; i < volumeCount; i++ {
fileName := path.Join(dir, fmt.Sprintf("cache_%d", i))
diskCache, err := LoadOrCreateChunkCacheVolume(fileName, volumeSize*1024*1024)
if err != nil {
glog.Errorf("failed to add cache %s : %v", fileName, err)
} else {
c.diskCaches = append(c.diskCaches, diskCache)
}
}
// keep newest cache to the front
sort.Slice(c.diskCaches, func(i, j int) bool {
return c.diskCaches[i].lastModTime.After(c.diskCaches[j].lastModTime)
})
return c
}
func (c *ChunkCache) GetChunk(fileId string) []byte {
item := c.cache.Get(fileId)
if item == nil {
func (c *ChunkCache) GetChunk(fileId string) (data []byte) {
c.RLock()
defer c.RUnlock()
if data = c.memCache.GetChunk(fileId); data != nil {
return data
}
fid, err := needle.ParseFileIdFromString(fileId)
if err != nil {
glog.Errorf("failed to parse file id %s", fileId)
return nil
}
data := item.Value().([]byte)
item.Extend(time.Hour)
return data
for _, diskCache := range c.diskCaches {
data, err = diskCache.GetNeedle(fid.Key)
if err == storage.ErrorNotFound {
continue
}
if err != nil {
glog.Errorf("failed to read cache file %s id %s", diskCache.fileName, fileId)
continue
}
if len(data) != 0 {
return
}
}
return nil
}
func (c *ChunkCache) SetChunk(fileId string, data []byte) {
c.cache.Set(fileId, data, time.Hour)
c.Lock()
defer c.Unlock()
c.memCache.SetChunk(fileId, data)
if len(c.diskCaches) == 0 {
return
}
if c.diskCaches[0].fileSize+int64(len(data)) > c.diskCaches[0].sizeLimit {
t, resetErr := c.diskCaches[len(c.diskCaches)-1].Reset()
if resetErr != nil {
glog.Errorf("failed to reset cache file %s", c.diskCaches[len(c.diskCaches)-1].fileName)
return
}
for i := len(c.diskCaches) - 1; i > 0; i-- {
c.diskCaches[i] = c.diskCaches[i-1]
}
c.diskCaches[0] = t
}
fid, err := needle.ParseFileIdFromString(fileId)
if err != nil {
glog.Errorf("failed to parse file id %s", fileId)
return
}
c.diskCaches[0].WriteNeedle(fid.Key, data)
}
func (c *ChunkCache) Shutdown() {
c.Lock()
defer c.Unlock()
for _, diskCache := range c.diskCaches {
diskCache.Shutdown()
}
}

36
weed/util/chunk_cache/chunk_cache_in_memory.go

@ -0,0 +1,36 @@
package chunk_cache
import (
"time"
"github.com/karlseguin/ccache"
)
// a global cache for recently accessed file chunks
type ChunkCacheInMemory struct {
cache *ccache.Cache
}
func NewChunkCacheInMemory(maxEntries int64) *ChunkCacheInMemory {
pruneCount := maxEntries >> 3
if pruneCount <= 0 {
pruneCount = 500
}
return &ChunkCacheInMemory{
cache: ccache.New(ccache.Configure().MaxSize(maxEntries).ItemsToPrune(uint32(pruneCount))),
}
}
func (c *ChunkCacheInMemory) GetChunk(fileId string) []byte {
item := c.cache.Get(fileId)
if item == nil {
return nil
}
data := item.Value().([]byte)
item.Extend(time.Hour)
return data
}
func (c *ChunkCacheInMemory) SetChunk(fileId string, data []byte) {
c.cache.Set(fileId, data, time.Hour)
}

145
weed/util/chunk_cache/chunk_cache_on_disk.go

@ -0,0 +1,145 @@
package chunk_cache
import (
"fmt"
"os"
"time"
"github.com/syndtr/goleveldb/leveldb/opt"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/storage"
"github.com/chrislusf/seaweedfs/weed/storage/backend"
"github.com/chrislusf/seaweedfs/weed/storage/types"
"github.com/chrislusf/seaweedfs/weed/util"
)
// This implements an on disk cache
// The entries are an FIFO with a size limit
type ChunkCacheVolume struct {
DataBackend backend.BackendStorageFile
nm storage.NeedleMapper
fileName string
smallBuffer []byte
sizeLimit int64
lastModTime time.Time
fileSize int64
}
func LoadOrCreateChunkCacheVolume(fileName string, preallocate int64) (*ChunkCacheVolume, error) {
v := &ChunkCacheVolume{
smallBuffer: make([]byte, types.NeedlePaddingSize),
fileName: fileName,
sizeLimit: preallocate,
}
var err error
if exists, canRead, canWrite, modTime, fileSize := util.CheckFile(v.fileName + ".dat"); exists {
if !canRead {
return nil, fmt.Errorf("cannot read cache file %s.dat", v.fileName)
}
if !canWrite {
return nil, fmt.Errorf("cannot write cache file %s.dat", v.fileName)
}
if dataFile, err := os.OpenFile(v.fileName+".dat", os.O_RDWR|os.O_CREATE, 0644); err != nil {
return nil, fmt.Errorf("cannot create cache file %s.dat: %v", v.fileName, err)
} else {
v.DataBackend = backend.NewDiskFile(dataFile)
v.lastModTime = modTime
v.fileSize = fileSize
}
} else {
if v.DataBackend, err = backend.CreateVolumeFile(v.fileName+".dat", preallocate, 0); err != nil {
return nil, fmt.Errorf("cannot create cache file %s.dat: %v", v.fileName, err)
}
v.lastModTime = time.Now()
}
var indexFile *os.File
if indexFile, err = os.OpenFile(v.fileName+".idx", os.O_RDWR|os.O_CREATE, 0644); err != nil {
return nil, fmt.Errorf("cannot write cache index %s.idx: %v", v.fileName, err)
}
glog.V(0).Infoln("loading leveldb", v.fileName+".ldb")
opts := &opt.Options{
BlockCacheCapacity: 2 * 1024 * 1024, // default value is 8MiB
WriteBuffer: 1 * 1024 * 1024, // default value is 4MiB
CompactionTableSizeMultiplier: 10, // default value is 1
}
if v.nm, err = storage.NewLevelDbNeedleMap(v.fileName+".ldb", indexFile, opts); err != nil {
return nil, fmt.Errorf("loading leveldb %s error: %v", v.fileName+".ldb", err)
}
return v, nil
}
func (v *ChunkCacheVolume) Shutdown() {
if v.DataBackend != nil {
v.DataBackend.Close()
v.DataBackend = nil
}
if v.nm != nil {
v.nm.Close()
v.nm = nil
}
}
func (v *ChunkCacheVolume) destroy() {
v.Shutdown()
os.Remove(v.fileName + ".dat")
os.Remove(v.fileName + ".idx")
os.RemoveAll(v.fileName + ".ldb")
}
func (v *ChunkCacheVolume) Reset() (*ChunkCacheVolume, error) {
v.destroy()
return LoadOrCreateChunkCacheVolume(v.fileName, v.sizeLimit)
}
func (v *ChunkCacheVolume) GetNeedle(key types.NeedleId) ([]byte, error) {
nv, ok := v.nm.Get(key)
if !ok {
return nil, storage.ErrorNotFound
}
data := make([]byte, nv.Size)
if readSize, readErr := v.DataBackend.ReadAt(data, nv.Offset.ToAcutalOffset()); readErr != nil {
return nil, fmt.Errorf("read %s.dat [%d,%d): %v",
v.fileName, nv.Offset.ToAcutalOffset(), nv.Offset.ToAcutalOffset()+int64(nv.Size), readErr)
} else {
if readSize != int(nv.Size) {
return nil, fmt.Errorf("read %d, expected %d", readSize, nv.Size)
}
}
return data, nil
}
func (v *ChunkCacheVolume) WriteNeedle(key types.NeedleId, data []byte) error {
offset := v.fileSize
written, err := v.DataBackend.WriteAt(data, offset)
if err != nil {
return err
} else if written != len(data) {
return fmt.Errorf("partial written %d, expected %d", written, len(data))
}
v.fileSize += int64(written)
extraSize := written % types.NeedlePaddingSize
if extraSize != 0 {
v.DataBackend.WriteAt(v.smallBuffer[:types.NeedlePaddingSize-extraSize], offset+int64(written))
v.fileSize += int64(types.NeedlePaddingSize - extraSize)
}
if err := v.nm.Put(key, types.ToOffset(offset), uint32(len(data))); err != nil {
glog.V(4).Infof("failed to save in needle map %d: %v", key, err)
}
return nil
}

58
weed/util/chunk_cache/chunk_cache_on_disk_test.go

@ -0,0 +1,58 @@
package chunk_cache
import (
"bytes"
"fmt"
"io/ioutil"
"math/rand"
"os"
"testing"
)
func TestOnDisk(t *testing.T) {
tmpDir, _ := ioutil.TempDir("", "c")
defer os.RemoveAll(tmpDir)
totalDiskSizeMb := int64(6)
segmentCount := 2
cache := NewChunkCache(0, tmpDir, totalDiskSizeMb, segmentCount)
writeCount := 5
type test_data struct {
data []byte
fileId string
}
testData := make([]*test_data, writeCount)
for i:=0;i<writeCount;i++{
buff := make([]byte, 1024*1024)
rand.Read(buff)
testData[i] = &test_data{
data: buff,
fileId: fmt.Sprintf("1,%daabbccdd", i+1),
}
cache.SetChunk(testData[i].fileId, testData[i].data)
}
for i:=0;i<writeCount;i++{
data := cache.GetChunk(testData[i].fileId)
if bytes.Compare(data, testData[i].data) != 0 {
t.Errorf("failed to write to and read from cache: %d", i)
}
}
cache.Shutdown()
cache = NewChunkCache(0, tmpDir, totalDiskSizeMb, segmentCount)
for i:=0;i<writeCount;i++{
data := cache.GetChunk(testData[i].fileId)
if bytes.Compare(data, testData[i].data) != 0 {
t.Errorf("failed to write to and read from cache: %d", i)
}
}
cache.Shutdown()
}
Loading…
Cancel
Save