Browse Source

Fix a few data races when reading files in mount (#3527)

pull/3530/head
Patrick Schmidt 2 years ago
committed by GitHub
parent
commit
5df105b1f9
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 25
      weed/filer/reader_cache.go
  2. 20
      weed/filer/reader_pattern.go
  3. 14
      weed/mount/filehandle.go
  4. 7
      weed/mount/filehandle_map.go
  5. 3
      weed/mount/weedfs.go
  6. 8
      weed/mount/weedfs_dir_lookup.go

25
weed/filer/reader_cache.go

@ -3,6 +3,7 @@ package filer
import ( import (
"fmt" "fmt"
"sync" "sync"
"sync/atomic"
"time" "time"
"github.com/seaweedfs/seaweedfs/weed/util/chunk_cache" "github.com/seaweedfs/seaweedfs/weed/util/chunk_cache"
@ -30,7 +31,7 @@ type SingleChunkCacher struct {
shouldCache bool shouldCache bool
wg sync.WaitGroup wg sync.WaitGroup
cacheStartedCh chan struct{} cacheStartedCh chan struct{}
completedTime time.Time
completedTimeNew int64
} }
func newReaderCache(limit int, chunkCache chunk_cache.ChunkCache, lookupFileIdFn wdclient.LookupFileIdFunctionType) *ReaderCache { func newReaderCache(limit int, chunkCache chunk_cache.ChunkCache, lookupFileIdFn wdclient.LookupFileIdFunctionType) *ReaderCache {
@ -50,13 +51,17 @@ func (rc *ReaderCache) MaybeCache(chunkViews []*ChunkView) {
rc.Lock() rc.Lock()
defer rc.Unlock() defer rc.Unlock()
if len(rc.downloaders) >= rc.limit {
return
}
for _, chunkView := range chunkViews { for _, chunkView := range chunkViews {
if _, found := rc.downloaders[chunkView.FileId]; found { if _, found := rc.downloaders[chunkView.FileId]; found {
continue continue
} }
if len(rc.downloaders) >= rc.limit { if len(rc.downloaders) >= rc.limit {
// if still no slots, return
// abort when slots are filled
return return
} }
@ -74,27 +79,28 @@ func (rc *ReaderCache) MaybeCache(chunkViews []*ChunkView) {
func (rc *ReaderCache) ReadChunkAt(buffer []byte, fileId string, cipherKey []byte, isGzipped bool, offset int64, chunkSize int, shouldCache bool) (int, error) { func (rc *ReaderCache) ReadChunkAt(buffer []byte, fileId string, cipherKey []byte, isGzipped bool, offset int64, chunkSize int, shouldCache bool) (int, error) {
rc.Lock() rc.Lock()
defer rc.Unlock()
if cacher, found := rc.downloaders[fileId]; found { if cacher, found := rc.downloaders[fileId]; found {
if n, err := cacher.readChunkAt(buffer, offset); n != 0 && err == nil { if n, err := cacher.readChunkAt(buffer, offset); n != 0 && err == nil {
rc.Unlock()
return n, err return n, err
} }
} }
if shouldCache || rc.lookupFileIdFn == nil { if shouldCache || rc.lookupFileIdFn == nil {
n, err := rc.chunkCache.ReadChunkAt(buffer, fileId, uint64(offset)) n, err := rc.chunkCache.ReadChunkAt(buffer, fileId, uint64(offset))
if n > 0 { if n > 0 {
rc.Unlock()
return n, err return n, err
} }
} }
// clean up old downloaders // clean up old downloaders
if len(rc.downloaders) >= rc.limit { if len(rc.downloaders) >= rc.limit {
oldestFid, oldestTime := "", time.Now()
oldestFid, oldestTime := "", time.Now().Unix()
for fid, downloader := range rc.downloaders { for fid, downloader := range rc.downloaders {
if !downloader.completedTime.IsZero() {
if downloader.completedTime.Before(oldestTime) {
oldestFid, oldestTime = fid, downloader.completedTime
}
completedTime := atomic.LoadInt64(&downloader.completedTimeNew)
if completedTime > 0 && completedTime < oldestTime {
oldestFid, oldestTime = fid, completedTime
} }
} }
if oldestFid != "" { if oldestFid != "" {
@ -110,6 +116,7 @@ func (rc *ReaderCache) ReadChunkAt(buffer []byte, fileId string, cipherKey []byt
go cacher.startCaching() go cacher.startCaching()
<-cacher.cacheStartedCh <-cacher.cacheStartedCh
rc.downloaders[fileId] = cacher rc.downloaders[fileId] = cacher
rc.Unlock()
return cacher.readChunkAt(buffer, offset) return cacher.readChunkAt(buffer, offset)
} }
@ -172,7 +179,7 @@ func (s *SingleChunkCacher) startCaching() {
if s.shouldCache { if s.shouldCache {
s.parent.chunkCache.SetChunk(s.chunkFileId, s.data) s.parent.chunkCache.SetChunk(s.chunkFileId, s.data)
} }
s.completedTime = time.Now()
atomic.StoreInt64(&s.completedTimeNew, time.Now().Unix())
return return
} }

20
weed/filer/reader_pattern.go

@ -1,5 +1,9 @@
package filer package filer
import (
"sync/atomic"
)
type ReaderPattern struct { type ReaderPattern struct {
isSequentialCounter int64 isSequentialCounter int64
lastReadStopOffset int64 lastReadStopOffset int64
@ -18,18 +22,20 @@ func NewReaderPattern() *ReaderPattern {
} }
func (rp *ReaderPattern) MonitorReadAt(offset int64, size int) { func (rp *ReaderPattern) MonitorReadAt(offset int64, size int) {
if rp.lastReadStopOffset == offset {
if rp.isSequentialCounter < ModeChangeLimit {
rp.isSequentialCounter++
lastOffset := atomic.SwapInt64(&rp.lastReadStopOffset, offset+int64(size))
counter := atomic.LoadInt64(&rp.isSequentialCounter)
if lastOffset == offset {
if counter < ModeChangeLimit {
atomic.AddInt64(&rp.isSequentialCounter, 1)
} }
} else { } else {
if rp.isSequentialCounter > -ModeChangeLimit {
rp.isSequentialCounter--
if counter > -ModeChangeLimit {
atomic.AddInt64(&rp.isSequentialCounter, -1)
} }
} }
rp.lastReadStopOffset = offset + int64(size)
} }
func (rp *ReaderPattern) IsRandomMode() bool { func (rp *ReaderPattern) IsRandomMode() bool {
return rp.isSequentialCounter < 0
return atomic.LoadInt64(&rp.isSequentialCounter) < 0
} }

14
weed/mount/filehandle.go

@ -1,12 +1,14 @@
package mount package mount
import ( import (
"sync"
"golang.org/x/exp/slices"
"github.com/seaweedfs/seaweedfs/weed/filer" "github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/util" "github.com/seaweedfs/seaweedfs/weed/util"
"golang.org/x/exp/slices"
"sync"
) )
type FileHandleId uint64 type FileHandleId uint64
@ -57,12 +59,20 @@ func (fh *FileHandle) GetEntry() *filer_pb.Entry {
defer fh.entryLock.Unlock() defer fh.entryLock.Unlock()
return fh.entry return fh.entry
} }
func (fh *FileHandle) SetEntry(entry *filer_pb.Entry) { func (fh *FileHandle) SetEntry(entry *filer_pb.Entry) {
fh.entryLock.Lock() fh.entryLock.Lock()
defer fh.entryLock.Unlock() defer fh.entryLock.Unlock()
fh.entry = entry fh.entry = entry
} }
func (fh *FileHandle) UpdateEntry(fn func(entry *filer_pb.Entry)) *filer_pb.Entry {
fh.entryLock.Lock()
defer fh.entryLock.Unlock()
fn(fh.entry)
return fh.entry
}
func (fh *FileHandle) AddChunks(chunks []*filer_pb.FileChunk) { func (fh *FileHandle) AddChunks(chunks []*filer_pb.FileChunk) {
fh.entryLock.Lock() fh.entryLock.Lock()
defer fh.entryLock.Unlock() defer fh.entryLock.Unlock()

7
weed/mount/filehandle_map.go

@ -1,8 +1,9 @@
package mount package mount
import ( import (
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"sync" "sync"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
) )
type FileHandleToInode struct { type FileHandleToInode struct {
@ -49,7 +50,9 @@ func (i *FileHandleToInode) AcquireFileHandle(wfs *WFS, inode uint64, entry *fil
} else { } else {
fh.counter++ fh.counter++
} }
fh.entry = entry
if fh.entry != entry {
fh.SetEntry(entry)
}
return fh return fh
} }

3
weed/mount/weedfs.go

@ -135,10 +135,11 @@ func (wfs *WFS) maybeReadEntry(inode uint64) (path util.FullPath, fh *FileHandle
} }
var found bool var found bool
if fh, found = wfs.fhmap.FindFileHandle(inode); found { if fh, found = wfs.fhmap.FindFileHandle(inode); found {
entry = fh.GetEntry()
entry = fh.UpdateEntry(func(entry *filer_pb.Entry) {
if entry != nil && fh.entry.Attributes == nil { if entry != nil && fh.entry.Attributes == nil {
entry.Attributes = &filer_pb.FuseAttributes{} entry.Attributes = &filer_pb.FuseAttributes{}
} }
})
} else { } else {
entry, status = wfs.maybeLoadEntry(path) entry, status = wfs.maybeLoadEntry(path)
} }

8
weed/mount/weedfs_dir_lookup.go

@ -2,7 +2,9 @@ package mount
import ( import (
"context" "context"
"github.com/hanwen/go-fuse/v2/fuse" "github.com/hanwen/go-fuse/v2/fuse"
"github.com/seaweedfs/seaweedfs/weed/filer" "github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/mount/meta_cache" "github.com/seaweedfs/seaweedfs/weed/mount/meta_cache"
@ -55,10 +57,14 @@ func (wfs *WFS) Lookup(cancel <-chan struct{}, header *fuse.InHeader, name strin
inode := wfs.inodeToPath.Lookup(fullFilePath, localEntry.Crtime.Unix(), localEntry.IsDirectory(), len(localEntry.HardLinkId) > 0, localEntry.Inode, true) inode := wfs.inodeToPath.Lookup(fullFilePath, localEntry.Crtime.Unix(), localEntry.IsDirectory(), len(localEntry.HardLinkId) > 0, localEntry.Inode, true)
if fh, found := wfs.fhmap.FindFileHandle(inode); found && fh.entry != nil {
if fh, found := wfs.fhmap.FindFileHandle(inode); found {
fh.entryLock.Lock()
if fh.entry != nil {
glog.V(4).Infof("lookup opened file %s size %d", dirPath.Child(localEntry.Name()), filer.FileSize(fh.entry)) glog.V(4).Infof("lookup opened file %s size %d", dirPath.Child(localEntry.Name()), filer.FileSize(fh.entry))
localEntry = filer.FromPbEntry(string(dirPath), fh.entry) localEntry = filer.FromPbEntry(string(dirPath), fh.entry)
} }
fh.entryLock.Unlock()
}
wfs.outputFilerEntry(out, inode, localEntry) wfs.outputFilerEntry(out, inode, localEntry)

Loading…
Cancel
Save