Browse Source

fix FUSE read for large files

FUSE expects ReadAt do not return partial filled buffer with a nil error.
pull/1255/head
Chris Lu 5 years ago
parent
commit
d1439c5bd3
  1. 123
      weed/filer2/reader_at.go
  2. 33
      weed/filer2/stream.go
  3. 3
      weed/filesys/file.go
  4. 12
      weed/filesys/filehandle.go
  5. 9
      weed/server/webdav_server.go

123
weed/filer2/reader_at.go

@ -0,0 +1,123 @@
package filer2
import (
"bytes"
"context"
"fmt"
"io"
"sync"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/util"
"github.com/chrislusf/seaweedfs/weed/wdclient"
)
type ChunkReadAt struct {
masterClient *wdclient.MasterClient
chunkViews []*ChunkView
buffer []byte
bufferOffset int64
lookupFileId func(fileId string) (targetUrl string, err error)
readerLock sync.Mutex
}
// var _ = io.ReaderAt(&ChunkReadAt{})
func NewChunkReaderAtFromClient(filerClient filer_pb.FilerClient, chunkViews []*ChunkView) *ChunkReadAt {
return &ChunkReadAt{
chunkViews: chunkViews,
lookupFileId: func(fileId string) (targetUrl string, err error) {
err = filerClient.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
vid := VolumeId(fileId)
resp, err := client.LookupVolume(context.Background(), &filer_pb.LookupVolumeRequest{
VolumeIds: []string{vid},
})
if err != nil {
return err
}
locations := resp.LocationsMap[vid]
if locations == nil || len(locations.Locations) == 0 {
glog.V(0).Infof("failed to locate %s", fileId)
return fmt.Errorf("failed to locate %s", fileId)
}
volumeServerAddress := filerClient.AdjustedUrl(locations.Locations[0].Url)
targetUrl = fmt.Sprintf("http://%s/%s", volumeServerAddress, fileId)
return nil
})
return
},
bufferOffset: -1,
}
}
func (c *ChunkReadAt) ReadAt(p []byte, offset int64) (n int, err error) {
c.readerLock.Lock()
defer c.readerLock.Unlock()
for n < len(p) && err == nil {
readCount, readErr := c.doReadAt(p[n:], offset+int64(n))
n += readCount
err = readErr
if readCount == 0 {
return n, nil
}
}
return
}
func (c *ChunkReadAt) doReadAt(p []byte, offset int64) (n int, err error) {
var found bool
for _, chunk := range c.chunkViews {
if chunk.LogicOffset <= offset && offset < chunk.LogicOffset+int64(chunk.Size) {
found = true
if c.bufferOffset != chunk.LogicOffset {
c.fetchChunkToBuffer(chunk)
}
break
}
}
if !found {
return 0, io.EOF
}
n = copy(p, c.buffer[offset-c.bufferOffset:])
// fmt.Printf("> doReadAt [%d,%d), buffer:[%d,%d)\n", offset, offset+int64(n), c.bufferOffset, c.bufferOffset+int64(len(c.buffer)))
return
}
func (c *ChunkReadAt) fetchChunkToBuffer(chunkView *ChunkView) error {
// fmt.Printf("fetching %s [%d,%d)\n", chunkView.FileId, chunkView.LogicOffset, chunkView.LogicOffset+int64(chunkView.Size))
urlString, err := c.lookupFileId(chunkView.FileId)
if err != nil {
glog.V(1).Infof("operation LookupFileId %s failed, err: %v", chunkView.FileId, err)
return err
}
var buffer bytes.Buffer
err = util.ReadUrlAsStream(urlString, chunkView.CipherKey, chunkView.isGzipped, chunkView.IsFullChunk, chunkView.Offset, int(chunkView.Size), func(data []byte) {
buffer.Write(data)
})
if err != nil {
glog.V(1).Infof("read %s failed, err: %v", chunkView.FileId, err)
return err
}
c.buffer = buffer.Bytes()
c.bufferOffset = chunkView.LogicOffset
glog.V(3).Infof("read %s [%d,%d)", chunkView.FileId, chunkView.LogicOffset, chunkView.LogicOffset+int64(chunkView.Size))
return nil
}

33
weed/filer2/stream.go

@ -2,8 +2,6 @@ package filer2
import (
"bytes"
"context"
"fmt"
"io"
"math"
"strings"
@ -71,37 +69,6 @@ func NewChunkStreamReaderFromFiler(masterClient *wdclient.MasterClient, chunks [
}
}
func NewChunkStreamReaderFromClient(filerClient filer_pb.FilerClient, chunkViews []*ChunkView) *ChunkStreamReader {
return &ChunkStreamReader{
chunkViews: chunkViews,
lookupFileId: func(fileId string) (targetUrl string, err error) {
err = filerClient.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
vid := VolumeId(fileId)
resp, err := client.LookupVolume(context.Background(), &filer_pb.LookupVolumeRequest{
VolumeIds: []string{vid},
})
if err != nil {
return err
}
locations := resp.LocationsMap[vid]
if locations == nil || len(locations.Locations) == 0 {
glog.V(0).Infof("failed to locate %s", fileId)
return fmt.Errorf("failed to locate %s", fileId)
}
volumeServerAddress := filerClient.AdjustedUrl(locations.Locations[0].Url)
targetUrl = fmt.Sprintf("http://%s/%s", volumeServerAddress, fileId)
return nil
})
return
},
}
}
func (c *ChunkStreamReader) Read(p []byte) (n int, err error) {
if c.isBufferEmpty() {
if c.chunkIndex >= len(c.chunkViews) {

3
weed/filesys/file.go

@ -34,7 +34,7 @@ type File struct {
entry *filer_pb.Entry
entryViewCache []filer2.VisibleInterval
isOpen int
reader io.ReadSeeker
reader io.ReaderAt
}
func (file *File) fullpath() util.FullPath {
@ -249,6 +249,7 @@ func (file *File) addChunks(chunks []*filer_pb.FileChunk) {
file.entryViewCache = newVisibles
newVisibles = t
}
file.reader = nil
glog.V(3).Infof("%s existing %d chunks adds %d more", file.fullpath(), len(file.entry.Chunks), len(chunks))

12
weed/filesys/filehandle.go

@ -3,7 +3,6 @@ package filesys
import (
"context"
"fmt"
"io"
"math"
"mime"
"path"
@ -30,6 +29,7 @@ type FileHandle struct {
NodeId fuse.NodeID // file or directory the request is about
Uid uint32 // user ID of process making request
Gid uint32 // group ID of process making request
}
func newFileHandle(file *File, uid, gid uint32) *FileHandle {
@ -89,13 +89,13 @@ func (fh *FileHandle) readFromChunks(buff []byte, offset int64) (int64, error) {
fh.f.entryViewCache = filer2.NonOverlappingVisibleIntervals(fh.f.entry.Chunks)
fh.f.reader = nil
}
if fh.f.reader == nil {
chunkViews := filer2.ViewFromVisibleIntervals(fh.f.entryViewCache, 0, math.MaxInt64)
fh.f.reader = filer2.NewChunkStreamReaderFromClient(fh.f.wfs, chunkViews)
chunkViews := filer2.ViewFromVisibleIntervals(fh.f.entryViewCache, 0, math.MaxInt32)
fh.f.reader = filer2.NewChunkReaderAtFromClient(fh.f.wfs, chunkViews)
}
fh.f.reader.Seek(offset, io.SeekStart)
totalRead, err := fh.f.reader.Read(buff)
totalRead, err := fh.f.reader.ReadAt(buff, offset)
if err != nil {
glog.Errorf("file handle read %s: %v", fh.f.fullpath(), err)
@ -168,8 +168,8 @@ func (fh *FileHandle) Flush(ctx context.Context, req *fuse.FlushRequest) error {
return fuse.EIO
}
fh.f.addChunks(chunks)
if len(chunks) > 0 {
fh.f.addChunks(chunks)
fh.dirtyMetadata = true
}

9
weed/server/webdav_server.go

@ -90,7 +90,7 @@ type WebDavFile struct {
off int64
entry *filer_pb.Entry
entryViewCache []filer2.VisibleInterval
reader io.ReadSeeker
reader io.ReaderAt
}
func NewWebDavFileSystem(option *WebDavOption) (webdav.FileSystem, error) {
@ -475,12 +475,11 @@ func (f *WebDavFile) Read(p []byte) (readSize int, err error) {
f.reader = nil
}
if f.reader == nil {
chunkViews := filer2.ViewFromVisibleIntervals(f.entryViewCache, 0, math.MaxInt64)
f.reader = filer2.NewChunkStreamReaderFromClient(f.fs, chunkViews)
chunkViews := filer2.ViewFromVisibleIntervals(f.entryViewCache, 0, math.MaxInt32)
f.reader = filer2.NewChunkReaderAtFromClient(f.fs, chunkViews)
}
f.reader.Seek(f.off, io.SeekStart)
readSize, err = f.reader.Read(p)
readSize, err = f.reader.ReadAt(p, f.off)
glog.V(3).Infof("WebDavFileSystem.Read %v: [%d,%d)", f.name, f.off, f.off+int64(readSize))
f.off += int64(readSize)

Loading…
Cancel
Save