Browse Source

FUSE mount: stream read data with buffer

fix https://github.com/chrislusf/seaweedfs/issues/1244
pull/1255/head
Chris Lu 5 years ago
parent
commit
65d2ea9fb0
  1. 59
      weed/filer2/stream.go
  2. 5
      weed/filesys/file.go
  3. 16
      weed/filesys/filehandle.go
  4. 2
      weed/server/filer_server_handlers_read.go

59
weed/filer2/stream.go

@ -2,8 +2,11 @@ package filer2
import ( import (
"bytes" "bytes"
"context"
"fmt"
"io" "io"
"math" "math"
"strings"
"github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
@ -51,18 +54,51 @@ type ChunkStreamReader struct {
bufferOffset int64 bufferOffset int64
bufferPos int bufferPos int
chunkIndex int chunkIndex int
lookupFileId func(fileId string) (targetUrl string, err error)
} }
var _ = io.ReadSeeker(&ChunkStreamReader{}) var _ = io.ReadSeeker(&ChunkStreamReader{})
func NewChunkStreamReader(masterClient *wdclient.MasterClient, chunks []*filer_pb.FileChunk) *ChunkStreamReader {
func NewChunkStreamReaderFromFiler(masterClient *wdclient.MasterClient, chunks []*filer_pb.FileChunk) *ChunkStreamReader {
chunkViews := ViewFromChunks(chunks, 0, math.MaxInt32) chunkViews := ViewFromChunks(chunks, 0, math.MaxInt32)
return &ChunkStreamReader{ return &ChunkStreamReader{
masterClient: masterClient,
chunkViews: chunkViews,
bufferOffset: -1,
chunkViews: chunkViews,
lookupFileId: func(fileId string) (targetUrl string, err error) {
return masterClient.LookupFileId(fileId)
},
}
}
func NewChunkStreamReaderFromClient(filerClient FilerClient, chunkViews []*ChunkView) *ChunkStreamReader {
return &ChunkStreamReader{
chunkViews: chunkViews,
lookupFileId: func(fileId string) (targetUrl string, err error) {
err = filerClient.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
vid := fileIdToVolumeId(fileId)
resp, err := client.LookupVolume(context.Background(), &filer_pb.LookupVolumeRequest{
VolumeIds: []string{vid},
})
if err != nil {
return err
}
locations := resp.LocationsMap[vid]
if locations == nil || len(locations.Locations) == 0 {
glog.V(0).Infof("failed to locate %s", fileId)
return fmt.Errorf("failed to locate %s", fileId)
}
volumeServerAddress := filerClient.AdjustedUrl(locations.Locations[0].Url)
targetUrl = fmt.Sprintf("http://%s/%s", volumeServerAddress, fileId)
return nil
})
return
},
} }
} }
@ -72,6 +108,7 @@ func (c *ChunkStreamReader) Read(p []byte) (n int, err error) {
return 0, io.EOF return 0, io.EOF
} }
chunkView := c.chunkViews[c.chunkIndex] chunkView := c.chunkViews[c.chunkIndex]
println("fetch1")
c.fetchChunkToBuffer(chunkView) c.fetchChunkToBuffer(chunkView)
c.chunkIndex++ c.chunkIndex++
} }
@ -105,7 +142,7 @@ func (c *ChunkStreamReader) Seek(offset int64, whence int) (int64, error) {
for i, chunk := range c.chunkViews { for i, chunk := range c.chunkViews {
if chunk.LogicOffset <= offset && offset < chunk.LogicOffset+int64(chunk.Size) { if chunk.LogicOffset <= offset && offset < chunk.LogicOffset+int64(chunk.Size) {
if c.isBufferEmpty() || c.bufferOffset != offset {
if c.isBufferEmpty() || c.bufferOffset != chunk.LogicOffset {
c.fetchChunkToBuffer(chunk) c.fetchChunkToBuffer(chunk)
c.chunkIndex = i + 1 c.chunkIndex = i + 1
break break
@ -119,7 +156,7 @@ func (c *ChunkStreamReader) Seek(offset int64, whence int) (int64, error) {
} }
func (c *ChunkStreamReader) fetchChunkToBuffer(chunkView *ChunkView) error { func (c *ChunkStreamReader) fetchChunkToBuffer(chunkView *ChunkView) error {
urlString, err := c.masterClient.LookupFileId(chunkView.FileId)
urlString, err := c.lookupFileId(chunkView.FileId)
if err != nil { if err != nil {
glog.V(1).Infof("operation LookupFileId %s failed, err: %v", chunkView.FileId, err) glog.V(1).Infof("operation LookupFileId %s failed, err: %v", chunkView.FileId, err)
return err return err
@ -136,5 +173,15 @@ func (c *ChunkStreamReader) fetchChunkToBuffer(chunkView *ChunkView) error {
c.bufferPos = 0 c.bufferPos = 0
c.bufferOffset = chunkView.LogicOffset c.bufferOffset = chunkView.LogicOffset
// glog.V(0).Infof("read %s [%d,%d)", chunkView.FileId, chunkView.LogicOffset, chunkView.LogicOffset+int64(chunkView.Size))
return nil return nil
} }
func fileIdToVolumeId(fileId string) (volumeId string) {
parts := strings.Split(fileId, ",")
if len(parts) != 2 {
return fileId
}
return parts[0]
}

5
weed/filesys/file.go

@ -2,6 +2,7 @@ package filesys
import ( import (
"context" "context"
"io"
"os" "os"
"sort" "sort"
"time" "time"
@ -32,6 +33,7 @@ type File struct {
entry *filer_pb.Entry entry *filer_pb.Entry
entryViewCache []filer2.VisibleInterval entryViewCache []filer2.VisibleInterval
isOpen int isOpen int
reader io.ReadSeeker
} }
func (file *File) fullpath() filer2.FullPath { func (file *File) fullpath() filer2.FullPath {
@ -119,6 +121,7 @@ func (file *File) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp *f
} }
file.entry.Chunks = chunks file.entry.Chunks = chunks
file.entryViewCache = nil file.entryViewCache = nil
file.reader = nil
} }
file.entry.Attributes.FileSize = req.Size file.entry.Attributes.FileSize = req.Size
} }
@ -245,6 +248,7 @@ func (file *File) addChunks(chunks []*filer_pb.FileChunk) {
file.entryViewCache = newVisibles file.entryViewCache = newVisibles
newVisibles = t newVisibles = t
} }
file.reader = nil
glog.V(3).Infof("%s existing %d chunks adds %d more", file.fullpath(), len(file.entry.Chunks), len(chunks)) glog.V(3).Infof("%s existing %d chunks adds %d more", file.fullpath(), len(file.entry.Chunks), len(chunks))
@ -254,6 +258,7 @@ func (file *File) addChunks(chunks []*filer_pb.FileChunk) {
func (file *File) setEntry(entry *filer_pb.Entry) { func (file *File) setEntry(entry *filer_pb.Entry) {
file.entry = entry file.entry = entry
file.entryViewCache = filer2.NonOverlappingVisibleIntervals(file.entry.Chunks) file.entryViewCache = filer2.NonOverlappingVisibleIntervals(file.entry.Chunks)
file.reader = nil
} }
func (file *File) saveEntry() error { func (file *File) saveEntry() error {

16
weed/filesys/filehandle.go

@ -3,6 +3,8 @@ package filesys
import ( import (
"context" "context"
"fmt" "fmt"
"io"
"math"
"mime" "mime"
"path" "path"
"time" "time"
@ -85,17 +87,23 @@ func (fh *FileHandle) readFromChunks(buff []byte, offset int64) (int64, error) {
if fh.f.entryViewCache == nil { if fh.f.entryViewCache == nil {
fh.f.entryViewCache = filer2.NonOverlappingVisibleIntervals(fh.f.entry.Chunks) fh.f.entryViewCache = filer2.NonOverlappingVisibleIntervals(fh.f.entry.Chunks)
fh.f.reader = nil
}
if fh.f.reader == nil {
chunkViews := filer2.ViewFromVisibleIntervals(fh.f.entryViewCache, 0, math.MaxInt32)
fh.f.reader = filer2.NewChunkStreamReaderFromClient(fh.f.wfs, chunkViews)
} }
chunkViews := filer2.ViewFromVisibleIntervals(fh.f.entryViewCache, offset, len(buff))
totalRead, err := filer2.ReadIntoBuffer(fh.f.wfs, fh.f.fullpath(), buff, chunkViews, offset)
fh.f.reader.Seek(offset, io.SeekStart)
totalRead, err := fh.f.reader.Read(buff)
if err != nil { if err != nil {
glog.Errorf("file handle read %s: %v", fh.f.fullpath(), err) glog.Errorf("file handle read %s: %v", fh.f.fullpath(), err)
} }
return totalRead, err
// glog.V(0).Infof("file handle read %s [%d,%d] %d : %v", fh.f.fullpath(), offset, offset+int64(totalRead), totalRead, err)
return int64(totalRead), err
} }
// Write to the file handle // Write to the file handle

2
weed/server/filer_server_handlers_read.go

@ -94,7 +94,7 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request,
ext := filepath.Ext(filename) ext := filepath.Ext(filename)
width, height, mode, shouldResize := shouldResizeImages(ext, r) width, height, mode, shouldResize := shouldResizeImages(ext, r)
if shouldResize { if shouldResize {
chunkedFileReader := filer2.NewChunkStreamReader(fs.filer.MasterClient, entry.Chunks)
chunkedFileReader := filer2.NewChunkStreamReaderFromFiler(fs.filer.MasterClient, entry.Chunks)
rs, _, _ := images.Resized(ext, chunkedFileReader, width, height, mode) rs, _, _ := images.Resized(ext, chunkedFileReader, width, height, mode)
io.Copy(w, rs) io.Copy(w, rs)
return return

Loading…
Cancel
Save