Browse Source

Context cancellation during reading range reading large files (#7093)

* context cancellation during reading range reading large files

* address comments

* cancellation for fuse read

* fix cancellation

* pass in context for each function to avoid racing condition

* Update reader_at_test.go

* remove dead code

* Update weed/filer/reader_at.go

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* Update weed/filer/filechunk_group.go

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* Update weed/filer/filechunk_group.go

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* address comments

* Update weed/mount/weedfs_file_read.go

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* Update weed/mount/weedfs_file_lseek.go

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* Update weed/mount/weedfs_file_read.go

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* Update weed/filer/reader_at.go

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* Update weed/mount/weedfs_file_lseek.go

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* test cancellation

---------

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
pull/7106/head
Chris Lu 2 months ago
committed by GitHub
parent
commit
4af182f880
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
  1. 7
      weed/command/filer_cat.go
  2. 14
      weed/filer/filechunk_group.go
  3. 141
      weed/filer/filechunk_group_test.go
  4. 19
      weed/filer/filechunk_section.go
  5. 1
      weed/filer/filer_on_meta_event.go
  6. 18
      weed/filer/reader_at.go
  7. 3
      weed/filer/reader_at_test.go
  8. 5
      weed/mount/filehandle.go
  9. 6
      weed/mount/filehandle_read.go
  10. 15
      weed/mount/weedfs_file_lseek.go
  11. 34
      weed/mount/weedfs_file_read.go
  12. 2
      weed/mq/logstore/read_parquet_to_log.go
  13. 12
      weed/server/webdav_server.go
  14. 3
      weed/shell/command_fs_cat.go

7
weed/command/filer_cat.go

@ -3,14 +3,15 @@ package command
import (
"context"
"fmt"
"net/url"
"os"
"strings"
"github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/wdclient"
"google.golang.org/grpc"
"net/url"
"os"
"strings"
"github.com/seaweedfs/seaweedfs/weed/security"
"github.com/seaweedfs/seaweedfs/weed/util"

14
weed/filer/filechunk_group.go

@ -45,7 +45,7 @@ func (group *ChunkGroup) AddChunk(chunk *filer_pb.FileChunk) error {
return nil
}
func (group *ChunkGroup) ReadDataAt(fileSize int64, buff []byte, offset int64) (n int, tsNs int64, err error) {
func (group *ChunkGroup) ReadDataAt(ctx context.Context, fileSize int64, buff []byte, offset int64) (n int, tsNs int64, err error) {
if offset >= fileSize {
return 0, 0, io.EOF
}
@ -68,7 +68,7 @@ func (group *ChunkGroup) ReadDataAt(fileSize int64, buff []byte, offset int64) (
n = int(int64(n) + rangeStop - rangeStart)
continue
}
xn, xTsNs, xErr := section.readDataAt(group, fileSize, buff[rangeStart-offset:rangeStop-offset], rangeStart)
xn, xTsNs, xErr := section.readDataAt(ctx, group, fileSize, buff[rangeStart-offset:rangeStop-offset], rangeStart)
if xErr != nil {
return n + xn, max(tsNs, xTsNs), xErr
}
@ -123,14 +123,14 @@ const (
)
// FIXME: needa tests
func (group *ChunkGroup) SearchChunks(offset, fileSize int64, whence uint32) (found bool, out int64) {
func (group *ChunkGroup) SearchChunks(ctx context.Context, offset, fileSize int64, whence uint32) (found bool, out int64) {
group.sectionsLock.RLock()
defer group.sectionsLock.RUnlock()
return group.doSearchChunks(offset, fileSize, whence)
return group.doSearchChunks(ctx, offset, fileSize, whence)
}
func (group *ChunkGroup) doSearchChunks(offset, fileSize int64, whence uint32) (found bool, out int64) {
func (group *ChunkGroup) doSearchChunks(ctx context.Context, offset, fileSize int64, whence uint32) (found bool, out int64) {
sectionIndex, maxSectionIndex := SectionIndex(offset/SectionSize), SectionIndex(fileSize/SectionSize)
if whence == SEEK_DATA {
@ -139,7 +139,7 @@ func (group *ChunkGroup) doSearchChunks(offset, fileSize int64, whence uint32) (
if !foundSection {
continue
}
sectionStart := section.DataStartOffset(group, offset, fileSize)
sectionStart := section.DataStartOffset(ctx, group, offset, fileSize)
if sectionStart == -1 {
continue
}
@ -153,7 +153,7 @@ func (group *ChunkGroup) doSearchChunks(offset, fileSize int64, whence uint32) (
if !foundSection {
return true, offset
}
holeStart := section.NextStopOffset(group, offset, fileSize)
holeStart := section.NextStopOffset(ctx, group, offset, fileSize)
if holeStart%SectionSize == 0 {
continue
}

141
weed/filer/filechunk_group_test.go

@ -1,8 +1,11 @@
package filer
import (
"context"
"errors"
"io"
"testing"
"time"
"github.com/stretchr/testify/assert"
)
@ -25,7 +28,7 @@ func TestChunkGroup_ReadDataAt_ErrorHandling(t *testing.T) {
offset := int64(0)
// With an empty ChunkGroup, we should get no error
n, tsNs, err := group.ReadDataAt(fileSize, buff, offset)
n, tsNs, err := group.ReadDataAt(context.Background(), fileSize, buff, offset)
// Should return 100 (length of buffer) and no error since there are no sections
// and missing sections are filled with zeros
@ -44,7 +47,7 @@ func TestChunkGroup_ReadDataAt_ErrorHandling(t *testing.T) {
fileSize := int64(50) // File smaller than buffer
offset := int64(0)
n, tsNs, err := group.ReadDataAt(fileSize, buff, offset)
n, tsNs, err := group.ReadDataAt(context.Background(), fileSize, buff, offset)
// Should return 50 (file size) and no error
assert.Equal(t, 50, n)
@ -57,7 +60,7 @@ func TestChunkGroup_ReadDataAt_ErrorHandling(t *testing.T) {
fileSize := int64(50)
offset := int64(100) // Offset beyond file size
n, tsNs, err := group.ReadDataAt(fileSize, buff, offset)
n, tsNs, err := group.ReadDataAt(context.Background(), fileSize, buff, offset)
assert.Equal(t, 0, n)
assert.Equal(t, int64(0), tsNs)
@ -80,19 +83,19 @@ func TestChunkGroup_ReadDataAt_ErrorHandling(t *testing.T) {
fileSize := int64(1000)
// Test 1: Normal operation with no sections (filled with zeros)
n, tsNs, err := group.ReadDataAt(fileSize, buff, int64(0))
n, tsNs, err := group.ReadDataAt(context.Background(), fileSize, buff, int64(0))
assert.Equal(t, 100, n, "should read full buffer")
assert.Equal(t, int64(0), tsNs, "timestamp should be zero for missing sections")
assert.NoError(t, err, "should not error for missing sections")
// Test 2: Reading beyond file size should return io.EOF immediately
n, tsNs, err = group.ReadDataAt(fileSize, buff, fileSize+1)
n, tsNs, err = group.ReadDataAt(context.Background(), fileSize, buff, fileSize+1)
assert.Equal(t, 0, n, "should not read any bytes when beyond file size")
assert.Equal(t, int64(0), tsNs, "timestamp should be zero")
assert.Equal(t, io.EOF, err, "should return io.EOF when reading beyond file size")
// Test 3: Reading at exact file boundary
n, tsNs, err = group.ReadDataAt(fileSize, buff, fileSize)
n, tsNs, err = group.ReadDataAt(context.Background(), fileSize, buff, fileSize)
assert.Equal(t, 0, n, "should not read any bytes at exact file size boundary")
assert.Equal(t, int64(0), tsNs, "timestamp should be zero")
assert.Equal(t, io.EOF, err, "should return io.EOF at file boundary")
@ -102,6 +105,130 @@ func TestChunkGroup_ReadDataAt_ErrorHandling(t *testing.T) {
// This prevents later sections from masking earlier errors, especially
// preventing io.EOF from masking network errors or other real failures.
})
t.Run("Context Cancellation", func(t *testing.T) {
// Test 4: Context cancellation should be properly propagated through ReadDataAt
// This test verifies that the context parameter is properly threaded through
// the call chain and that cancellation checks are in place at the right points
// Test with a pre-cancelled context to ensure the cancellation is detected
ctx, cancel := context.WithCancel(context.Background())
cancel() // Cancel immediately
group := &ChunkGroup{
sections: make(map[SectionIndex]*FileChunkSection),
}
buff := make([]byte, 100)
fileSize := int64(1000)
// Call ReadDataAt with the already cancelled context
n, tsNs, err := group.ReadDataAt(ctx, fileSize, buff, int64(0))
// For an empty ChunkGroup (no sections), the operation will complete successfully
// since it just fills the buffer with zeros. However, the important thing is that
// the context is properly threaded through the call chain.
// The actual cancellation would be more evident with real chunk sections that
// perform network operations.
if err != nil {
// If an error is returned, it should be a context cancellation error
assert.True(t,
errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded),
"Expected context.Canceled or context.DeadlineExceeded, got: %v", err)
} else {
// If no error (operation completed before cancellation check),
// verify normal behavior for empty ChunkGroup
assert.Equal(t, 100, n, "should read full buffer size when no sections exist")
assert.Equal(t, int64(0), tsNs, "timestamp should be zero")
t.Log("Operation completed before context cancellation was checked - this is expected for empty ChunkGroup")
}
})
t.Run("Context Cancellation with Timeout", func(t *testing.T) {
// Test 5: Context with timeout should be respected
group := &ChunkGroup{
sections: make(map[SectionIndex]*FileChunkSection),
}
// Create a context with a very short timeout
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Millisecond)
defer cancel()
buff := make([]byte, 100)
fileSize := int64(1000)
// This should fail due to timeout
n, tsNs, err := group.ReadDataAt(ctx, fileSize, buff, int64(0))
// For this simple case with no sections, it might complete before timeout
// But if it does timeout, we should handle it properly
if err != nil {
assert.True(t,
errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded),
"Expected context.Canceled or context.DeadlineExceeded when context times out, got: %v", err)
} else {
// If no error, verify normal behavior
assert.Equal(t, 100, n, "should read full buffer size when no sections exist")
assert.Equal(t, int64(0), tsNs, "timestamp should be zero")
}
})
}
func TestChunkGroup_SearchChunks_Cancellation(t *testing.T) {
t.Run("Context Cancellation in SearchChunks", func(t *testing.T) {
// Test that SearchChunks properly handles context cancellation
group := &ChunkGroup{
sections: make(map[SectionIndex]*FileChunkSection),
}
// Test with a pre-cancelled context
ctx, cancel := context.WithCancel(context.Background())
cancel() // Cancel immediately
fileSize := int64(1000)
offset := int64(0)
whence := uint32(3) // SEEK_DATA
// Call SearchChunks with cancelled context
found, resultOffset := group.SearchChunks(ctx, offset, fileSize, whence)
// For an empty ChunkGroup, SearchChunks should complete quickly
// The main goal is to verify the context parameter is properly threaded through
// In real scenarios with actual chunk sections, context cancellation would be more meaningful
// Verify the function completes and returns reasonable values
assert.False(t, found, "should not find data in empty chunk group")
assert.Equal(t, int64(0), resultOffset, "should return 0 offset when no data found")
t.Log("SearchChunks completed with cancelled context - context threading verified")
})
t.Run("Context with Timeout in SearchChunks", func(t *testing.T) {
// Test SearchChunks with a timeout context
group := &ChunkGroup{
sections: make(map[SectionIndex]*FileChunkSection),
}
// Create a context with very short timeout
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Millisecond)
defer cancel()
fileSize := int64(1000)
offset := int64(0)
whence := uint32(3) // SEEK_DATA
// Call SearchChunks - should complete quickly for empty group
found, resultOffset := group.SearchChunks(ctx, offset, fileSize, whence)
// Verify reasonable behavior
assert.False(t, found, "should not find data in empty chunk group")
assert.Equal(t, int64(0), resultOffset, "should return 0 offset when no data found")
})
}
func TestChunkGroup_doSearchChunks(t *testing.T) {
@ -127,7 +254,7 @@ func TestChunkGroup_doSearchChunks(t *testing.T) {
group := &ChunkGroup{
sections: tt.fields.sections,
}
gotFound, gotOut := group.doSearchChunks(tt.args.offset, tt.args.fileSize, tt.args.whence)
gotFound, gotOut := group.doSearchChunks(context.Background(), tt.args.offset, tt.args.fileSize, tt.args.whence)
assert.Equalf(t, tt.wantFound, gotFound, "doSearchChunks(%v, %v, %v)", tt.args.offset, tt.args.fileSize, tt.args.whence)
assert.Equalf(t, tt.wantOut, gotOut, "doSearchChunks(%v, %v, %v)", tt.args.offset, tt.args.fileSize, tt.args.whence)
})

19
weed/filer/filechunk_section.go

@ -1,6 +1,7 @@
package filer
import (
"context"
"sync"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
@ -62,7 +63,7 @@ func removeGarbageChunks(section *FileChunkSection, garbageFileIds map[string]st
}
}
func (section *FileChunkSection) setupForRead(group *ChunkGroup, fileSize int64) {
func (section *FileChunkSection) setupForRead(ctx context.Context, group *ChunkGroup, fileSize int64) {
section.lock.Lock()
defer section.lock.Unlock()
@ -84,25 +85,25 @@ func (section *FileChunkSection) setupForRead(group *ChunkGroup, fileSize int64)
}
if section.reader == nil {
section.reader = NewChunkReaderAtFromClient(group.readerCache, section.chunkViews, min(int64(section.sectionIndex+1)*SectionSize, fileSize))
section.reader = NewChunkReaderAtFromClient(ctx, group.readerCache, section.chunkViews, min(int64(section.sectionIndex+1)*SectionSize, fileSize))
}
section.isPrepared = true
section.reader.fileSize = fileSize
}
func (section *FileChunkSection) readDataAt(group *ChunkGroup, fileSize int64, buff []byte, offset int64) (n int, tsNs int64, err error) {
func (section *FileChunkSection) readDataAt(ctx context.Context, group *ChunkGroup, fileSize int64, buff []byte, offset int64) (n int, tsNs int64, err error) {
section.setupForRead(group, fileSize)
section.setupForRead(ctx, group, fileSize)
section.lock.RLock()
defer section.lock.RUnlock()
return section.reader.ReadAtWithTime(buff, offset)
return section.reader.ReadAtWithTime(ctx, buff, offset)
}
func (section *FileChunkSection) DataStartOffset(group *ChunkGroup, offset int64, fileSize int64) int64 {
func (section *FileChunkSection) DataStartOffset(ctx context.Context, group *ChunkGroup, offset int64, fileSize int64) int64 {
section.setupForRead(group, fileSize)
section.setupForRead(ctx, group, fileSize)
section.lock.RLock()
defer section.lock.RUnlock()
@ -119,9 +120,9 @@ func (section *FileChunkSection) DataStartOffset(group *ChunkGroup, offset int64
return -1
}
func (section *FileChunkSection) NextStopOffset(group *ChunkGroup, offset int64, fileSize int64) int64 {
func (section *FileChunkSection) NextStopOffset(ctx context.Context, group *ChunkGroup, offset int64, fileSize int64) int64 {
section.setupForRead(group, fileSize)
section.setupForRead(ctx, group, fileSize)
section.lock.RLock()
defer section.lock.RUnlock()

1
weed/filer/filer_on_meta_event.go

@ -2,6 +2,7 @@ package filer
import (
"bytes"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/util"

18
weed/filer/reader_at.go

@ -20,6 +20,7 @@ type ChunkReadAt struct {
readerCache *ReaderCache
readerPattern *ReaderPattern
lastChunkFid string
ctx context.Context // Context used for cancellation during chunk read operations
}
var _ = io.ReaderAt(&ChunkReadAt{})
@ -87,13 +88,14 @@ func LookupFn(filerClient filer_pb.FilerClient) wdclient.LookupFileIdFunctionTyp
}
}
func NewChunkReaderAtFromClient(readerCache *ReaderCache, chunkViews *IntervalList[*ChunkView], fileSize int64) *ChunkReadAt {
func NewChunkReaderAtFromClient(ctx context.Context, readerCache *ReaderCache, chunkViews *IntervalList[*ChunkView], fileSize int64) *ChunkReadAt {
return &ChunkReadAt{
chunkViews: chunkViews,
fileSize: fileSize,
readerCache: readerCache,
readerPattern: NewReaderPattern(),
ctx: ctx,
}
}
@ -114,11 +116,11 @@ func (c *ChunkReadAt) ReadAt(p []byte, offset int64) (n int, err error) {
defer c.chunkViews.Lock.RUnlock()
// glog.V(4).Infof("ReadAt [%d,%d) of total file size %d bytes %d chunk views", offset, offset+int64(len(p)), c.fileSize, len(c.chunkViews))
n, _, err = c.doReadAt(p, offset)
n, _, err = c.doReadAt(c.ctx, p, offset)
return
}
func (c *ChunkReadAt) ReadAtWithTime(p []byte, offset int64) (n int, ts int64, err error) {
func (c *ChunkReadAt) ReadAtWithTime(ctx context.Context, p []byte, offset int64) (n int, ts int64, err error) {
c.readerPattern.MonitorReadAt(offset, len(p))
@ -126,10 +128,10 @@ func (c *ChunkReadAt) ReadAtWithTime(p []byte, offset int64) (n int, ts int64, e
defer c.chunkViews.Lock.RUnlock()
// glog.V(4).Infof("ReadAt [%d,%d) of total file size %d bytes %d chunk views", offset, offset+int64(len(p)), c.fileSize, len(c.chunkViews))
return c.doReadAt(p, offset)
return c.doReadAt(ctx, p, offset)
}
func (c *ChunkReadAt) doReadAt(p []byte, offset int64) (n int, ts int64, err error) {
func (c *ChunkReadAt) doReadAt(ctx context.Context, p []byte, offset int64) (n int, ts int64, err error) {
startOffset, remaining := offset, int64(len(p))
var nextChunks *Interval[*ChunkView]
@ -158,7 +160,7 @@ func (c *ChunkReadAt) doReadAt(p []byte, offset int64) (n int, ts int64, err err
// glog.V(4).Infof("read [%d,%d), %d/%d chunk %s [%d,%d)", chunkStart, chunkStop, i, len(c.chunkViews), chunk.FileId, chunk.ViewOffset-chunk.Offset, chunk.ViewOffset-chunk.Offset+int64(chunk.ViewSize))
bufferOffset := chunkStart - chunk.ViewOffset + chunk.OffsetInChunk
ts = chunk.ModifiedTsNs
copied, err := c.readChunkSliceAt(p[startOffset-offset:chunkStop-chunkStart+startOffset-offset], chunk, nextChunks, uint64(bufferOffset))
copied, err := c.readChunkSliceAt(ctx, p[startOffset-offset:chunkStop-chunkStart+startOffset-offset], chunk, nextChunks, uint64(bufferOffset))
if err != nil {
glog.Errorf("fetching chunk %+v: %v\n", chunk, err)
return copied, ts, err
@ -192,14 +194,14 @@ func (c *ChunkReadAt) doReadAt(p []byte, offset int64) (n int, ts int64, err err
}
func (c *ChunkReadAt) readChunkSliceAt(buffer []byte, chunkView *ChunkView, nextChunkViews *Interval[*ChunkView], offset uint64) (n int, err error) {
func (c *ChunkReadAt) readChunkSliceAt(ctx context.Context, buffer []byte, chunkView *ChunkView, nextChunkViews *Interval[*ChunkView], offset uint64) (n int, err error) {
if c.readerPattern.IsRandomMode() {
n, err := c.readerCache.chunkCache.ReadChunkAt(buffer, chunkView.FileId, offset)
if n > 0 {
return n, err
}
return fetchChunkRange(context.Background(), buffer, c.readerCache.lookupFileIdFn, chunkView.FileId, chunkView.CipherKey, chunkView.IsGzipped, int64(offset))
return fetchChunkRange(ctx, buffer, c.readerCache.lookupFileIdFn, chunkView.FileId, chunkView.CipherKey, chunkView.IsGzipped, int64(offset))
}
shouldCache := (uint64(chunkView.ViewOffset) + chunkView.ChunkSize) <= c.readerCache.chunkCache.GetMaxFilePartSizeInCache()

3
weed/filer/reader_at_test.go

@ -2,6 +2,7 @@ package filer
import (
"bytes"
"context"
"io"
"math"
"strconv"
@ -91,7 +92,7 @@ func testReadAt(t *testing.T, readerAt *ChunkReadAt, offset int64, size int, exp
if data == nil {
data = make([]byte, size)
}
n, _, err := readerAt.doReadAt(data, offset)
n, _, err := readerAt.doReadAt(context.Background(), data, offset)
if expectedN != n {
t.Errorf("unexpected read size: %d, expect: %d", n, expectedN)

5
weed/mount/filehandle.go

@ -1,12 +1,13 @@
package mount
import (
"os"
"sync"
"github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/util"
"os"
"sync"
)
type FileHandleId uint64

6
weed/mount/filehandle_read.go

@ -23,6 +23,10 @@ func (fh *FileHandle) readFromDirtyPages(buff []byte, startOffset int64, tsNs in
}
func (fh *FileHandle) readFromChunks(buff []byte, offset int64) (int64, int64, error) {
return fh.readFromChunksWithContext(context.Background(), buff, offset)
}
func (fh *FileHandle) readFromChunksWithContext(ctx context.Context, buff []byte, offset int64) (int64, int64, error) {
fh.entryLock.RLock()
defer fh.entryLock.RUnlock()
@ -60,7 +64,7 @@ func (fh *FileHandle) readFromChunks(buff []byte, offset int64) (int64, int64, e
return int64(totalRead), 0, nil
}
totalRead, ts, err := fh.entryChunkGroup.ReadDataAt(fileSize, buff, offset)
totalRead, ts, err := fh.entryChunkGroup.ReadDataAt(ctx, fileSize, buff, offset)
if err != nil && err != io.EOF {
glog.Errorf("file handle read %s: %v", fileFullPath, err)

15
weed/mount/weedfs_file_lseek.go

@ -1,9 +1,11 @@
package mount
import (
"github.com/seaweedfs/seaweedfs/weed/util"
"context"
"syscall"
"github.com/seaweedfs/seaweedfs/weed/util"
"github.com/hanwen/go-fuse/v2/fuse"
"github.com/seaweedfs/seaweedfs/weed/filer"
@ -54,8 +56,17 @@ func (wfs *WFS) Lseek(cancel <-chan struct{}, in *fuse.LseekIn, out *fuse.LseekO
return ENXIO
}
// Create a context that will be cancelled when the cancel channel receives a signal
ctx, cancelFunc := context.WithCancel(context.Background())
go func() {
select {
case <-cancel:
cancelFunc()
}
}()
// search chunks for the offset
found, offset := fh.entryChunkGroup.SearchChunks(offset, fileSize, in.Whence)
found, offset := fh.entryChunkGroup.SearchChunks(ctx, offset, fileSize, in.Whence)
if found {
out.Offset = uint64(offset)
return fuse.OK

34
weed/mount/weedfs_file_read.go

@ -2,10 +2,12 @@ package mount
import (
"bytes"
"context"
"fmt"
"github.com/seaweedfs/seaweedfs/weed/util"
"io"
"github.com/seaweedfs/seaweedfs/weed/util"
"github.com/hanwen/go-fuse/v2/fuse"
"github.com/seaweedfs/seaweedfs/weed/glog"
@ -45,8 +47,17 @@ func (wfs *WFS) Read(cancel <-chan struct{}, in *fuse.ReadIn, buff []byte) (fuse
fhActiveLock := fh.wfs.fhLockTable.AcquireLock("Read", fh.fh, util.SharedLock)
defer fh.wfs.fhLockTable.ReleaseLock(fh.fh, fhActiveLock)
// Create a context that will be cancelled when the cancel channel receives a signal
ctx, cancelFunc := context.WithCancel(context.Background())
go func() {
select {
case <-cancel:
cancelFunc()
}
}()
offset := int64(in.Offset)
totalRead, err := readDataByFileHandle(buff, fh, offset)
totalRead, err := readDataByFileHandleWithContext(ctx, buff, fh, offset)
if err != nil {
glog.Warningf("file handle read %s %d: %v", fh.FullPath(), totalRead, err)
return nil, fuse.EIO
@ -59,7 +70,7 @@ func (wfs *WFS) Read(cancel <-chan struct{}, in *fuse.ReadIn, buff []byte) (fuse
if bytes.Compare(mirrorData, buff[:totalRead]) != 0 {
againBuff := make([]byte, len(buff))
againRead, _ := readDataByFileHandle(againBuff, fh, offset)
againRead, _ := readDataByFileHandleWithContext(ctx, againBuff, fh, offset)
againCorrect := bytes.Compare(mirrorData, againBuff[:againRead]) == 0
againSame := bytes.Compare(buff[:totalRead], againBuff[:againRead]) == 0
@ -88,3 +99,20 @@ func readDataByFileHandle(buff []byte, fhIn *FileHandle, offset int64) (int64, e
}
return n, err
}
func readDataByFileHandleWithContext(ctx context.Context, buff []byte, fhIn *FileHandle, offset int64) (int64, error) {
// read data from source file
size := len(buff)
fhIn.lockForRead(offset, size)
defer fhIn.unlockForRead(offset, size)
n, tsNs, err := fhIn.readFromChunksWithContext(ctx, buff, offset)
if err == nil || err == io.EOF {
maxStop := fhIn.readFromDirtyPages(buff, offset, tsNs)
n = max(maxStop-offset, n)
}
if err == io.EOF {
err = nil
}
return n, err
}

2
weed/mq/logstore/read_parquet_to_log.go

@ -55,7 +55,7 @@ func GenParquetReadFunc(filerClient filer_pb.FilerClient, t topic.Topic, p topic
visibleIntervals, _ := filer.NonOverlappingVisibleIntervals(context.Background(), lookupFileIdFn, entry.Chunks, 0, int64(fileSize))
chunkViews := filer.ViewFromVisibleIntervals(visibleIntervals, 0, int64(fileSize))
readerCache := filer.NewReaderCache(32, chunkCache, lookupFileIdFn)
readerAt := filer.NewChunkReaderAtFromClient(readerCache, chunkViews, int64(fileSize))
readerAt := filer.NewChunkReaderAtFromClient(context.Background(), readerCache, chunkViews, int64(fileSize))
// create parquet reader
parquetReader := parquet.NewReader(readerAt)

12
weed/server/webdav_server.go

@ -3,13 +3,14 @@ package weed_server
import (
"context"
"fmt"
"github.com/seaweedfs/seaweedfs/weed/util/version"
"io"
"os"
"path"
"strings"
"time"
"github.com/seaweedfs/seaweedfs/weed/util/version"
"github.com/seaweedfs/seaweedfs/weed/util/buffered_writer"
"golang.org/x/net/webdav"
"google.golang.org/grpc"
@ -126,6 +127,7 @@ type WebDavFile struct {
visibleIntervals *filer.IntervalList[*filer.VisibleInterval]
reader io.ReaderAt
bufWriter *buffered_writer.BufferedWriteCloser
ctx context.Context
}
func NewWebDavFileSystem(option *WebDavOption) (webdav.FileSystem, error) {
@ -269,6 +271,7 @@ func (fs *WebDavFileSystem) OpenFile(ctx context.Context, fullFilePath string, f
name: fullFilePath,
isDirectory: false,
bufWriter: buffered_writer.NewBufferedWriteCloser(fs.option.MaxMB * 1024 * 1024),
ctx: ctx,
}, nil
}
@ -277,7 +280,7 @@ func (fs *WebDavFileSystem) OpenFile(ctx context.Context, fullFilePath string, f
if err == os.ErrNotExist {
return nil, err
}
return &WebDavFile{fs: fs}, nil
return &WebDavFile{fs: fs, ctx: ctx}, nil
}
if !strings.HasSuffix(fullFilePath, "/") && fi.IsDir() {
fullFilePath += "/"
@ -288,6 +291,7 @@ func (fs *WebDavFileSystem) OpenFile(ctx context.Context, fullFilePath string, f
name: fullFilePath,
isDirectory: false,
bufWriter: buffered_writer.NewBufferedWriteCloser(fs.option.MaxMB * 1024 * 1024),
ctx: ctx,
}, nil
}
@ -557,12 +561,12 @@ func (f *WebDavFile) Read(p []byte) (readSize int, err error) {
return 0, io.EOF
}
if f.visibleIntervals == nil {
f.visibleIntervals, _ = filer.NonOverlappingVisibleIntervals(context.Background(), filer.LookupFn(f.fs), f.entry.GetChunks(), 0, fileSize)
f.visibleIntervals, _ = filer.NonOverlappingVisibleIntervals(f.ctx, filer.LookupFn(f.fs), f.entry.GetChunks(), 0, fileSize)
f.reader = nil
}
if f.reader == nil {
chunkViews := filer.ViewFromVisibleIntervals(f.visibleIntervals, 0, fileSize)
f.reader = filer.NewChunkReaderAtFromClient(f.fs.readerCache, chunkViews, fileSize)
f.reader = filer.NewChunkReaderAtFromClient(f.ctx, f.fs.readerCache, chunkViews, fileSize)
}
readSize, err = f.reader.ReadAt(p, f.off)

3
weed/shell/command_fs_cat.go

@ -3,10 +3,11 @@ package shell
import (
"context"
"fmt"
"io"
"github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/util"
"io"
)
func init() {

Loading…
Cancel
Save