Browse Source

Merge branch 'master' of https://github.com/chrislusf/seaweedfs

pull/4407/head
zemul 2 years ago
parent
commit
09f7f9331d
  1. 4
      k8s/charts/seaweedfs/Chart.yaml
  2. 30
      weed/filer/filechunk_section.go
  3. 6
      weed/filer/interval_list.go
  4. 8
      weed/filer/reader_at.go
  5. 5
      weed/mount/filehandle.go
  6. 4
      weed/mount/filehandle_read.go
  7. 4
      weed/mount/weedfs_dir_lookup.go
  8. 8
      weed/mount/weedfs_file_lseek.go
  9. 4
      weed/mount/weedfs_file_read.go
  10. 4
      weed/mount/weedfs_file_sync.go
  11. 2
      weed/util/constants.go

4
k8s/charts/seaweedfs/Chart.yaml

@ -1,5 +1,5 @@
apiVersion: v1
description: SeaweedFS
name: seaweedfs
appVersion: "3.45"
version: "3.45"
appVersion: "3.46"
version: "3.46"

30
weed/filer/filechunk_section.go

@ -13,7 +13,8 @@ type FileChunkSection struct {
visibleIntervals *IntervalList[*VisibleInterval]
chunkViews *IntervalList[*ChunkView]
reader *ChunkReadAt
lock sync.Mutex
lock sync.RWMutex
isPrepared bool
}
func NewFileChunkSection(si SectionIndex) *FileChunkSection {
@ -61,6 +62,19 @@ func removeGarbageChunks(section *FileChunkSection, garbageFileIds map[string]st
}
func (section *FileChunkSection) setupForRead(group *ChunkGroup, fileSize int64) {
if section.isPrepared {
section.reader.fileSize = fileSize
return
}
section.lock.Lock()
defer section.lock.Unlock()
if section.isPrepared {
section.reader.fileSize = fileSize
return
}
if section.visibleIntervals == nil {
section.visibleIntervals = readResolvedChunks(section.chunks, int64(section.sectionIndex)*SectionSize, (int64(section.sectionIndex)+1)*SectionSize)
section.chunks, _ = SeparateGarbageChunks(section.visibleIntervals, section.chunks)
@ -76,23 +90,25 @@ func (section *FileChunkSection) setupForRead(group *ChunkGroup, fileSize int64)
if section.reader == nil {
section.reader = NewChunkReaderAtFromClient(group.readerCache, section.chunkViews, min(int64(section.sectionIndex+1)*SectionSize, fileSize))
}
section.isPrepared = true
section.reader.fileSize = fileSize
}
func (section *FileChunkSection) readDataAt(group *ChunkGroup, fileSize int64, buff []byte, offset int64) (n int, tsNs int64, err error) {
section.lock.Lock()
defer section.lock.Unlock()
section.setupForRead(group, fileSize)
section.lock.RLock()
defer section.lock.RUnlock()
return section.reader.ReadAtWithTime(buff, offset)
}
func (section *FileChunkSection) DataStartOffset(group *ChunkGroup, offset int64, fileSize int64) int64 {
section.lock.Lock()
defer section.lock.Unlock()
section.setupForRead(group, fileSize)
section.lock.RLock()
defer section.lock.RUnlock()
for x := section.visibleIntervals.Front(); x != nil; x = x.Next {
visible := x.Value
@ -108,10 +124,10 @@ func (section *FileChunkSection) DataStartOffset(group *ChunkGroup, offset int64
}
func (section *FileChunkSection) NextStopOffset(group *ChunkGroup, offset int64, fileSize int64) int64 {
section.lock.Lock()
defer section.lock.Unlock()
section.setupForRead(group, fileSize)
section.lock.RLock()
defer section.lock.RUnlock()
isAfterOffset := false
for x := section.visibleIntervals.Front(); x != nil; x = x.Next {

6
weed/filer/interval_list.go

@ -27,7 +27,7 @@ func (interval *Interval[T]) Size() int64 {
type IntervalList[T IntervalValue] struct {
head *Interval[T]
tail *Interval[T]
Lock sync.Mutex
Lock sync.RWMutex
}
func NewIntervalList[T IntervalValue]() *IntervalList[T] {
@ -248,8 +248,8 @@ func (list *IntervalList[T]) overlayInterval(interval *Interval[T]) {
}
func (list *IntervalList[T]) Len() int {
list.Lock.Lock()
defer list.Lock.Unlock()
list.Lock.RLock()
defer list.Lock.RUnlock()
var count int
for t := list.head; t != nil; t = t.Next {

8
weed/filer/reader_at.go

@ -106,8 +106,8 @@ func (c *ChunkReadAt) ReadAt(p []byte, offset int64) (n int, err error) {
c.readerPattern.MonitorReadAt(offset, len(p))
c.chunkViews.Lock.Lock()
defer c.chunkViews.Lock.Unlock()
c.chunkViews.Lock.RLock()
defer c.chunkViews.Lock.RUnlock()
// glog.V(4).Infof("ReadAt [%d,%d) of total file size %d bytes %d chunk views", offset, offset+int64(len(p)), c.fileSize, len(c.chunkViews))
n, _, err = c.doReadAt(p, offset)
@ -118,8 +118,8 @@ func (c *ChunkReadAt) ReadAtWithTime(p []byte, offset int64) (n int, ts int64, e
c.readerPattern.MonitorReadAt(offset, len(p))
c.chunkViews.Lock.Lock()
defer c.chunkViews.Lock.Unlock()
c.chunkViews.Lock.RLock()
defer c.chunkViews.Lock.RUnlock()
// glog.V(4).Infof("ReadAt [%d,%d) of total file size %d bytes %d chunk views", offset, offset+int64(len(p)), c.fileSize, len(c.chunkViews))
return c.doReadAt(p, offset)

5
weed/mount/filehandle.go

@ -17,7 +17,7 @@ type FileHandle struct {
fh FileHandleId
counter int64
entry *LockedEntry
entryLock sync.Mutex
entryLock sync.RWMutex
entryChunkGroup *filer.ChunkGroup
inode uint64
wfs *WFS
@ -27,8 +27,7 @@ type FileHandle struct {
dirtyPages *PageWriter
reader *filer.ChunkReadAt
contentType string
handle uint64
sync.Mutex
sync.RWMutex
isDeleted bool

4
weed/mount/filehandle_read.go

@ -23,8 +23,8 @@ func (fh *FileHandle) readFromDirtyPages(buff []byte, startOffset int64, tsNs in
}
func (fh *FileHandle) readFromChunks(buff []byte, offset int64) (int64, int64, error) {
fh.entryLock.Lock()
defer fh.entryLock.Unlock()
fh.entryLock.RLock()
defer fh.entryLock.RUnlock()
fileFullPath := fh.FullPath()

4
weed/mount/weedfs_dir_lookup.go

@ -58,12 +58,12 @@ func (wfs *WFS) Lookup(cancel <-chan struct{}, header *fuse.InHeader, name strin
inode := wfs.inodeToPath.Lookup(fullFilePath, localEntry.Crtime.Unix(), localEntry.IsDirectory(), len(localEntry.HardLinkId) > 0, localEntry.Inode, true)
if fh, found := wfs.fhmap.FindFileHandle(inode); found {
fh.entryLock.Lock()
fh.entryLock.RLock()
if entry := fh.GetEntry(); entry != nil {
glog.V(4).Infof("lookup opened file %s size %d", dirPath.Child(localEntry.Name()), filer.FileSize(entry))
localEntry = filer.FromPbEntry(string(dirPath), entry)
}
fh.entryLock.Unlock()
fh.entryLock.RUnlock()
}
wfs.outputFilerEntry(out, inode, localEntry)

8
weed/mount/weedfs_file_lseek.go

@ -35,10 +35,10 @@ func (wfs *WFS) Lseek(cancel <-chan struct{}, in *fuse.LseekIn, out *fuse.LseekO
}
// lock the file until the proper offset was calculated
fh.Lock()
defer fh.Unlock()
fh.entryLock.Lock()
defer fh.entryLock.Unlock()
fh.RLock()
defer fh.RUnlock()
fh.entryLock.RLock()
defer fh.entryLock.RUnlock()
fileSize := int64(filer.FileSize(fh.GetEntry()))
offset := max(int64(in.Offset), 0)

4
weed/mount/weedfs_file_read.go

@ -41,8 +41,8 @@ func (wfs *WFS) Read(cancel <-chan struct{}, in *fuse.ReadIn, buff []byte) (fuse
return nil, fuse.ENOENT
}
fh.Lock()
defer fh.Unlock()
fh.RLock()
defer fh.RUnlock()
offset := int64(in.Offset)
totalRead, err := readDataByFileHandle(buff, fh, offset)

4
weed/mount/weedfs_file_sync.go

@ -96,7 +96,7 @@ func (wfs *WFS) doFlush(fh *FileHandle, uid, gid uint32) fuse.Status {
fileFullPath := fh.FullPath()
dir, name := fileFullPath.DirAndName()
// send the data to the OS
glog.V(4).Infof("doFlush %s fh %d", fileFullPath, fh.handle)
glog.V(4).Infof("doFlush %s fh %d", fileFullPath, fh.fh)
if !wfs.IsOverQuota {
if err := fh.dirtyPages.FlushData(); err != nil {
@ -177,7 +177,7 @@ func (wfs *WFS) doFlush(fh *FileHandle, uid, gid uint32) fuse.Status {
}
if err != nil {
glog.Errorf("%v fh %d flush: %v", fileFullPath, fh.handle, err)
glog.Errorf("%v fh %d flush: %v", fileFullPath, fh.fh, err)
return fuse.EIO
}

2
weed/util/constants.go

@ -5,7 +5,7 @@ import (
)
var (
VERSION_NUMBER = fmt.Sprintf("%.02f", 3.45)
VERSION_NUMBER = fmt.Sprintf("%.02f", 3.46)
VERSION = sizeLimit + " " + VERSION_NUMBER
COMMIT = ""
)

Loading…
Cancel
Save