diff --git a/weed/filer/filechunk_manifest.go b/weed/filer/filechunk_manifest.go index 2abbc6729..18ed8fa8f 100644 --- a/weed/filer/filechunk_manifest.go +++ b/weed/filer/filechunk_manifest.go @@ -116,13 +116,13 @@ func fetchWholeChunk(ctx context.Context, bytesBuffer *bytes.Buffer, lookupFileI return nil } -func fetchChunkRange(buffer []byte, lookupFileIdFn wdclient.LookupFileIdFunctionType, fileId string, cipherKey []byte, isGzipped bool, offset int64) (int, error) { - urlStrings, err := lookupFileIdFn(context.Background(), fileId) +func fetchChunkRange(ctx context.Context, buffer []byte, lookupFileIdFn wdclient.LookupFileIdFunctionType, fileId string, cipherKey []byte, isGzipped bool, offset int64) (int, error) { + urlStrings, err := lookupFileIdFn(ctx, fileId) if err != nil { - glog.Errorf("operation LookupFileId %s failed, err: %v", fileId, err) + glog.ErrorfCtx(ctx, "operation LookupFileId %s failed, err: %v", fileId, err) return 0, err } - return util_http.RetriedFetchChunkData(context.Background(), buffer, urlStrings, cipherKey, isGzipped, false, offset, fileId) + return util_http.RetriedFetchChunkData(ctx, buffer, urlStrings, cipherKey, isGzipped, false, offset, fileId) } func retriedStreamFetchChunkData(ctx context.Context, writer io.Writer, urlStrings []string, jwt string, cipherKey []byte, isGzipped bool, isFullChunk bool, offset int64, size int) (err error) { @@ -131,12 +131,34 @@ func retriedStreamFetchChunkData(ctx context.Context, writer io.Writer, urlStrin var totalWritten int for waitTime := time.Second; waitTime < util.RetryWaitTime; waitTime += waitTime / 2 { + // Check for context cancellation before starting retry loop + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + retriedCnt := 0 for _, urlString := range urlStrings { + // Check for context cancellation before each volume server request + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + retriedCnt++ var localProcessed int var writeErr error shouldRetry, err = util_http.ReadUrlAsStreamAuthenticated(ctx, urlString+"?readDeleted=true", jwt, cipherKey, isGzipped, isFullChunk, offset, size, func(data []byte) { + // Check for context cancellation during data processing + select { + case <-ctx.Done(): + writeErr = ctx.Err() + return + default: + } + if totalWritten > localProcessed { toBeSkipped := totalWritten - localProcessed if len(data) <= toBeSkipped { @@ -170,7 +192,15 @@ func retriedStreamFetchChunkData(ctx context.Context, writer io.Writer, urlStrin } if err != nil && shouldRetry { glog.V(0).InfofCtx(ctx, "retry reading in %v", waitTime) - time.Sleep(waitTime) + // Sleep with proper context cancellation and timer cleanup + timer := time.NewTimer(waitTime) + select { + case <-ctx.Done(): + timer.Stop() + return ctx.Err() + case <-timer.C: + // Continue with retry + } } else { break } diff --git a/weed/filer/reader_at.go b/weed/filer/reader_at.go index 83e66523d..b33087777 100644 --- a/weed/filer/reader_at.go +++ b/weed/filer/reader_at.go @@ -199,7 +199,7 @@ func (c *ChunkReadAt) readChunkSliceAt(buffer []byte, chunkView *ChunkView, next if n > 0 { return n, err } - return fetchChunkRange(buffer, c.readerCache.lookupFileIdFn, chunkView.FileId, chunkView.CipherKey, chunkView.IsGzipped, int64(offset)) + return fetchChunkRange(context.Background(), buffer, c.readerCache.lookupFileIdFn, chunkView.FileId, chunkView.CipherKey, chunkView.IsGzipped, int64(offset)) } shouldCache := (uint64(chunkView.ViewOffset) + chunkView.ChunkSize) <= c.readerCache.chunkCache.GetMaxFilePartSizeInCache() diff --git a/weed/util/http/http_global_client_util.go b/weed/util/http/http_global_client_util.go index 27398f3ec..78ed55fa7 100644 --- a/weed/util/http/http_global_client_util.go +++ b/weed/util/http/http_global_client_util.go @@ -357,6 +357,13 @@ func ReadUrlAsStreamAuthenticated(ctx context.Context, fileUrl, jwt string, ciph defer mem.Free(buf) for { + // Check for context cancellation before each read + select { + case <-ctx.Done(): + return false, ctx.Err() + default: + } + m, err = reader.Read(buf) if m > 0 { fn(buf[:m]) @@ -482,12 +489,34 @@ func RetriedFetchChunkData(ctx context.Context, buffer []byte, urlStrings []stri var shouldRetry bool for waitTime := time.Second; waitTime < util.RetryWaitTime; waitTime += waitTime / 2 { + // Check for context cancellation before starting retry loop + select { + case <-ctx.Done(): + return n, ctx.Err() + default: + } + for _, urlString := range urlStrings { + // Check for context cancellation before each volume server request + select { + case <-ctx.Done(): + return n, ctx.Err() + default: + } + n = 0 if strings.Contains(urlString, "%") { urlString = url.PathEscape(urlString) } shouldRetry, err = ReadUrlAsStreamAuthenticated(ctx, urlString+"?readDeleted=true", string(jwt), cipherKey, isGzipped, isFullChunk, offset, len(buffer), func(data []byte) { + // Check for context cancellation during data processing + select { + case <-ctx.Done(): + // Stop processing data when context is cancelled + return + default: + } + if n < len(buffer) { x := copy(buffer[n:], data) n += x @@ -504,7 +533,15 @@ func RetriedFetchChunkData(ctx context.Context, buffer []byte, urlStrings []stri } if err != nil && shouldRetry { glog.V(0).InfofCtx(ctx, "retry reading in %v", waitTime) - time.Sleep(waitTime) + // Sleep with proper context cancellation and timer cleanup + timer := time.NewTimer(waitTime) + select { + case <-ctx.Done(): + timer.Stop() + return n, ctx.Err() + case <-timer.C: + // Continue with retry + } } else { break }