Browse Source

context cancellation during reading range reading large files (#7092)

* context cancellation during reading range reading large files

* address comments
pull/7095/head
Chris Lu 2 months ago
committed by GitHub
parent
commit
a834327755
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
  1. 40
      weed/filer/filechunk_manifest.go
  2. 2
      weed/filer/reader_at.go
  3. 39
      weed/util/http/http_global_client_util.go

40
weed/filer/filechunk_manifest.go

@ -116,13 +116,13 @@ func fetchWholeChunk(ctx context.Context, bytesBuffer *bytes.Buffer, lookupFileI
return nil return nil
} }
func fetchChunkRange(buffer []byte, lookupFileIdFn wdclient.LookupFileIdFunctionType, fileId string, cipherKey []byte, isGzipped bool, offset int64) (int, error) {
urlStrings, err := lookupFileIdFn(context.Background(), fileId)
func fetchChunkRange(ctx context.Context, buffer []byte, lookupFileIdFn wdclient.LookupFileIdFunctionType, fileId string, cipherKey []byte, isGzipped bool, offset int64) (int, error) {
urlStrings, err := lookupFileIdFn(ctx, fileId)
if err != nil { if err != nil {
glog.Errorf("operation LookupFileId %s failed, err: %v", fileId, err)
glog.ErrorfCtx(ctx, "operation LookupFileId %s failed, err: %v", fileId, err)
return 0, err return 0, err
} }
return util_http.RetriedFetchChunkData(context.Background(), buffer, urlStrings, cipherKey, isGzipped, false, offset, fileId)
return util_http.RetriedFetchChunkData(ctx, buffer, urlStrings, cipherKey, isGzipped, false, offset, fileId)
} }
func retriedStreamFetchChunkData(ctx context.Context, writer io.Writer, urlStrings []string, jwt string, cipherKey []byte, isGzipped bool, isFullChunk bool, offset int64, size int) (err error) { func retriedStreamFetchChunkData(ctx context.Context, writer io.Writer, urlStrings []string, jwt string, cipherKey []byte, isGzipped bool, isFullChunk bool, offset int64, size int) (err error) {
@ -131,12 +131,34 @@ func retriedStreamFetchChunkData(ctx context.Context, writer io.Writer, urlStrin
var totalWritten int var totalWritten int
for waitTime := time.Second; waitTime < util.RetryWaitTime; waitTime += waitTime / 2 { for waitTime := time.Second; waitTime < util.RetryWaitTime; waitTime += waitTime / 2 {
// Check for context cancellation before starting retry loop
select {
case <-ctx.Done():
return ctx.Err()
default:
}
retriedCnt := 0 retriedCnt := 0
for _, urlString := range urlStrings { for _, urlString := range urlStrings {
// Check for context cancellation before each volume server request
select {
case <-ctx.Done():
return ctx.Err()
default:
}
retriedCnt++ retriedCnt++
var localProcessed int var localProcessed int
var writeErr error var writeErr error
shouldRetry, err = util_http.ReadUrlAsStreamAuthenticated(ctx, urlString+"?readDeleted=true", jwt, cipherKey, isGzipped, isFullChunk, offset, size, func(data []byte) { shouldRetry, err = util_http.ReadUrlAsStreamAuthenticated(ctx, urlString+"?readDeleted=true", jwt, cipherKey, isGzipped, isFullChunk, offset, size, func(data []byte) {
// Check for context cancellation during data processing
select {
case <-ctx.Done():
writeErr = ctx.Err()
return
default:
}
if totalWritten > localProcessed { if totalWritten > localProcessed {
toBeSkipped := totalWritten - localProcessed toBeSkipped := totalWritten - localProcessed
if len(data) <= toBeSkipped { if len(data) <= toBeSkipped {
@ -170,7 +192,15 @@ func retriedStreamFetchChunkData(ctx context.Context, writer io.Writer, urlStrin
} }
if err != nil && shouldRetry { if err != nil && shouldRetry {
glog.V(0).InfofCtx(ctx, "retry reading in %v", waitTime) glog.V(0).InfofCtx(ctx, "retry reading in %v", waitTime)
time.Sleep(waitTime)
// Sleep with proper context cancellation and timer cleanup
timer := time.NewTimer(waitTime)
select {
case <-ctx.Done():
timer.Stop()
return ctx.Err()
case <-timer.C:
// Continue with retry
}
} else { } else {
break break
} }

2
weed/filer/reader_at.go

@ -199,7 +199,7 @@ func (c *ChunkReadAt) readChunkSliceAt(buffer []byte, chunkView *ChunkView, next
if n > 0 { if n > 0 {
return n, err return n, err
} }
return fetchChunkRange(buffer, c.readerCache.lookupFileIdFn, chunkView.FileId, chunkView.CipherKey, chunkView.IsGzipped, int64(offset))
return fetchChunkRange(context.Background(), buffer, c.readerCache.lookupFileIdFn, chunkView.FileId, chunkView.CipherKey, chunkView.IsGzipped, int64(offset))
} }
shouldCache := (uint64(chunkView.ViewOffset) + chunkView.ChunkSize) <= c.readerCache.chunkCache.GetMaxFilePartSizeInCache() shouldCache := (uint64(chunkView.ViewOffset) + chunkView.ChunkSize) <= c.readerCache.chunkCache.GetMaxFilePartSizeInCache()

39
weed/util/http/http_global_client_util.go

@ -357,6 +357,13 @@ func ReadUrlAsStreamAuthenticated(ctx context.Context, fileUrl, jwt string, ciph
defer mem.Free(buf) defer mem.Free(buf)
for { for {
// Check for context cancellation before each read
select {
case <-ctx.Done():
return false, ctx.Err()
default:
}
m, err = reader.Read(buf) m, err = reader.Read(buf)
if m > 0 { if m > 0 {
fn(buf[:m]) fn(buf[:m])
@ -482,12 +489,34 @@ func RetriedFetchChunkData(ctx context.Context, buffer []byte, urlStrings []stri
var shouldRetry bool var shouldRetry bool
for waitTime := time.Second; waitTime < util.RetryWaitTime; waitTime += waitTime / 2 { for waitTime := time.Second; waitTime < util.RetryWaitTime; waitTime += waitTime / 2 {
// Check for context cancellation before starting retry loop
select {
case <-ctx.Done():
return n, ctx.Err()
default:
}
for _, urlString := range urlStrings { for _, urlString := range urlStrings {
// Check for context cancellation before each volume server request
select {
case <-ctx.Done():
return n, ctx.Err()
default:
}
n = 0 n = 0
if strings.Contains(urlString, "%") { if strings.Contains(urlString, "%") {
urlString = url.PathEscape(urlString) urlString = url.PathEscape(urlString)
} }
shouldRetry, err = ReadUrlAsStreamAuthenticated(ctx, urlString+"?readDeleted=true", string(jwt), cipherKey, isGzipped, isFullChunk, offset, len(buffer), func(data []byte) { shouldRetry, err = ReadUrlAsStreamAuthenticated(ctx, urlString+"?readDeleted=true", string(jwt), cipherKey, isGzipped, isFullChunk, offset, len(buffer), func(data []byte) {
// Check for context cancellation during data processing
select {
case <-ctx.Done():
// Stop processing data when context is cancelled
return
default:
}
if n < len(buffer) { if n < len(buffer) {
x := copy(buffer[n:], data) x := copy(buffer[n:], data)
n += x n += x
@ -504,7 +533,15 @@ func RetriedFetchChunkData(ctx context.Context, buffer []byte, urlStrings []stri
} }
if err != nil && shouldRetry { if err != nil && shouldRetry {
glog.V(0).InfofCtx(ctx, "retry reading in %v", waitTime) glog.V(0).InfofCtx(ctx, "retry reading in %v", waitTime)
time.Sleep(waitTime)
// Sleep with proper context cancellation and timer cleanup
timer := time.NewTimer(waitTime)
select {
case <-ctx.Done():
timer.Stop()
return n, ctx.Err()
case <-timer.C:
// Continue with retry
}
} else { } else {
break break
} }

Loading…
Cancel
Save