Browse Source

stream read large files

pull/3140/head
chrislu 3 years ago
parent
commit
ecef844dfc
  1. 51
      weed/server/volume_server_handlers_read.go
  2. 30
      weed/storage/needle/needle_read_page.go
  3. 2
      weed/storage/needle/needle_read_test.go
  4. 15
      weed/storage/store.go
  5. 80
      weed/storage/volume_read.go

51
weed/server/volume_server_handlers_read.go

@ -127,6 +127,7 @@ func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request)
var count int
var needleSize types.Size
readOption.AttemptMetaOnly, readOption.MustMetaOnly = shouldAttemptStreamWrite(hasVolume, ext, r)
onReadSizeFn := func(size types.Size) {
needleSize = size
atomic.AddInt64(&vs.inFlightDownloadDataSize, int64(needleSize))
@ -218,11 +219,31 @@ func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request)
}
}
rs := conditionallyResizeImages(bytes.NewReader(n.Data), ext, r)
if !readOption.IsMetaOnly {
rs := conditionallyResizeImages(bytes.NewReader(n.Data), ext, r)
if e := writeResponseContent(filename, mtype, rs, w, r); e != nil {
glog.V(2).Infoln("response write error:", e)
}
} else {
vs.streamWriteResponseContent(filename, mtype, volumeId, n, w, r, readOption)
}
}
if e := writeResponseContent(filename, mtype, rs, w, r); e != nil {
glog.V(2).Infoln("response write error:", e)
func shouldAttemptStreamWrite(hasLocalVolume bool, ext string, r *http.Request) (shouldAttempt bool, mustMetaOnly bool) {
if !hasLocalVolume {
return false, false
}
if len(ext) > 0 {
ext = strings.ToLower(ext)
}
if r.Method == "HEAD" {
return true, true
}
_, _, _, shouldResize := shouldResizeImages(ext, r)
if shouldResize {
return false, false
}
return true, false
}
func (vs *VolumeServer) tryHandleChunkedFile(n *needle.Needle, fileName string, ext string, w http.ResponseWriter, r *http.Request) (processed bool) {
@ -318,3 +339,27 @@ func writeResponseContent(filename, mimeType string, rs io.ReadSeeker, w http.Re
})
return nil
}
func (vs *VolumeServer) streamWriteResponseContent(filename string, mimeType string, volumeId needle.VolumeId, n *needle.Needle, w http.ResponseWriter, r *http.Request, readOption *storage.ReadOption) {
totalSize := int64(n.DataSize)
if mimeType == "" {
if ext := filepath.Ext(filename); ext != "" {
mimeType = mime.TypeByExtension(ext)
}
}
if mimeType != "" {
w.Header().Set("Content-Type", mimeType)
}
w.Header().Set("Accept-Ranges", "bytes")
adjustPassthroughHeaders(w, r, filename)
if r.Method == "HEAD" {
w.Header().Set("Content-Length", strconv.FormatInt(totalSize, 10))
return
}
processRangeRequest(r, w, totalSize, mimeType, func(writer io.Writer, offset int64, size int64) error {
return vs.store.ReadVolumeNeedleDataInto(volumeId, n, readOption, writer, offset, size)
})
}

30
weed/storage/needle/needle_read_page.go

@ -10,26 +10,28 @@ import (
)
// ReadNeedleDataInto uses a needle without n.Data to read the content into an io.Writer
func (n *Needle) ReadNeedleDataInto(r backend.BackendStorageFile, offset int64, buf []byte, writer io.Writer, expectedChecksumValue uint32) (err error) {
func (n *Needle) ReadNeedleDataInto(r backend.BackendStorageFile, volumeOffset int64, buf []byte, writer io.Writer, needleOffset int64, size int64, expectedChecksumValue uint32) (err error) {
crc := CRC(0)
for x := 0; ; x += len(buf) {
count, err := n.ReadNeedleData(r, offset, buf, int64(x))
for x := needleOffset; x < needleOffset+size; x += int64(len(buf)) {
count, err := n.ReadNeedleData(r, volumeOffset, buf, x)
if count > 0 {
crc = crc.Update(buf[0:count])
if _, err = writer.Write(buf[0:count]); err != nil {
return fmt.Errorf("ReadNeedleData write: %v", err)
}
}
if err != nil {
if err == io.EOF {
err = nil
break
}
return fmt.Errorf("ReadNeedleData: %v", err)
}
if count > 0 {
crc = crc.Update(buf[0:count])
if _, err = writer.Write(buf[0:count]); err != nil {
return fmt.Errorf("ReadNeedleData write: %v", err)
}
} else {
if count <= 0 {
break
}
}
if expectedChecksumValue != crc.Value() {
if needleOffset == 0 && size == int64(n.DataSize) && expectedChecksumValue != crc.Value() {
return fmt.Errorf("ReadNeedleData checksum %v expected %v", crc.Value(), expectedChecksumValue)
}
return nil
@ -65,14 +67,18 @@ func (n *Needle) ReadNeedleMeta(r backend.BackendStorageFile, offset int64, size
return 0, err
}
n.ParseNeedleHeader(bytes)
if n.Size != size {
if OffsetSize == 4 && offset < int64(MaxPossibleVolumeSize) {
return 0, ErrorSizeMismatch
}
}
n.DataSize = util.BytesToUint32(bytes[NeedleHeaderSize : NeedleHeaderSize+DataSizeSize])
startOffset := offset + NeedleHeaderSize + DataSizeSize + int64(n.DataSize)
dataSize := GetActualSize(size, version)
stopOffset := offset + dataSize
metaSize := stopOffset - startOffset
fmt.Printf("offset %d dataSize %d\n", offset, dataSize)
fmt.Printf("read needle meta [%d,%d) size %d\n", startOffset, stopOffset, metaSize)
metaSlice := make([]byte, int(metaSize))
count, err = r.ReadAt(metaSlice, startOffset)

2
weed/storage/needle/needle_read_test.go

@ -74,7 +74,7 @@ func TestPageRead(t *testing.T) {
fmt.Printf("Checksum value %d\n", checksumValue)
buf := make([]byte, 1024)
if err = n.ReadNeedleDataInto(datBackend, offset, buf, io.Discard, checksumValue); err != nil {
if err = n.ReadNeedleDataInto(datBackend, offset, buf, io.Discard, 0, int64(n.DataSize), checksumValue); err != nil {
t.Fatalf("ReadNeedleDataInto: %v", err)
}

15
weed/storage/store.go

@ -2,6 +2,7 @@ package storage
import (
"fmt"
"io"
"path/filepath"
"strings"
"sync/atomic"
@ -26,7 +27,13 @@ const (
)
type ReadOption struct {
ReadDeleted bool
ReadDeleted bool
AttemptMetaOnly bool
MustMetaOnly bool
IsMetaOnly bool // read status
ChecksumValue uint32 // read status
VolumeRevision uint16
IsOutOfRange bool // whether need to read over MaxPossibleVolumeSize
}
/*
@ -375,6 +382,12 @@ func (s *Store) ReadVolumeNeedle(i needle.VolumeId, n *needle.Needle, readOption
}
return 0, fmt.Errorf("volume %d not found", i)
}
func (s *Store) ReadVolumeNeedleDataInto(i needle.VolumeId, n *needle.Needle, readOption *ReadOption, writer io.Writer, offset int64, size int64) error {
if v := s.findVolume(i); v != nil {
return v.readNeedleDataInto(n, readOption, writer, offset, size)
}
return fmt.Errorf("volume %d not found", i)
}
func (s *Store) GetVolume(i needle.VolumeId) *Volume {
return s.findVolume(i)
}

80
weed/storage/volume_read.go

@ -2,6 +2,7 @@ package storage
import (
"fmt"
"github.com/chrislusf/seaweedfs/weed/util/mem"
"io"
"time"
@ -12,8 +13,10 @@ import (
. "github.com/chrislusf/seaweedfs/weed/storage/types"
)
const PagedReadLimit = 1024 * 1024
// read fills in Needle content by looking up n.Id from NeedleMapper
func (v *Volume) readNeedle(n *needle.Needle, readOption *ReadOption, onReadSizeFn func(size Size)) (int, error) {
func (v *Volume) readNeedle(n *needle.Needle, readOption *ReadOption, onReadSizeFn func(size Size)) (count int, err error) {
v.dataFileAccessLock.RLock()
defer v.dataFileAccessLock.RUnlock()
@ -36,31 +39,84 @@ func (v *Volume) readNeedle(n *needle.Needle, readOption *ReadOption, onReadSize
if onReadSizeFn != nil {
onReadSizeFn(readSize)
}
err := n.ReadData(v.DataBackend, nv.Offset.ToActualOffset(), readSize, v.Version())
if err == needle.ErrorSizeMismatch && OffsetSize == 4 {
err = n.ReadData(v.DataBackend, nv.Offset.ToActualOffset()+int64(MaxPossibleVolumeSize), readSize, v.Version())
if readOption != nil && readOption.AttemptMetaOnly && readSize > PagedReadLimit {
readOption.VolumeRevision = v.SuperBlock.CompactionRevision
readOption.ChecksumValue, err = n.ReadNeedleMeta(v.DataBackend, nv.Offset.ToActualOffset(), readSize, v.Version())
if err == needle.ErrorSizeMismatch && OffsetSize == 4 {
readOption.IsOutOfRange = true
readOption.ChecksumValue, err = n.ReadNeedleMeta(v.DataBackend, nv.Offset.ToActualOffset()+int64(MaxPossibleVolumeSize), readSize, v.Version())
}
if err != nil {
return 0, err
}
if !n.IsCompressed() && !n.IsChunkedManifest() {
readOption.IsMetaOnly = true
}
}
v.checkReadWriteError(err)
if err != nil {
return 0, err
if readOption == nil || !readOption.IsMetaOnly {
err = n.ReadData(v.DataBackend, nv.Offset.ToActualOffset(), readSize, v.Version())
if err == needle.ErrorSizeMismatch && OffsetSize == 4 {
err = n.ReadData(v.DataBackend, nv.Offset.ToActualOffset()+int64(MaxPossibleVolumeSize), readSize, v.Version())
}
v.checkReadWriteError(err)
if err != nil {
return 0, err
}
}
bytesRead := len(n.Data)
count = int(n.DataSize)
if !n.HasTtl() {
return bytesRead, nil
return
}
ttlMinutes := n.Ttl.Minutes()
if ttlMinutes == 0 {
return bytesRead, nil
return
}
if !n.HasLastModifiedDate() {
return bytesRead, nil
return
}
if time.Now().Before(time.Unix(0, int64(n.AppendAtNs)).Add(time.Duration(ttlMinutes) * time.Minute)) {
return bytesRead, nil
return
}
return -1, ErrorNotFound
}
// read fills in Needle content by looking up n.Id from NeedleMapper
func (v *Volume) readNeedleDataInto(n *needle.Needle, readOption *ReadOption, writer io.Writer, offset int64, size int64) (err error) {
v.dataFileAccessLock.RLock()
defer v.dataFileAccessLock.RUnlock()
nv, ok := v.nm.Get(n.Id)
if !ok || nv.Offset.IsZero() {
return ErrorNotFound
}
readSize := nv.Size
if readSize.IsDeleted() {
if readOption != nil && readOption.ReadDeleted && readSize != TombstoneFileSize {
glog.V(3).Infof("reading deleted %s", n.String())
readSize = -readSize
} else {
return ErrorDeleted
}
}
if readSize == 0 {
return nil
}
if readOption.VolumeRevision != v.SuperBlock.CompactionRevision {
// the volume is compacted
readOption.IsOutOfRange = false
readOption.ChecksumValue, err = n.ReadNeedleMeta(v.DataBackend, nv.Offset.ToActualOffset(), readSize, v.Version())
}
buf := mem.Allocate(1024 * 1024)
defer mem.Free(buf)
actualOffset := nv.Offset.ToActualOffset()
if readOption.IsOutOfRange {
actualOffset += int64(MaxPossibleVolumeSize)
}
return n.ReadNeedleDataInto(v.DataBackend, actualOffset, buf, writer, offset, size, readOption.ChecksumValue)
}
// read fills in Needle content by looking up n.Id from NeedleMapper
func (v *Volume) ReadNeedleBlob(offset int64, size Size) ([]byte, error) {
v.dataFileAccessLock.RLock()

Loading…
Cancel
Save