diff --git a/weed/command/mount.go b/weed/command/mount.go index adf384a6f..6165402b4 100644 --- a/weed/command/mount.go +++ b/weed/command/mount.go @@ -1,5 +1,9 @@ package command +import ( + "os" +) + type MountOptions struct { filer *string filerMountRootPath *string @@ -9,7 +13,8 @@ type MountOptions struct { replication *string ttlSec *int chunkSizeLimitMB *int - chunkCacheCountLimit *int64 + cacheDir *string + cacheSizeMB *int64 dataCenter *string allowOthers *bool umaskString *string @@ -33,7 +38,8 @@ func init() { mountOptions.replication = cmdMount.Flag.String("replication", "", "replication(e.g. 000, 001) to create to files. If empty, let filer decide.") mountOptions.ttlSec = cmdMount.Flag.Int("ttl", 0, "file ttl in seconds") mountOptions.chunkSizeLimitMB = cmdMount.Flag.Int("chunkSizeLimitMB", 4, "local write buffer size, also chunk large files") - mountOptions.chunkCacheCountLimit = cmdMount.Flag.Int64("chunkCacheCountLimit", 1000, "number of file chunks to cache in memory") + mountOptions.cacheDir = cmdMount.Flag.String("cacheDir", os.TempDir(), "local cache directory for file chunks") + mountOptions.cacheSizeMB = cmdMount.Flag.Int64("cacheCapacityMB", 1000, "local cache capacity in MB") mountOptions.dataCenter = cmdMount.Flag.String("dataCenter", "", "prefer to write to the data center") mountOptions.allowOthers = cmdMount.Flag.Bool("allowOthers", true, "allows other users to access the file system") mountOptions.umaskString = cmdMount.Flag.String("umask", "022", "octal umask, e.g., 022, 0111") diff --git a/weed/command/mount_std.go b/weed/command/mount_std.go index 148540dec..0f87d6aee 100644 --- a/weed/command/mount_std.go +++ b/weed/command/mount_std.go @@ -129,7 +129,6 @@ func RunMount(option *MountOptions, umask os.FileMode) bool { } options = append(options, osSpecificMountOptions()...) - if *option.allowOthers { options = append(options, fuse.AllowOther()) } @@ -137,12 +136,12 @@ func RunMount(option *MountOptions, umask os.FileMode) bool { options = append(options, fuse.AllowNonEmptyMount()) } + // mount c, err := fuse.Mount(dir, options...) if err != nil { glog.V(0).Infof("mount: %v", err) return true } - defer fuse.Unmount(dir) util.OnInterrupt(func() { @@ -164,7 +163,8 @@ func RunMount(option *MountOptions, umask os.FileMode) bool { Replication: *option.replication, TtlSec: int32(*option.ttlSec), ChunkSizeLimit: int64(chunkSizeLimitMB) * 1024 * 1024, - ChunkCacheCountLimit: *option.chunkCacheCountLimit, + CacheDir: *option.cacheDir, + CacheSizeMB: *option.cacheSizeMB, DataCenter: *option.dataCenter, DirListCacheLimit: *option.dirListCacheLimit, EntryCacheTtl: 3 * time.Second, diff --git a/weed/command/webdav.go b/weed/command/webdav.go index 4f5d5f5ce..a1616d0fc 100644 --- a/weed/command/webdav.go +++ b/weed/command/webdav.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "net/http" + "os" "os/user" "strconv" "time" @@ -26,6 +27,8 @@ type WebDavOption struct { collection *string tlsPrivateKey *string tlsCertificate *string + cacheDir *string + cacheSizeMB *int64 } func init() { @@ -35,6 +38,8 @@ func init() { webDavStandaloneOptions.collection = cmdWebDav.Flag.String("collection", "", "collection to create the files") webDavStandaloneOptions.tlsPrivateKey = cmdWebDav.Flag.String("key.file", "", "path to the TLS private key file") webDavStandaloneOptions.tlsCertificate = cmdWebDav.Flag.String("cert.file", "", "path to the TLS certificate file") + webDavStandaloneOptions.cacheDir = cmdWebDav.Flag.String("cacheDir", os.TempDir(), "local cache directory for file chunks") + webDavStandaloneOptions.cacheSizeMB = cmdWebDav.Flag.Int64("cacheCapacityMB", 1000, "local cache capacity in MB") } var cmdWebDav = &Command{ @@ -105,6 +110,8 @@ func (wo *WebDavOption) startWebDav() bool { Uid: uid, Gid: gid, Cipher: cipher, + CacheDir: *wo.cacheDir, + CacheSizeMB: *wo.cacheSizeMB, }) if webdavServer_err != nil { glog.Fatalf("WebDav Server startup error: %v", webdavServer_err) diff --git a/weed/filesys/wfs.go b/weed/filesys/wfs.go index 49db18b6e..b2f68c030 100644 --- a/weed/filesys/wfs.go +++ b/weed/filesys/wfs.go @@ -22,18 +22,19 @@ import ( ) type Option struct { - FilerGrpcAddress string - GrpcDialOption grpc.DialOption - FilerMountRootPath string - Collection string - Replication string - TtlSec int32 - ChunkSizeLimit int64 - ChunkCacheCountLimit int64 - DataCenter string - DirListCacheLimit int64 - EntryCacheTtl time.Duration - Umask os.FileMode + FilerGrpcAddress string + GrpcDialOption grpc.DialOption + FilerMountRootPath string + Collection string + Replication string + TtlSec int32 + ChunkSizeLimit int64 + CacheDir string + CacheSizeMB int64 + DataCenter string + DirListCacheLimit int64 + EntryCacheTtl time.Duration + Umask os.FileMode MountUid uint32 MountGid uint32 @@ -72,6 +73,10 @@ type statsCache struct { } func NewSeaweedFileSystem(option *Option) *WFS { + chunkCache := chunk_cache.NewChunkCache(256, option.CacheDir, option.CacheSizeMB, 4) + util.OnInterrupt(func() { + chunkCache.Shutdown() + }) wfs := &WFS{ option: option, listDirectoryEntriesCache: ccache.New(ccache.Configure().MaxSize(option.DirListCacheLimit * 3).ItemsToPrune(100)), @@ -81,7 +86,7 @@ func NewSeaweedFileSystem(option *Option) *WFS { return make([]byte, option.ChunkSizeLimit) }, }, - chunkCache: chunk_cache.NewChunkCache(option.ChunkCacheCountLimit), + chunkCache: chunkCache, } wfs.root = &Dir{name: wfs.option.FilerMountRootPath, wfs: wfs} diff --git a/weed/server/webdav_server.go b/weed/server/webdav_server.go index 082755291..affc953bc 100644 --- a/weed/server/webdav_server.go +++ b/weed/server/webdav_server.go @@ -34,6 +34,8 @@ type WebDavOption struct { Uid uint32 Gid uint32 Cipher bool + CacheDir string + CacheSizeMB int64 } type WebDavServer struct { @@ -96,9 +98,14 @@ type WebDavFile struct { } func NewWebDavFileSystem(option *WebDavOption) (webdav.FileSystem, error) { + + chunkCache := chunk_cache.NewChunkCache(256, option.CacheDir, option.CacheSizeMB, 4) + util.OnInterrupt(func() { + chunkCache.Shutdown() + }) return &WebDavFileSystem{ option: option, - chunkCache: chunk_cache.NewChunkCache(1000), + chunkCache: chunkCache, }, nil } diff --git a/weed/util/chunk_cache/chunk_cache.go b/weed/util/chunk_cache/chunk_cache.go index e2676d9cc..682f5185a 100644 --- a/weed/util/chunk_cache/chunk_cache.go +++ b/weed/util/chunk_cache/chunk_cache.go @@ -1,36 +1,115 @@ package chunk_cache import ( - "time" + "fmt" + "path" + "sort" + "sync" - "github.com/karlseguin/ccache" + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/storage" + "github.com/chrislusf/seaweedfs/weed/storage/needle" ) // a global cache for recently accessed file chunks type ChunkCache struct { - cache *ccache.Cache + memCache *ChunkCacheInMemory + diskCaches []*ChunkCacheVolume + sync.RWMutex } -func NewChunkCache(maxEntries int64) *ChunkCache { - pruneCount := maxEntries >> 3 - if pruneCount <= 0 { - pruneCount = 500 +func NewChunkCache(maxEntries int64, dir string, diskSizeMB int64, segmentCount int) *ChunkCache { + c := &ChunkCache{ + memCache: NewChunkCacheInMemory(maxEntries), } - return &ChunkCache{ - cache: ccache.New(ccache.Configure().MaxSize(maxEntries).ItemsToPrune(uint32(pruneCount))), + + volumeCount, volumeSize := int(diskSizeMB/30000), int64(30000) + if volumeCount < segmentCount { + volumeCount, volumeSize = segmentCount, diskSizeMB/int64(segmentCount) + } + + for i := 0; i < volumeCount; i++ { + fileName := path.Join(dir, fmt.Sprintf("cache_%d", i)) + diskCache, err := LoadOrCreateChunkCacheVolume(fileName, volumeSize*1024*1024) + if err != nil { + glog.Errorf("failed to add cache %s : %v", fileName, err) + } else { + c.diskCaches = append(c.diskCaches, diskCache) + } } + + // keep newest cache to the front + sort.Slice(c.diskCaches, func(i, j int) bool { + return c.diskCaches[i].lastModTime.After(c.diskCaches[j].lastModTime) + }) + + return c } -func (c *ChunkCache) GetChunk(fileId string) []byte { - item := c.cache.Get(fileId) - if item == nil { +func (c *ChunkCache) GetChunk(fileId string) (data []byte) { + c.RLock() + defer c.RUnlock() + + if data = c.memCache.GetChunk(fileId); data != nil { + return data + } + + fid, err := needle.ParseFileIdFromString(fileId) + if err != nil { + glog.Errorf("failed to parse file id %s", fileId) return nil } - data := item.Value().([]byte) - item.Extend(time.Hour) - return data + for _, diskCache := range c.diskCaches { + data, err = diskCache.GetNeedle(fid.Key) + if err == storage.ErrorNotFound { + continue + } + if err != nil { + glog.Errorf("failed to read cache file %s id %s", diskCache.fileName, fileId) + continue + } + if len(data) != 0 { + return + } + } + return nil } func (c *ChunkCache) SetChunk(fileId string, data []byte) { - c.cache.Set(fileId, data, time.Hour) + c.Lock() + defer c.Unlock() + + c.memCache.SetChunk(fileId, data) + + if len(c.diskCaches) == 0 { + return + } + + if c.diskCaches[0].fileSize+int64(len(data)) > c.diskCaches[0].sizeLimit { + t, resetErr := c.diskCaches[len(c.diskCaches)-1].Reset() + if resetErr != nil { + glog.Errorf("failed to reset cache file %s", c.diskCaches[len(c.diskCaches)-1].fileName) + return + } + for i := len(c.diskCaches) - 1; i > 0; i-- { + c.diskCaches[i] = c.diskCaches[i-1] + } + c.diskCaches[0] = t + } + + fid, err := needle.ParseFileIdFromString(fileId) + if err != nil { + glog.Errorf("failed to parse file id %s", fileId) + return + } + c.diskCaches[0].WriteNeedle(fid.Key, data) + } + +func (c *ChunkCache) Shutdown() { + c.Lock() + defer c.Unlock() + for _, diskCache := range c.diskCaches { + diskCache.Shutdown() + } +} \ No newline at end of file diff --git a/weed/util/chunk_cache/chunk_cache_in_memory.go b/weed/util/chunk_cache/chunk_cache_in_memory.go new file mode 100644 index 000000000..931e45e9a --- /dev/null +++ b/weed/util/chunk_cache/chunk_cache_in_memory.go @@ -0,0 +1,36 @@ +package chunk_cache + +import ( + "time" + + "github.com/karlseguin/ccache" +) + +// a global cache for recently accessed file chunks +type ChunkCacheInMemory struct { + cache *ccache.Cache +} + +func NewChunkCacheInMemory(maxEntries int64) *ChunkCacheInMemory { + pruneCount := maxEntries >> 3 + if pruneCount <= 0 { + pruneCount = 500 + } + return &ChunkCacheInMemory{ + cache: ccache.New(ccache.Configure().MaxSize(maxEntries).ItemsToPrune(uint32(pruneCount))), + } +} + +func (c *ChunkCacheInMemory) GetChunk(fileId string) []byte { + item := c.cache.Get(fileId) + if item == nil { + return nil + } + data := item.Value().([]byte) + item.Extend(time.Hour) + return data +} + +func (c *ChunkCacheInMemory) SetChunk(fileId string, data []byte) { + c.cache.Set(fileId, data, time.Hour) +} diff --git a/weed/util/chunk_cache/chunk_cache_on_disk.go b/weed/util/chunk_cache/chunk_cache_on_disk.go new file mode 100644 index 000000000..2c7ef8d39 --- /dev/null +++ b/weed/util/chunk_cache/chunk_cache_on_disk.go @@ -0,0 +1,145 @@ +package chunk_cache + +import ( + "fmt" + "os" + "time" + + "github.com/syndtr/goleveldb/leveldb/opt" + + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/storage" + "github.com/chrislusf/seaweedfs/weed/storage/backend" + "github.com/chrislusf/seaweedfs/weed/storage/types" + "github.com/chrislusf/seaweedfs/weed/util" +) + +// This implements an on disk cache +// The entries are an FIFO with a size limit + +type ChunkCacheVolume struct { + DataBackend backend.BackendStorageFile + nm storage.NeedleMapper + fileName string + smallBuffer []byte + sizeLimit int64 + lastModTime time.Time + fileSize int64 +} + +func LoadOrCreateChunkCacheVolume(fileName string, preallocate int64) (*ChunkCacheVolume, error) { + + v := &ChunkCacheVolume{ + smallBuffer: make([]byte, types.NeedlePaddingSize), + fileName: fileName, + sizeLimit: preallocate, + } + + var err error + + if exists, canRead, canWrite, modTime, fileSize := util.CheckFile(v.fileName + ".dat"); exists { + if !canRead { + return nil, fmt.Errorf("cannot read cache file %s.dat", v.fileName) + } + if !canWrite { + return nil, fmt.Errorf("cannot write cache file %s.dat", v.fileName) + } + if dataFile, err := os.OpenFile(v.fileName+".dat", os.O_RDWR|os.O_CREATE, 0644); err != nil { + return nil, fmt.Errorf("cannot create cache file %s.dat: %v", v.fileName, err) + } else { + v.DataBackend = backend.NewDiskFile(dataFile) + v.lastModTime = modTime + v.fileSize = fileSize + } + } else { + if v.DataBackend, err = backend.CreateVolumeFile(v.fileName+".dat", preallocate, 0); err != nil { + return nil, fmt.Errorf("cannot create cache file %s.dat: %v", v.fileName, err) + } + v.lastModTime = time.Now() + } + + var indexFile *os.File + if indexFile, err = os.OpenFile(v.fileName+".idx", os.O_RDWR|os.O_CREATE, 0644); err != nil { + return nil, fmt.Errorf("cannot write cache index %s.idx: %v", v.fileName, err) + } + + glog.V(0).Infoln("loading leveldb", v.fileName+".ldb") + opts := &opt.Options{ + BlockCacheCapacity: 2 * 1024 * 1024, // default value is 8MiB + WriteBuffer: 1 * 1024 * 1024, // default value is 4MiB + CompactionTableSizeMultiplier: 10, // default value is 1 + } + if v.nm, err = storage.NewLevelDbNeedleMap(v.fileName+".ldb", indexFile, opts); err != nil { + return nil, fmt.Errorf("loading leveldb %s error: %v", v.fileName+".ldb", err) + } + + return v, nil + +} + +func (v *ChunkCacheVolume) Shutdown() { + if v.DataBackend != nil { + v.DataBackend.Close() + v.DataBackend = nil + } + if v.nm != nil { + v.nm.Close() + v.nm = nil + } +} + +func (v *ChunkCacheVolume) destroy() { + v.Shutdown() + os.Remove(v.fileName + ".dat") + os.Remove(v.fileName + ".idx") + os.RemoveAll(v.fileName + ".ldb") +} + +func (v *ChunkCacheVolume) Reset() (*ChunkCacheVolume, error) { + v.destroy() + return LoadOrCreateChunkCacheVolume(v.fileName, v.sizeLimit) +} + +func (v *ChunkCacheVolume) GetNeedle(key types.NeedleId) ([]byte, error) { + + nv, ok := v.nm.Get(key) + if !ok { + return nil, storage.ErrorNotFound + } + data := make([]byte, nv.Size) + if readSize, readErr := v.DataBackend.ReadAt(data, nv.Offset.ToAcutalOffset()); readErr != nil { + return nil, fmt.Errorf("read %s.dat [%d,%d): %v", + v.fileName, nv.Offset.ToAcutalOffset(), nv.Offset.ToAcutalOffset()+int64(nv.Size), readErr) + } else { + if readSize != int(nv.Size) { + return nil, fmt.Errorf("read %d, expected %d", readSize, nv.Size) + } + } + + return data, nil +} + +func (v *ChunkCacheVolume) WriteNeedle(key types.NeedleId, data []byte) error { + + offset := v.fileSize + + written, err := v.DataBackend.WriteAt(data, offset) + if err != nil { + return err + } else if written != len(data) { + return fmt.Errorf("partial written %d, expected %d", written, len(data)) + } + + v.fileSize += int64(written) + extraSize := written % types.NeedlePaddingSize + if extraSize != 0 { + v.DataBackend.WriteAt(v.smallBuffer[:types.NeedlePaddingSize-extraSize], offset+int64(written)) + v.fileSize += int64(types.NeedlePaddingSize - extraSize) + } + + if err := v.nm.Put(key, types.ToOffset(offset), uint32(len(data))); err != nil { + glog.V(4).Infof("failed to save in needle map %d: %v", key, err) + } + + return nil +} diff --git a/weed/util/chunk_cache/chunk_cache_on_disk_test.go b/weed/util/chunk_cache/chunk_cache_on_disk_test.go new file mode 100644 index 000000000..256b10139 --- /dev/null +++ b/weed/util/chunk_cache/chunk_cache_on_disk_test.go @@ -0,0 +1,58 @@ +package chunk_cache + +import ( + "bytes" + "fmt" + "io/ioutil" + "math/rand" + "os" + "testing" +) + +func TestOnDisk(t *testing.T) { + + tmpDir, _ := ioutil.TempDir("", "c") + defer os.RemoveAll(tmpDir) + + totalDiskSizeMb := int64(6) + segmentCount := 2 + + cache := NewChunkCache(0, tmpDir, totalDiskSizeMb, segmentCount) + + writeCount := 5 + type test_data struct { + data []byte + fileId string + } + testData := make([]*test_data, writeCount) + for i:=0;i