Browse Source

Lazy loading (#3958)

* types packages is imported more than onece

* lazy-loading

* fix bugs

* fix bugs

* fix unit tests

* fix test error

* rename function

* unload ldb after initial startup

* Don't load ldb when starting volume server if ldbtimeout is set.

* remove uncessary unloadldb

* Update weed/command/server.go

Co-authored-by: Chris Lu <chrislusf@users.noreply.github.com>

* Update weed/command/volume.go

Co-authored-by: Chris Lu <chrislusf@users.noreply.github.com>

Co-authored-by: guol-fnst <goul-fnst@fujitsu.com>
Co-authored-by: Chris Lu <chrislusf@users.noreply.github.com>
pull/3977/head
Guo Lei 2 years ago
committed by GitHub
parent
commit
5b905fb2b7
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 5
      weed/command/backup.go
  2. 2
      weed/command/compact.go
  3. 1
      weed/command/server.go
  4. 3
      weed/command/volume.go
  5. 4
      weed/server/volume_grpc_admin.go
  6. 5
      weed/server/volume_server.go
  7. 14
      weed/storage/disk_location.go
  8. 7
      weed/storage/idx_binary_search_test.go
  9. 2
      weed/storage/needle_map.go
  10. 139
      weed/storage/needle_map_leveldb.go
  11. 2
      weed/storage/needle_map_memory.go
  12. 14
      weed/storage/store.go
  13. 4
      weed/storage/volume.go
  14. 17
      weed/storage/volume_loading.go
  15. 7
      weed/storage/volume_read_test.go
  16. 4
      weed/storage/volume_vacuum_test.go
  17. 2
      weed/storage/volume_write_test.go
  18. 2
      weed/util/chunk_cache/chunk_cache_on_disk.go

5
weed/command/backup.go

@ -2,6 +2,7 @@ package command
import ( import (
"fmt" "fmt"
"github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/security" "github.com/seaweedfs/seaweedfs/weed/security"
@ -113,7 +114,7 @@ func runBackup(cmd *Command, args []string) bool {
return true return true
} }
} }
v, err := storage.NewVolume(util.ResolvePath(*s.dir), util.ResolvePath(*s.dir), *s.collection, vid, storage.NeedleMapInMemory, replication, ttl, 0, 0)
v, err := storage.NewVolume(util.ResolvePath(*s.dir), util.ResolvePath(*s.dir), *s.collection, vid, storage.NeedleMapInMemory, replication, ttl, 0, 0, 0)
if err != nil { if err != nil {
fmt.Printf("Error creating or reading from volume %d: %v\n", vid, err) fmt.Printf("Error creating or reading from volume %d: %v\n", vid, err)
return true return true
@ -138,7 +139,7 @@ func runBackup(cmd *Command, args []string) bool {
// remove the old data // remove the old data
v.Destroy() v.Destroy()
// recreate an empty volume // recreate an empty volume
v, err = storage.NewVolume(util.ResolvePath(*s.dir), util.ResolvePath(*s.dir), *s.collection, vid, storage.NeedleMapInMemory, replication, ttl, 0, 0)
v, err = storage.NewVolume(util.ResolvePath(*s.dir), util.ResolvePath(*s.dir), *s.collection, vid, storage.NeedleMapInMemory, replication, ttl, 0, 0, 0)
if err != nil { if err != nil {
fmt.Printf("Error creating or reading from volume %d: %v\n", vid, err) fmt.Printf("Error creating or reading from volume %d: %v\n", vid, err)
return true return true

2
weed/command/compact.go

@ -41,7 +41,7 @@ func runCompact(cmd *Command, args []string) bool {
preallocate := *compactVolumePreallocate * (1 << 20) preallocate := *compactVolumePreallocate * (1 << 20)
vid := needle.VolumeId(*compactVolumeId) vid := needle.VolumeId(*compactVolumeId)
v, err := storage.NewVolume(util.ResolvePath(*compactVolumePath), util.ResolvePath(*compactVolumePath), *compactVolumeCollection, vid, storage.NeedleMapInMemory, nil, nil, preallocate, 0)
v, err := storage.NewVolume(util.ResolvePath(*compactVolumePath), util.ResolvePath(*compactVolumePath), *compactVolumeCollection, vid, storage.NeedleMapInMemory, nil, nil, preallocate, 0, 0)
if err != nil { if err != nil {
glog.Fatalf("Load Volume [ERROR] %s\n", err) glog.Fatalf("Load Volume [ERROR] %s\n", err)
} }

1
weed/command/server.go

@ -125,6 +125,7 @@ func init() {
serverOptions.v.readMode = cmdServer.Flag.String("volume.readMode", "proxy", "[local|proxy|redirect] how to deal with non-local volume: 'not found|read in remote node|redirect volume location'.") serverOptions.v.readMode = cmdServer.Flag.String("volume.readMode", "proxy", "[local|proxy|redirect] how to deal with non-local volume: 'not found|read in remote node|redirect volume location'.")
serverOptions.v.compactionMBPerSecond = cmdServer.Flag.Int("volume.compactionMBps", 0, "limit compaction speed in mega bytes per second") serverOptions.v.compactionMBPerSecond = cmdServer.Flag.Int("volume.compactionMBps", 0, "limit compaction speed in mega bytes per second")
serverOptions.v.fileSizeLimitMB = cmdServer.Flag.Int("volume.fileSizeLimitMB", 256, "limit file size to avoid out of memory") serverOptions.v.fileSizeLimitMB = cmdServer.Flag.Int("volume.fileSizeLimitMB", 256, "limit file size to avoid out of memory")
serverOptions.v.ldbTimeout = cmdServer.Flag.Int64("volume.index.leveldbTimeout", 0, "alive time for leveldb (default to 0). If leveldb of volume is not accessed in ldbTimeout hours, it will be off loaded to reduce opened files and memory consumption.")
serverOptions.v.concurrentUploadLimitMB = cmdServer.Flag.Int("volume.concurrentUploadLimitMB", 64, "limit total concurrent upload size") serverOptions.v.concurrentUploadLimitMB = cmdServer.Flag.Int("volume.concurrentUploadLimitMB", 64, "limit total concurrent upload size")
serverOptions.v.concurrentDownloadLimitMB = cmdServer.Flag.Int("volume.concurrentDownloadLimitMB", 64, "limit total concurrent download size") serverOptions.v.concurrentDownloadLimitMB = cmdServer.Flag.Int("volume.concurrentDownloadLimitMB", 64, "limit total concurrent download size")
serverOptions.v.publicUrl = cmdServer.Flag.String("volume.publicUrl", "", "publicly accessible address") serverOptions.v.publicUrl = cmdServer.Flag.String("volume.publicUrl", "", "publicly accessible address")

3
weed/command/volume.go

@ -68,6 +68,7 @@ type VolumeServerOptions struct {
inflightUploadDataTimeout *time.Duration inflightUploadDataTimeout *time.Duration
hasSlowRead *bool hasSlowRead *bool
readBufferSizeMB *int readBufferSizeMB *int
ldbTimeout *int64
} }
func init() { func init() {
@ -92,6 +93,7 @@ func init() {
v.memProfile = cmdVolume.Flag.String("memprofile", "", "memory profile output file") v.memProfile = cmdVolume.Flag.String("memprofile", "", "memory profile output file")
v.compactionMBPerSecond = cmdVolume.Flag.Int("compactionMBps", 0, "limit background compaction or copying speed in mega bytes per second") v.compactionMBPerSecond = cmdVolume.Flag.Int("compactionMBps", 0, "limit background compaction or copying speed in mega bytes per second")
v.fileSizeLimitMB = cmdVolume.Flag.Int("fileSizeLimitMB", 256, "limit file size to avoid out of memory") v.fileSizeLimitMB = cmdVolume.Flag.Int("fileSizeLimitMB", 256, "limit file size to avoid out of memory")
v.ldbTimeout = cmdVolume.Flag.Int64("index.leveldbTimeout", 0, "alive time for leveldb (default to 0). If leveldb of volume is not accessed in ldbTimeout hours, it will be off loaded to reduce opened files and memory consumption.")
v.concurrentUploadLimitMB = cmdVolume.Flag.Int("concurrentUploadLimitMB", 256, "limit total concurrent upload size") v.concurrentUploadLimitMB = cmdVolume.Flag.Int("concurrentUploadLimitMB", 256, "limit total concurrent upload size")
v.concurrentDownloadLimitMB = cmdVolume.Flag.Int("concurrentDownloadLimitMB", 256, "limit total concurrent download size") v.concurrentDownloadLimitMB = cmdVolume.Flag.Int("concurrentDownloadLimitMB", 256, "limit total concurrent download size")
v.pprof = cmdVolume.Flag.Bool("pprof", false, "enable pprof http handlers. precludes --memprofile and --cpuprofile") v.pprof = cmdVolume.Flag.Bool("pprof", false, "enable pprof http handlers. precludes --memprofile and --cpuprofile")
@ -249,6 +251,7 @@ func (v VolumeServerOptions) startVolumeServer(volumeFolders, maxVolumeCounts, v
*v.inflightUploadDataTimeout, *v.inflightUploadDataTimeout,
*v.hasSlowRead, *v.hasSlowRead,
*v.readBufferSizeMB, *v.readBufferSizeMB,
*v.ldbTimeout,
) )
// starting grpc server // starting grpc server
grpcS := v.startGrpcService(volumeServer) grpcS := v.startGrpcService(volumeServer)

4
weed/server/volume_grpc_admin.go

@ -3,10 +3,11 @@ package weed_server
import ( import (
"context" "context"
"fmt" "fmt"
"github.com/seaweedfs/seaweedfs/weed/storage"
"path/filepath" "path/filepath"
"time" "time"
"github.com/seaweedfs/seaweedfs/weed/storage"
"github.com/seaweedfs/seaweedfs/weed/cluster" "github.com/seaweedfs/seaweedfs/weed/cluster"
"github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
@ -50,6 +51,7 @@ func (vs *VolumeServer) AllocateVolume(ctx context.Context, req *volume_server_p
req.Preallocate, req.Preallocate,
req.MemoryMapMaxSizeMb, req.MemoryMapMaxSizeMb,
types.ToDiskType(req.DiskType), types.ToDiskType(req.DiskType),
vs.ldbTimout,
) )
if err != nil { if err != nil {

5
weed/server/volume_server.go

@ -41,6 +41,7 @@ type VolumeServer struct {
grpcDialOption grpc.DialOption grpcDialOption grpc.DialOption
needleMapKind storage.NeedleMapKind needleMapKind storage.NeedleMapKind
ldbTimout int64
FixJpgOrientation bool FixJpgOrientation bool
ReadMode string ReadMode string
compactionBytePerSecond int64 compactionBytePerSecond int64
@ -68,6 +69,7 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string,
inflightUploadDataTimeout time.Duration, inflightUploadDataTimeout time.Duration,
hasSlowRead bool, hasSlowRead bool,
readBufferSizeMB int, readBufferSizeMB int,
ldbTimeout int64,
) *VolumeServer { ) *VolumeServer {
v := util.GetViper() v := util.GetViper()
@ -99,12 +101,13 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string,
inflightUploadDataTimeout: inflightUploadDataTimeout, inflightUploadDataTimeout: inflightUploadDataTimeout,
hasSlowRead: hasSlowRead, hasSlowRead: hasSlowRead,
readBufferSizeMB: readBufferSizeMB, readBufferSizeMB: readBufferSizeMB,
ldbTimout: ldbTimeout,
} }
vs.SeedMasterNodes = masterNodes vs.SeedMasterNodes = masterNodes
vs.checkWithMaster() vs.checkWithMaster()
vs.store = storage.NewStore(vs.grpcDialOption, ip, port, grpcPort, publicUrl, folders, maxCounts, minFreeSpaces, idxFolder, vs.needleMapKind, diskTypes)
vs.store = storage.NewStore(vs.grpcDialOption, ip, port, grpcPort, publicUrl, folders, maxCounts, minFreeSpaces, idxFolder, vs.needleMapKind, diskTypes, ldbTimeout)
vs.guard = security.NewGuard(whiteList, signingKey, expiresAfterSec, readSigningKey, readExpiresAfterSec) vs.guard = security.NewGuard(whiteList, signingKey, expiresAfterSec, readSigningKey, readExpiresAfterSec)
handleStaticResources(adminMux) handleStaticResources(adminMux)

14
weed/storage/disk_location.go

@ -114,7 +114,7 @@ func getValidVolumeName(basename string) string {
return "" return ""
} }
func (l *DiskLocation) loadExistingVolume(dirEntry os.DirEntry, needleMapKind NeedleMapKind, skipIfEcVolumesExists bool) bool {
func (l *DiskLocation) loadExistingVolume(dirEntry os.DirEntry, needleMapKind NeedleMapKind, skipIfEcVolumesExists bool, ldbTimeout int64) bool {
basename := dirEntry.Name() basename := dirEntry.Name()
if dirEntry.IsDir() { if dirEntry.IsDir() {
return false return false
@ -158,7 +158,7 @@ func (l *DiskLocation) loadExistingVolume(dirEntry os.DirEntry, needleMapKind Ne
} }
// load the volume // load the volume
v, e := NewVolume(l.Directory, l.IdxDirectory, collection, vid, needleMapKind, nil, nil, 0, 0)
v, e := NewVolume(l.Directory, l.IdxDirectory, collection, vid, needleMapKind, nil, nil, 0, 0, ldbTimeout)
if e != nil { if e != nil {
glog.V(0).Infof("new volume %s error %s", volumeName, e) glog.V(0).Infof("new volume %s error %s", volumeName, e)
return false return false
@ -172,7 +172,7 @@ func (l *DiskLocation) loadExistingVolume(dirEntry os.DirEntry, needleMapKind Ne
return true return true
} }
func (l *DiskLocation) concurrentLoadingVolumes(needleMapKind NeedleMapKind, concurrency int) {
func (l *DiskLocation) concurrentLoadingVolumes(needleMapKind NeedleMapKind, concurrency int, ldbTimeout int64) {
task_queue := make(chan os.DirEntry, 10*concurrency) task_queue := make(chan os.DirEntry, 10*concurrency)
go func() { go func() {
@ -198,7 +198,7 @@ func (l *DiskLocation) concurrentLoadingVolumes(needleMapKind NeedleMapKind, con
go func() { go func() {
defer wg.Done() defer wg.Done()
for fi := range task_queue { for fi := range task_queue {
_ = l.loadExistingVolume(fi, needleMapKind, true)
_ = l.loadExistingVolume(fi, needleMapKind, true, ldbTimeout)
} }
}() }()
} }
@ -206,7 +206,7 @@ func (l *DiskLocation) concurrentLoadingVolumes(needleMapKind NeedleMapKind, con
} }
func (l *DiskLocation) loadExistingVolumes(needleMapKind NeedleMapKind) {
func (l *DiskLocation) loadExistingVolumes(needleMapKind NeedleMapKind, ldbTimeout int64) {
workerNum := runtime.NumCPU() workerNum := runtime.NumCPU()
val, ok := os.LookupEnv("GOMAXPROCS") val, ok := os.LookupEnv("GOMAXPROCS")
@ -222,7 +222,7 @@ func (l *DiskLocation) loadExistingVolumes(needleMapKind NeedleMapKind) {
workerNum = 10 workerNum = 10
} }
} }
l.concurrentLoadingVolumes(needleMapKind, workerNum)
l.concurrentLoadingVolumes(needleMapKind, workerNum, ldbTimeout)
glog.V(0).Infof("Store started on dir: %s with %d volumes max %d", l.Directory, len(l.volumes), l.MaxVolumeCount) glog.V(0).Infof("Store started on dir: %s with %d volumes max %d", l.Directory, len(l.volumes), l.MaxVolumeCount)
l.loadAllEcShards() l.loadAllEcShards()
@ -292,7 +292,7 @@ func (l *DiskLocation) deleteVolumeById(vid needle.VolumeId) (found bool, e erro
func (l *DiskLocation) LoadVolume(vid needle.VolumeId, needleMapKind NeedleMapKind) bool { func (l *DiskLocation) LoadVolume(vid needle.VolumeId, needleMapKind NeedleMapKind) bool {
if fileInfo, found := l.LocateVolume(vid); found { if fileInfo, found := l.LocateVolume(vid); found {
return l.loadExistingVolume(fileInfo, needleMapKind, false)
return l.loadExistingVolume(fileInfo, needleMapKind, false, 0)
} }
return false return false
} }

7
weed/storage/idx_binary_search_test.go

@ -1,19 +1,20 @@
package storage package storage
import ( import (
"os"
"testing"
"github.com/seaweedfs/seaweedfs/weed/storage/idx" "github.com/seaweedfs/seaweedfs/weed/storage/idx"
"github.com/seaweedfs/seaweedfs/weed/storage/needle" "github.com/seaweedfs/seaweedfs/weed/storage/needle"
"github.com/seaweedfs/seaweedfs/weed/storage/super_block" "github.com/seaweedfs/seaweedfs/weed/storage/super_block"
"github.com/seaweedfs/seaweedfs/weed/storage/types" "github.com/seaweedfs/seaweedfs/weed/storage/types"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"os"
"testing"
) )
func TestFirstInvalidIndex(t *testing.T) { func TestFirstInvalidIndex(t *testing.T) {
dir := t.TempDir() dir := t.TempDir()
v, err := NewVolume(dir, dir, "", 1, NeedleMapInMemory, &super_block.ReplicaPlacement{}, &needle.TTL{}, 0, 0)
v, err := NewVolume(dir, dir, "", 1, NeedleMapInMemory, &super_block.ReplicaPlacement{}, &needle.TTL{}, 0, 0, 0)
if err != nil { if err != nil {
t.Fatalf("volume creation: %v", err) t.Fatalf("volume creation: %v", err)
} }

2
weed/storage/needle_map.go

@ -47,7 +47,7 @@ type baseNeedleMapper struct {
type TempNeedleMapper interface { type TempNeedleMapper interface {
NeedleMapper NeedleMapper
DoOffsetLoading(v *Volume, indexFile *os.File, startFrom uint64) error DoOffsetLoading(v *Volume, indexFile *os.File, startFrom uint64) error
UpdateNeedleMap(v *Volume, indexFile *os.File, opts *opt.Options) error
UpdateNeedleMap(v *Volume, indexFile *os.File, opts *opt.Options, ldbTimeout int64) error
} }
func (nm *baseNeedleMapper) IndexFileSize() uint64 { func (nm *baseNeedleMapper) IndexFileSize() uint64 {

139
weed/storage/needle_map_leveldb.go

@ -5,6 +5,8 @@ import (
"os" "os"
"path/filepath" "path/filepath"
"strings" "strings"
"sync"
"time"
"github.com/syndtr/goleveldb/leveldb/errors" "github.com/syndtr/goleveldb/leveldb/errors"
"github.com/syndtr/goleveldb/leveldb/opt" "github.com/syndtr/goleveldb/leveldb/opt"
@ -26,12 +28,18 @@ var watermarkKey = []byte("idx_entry_watermark")
type LevelDbNeedleMap struct { type LevelDbNeedleMap struct {
baseNeedleMapper baseNeedleMapper
dbFileName string
db *leveldb.DB
dbFileName string
db *leveldb.DB
ldbOpts *opt.Options
ldbAccessLock sync.RWMutex
exitChan chan bool
// no need to use atomic
accessFlag int64
ldbTimeout int64
recordCount uint64 recordCount uint64
} }
func NewLevelDbNeedleMap(dbFileName string, indexFile *os.File, opts *opt.Options) (m *LevelDbNeedleMap, err error) {
func NewLevelDbNeedleMap(dbFileName string, indexFile *os.File, opts *opt.Options, ldbTimeout int64) (m *LevelDbNeedleMap, err error) {
m = &LevelDbNeedleMap{dbFileName: dbFileName} m = &LevelDbNeedleMap{dbFileName: dbFileName}
m.indexFile = indexFile m.indexFile = indexFile
if !isLevelDbFresh(dbFileName, indexFile) { if !isLevelDbFresh(dbFileName, indexFile) {
@ -46,27 +54,36 @@ func NewLevelDbNeedleMap(dbFileName string, indexFile *os.File, opts *opt.Option
} }
glog.V(1).Infof("Opening %s...", dbFileName) glog.V(1).Infof("Opening %s...", dbFileName)
if m.db, err = leveldb.OpenFile(dbFileName, opts); err != nil {
if errors.IsCorrupted(err) {
m.db, err = leveldb.RecoverFile(dbFileName, opts)
if m.ldbTimeout == 0 {
if m.db, err = leveldb.OpenFile(dbFileName, opts); err != nil {
if errors.IsCorrupted(err) {
m.db, err = leveldb.RecoverFile(dbFileName, opts)
}
if err != nil {
return
}
} }
glog.V(0).Infof("Loading %s... , watermark: %d", dbFileName, getWatermark(m.db))
m.recordCount = uint64(m.indexFileOffset / NeedleMapEntrySize)
watermark := (m.recordCount / watermarkBatchSize) * watermarkBatchSize
err = setWatermark(m.db, watermark)
if err != nil { if err != nil {
glog.Fatalf("set watermark for %s error: %s\n", dbFileName, err)
return return
} }
} }
glog.V(0).Infof("Loading %s... , watermark: %d", dbFileName, getWatermark(m.db))
m.recordCount = uint64(m.indexFileOffset / NeedleMapEntrySize)
watermark := (m.recordCount / watermarkBatchSize) * watermarkBatchSize
err = setWatermark(m.db, watermark)
if err != nil {
glog.Fatalf("set watermark for %s error: %s\n", dbFileName, err)
return
}
mm, indexLoadError := newNeedleMapMetricFromIndexFile(indexFile) mm, indexLoadError := newNeedleMapMetricFromIndexFile(indexFile)
if indexLoadError != nil { if indexLoadError != nil {
return nil, indexLoadError return nil, indexLoadError
} }
m.mapMetric = *mm m.mapMetric = *mm
m.ldbTimeout = ldbTimeout
if m.ldbTimeout > 0 {
m.ldbOpts = opts
m.exitChan = make(chan bool, 1)
m.accessFlag = 0
go lazyLoadingRoutine(m)
}
return return
} }
@ -116,6 +133,14 @@ func generateLevelDbFile(dbFileName string, indexFile *os.File) error {
func (m *LevelDbNeedleMap) Get(key NeedleId) (element *needle_map.NeedleValue, ok bool) { func (m *LevelDbNeedleMap) Get(key NeedleId) (element *needle_map.NeedleValue, ok bool) {
bytes := make([]byte, NeedleIdSize) bytes := make([]byte, NeedleIdSize)
if m.ldbTimeout > 0 {
m.ldbAccessLock.RLock()
defer m.ldbAccessLock.RUnlock()
loadErr := reloadLdb(m)
if loadErr != nil {
return nil, false
}
}
NeedleIdToBytes(bytes[0:NeedleIdSize], key) NeedleIdToBytes(bytes[0:NeedleIdSize], key)
data, err := m.db.Get(bytes, nil) data, err := m.db.Get(bytes, nil)
if err != nil || len(data) != OffsetSize+SizeSize { if err != nil || len(data) != OffsetSize+SizeSize {
@ -129,6 +154,14 @@ func (m *LevelDbNeedleMap) Get(key NeedleId) (element *needle_map.NeedleValue, o
func (m *LevelDbNeedleMap) Put(key NeedleId, offset Offset, size Size) error { func (m *LevelDbNeedleMap) Put(key NeedleId, offset Offset, size Size) error {
var oldSize Size var oldSize Size
var watermark uint64 var watermark uint64
if m.ldbTimeout > 0 {
m.ldbAccessLock.RLock()
defer m.ldbAccessLock.RUnlock()
loadErr := reloadLdb(m)
if loadErr != nil {
return loadErr
}
}
if oldNeedle, ok := m.Get(key); ok { if oldNeedle, ok := m.Get(key); ok {
oldSize = oldNeedle.Size oldSize = oldNeedle.Size
} }
@ -188,6 +221,14 @@ func levelDbDelete(db *leveldb.DB, key NeedleId) error {
func (m *LevelDbNeedleMap) Delete(key NeedleId, offset Offset) error { func (m *LevelDbNeedleMap) Delete(key NeedleId, offset Offset) error {
var watermark uint64 var watermark uint64
if m.ldbTimeout > 0 {
m.ldbAccessLock.RLock()
defer m.ldbAccessLock.RUnlock()
loadErr := reloadLdb(m)
if loadErr != nil {
return loadErr
}
}
oldNeedle, found := m.Get(key) oldNeedle, found := m.Get(key)
if !found || oldNeedle.Size.IsDeleted() { if !found || oldNeedle.Size.IsDeleted() {
return nil return nil
@ -223,6 +264,9 @@ func (m *LevelDbNeedleMap) Close() {
glog.Warningf("close levelDB failed: %v", err) glog.Warningf("close levelDB failed: %v", err)
} }
} }
if m.ldbTimeout > 0 {
m.exitChan <- true
}
} }
func (m *LevelDbNeedleMap) Destroy() error { func (m *LevelDbNeedleMap) Destroy() error {
@ -231,7 +275,7 @@ func (m *LevelDbNeedleMap) Destroy() error {
return os.RemoveAll(m.dbFileName) return os.RemoveAll(m.dbFileName)
} }
func (m *LevelDbNeedleMap) UpdateNeedleMap(v *Volume, indexFile *os.File, opts *opt.Options) error {
func (m *LevelDbNeedleMap) UpdateNeedleMap(v *Volume, indexFile *os.File, opts *opt.Options, ldbTimeout int64) error {
if v.nm != nil { if v.nm != nil {
v.nm.Close() v.nm.Close()
v.nm = nil v.nm = nil
@ -280,6 +324,13 @@ func (m *LevelDbNeedleMap) UpdateNeedleMap(v *Volume, indexFile *os.File, opts *
} }
v.nm = m v.nm = m
v.tmpNm = nil v.tmpNm = nil
m.ldbTimeout = ldbTimeout
if m.ldbTimeout > 0 {
m.ldbOpts = opts
m.exitChan = make(chan bool, 1)
m.accessFlag = 0
go lazyLoadingRoutine(m)
}
return e return e
} }
@ -348,3 +399,61 @@ func (m *LevelDbNeedleMap) DoOffsetLoading(v *Volume, indexFile *os.File, startF
}) })
return err return err
} }
func reloadLdb(m *LevelDbNeedleMap) (err error) {
if m.db != nil {
return nil
}
glog.V(1).Infof("reloading leveldb %s", m.dbFileName)
m.accessFlag = 1
if m.db, err = leveldb.OpenFile(m.dbFileName, m.ldbOpts); err != nil {
if errors.IsCorrupted(err) {
m.db, err = leveldb.RecoverFile(m.dbFileName, m.ldbOpts)
}
if err != nil {
glog.Fatalf("RecoverFile %s failed:%v", m.dbFileName, err)
return err
}
}
return nil
}
func unloadLdb(m *LevelDbNeedleMap) (err error) {
m.ldbAccessLock.Lock()
defer m.ldbAccessLock.Unlock()
if m.db != nil {
glog.V(1).Infof("reached max idle count, unload leveldb, %s", m.dbFileName)
m.db.Close()
m.db = nil
}
return nil
}
func lazyLoadingRoutine(m *LevelDbNeedleMap) (err error) {
glog.V(1).Infof("lazyLoadingRoutine %s", m.dbFileName)
var accessRecord int64
accessRecord = 1
for {
select {
case exit := <-m.exitChan:
if exit {
glog.V(1).Infof("exit from lazyLoadingRoutine")
return nil
}
case <-time.After(time.Hour * 1):
glog.V(1).Infof("timeout %s", m.dbFileName)
if m.accessFlag == 0 {
accessRecord++
glog.V(1).Infof("accessRecord++")
if accessRecord >= m.ldbTimeout {
unloadLdb(m)
}
} else {
glog.V(1).Infof("reset accessRecord %s", m.dbFileName)
// reset accessRecord
accessRecord = 0
}
continue
}
}
}

2
weed/storage/needle_map_memory.go

@ -84,7 +84,7 @@ func (nm *NeedleMap) Destroy() error {
return os.Remove(nm.indexFile.Name()) return os.Remove(nm.indexFile.Name())
} }
func (nm *NeedleMap) UpdateNeedleMap(v *Volume, indexFile *os.File, opts *opt.Options) error {
func (nm *NeedleMap) UpdateNeedleMap(v *Volume, indexFile *os.File, opts *opt.Options, ldbTimeout int64) error {
if v.nm != nil { if v.nm != nil {
v.nm.Close() v.nm.Close()
v.nm = nil v.nm = nil

14
weed/storage/store.go

@ -80,7 +80,7 @@ func (s *Store) String() (str string) {
} }
func NewStore(grpcDialOption grpc.DialOption, ip string, port int, grpcPort int, publicUrl string, dirnames []string, maxVolumeCounts []int32, func NewStore(grpcDialOption grpc.DialOption, ip string, port int, grpcPort int, publicUrl string, dirnames []string, maxVolumeCounts []int32,
minFreeSpaces []util.MinFreeSpace, idxFolder string, needleMapKind NeedleMapKind, diskTypes []DiskType) (s *Store) {
minFreeSpaces []util.MinFreeSpace, idxFolder string, needleMapKind NeedleMapKind, diskTypes []DiskType, ldbTimeout int64) (s *Store) {
s = &Store{grpcDialOption: grpcDialOption, Port: port, Ip: ip, GrpcPort: grpcPort, PublicUrl: publicUrl, NeedleMapKind: needleMapKind} s = &Store{grpcDialOption: grpcDialOption, Port: port, Ip: ip, GrpcPort: grpcPort, PublicUrl: publicUrl, NeedleMapKind: needleMapKind}
s.Locations = make([]*DiskLocation, 0) s.Locations = make([]*DiskLocation, 0)
@ -93,7 +93,7 @@ func NewStore(grpcDialOption grpc.DialOption, ip string, port int, grpcPort int,
wg.Add(1) wg.Add(1)
go func() { go func() {
defer wg.Done() defer wg.Done()
location.loadExistingVolumes(needleMapKind)
location.loadExistingVolumes(needleMapKind, ldbTimeout)
}() }()
} }
wg.Wait() wg.Wait()
@ -106,7 +106,7 @@ func NewStore(grpcDialOption grpc.DialOption, ip string, port int, grpcPort int,
return return
} }
func (s *Store) AddVolume(volumeId needle.VolumeId, collection string, needleMapKind NeedleMapKind, replicaPlacement string, ttlString string, preallocate int64, MemoryMapMaxSizeMb uint32, diskType DiskType) error {
func (s *Store) AddVolume(volumeId needle.VolumeId, collection string, needleMapKind NeedleMapKind, replicaPlacement string, ttlString string, preallocate int64, MemoryMapMaxSizeMb uint32, diskType DiskType, ldbTimeout int64) error {
rt, e := super_block.NewReplicaPlacementFromString(replicaPlacement) rt, e := super_block.NewReplicaPlacementFromString(replicaPlacement)
if e != nil { if e != nil {
return e return e
@ -115,7 +115,7 @@ func (s *Store) AddVolume(volumeId needle.VolumeId, collection string, needleMap
if e != nil { if e != nil {
return e return e
} }
e = s.addVolume(volumeId, collection, needleMapKind, rt, ttl, preallocate, MemoryMapMaxSizeMb, diskType)
e = s.addVolume(volumeId, collection, needleMapKind, rt, ttl, preallocate, MemoryMapMaxSizeMb, diskType, ldbTimeout)
return e return e
} }
func (s *Store) DeleteCollection(collection string) (e error) { func (s *Store) DeleteCollection(collection string) (e error) {
@ -158,14 +158,14 @@ func (s *Store) FindFreeLocation(diskType DiskType) (ret *DiskLocation) {
} }
return ret return ret
} }
func (s *Store) addVolume(vid needle.VolumeId, collection string, needleMapKind NeedleMapKind, replicaPlacement *super_block.ReplicaPlacement, ttl *needle.TTL, preallocate int64, memoryMapMaxSizeMb uint32, diskType DiskType) error {
func (s *Store) addVolume(vid needle.VolumeId, collection string, needleMapKind NeedleMapKind, replicaPlacement *super_block.ReplicaPlacement, ttl *needle.TTL, preallocate int64, memoryMapMaxSizeMb uint32, diskType DiskType, ldbTimeout int64) error {
if s.findVolume(vid) != nil { if s.findVolume(vid) != nil {
return fmt.Errorf("Volume Id %d already exists!", vid) return fmt.Errorf("Volume Id %d already exists!", vid)
} }
if location := s.FindFreeLocation(diskType); location != nil { if location := s.FindFreeLocation(diskType); location != nil {
glog.V(0).Infof("In dir %s adds volume:%v collection:%s replicaPlacement:%v ttl:%v", glog.V(0).Infof("In dir %s adds volume:%v collection:%s replicaPlacement:%v ttl:%v",
location.Directory, vid, collection, replicaPlacement, ttl) location.Directory, vid, collection, replicaPlacement, ttl)
if volume, err := NewVolume(location.Directory, location.IdxDirectory, collection, vid, needleMapKind, replicaPlacement, ttl, preallocate, memoryMapMaxSizeMb); err == nil {
if volume, err := NewVolume(location.Directory, location.IdxDirectory, collection, vid, needleMapKind, replicaPlacement, ttl, preallocate, memoryMapMaxSizeMb, ldbTimeout); err == nil {
location.SetVolume(vid, volume) location.SetVolume(vid, volume)
glog.V(0).Infof("add volume %d", vid) glog.V(0).Infof("add volume %d", vid)
s.NewVolumesChan <- master_pb.VolumeShortInformationMessage{ s.NewVolumesChan <- master_pb.VolumeShortInformationMessage{
@ -373,7 +373,7 @@ func (s *Store) SetStopping() {
func (s *Store) LoadNewVolumes() { func (s *Store) LoadNewVolumes() {
for _, location := range s.Locations { for _, location := range s.Locations {
location.loadExistingVolumes(s.NeedleMapKind)
location.loadExistingVolumes(s.NeedleMapKind, 0)
} }
} }

4
weed/storage/volume.go

@ -43,6 +43,7 @@ type Volume struct {
lastCompactIndexOffset uint64 lastCompactIndexOffset uint64
lastCompactRevision uint16 lastCompactRevision uint16
ldbTimeout int64
isCompacting bool isCompacting bool
isCommitCompacting bool isCommitCompacting bool
@ -53,12 +54,13 @@ type Volume struct {
lastIoError error lastIoError error
} }
func NewVolume(dirname string, dirIdx string, collection string, id needle.VolumeId, needleMapKind NeedleMapKind, replicaPlacement *super_block.ReplicaPlacement, ttl *needle.TTL, preallocate int64, memoryMapMaxSizeMb uint32) (v *Volume, e error) {
func NewVolume(dirname string, dirIdx string, collection string, id needle.VolumeId, needleMapKind NeedleMapKind, replicaPlacement *super_block.ReplicaPlacement, ttl *needle.TTL, preallocate int64, memoryMapMaxSizeMb uint32, ldbTimeout int64) (v *Volume, e error) {
// if replicaPlacement is nil, the superblock will be loaded from disk // if replicaPlacement is nil, the superblock will be loaded from disk
v = &Volume{dir: dirname, dirIdx: dirIdx, Collection: collection, Id: id, MemoryMapMaxSizeMb: memoryMapMaxSizeMb, v = &Volume{dir: dirname, dirIdx: dirIdx, Collection: collection, Id: id, MemoryMapMaxSizeMb: memoryMapMaxSizeMb,
asyncRequestsChan: make(chan *needle.AsyncRequest, 128)} asyncRequestsChan: make(chan *needle.AsyncRequest, 128)}
v.SuperBlock = super_block.SuperBlock{ReplicaPlacement: replicaPlacement, Ttl: ttl} v.SuperBlock = super_block.SuperBlock{ReplicaPlacement: replicaPlacement, Ttl: ttl}
v.needleMapKind = needleMapKind v.needleMapKind = needleMapKind
v.ldbTimeout = ldbTimeout
e = v.load(true, true, needleMapKind, preallocate) e = v.load(true, true, needleMapKind, preallocate)
v.startWorker() v.startWorker()
return return

17
weed/storage/volume_loading.go

@ -2,9 +2,10 @@ package storage
import ( import (
"fmt" "fmt"
"github.com/seaweedfs/seaweedfs/weed/storage/types"
"os" "os"
"github.com/seaweedfs/seaweedfs/weed/storage/types"
"github.com/syndtr/goleveldb/leveldb/opt" "github.com/syndtr/goleveldb/leveldb/opt"
"github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/glog"
@ -136,7 +137,7 @@ func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, needleMapKind
case NeedleMapInMemory: case NeedleMapInMemory:
if v.tmpNm != nil { if v.tmpNm != nil {
glog.V(0).Infof("updating memory compact index %s ", v.FileName(".idx")) glog.V(0).Infof("updating memory compact index %s ", v.FileName(".idx"))
err = v.tmpNm.UpdateNeedleMap(v, indexFile, nil)
err = v.tmpNm.UpdateNeedleMap(v, indexFile, nil, 0)
} else { } else {
glog.V(0).Infoln("loading memory index", v.FileName(".idx"), "to memory") glog.V(0).Infoln("loading memory index", v.FileName(".idx"), "to memory")
if v.nm, err = LoadCompactNeedleMap(indexFile); err != nil { if v.nm, err = LoadCompactNeedleMap(indexFile); err != nil {
@ -151,10 +152,10 @@ func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, needleMapKind
} }
if v.tmpNm != nil { if v.tmpNm != nil {
glog.V(0).Infoln("updating leveldb index", v.FileName(".ldb")) glog.V(0).Infoln("updating leveldb index", v.FileName(".ldb"))
err = v.tmpNm.UpdateNeedleMap(v, indexFile, opts)
err = v.tmpNm.UpdateNeedleMap(v, indexFile, opts, v.ldbTimeout)
} else { } else {
glog.V(0).Infoln("loading leveldb index", v.FileName(".ldb")) glog.V(0).Infoln("loading leveldb index", v.FileName(".ldb"))
if v.nm, err = NewLevelDbNeedleMap(v.FileName(".ldb"), indexFile, opts); err != nil {
if v.nm, err = NewLevelDbNeedleMap(v.FileName(".ldb"), indexFile, opts, v.ldbTimeout); err != nil {
glog.V(0).Infof("loading leveldb %s error: %v", v.FileName(".ldb"), err) glog.V(0).Infof("loading leveldb %s error: %v", v.FileName(".ldb"), err)
} }
} }
@ -166,10 +167,10 @@ func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, needleMapKind
} }
if v.tmpNm != nil { if v.tmpNm != nil {
glog.V(0).Infoln("updating leveldb medium index", v.FileName(".ldb")) glog.V(0).Infoln("updating leveldb medium index", v.FileName(".ldb"))
err = v.tmpNm.UpdateNeedleMap(v, indexFile, opts)
err = v.tmpNm.UpdateNeedleMap(v, indexFile, opts, v.ldbTimeout)
} else { } else {
glog.V(0).Infoln("loading leveldb medium index", v.FileName(".ldb")) glog.V(0).Infoln("loading leveldb medium index", v.FileName(".ldb"))
if v.nm, err = NewLevelDbNeedleMap(v.FileName(".ldb"), indexFile, opts); err != nil {
if v.nm, err = NewLevelDbNeedleMap(v.FileName(".ldb"), indexFile, opts, v.ldbTimeout); err != nil {
glog.V(0).Infof("loading leveldb %s error: %v", v.FileName(".ldb"), err) glog.V(0).Infof("loading leveldb %s error: %v", v.FileName(".ldb"), err)
} }
} }
@ -181,10 +182,10 @@ func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, needleMapKind
} }
if v.tmpNm != nil { if v.tmpNm != nil {
glog.V(0).Infoln("updating leveldb large index", v.FileName(".ldb")) glog.V(0).Infoln("updating leveldb large index", v.FileName(".ldb"))
err = v.tmpNm.UpdateNeedleMap(v, indexFile, opts)
err = v.tmpNm.UpdateNeedleMap(v, indexFile, opts, v.ldbTimeout)
} else { } else {
glog.V(0).Infoln("loading leveldb large index", v.FileName(".ldb")) glog.V(0).Infoln("loading leveldb large index", v.FileName(".ldb"))
if v.nm, err = NewLevelDbNeedleMap(v.FileName(".ldb"), indexFile, opts); err != nil {
if v.nm, err = NewLevelDbNeedleMap(v.FileName(".ldb"), indexFile, opts, v.ldbTimeout); err != nil {
glog.V(0).Infof("loading leveldb %s error: %v", v.FileName(".ldb"), err) glog.V(0).Infof("loading leveldb %s error: %v", v.FileName(".ldb"), err)
} }
} }

7
weed/storage/volume_read_test.go

@ -1,17 +1,18 @@
package storage package storage
import ( import (
"testing"
"github.com/seaweedfs/seaweedfs/weed/storage/needle" "github.com/seaweedfs/seaweedfs/weed/storage/needle"
"github.com/seaweedfs/seaweedfs/weed/storage/super_block" "github.com/seaweedfs/seaweedfs/weed/storage/super_block"
"github.com/seaweedfs/seaweedfs/weed/storage/types" "github.com/seaweedfs/seaweedfs/weed/storage/types"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"testing"
) )
func TestReadNeedMetaWithWritesAndUpdates(t *testing.T) { func TestReadNeedMetaWithWritesAndUpdates(t *testing.T) {
dir := t.TempDir() dir := t.TempDir()
v, err := NewVolume(dir, dir, "", 1, NeedleMapInMemory, &super_block.ReplicaPlacement{}, &needle.TTL{}, 0, 0)
v, err := NewVolume(dir, dir, "", 1, NeedleMapInMemory, &super_block.ReplicaPlacement{}, &needle.TTL{}, 0, 0, 0)
if err != nil { if err != nil {
t.Fatalf("volume creation: %v", err) t.Fatalf("volume creation: %v", err)
} }
@ -48,7 +49,7 @@ func TestReadNeedMetaWithWritesAndUpdates(t *testing.T) {
func TestReadNeedMetaWithDeletesThenWrites(t *testing.T) { func TestReadNeedMetaWithDeletesThenWrites(t *testing.T) {
dir := t.TempDir() dir := t.TempDir()
v, err := NewVolume(dir, dir, "", 1, NeedleMapInMemory, &super_block.ReplicaPlacement{}, &needle.TTL{}, 0, 0)
v, err := NewVolume(dir, dir, "", 1, NeedleMapInMemory, &super_block.ReplicaPlacement{}, &needle.TTL{}, 0, 0, 0)
if err != nil { if err != nil {
t.Fatalf("volume creation: %v", err) t.Fatalf("volume creation: %v", err)
} }

4
weed/storage/volume_vacuum_test.go

@ -72,7 +72,7 @@ func TestLDBIndexCompaction(t *testing.T) {
func testCompaction(t *testing.T, needleMapKind NeedleMapKind) { func testCompaction(t *testing.T, needleMapKind NeedleMapKind) {
dir := t.TempDir() dir := t.TempDir()
v, err := NewVolume(dir, dir, "", 1, needleMapKind, &super_block.ReplicaPlacement{}, &needle.TTL{}, 0, 0)
v, err := NewVolume(dir, dir, "", 1, needleMapKind, &super_block.ReplicaPlacement{}, &needle.TTL{}, 0, 0, 0)
if err != nil { if err != nil {
t.Fatalf("volume creation: %v", err) t.Fatalf("volume creation: %v", err)
} }
@ -115,7 +115,7 @@ func testCompaction(t *testing.T, needleMapKind NeedleMapKind) {
v.Close() v.Close()
v, err = NewVolume(dir, dir, "", 1, needleMapKind, nil, nil, 0, 0)
v, err = NewVolume(dir, dir, "", 1, needleMapKind, nil, nil, 0, 0, 0)
if err != nil { if err != nil {
t.Fatalf("volume reloading: %v", err) t.Fatalf("volume reloading: %v", err)
} }

2
weed/storage/volume_write_test.go

@ -13,7 +13,7 @@ import (
func TestSearchVolumesWithDeletedNeedles(t *testing.T) { func TestSearchVolumesWithDeletedNeedles(t *testing.T) {
dir := t.TempDir() dir := t.TempDir()
v, err := NewVolume(dir, dir, "", 1, NeedleMapInMemory, &super_block.ReplicaPlacement{}, &needle.TTL{}, 0, 0)
v, err := NewVolume(dir, dir, "", 1, NeedleMapInMemory, &super_block.ReplicaPlacement{}, &needle.TTL{}, 0, 0, 0)
if err != nil { if err != nil {
t.Fatalf("volume creation: %v", err) t.Fatalf("volume creation: %v", err)
} }

2
weed/util/chunk_cache/chunk_cache_on_disk.go

@ -69,7 +69,7 @@ func LoadOrCreateChunkCacheVolume(fileName string, preallocate int64) (*ChunkCac
WriteBuffer: 1 * 1024 * 1024, // default value is 4MiB WriteBuffer: 1 * 1024 * 1024, // default value is 4MiB
CompactionTableSizeMultiplier: 10, // default value is 1 CompactionTableSizeMultiplier: 10, // default value is 1
} }
if v.nm, err = storage.NewLevelDbNeedleMap(v.fileName+".ldb", indexFile, opts); err != nil {
if v.nm, err = storage.NewLevelDbNeedleMap(v.fileName+".ldb", indexFile, opts, 0); err != nil {
return nil, fmt.Errorf("loading leveldb %s error: %v", v.fileName+".ldb", err) return nil, fmt.Errorf("loading leveldb %s error: %v", v.fileName+".ldb", err)
} }

Loading…
Cancel
Save