Browse Source

properly lock file.entry object

fix https://github.com/chrislusf/seaweedfs/issues/1882
pull/1883/head
Chris Lu 4 years ago
parent
commit
5ba4b479f8
  1. 25
      weed/filesys/dir_link.go
  2. 9
      weed/filesys/dirty_page.go
  3. 27
      weed/filesys/file.go
  4. 66
      weed/filesys/filehandle.go
  5. 5
      weed/filesys/fscache.go

25
weed/filesys/dir_link.go

@ -35,15 +35,20 @@ func (dir *Dir) Link(ctx context.Context, req *fuse.LinkRequest, old fs.Node) (f
return nil, err
}
oldEntry := oldFile.getEntry()
if oldEntry == nil {
return nil, fuse.EIO
}
// update old file to hardlink mode
if len(oldFile.entry.HardLinkId) == 0 {
oldFile.entry.HardLinkId = append(util.RandomBytes(16), HARD_LINK_MARKER)
oldFile.entry.HardLinkCounter = 1
if len(oldEntry.HardLinkId) == 0 {
oldEntry.HardLinkId = append(util.RandomBytes(16), HARD_LINK_MARKER)
oldEntry.HardLinkCounter = 1
}
oldFile.entry.HardLinkCounter++
oldEntry.HardLinkCounter++
updateOldEntryRequest := &filer_pb.UpdateEntryRequest{
Directory: oldFile.dir.FullPath(),
Entry: oldFile.entry,
Entry: oldEntry,
Signatures: []int32{dir.wfs.signature},
}
@ -53,11 +58,11 @@ func (dir *Dir) Link(ctx context.Context, req *fuse.LinkRequest, old fs.Node) (f
Entry: &filer_pb.Entry{
Name: req.NewName,
IsDirectory: false,
Attributes: oldFile.entry.Attributes,
Chunks: oldFile.entry.Chunks,
Extended: oldFile.entry.Extended,
HardLinkId: oldFile.entry.HardLinkId,
HardLinkCounter: oldFile.entry.HardLinkCounter,
Attributes: oldEntry.Attributes,
Chunks: oldEntry.Chunks,
Extended: oldEntry.Extended,
HardLinkId: oldEntry.HardLinkId,
HardLinkCounter: oldEntry.HardLinkCounter,
},
Signatures: []int32{dir.wfs.signature},
}

9
weed/filesys/dirty_page.go

@ -30,7 +30,7 @@ func newDirtyPages(file *File) *ContinuousDirtyPages {
func (pages *ContinuousDirtyPages) AddPage(offset int64, data []byte) {
glog.V(4).Infof("%s AddPage [%d,%d) of %d bytes", pages.f.fullpath(), offset, offset+int64(len(data)), pages.f.entry.Attributes.FileSize)
glog.V(4).Infof("%s AddPage [%d,%d)", pages.f.fullpath(), offset, offset+int64(len(data)))
if len(data) > int(pages.f.wfs.option.ChunkSizeLimit) {
// this is more than what buffer can hold.
@ -69,7 +69,12 @@ func (pages *ContinuousDirtyPages) saveExistingLargestPageToStorage() (hasSavedD
return false
}
fileSize := int64(pages.f.entry.Attributes.FileSize)
entry := pages.f.getEntry()
if entry == nil {
return false
}
fileSize := int64(entry.Attributes.FileSize)
chunkSize := min(maxList.Size(), fileSize-maxList.Offset())
if chunkSize == 0 {

27
weed/filesys/file.go

@ -5,6 +5,7 @@ import (
"io"
"os"
"sort"
"sync"
"time"
"github.com/seaweedfs/fuse"
@ -33,6 +34,7 @@ type File struct {
dir *Dir
wfs *WFS
entry *filer_pb.Entry
entryLock sync.RWMutex
entryViewCache []filer.VisibleInterval
isOpen int
reader io.ReaderAt
@ -47,7 +49,7 @@ func (file *File) Attr(ctx context.Context, attr *fuse.Attr) (err error) {
glog.V(4).Infof("file Attr %s, open:%v existing:%v", file.fullpath(), file.isOpen, attr)
entry := file.entry
entry := file.getEntry()
if file.isOpen <= 0 || entry == nil {
if entry, err = file.maybeLoadEntry(ctx); err != nil {
return err
@ -258,7 +260,7 @@ func (file *File) Forget() {
}
func (file *File) maybeLoadEntry(ctx context.Context) (entry *filer_pb.Entry, err error) {
entry = file.entry
entry = file.getEntry()
if file.isOpen > 0 {
return entry, nil
}
@ -299,8 +301,13 @@ func (file *File) addChunks(chunks []*filer_pb.FileChunk) {
}
}
entry := file.getEntry()
if entry == nil {
return
}
// pick out-of-order chunks from existing chunks
for _, chunk := range file.entry.Chunks {
for _, chunk := range entry.Chunks {
if lessThan(earliestChunk, chunk) {
chunks = append(chunks, chunk)
}
@ -318,18 +325,22 @@ func (file *File) addChunks(chunks []*filer_pb.FileChunk) {
file.reader = nil
glog.V(4).Infof("%s existing %d chunks adds %d more", file.fullpath(), len(file.entry.Chunks), len(chunks))
glog.V(4).Infof("%s existing %d chunks adds %d more", file.fullpath(), len(entry.Chunks), len(chunks))
file.entry.Chunks = append(file.entry.Chunks, newChunks...)
entry.Chunks = append(entry.Chunks, newChunks...)
}
func (file *File) setEntry(entry *filer_pb.Entry) {
file.entryLock.Lock()
defer file.entryLock.Unlock()
file.entry = entry
file.entryViewCache, _ = filer.NonOverlappingVisibleIntervals(file.wfs.LookupFn(), entry.Chunks)
file.reader = nil
}
func (file *File) clearEntry() {
file.entryLock.Lock()
defer file.entryLock.Unlock()
file.entry = nil
file.entryViewCache = nil
file.reader = nil
@ -359,3 +370,9 @@ func (file *File) saveEntry(entry *filer_pb.Entry) error {
return nil
})
}
func (file *File) getEntry() *filer_pb.Entry {
file.entryLock.RLock()
defer file.entryLock.RUnlock()
return file.entry
}

66
weed/filesys/filehandle.go

@ -40,8 +40,9 @@ func newFileHandle(file *File, uid, gid uint32) *FileHandle {
Uid: uid,
Gid: gid,
}
if fh.f.entry != nil {
fh.f.entry.Attributes.FileSize = filer.FileSize(fh.f.entry)
entry := fh.f.getEntry()
if entry != nil {
entry.Attributes.FileSize = filer.FileSize(entry)
}
return fh
@ -104,22 +105,27 @@ func (fh *FileHandle) readFromDirtyPages(buff []byte, startOffset int64) (maxSto
func (fh *FileHandle) readFromChunks(buff []byte, offset int64) (int64, error) {
fileSize := int64(filer.FileSize(fh.f.entry))
entry := fh.f.getEntry()
if entry == nil {
return 0, io.EOF
}
fileSize := int64(filer.FileSize(entry))
if fileSize == 0 {
glog.V(1).Infof("empty fh %v", fh.f.fullpath())
return 0, io.EOF
}
if offset+int64(len(buff)) <= int64(len(fh.f.entry.Content)) {
totalRead := copy(buff, fh.f.entry.Content[offset:])
if offset+int64(len(buff)) <= int64(len(entry.Content)) {
totalRead := copy(buff, entry.Content[offset:])
glog.V(4).Infof("file handle read cached %s [%d,%d] %d", fh.f.fullpath(), offset, offset+int64(totalRead), totalRead)
return int64(totalRead), nil
}
var chunkResolveErr error
if fh.f.entryViewCache == nil {
fh.f.entryViewCache, chunkResolveErr = filer.NonOverlappingVisibleIntervals(fh.f.wfs.LookupFn(), fh.f.entry.Chunks)
fh.f.entryViewCache, chunkResolveErr = filer.NonOverlappingVisibleIntervals(fh.f.wfs.LookupFn(), entry.Chunks)
if chunkResolveErr != nil {
return 0, fmt.Errorf("fail to resolve chunk manifest: %v", chunkResolveErr)
}
@ -158,8 +164,13 @@ func (fh *FileHandle) Write(ctx context.Context, req *fuse.WriteRequest, resp *f
copy(data, req.Data)
}
fh.f.entry.Content = nil
fh.f.entry.Attributes.FileSize = uint64(max(req.Offset+int64(len(data)), int64(fh.f.entry.Attributes.FileSize)))
entry := fh.f.getEntry()
if entry == nil {
return fuse.EIO
}
entry.Content = nil
entry.Attributes.FileSize = uint64(max(req.Offset+int64(len(data)), int64(entry.Attributes.FileSize)))
glog.V(4).Infof("%v write [%d,%d) %d", fh.f.fullpath(), req.Offset, req.Offset+int64(len(req.Data)), len(req.Data))
fh.dirtyPages.AddPage(req.Offset, data)
@ -242,35 +253,40 @@ func (fh *FileHandle) doFlush(ctx context.Context, header fuse.Header) error {
err := fh.f.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
if fh.f.entry.Attributes != nil {
fh.f.entry.Attributes.Mime = fh.contentType
if fh.f.entry.Attributes.Uid == 0 {
fh.f.entry.Attributes.Uid = header.Uid
entry := fh.f.getEntry()
if entry == nil {
return nil
}
if entry.Attributes != nil {
entry.Attributes.Mime = fh.contentType
if entry.Attributes.Uid == 0 {
entry.Attributes.Uid = header.Uid
}
if fh.f.entry.Attributes.Gid == 0 {
fh.f.entry.Attributes.Gid = header.Gid
if entry.Attributes.Gid == 0 {
entry.Attributes.Gid = header.Gid
}
if fh.f.entry.Attributes.Crtime == 0 {
fh.f.entry.Attributes.Crtime = time.Now().Unix()
if entry.Attributes.Crtime == 0 {
entry.Attributes.Crtime = time.Now().Unix()
}
fh.f.entry.Attributes.Mtime = time.Now().Unix()
fh.f.entry.Attributes.FileMode = uint32(os.FileMode(fh.f.entry.Attributes.FileMode) &^ fh.f.wfs.option.Umask)
fh.f.entry.Attributes.Collection = fh.dirtyPages.collection
fh.f.entry.Attributes.Replication = fh.dirtyPages.replication
entry.Attributes.Mtime = time.Now().Unix()
entry.Attributes.FileMode = uint32(os.FileMode(entry.Attributes.FileMode) &^ fh.f.wfs.option.Umask)
entry.Attributes.Collection = fh.dirtyPages.collection
entry.Attributes.Replication = fh.dirtyPages.replication
}
request := &filer_pb.CreateEntryRequest{
Directory: fh.f.dir.FullPath(),
Entry: fh.f.entry,
Entry: entry,
Signatures: []int32{fh.f.wfs.signature},
}
glog.V(4).Infof("%s set chunks: %v", fh.f.fullpath(), len(fh.f.entry.Chunks))
for i, chunk := range fh.f.entry.Chunks {
glog.V(4).Infof("%s set chunks: %v", fh.f.fullpath(), len(entry.Chunks))
for i, chunk := range entry.Chunks {
glog.V(4).Infof("%s chunks %d: %v [%d,%d)", fh.f.fullpath(), i, chunk.GetFileIdString(), chunk.Offset, chunk.Offset+int64(chunk.Size))
}
manifestChunks, nonManifestChunks := filer.SeparateManifestChunks(fh.f.entry.Chunks)
manifestChunks, nonManifestChunks := filer.SeparateManifestChunks(entry.Chunks)
chunks, _ := filer.CompactFileChunks(fh.f.wfs.LookupFn(), nonManifestChunks)
chunks, manifestErr := filer.MaybeManifestize(fh.f.wfs.saveDataAsChunk(fh.f.fullpath()), chunks)
@ -278,7 +294,7 @@ func (fh *FileHandle) doFlush(ctx context.Context, header fuse.Header) error {
// not good, but should be ok
glog.V(0).Infof("MaybeManifestize: %v", manifestErr)
}
fh.f.entry.Chunks = append(chunks, manifestChunks...)
entry.Chunks = append(chunks, manifestChunks...)
fh.f.wfs.mapPbIdFromLocalToFiler(request.Entry)
defer fh.f.wfs.mapPbIdFromFilerToLocal(request.Entry)

5
weed/filesys/fscache.go

@ -124,8 +124,9 @@ func (c *FsCache) Move(oldPath util.FullPath, newPath util.FullPath) *FsNode {
}
if f, ok := src.node.(*File); ok {
f.Name = target.name
if f.entry != nil {
f.entry.Name = f.Name
entry := f.getEntry()
if entry != nil {
entry.Name = f.Name
}
}
parent.disconnectChild(target)

Loading…
Cancel
Save